DRILL-5874: NPE in AnonWebUserConnection.cleanupSession() closes #993
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a26fbec1 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a26fbec1 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a26fbec1 Branch: refs/heads/master Commit: a26fbec13134f249e258be6735b82cf09ab1f406 Parents: 8eda4d7 Author: Sorabh Hamirwasia <[email protected]> Authored: Wed Oct 11 15:57:21 2017 -0700 Committer: Paul Rogers <[email protected]> Committed: Mon Oct 30 11:43:08 2017 -0700 ---------------------------------------------------------------------- .../drill/exec/server/rest/DrillRestServer.java | 37 +++- .../exec/server/rest/WebSessionResources.java | 16 +- .../exec/server/rest/WebUserConnection.java | 11 +- .../server/rest/WebSessionResourcesTest.java | 168 +++++++++++++++++++ 4 files changed, 219 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/a26fbec1/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java index 6eb47e6..1545847 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java @@ -29,6 +29,9 @@ import freemarker.cache.TemplateLoader; import freemarker.cache.WebappTemplateLoader; import freemarker.core.HTMLOutputFormat; import freemarker.template.Configuration; +import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultChannelPromise; +import io.netty.util.concurrent.EventExecutor; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.memory.BufferAllocator; @@ -108,10 +111,17 @@ public class DrillRestServer extends ResourceConfig { provider.setMapper(workManager.getContext().getLpPersistence().getMapper()); register(provider); + // Get an EventExecutor out of the BitServer EventLoopGroup to notify listeners for WebUserConnection. For + // actual connections between Drillbits this EventLoopGroup is used to handle network related events. Though + // there is no actual network connection associated with WebUserConnection but we need a CloseFuture in + // WebSessionResources, so we are using EvenExecutor from network EventLoopGroup pool. + final EventExecutor executor = workManager.getContext().getBitLoopGroup().next(); + register(new AbstractBinder() { @Override protected void configure() { bind(workManager).to(WorkManager.class); + bind(executor).to(EventExecutor.class); bind(workManager.getContext().getLpPersistence().getMapper()).to(ObjectMapper.class); bind(workManager.getContext().getStoreProvider()).to(PersistentStoreProvider.class); bind(workManager.getContext().getStorage()).to(StoragePluginRegistry.class); @@ -159,6 +169,9 @@ public class DrillRestServer extends ResourceConfig { @Inject WorkManager workManager; + @Inject + EventExecutor executor; + @SuppressWarnings("resource") @Override public WebUserConnection provide() { @@ -204,9 +217,15 @@ public class DrillRestServer extends ResourceConfig { config.getLong(ExecConstants.HTTP_SESSION_MEMORY_RESERVATION), config.getLong(ExecConstants.HTTP_SESSION_MEMORY_MAXIMUM)); + // Create a dummy close future which is needed by Foreman only. Foreman uses this future to add a close + // listener to known about channel close event from underlying layer. We use this future to notify Foreman + // listeners when the Web session (not connection) between Web Client and WebServer is closed. This will help + // Foreman to cancel all the running queries for this Web Client. + final ChannelPromise closeFuture = new DefaultChannelPromise(null, executor); + // Create a WebSessionResource instance which owns the lifecycle of all the session resources. - // Set this instance as an attribute of HttpSession, since it will be used until session is destroyed. - webSessionResources = new WebSessionResources(sessionAllocator, remoteAddress, drillUserSession); + // Set this instance as an attribute of HttpSession, since it will be used until session is destroyed + webSessionResources = new WebSessionResources(sessionAllocator, remoteAddress, drillUserSession, closeFuture); session.setAttribute(WebSessionResources.class.getSimpleName(), webSessionResources); } // Create a new WebUserConnection for the request @@ -227,6 +246,9 @@ public class DrillRestServer extends ResourceConfig { @Inject WorkManager workManager; + @Inject + EventExecutor executor; + @SuppressWarnings("resource") @Override public WebUserConnection provide() { @@ -260,8 +282,15 @@ public class DrillRestServer extends ResourceConfig { logger.trace("Failed to get the remote address of the http session request", ex); } - final WebSessionResources webSessionResources = new WebSessionResources(sessionAllocator, - remoteAddress, drillUserSession); + // Create a dummy close future which is needed by Foreman only. Foreman uses this future to add a close + // listener to known about channel close event from underlying layer. + // + // The invocation of this close future is no-op as it will be triggered after query completion in unsecure case. + // But we need this close future as it's expected by Foreman. + final ChannelPromise closeFuture = new DefaultChannelPromise(null, executor); + + final WebSessionResources webSessionResources = new WebSessionResources(sessionAllocator, remoteAddress, + drillUserSession, closeFuture); // Create a AnonWenUserConnection for this request return new AnonWebUserConnection(webSessionResources); http://git-wip-us.apache.org/repos/asf/drill/blob/a26fbec1/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java index aeed51a..2ca457c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java @@ -19,7 +19,6 @@ package org.apache.drill.exec.server.rest; import io.netty.channel.ChannelPromise; -import io.netty.channel.DefaultChannelPromise; import org.apache.drill.common.AutoCloseables; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.rpc.ChannelClosedException; @@ -43,11 +42,12 @@ public class WebSessionResources implements AutoCloseable { private ChannelPromise closeFuture; - WebSessionResources(BufferAllocator allocator, SocketAddress remoteAddress, UserSession userSession) { + WebSessionResources(BufferAllocator allocator, SocketAddress remoteAddress, + UserSession userSession, ChannelPromise closeFuture) { this.allocator = allocator; this.remoteAddress = remoteAddress; this.webUserSession = userSession; - closeFuture = new DefaultChannelPromise(null); + this.closeFuture = closeFuture; } public UserSession getSession() { @@ -68,16 +68,20 @@ public class WebSessionResources implements AutoCloseable { @Override public void close() { - try { AutoCloseables.close(webUserSession, allocator); } catch (Exception ex) { logger.error("Failure while closing the session resources", ex); } - // Set the close future associated with this session. + // Notify all the listeners of this closeFuture for failure events so that listeners can do cleanup related to this + // WebSession. This will be called after every query execution by AnonymousWebUserConnection::cleanupSession and + // for authenticated user it is called when session is invalidated. + // For authenticated user it will cancel the in-flight queries based on session invalidation. Whereas for + // unauthenticated user it's a no-op since there is no session associated with it. We don't have mechanism currently + // to call this close future upon Http connection close. if (closeFuture != null) { - closeFuture.setFailure(new ChannelClosedException("Http Session of the user is closed.")); + closeFuture.setFailure(new ChannelClosedException("Http connection is closed by Web Client")); closeFuture = null; } } http://git-wip-us.apache.org/repos/asf/drill/blob/a26fbec1/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java index bcce9eb..f46b5e5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java @@ -42,9 +42,14 @@ import java.util.Map; import java.util.Set; /** - * WebUserConnectionWrapper which represents the UserClientConnection for the WebUser submitting the query. It provides - * access to the UserSession executing the query. There is no actual physical channel corresponding to this connection - * wrapper. + * WebUserConnectionWrapper which represents the UserClientConnection between WebServer and Foreman, for the WebUser + * submitting the query. It provides access to the UserSession executing the query. There is no actual physical + * channel corresponding to this connection wrapper. + * + * It returns a close future with no actual underlying {@link io.netty.channel.Channel} associated with it but do have an + * EventExecutor out of BitServer EventLoopGroup. Since there is no actual connection established using this class, + * hence the close event will never be fired by underlying layer and close future is set only when the + * {@link WebSessionResources} are closed. */ public class WebUserConnection extends AbstractDisposableUserClientConnection implements ConnectionThrottle { http://git-wip-us.apache.org/repos/asf/drill/blob/a26fbec1/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java new file mode 100644 index 0000000..bb990de --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.server.rest; + +import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultChannelPromise; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.rpc.TransportCheck; +import org.apache.drill.exec.rpc.user.UserSession; +import org.junit.Test; + +import java.net.SocketAddress; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * Validates {@link WebSessionResources} close works as expected w.r.t {@link io.netty.channel.AbstractChannel.CloseFuture} + * associated with it. + */ +public class WebSessionResourcesTest { + //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WebSessionResourcesTest.class); + + private WebSessionResources webSessionResources; + + private boolean listenerComplete; + + private CountDownLatch latch; + + private EventExecutor executor; + + // A close listener added in close future in one of the test to see if it's invoked correctly. + private class TestClosedListener implements GenericFutureListener<Future<Void>> { + @Override + public void operationComplete(Future<Void> future) throws Exception { + listenerComplete = true; + latch.countDown(); + } + } + + /** + * Validates {@link WebSessionResources#close()} throws NPE when closefuture passed to WebSessionResources doesn't + * have a valid channel and EventExecutor associated with it. + * @throws Exception + */ + @Test + public void testChannelPromiseWithNullExecutor() throws Exception { + try { + ChannelPromise closeFuture = new DefaultChannelPromise(null); + webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class), mock + (UserSession.class), closeFuture); + webSessionResources.close(); + fail(); + } catch (Exception e) { + assertTrue(e instanceof NullPointerException); + verify(webSessionResources.getAllocator()).close(); + verify(webSessionResources.getSession()).close(); + } + } + + /** + * Validates successful {@link WebSessionResources#close()} with valid CloseFuture and other parameters. + * @throws Exception + */ + @Test + public void testChannelPromiseWithValidExecutor() throws Exception { + try { + EventExecutor mockExecutor = mock(EventExecutor.class); + ChannelPromise closeFuture = new DefaultChannelPromise(null, mockExecutor); + webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class), mock + (UserSession.class), closeFuture); + webSessionResources.close(); + verify(webSessionResources.getAllocator()).close(); + verify(webSessionResources.getSession()).close(); + verify(mockExecutor).inEventLoop(); + verify(mockExecutor).execute(any(Runnable.class)); + assertTrue(webSessionResources.getCloseFuture() == null); + assertTrue(!listenerComplete); + } catch (Exception e) { + fail(); + } + } + + /** + * Validates double call to {@link WebSessionResources#close()} doesn't throw any exception. + * @throws Exception + */ + @Test + public void testDoubleClose() throws Exception { + try { + ChannelPromise closeFuture = new DefaultChannelPromise(null, mock(EventExecutor.class)); + webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class), mock + (UserSession.class), closeFuture); + webSessionResources.close(); + + verify(webSessionResources.getAllocator()).close(); + verify(webSessionResources.getSession()).close(); + assertTrue(webSessionResources.getCloseFuture() == null); + + webSessionResources.close(); + } catch (Exception e) { + fail(); + } + } + + /** + * Validates successful {@link WebSessionResources#close()} with valid CloseFuture and {@link TestClosedListener} + * getting invoked which is added to the close future. + * @throws Exception + */ + @Test + public void testCloseWithListener() throws Exception { + try { + // Assign latch, executor and closeListener for this test case + GenericFutureListener<Future<Void>> closeListener = new TestClosedListener(); + latch = new CountDownLatch(1); + executor = TransportCheck.createEventLoopGroup(1, "Test-Thread").next(); + ChannelPromise closeFuture = new DefaultChannelPromise(null, executor); + + // create WebSessionResources with above ChannelPromise to notify listener + webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class), + mock(UserSession.class), closeFuture); + + // Add the Test Listener to close future + assertTrue(!listenerComplete); + closeFuture.addListener(closeListener); + + // Close the WebSessionResources + webSessionResources.close(); + + // Verify the states + verify(webSessionResources.getAllocator()).close(); + verify(webSessionResources.getSession()).close(); + assertTrue(webSessionResources.getCloseFuture() == null); + + // Since listener will be invoked so test should not wait forever + latch.await(); + assertTrue(listenerComplete); + } catch (Exception e) { + fail(); + } finally { + listenerComplete = false; + executor.shutdownGracefully(); + } + } +} \ No newline at end of file
