DRILL-5961: For long running queries (> 10 min) Drill may raise FragmentSetupException for completed/cancelled fragments
This closes #1041 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c5af3aef Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c5af3aef Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c5af3aef Branch: refs/heads/master Commit: c5af3aefe79c34d5b76bec8ce55875decca9e617 Parents: cbb79e5 Author: Vlad Rozov <[email protected]> Authored: Tue Nov 14 16:24:01 2017 -0800 Committer: Parth Chandra <[email protected]> Committed: Thu Jan 11 17:11:41 2018 -0800 ---------------------------------------------------------------------- .../drill/exec/rpc/control/WorkEventBus.java | 91 +++++--------------- .../org/apache/drill/exec/work/WorkManager.java | 4 +- .../exec/work/batch/ControlMessageHandler.java | 10 +-- .../work/fragment/NonRootFragmentManager.java | 5 -- .../drill/exec/rpc/data/TestBitBitKerberos.java | 3 - .../apache/drill/exec/rpc/data/TestBitRpc.java | 1 - 6 files changed, 30 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/c5af3aef/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java index 3e461ef..c889e07 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java @@ -39,10 +39,6 @@ public class WorkEventBus { private final ConcurrentMap<FragmentHandle, FragmentManager> managers = Maps.newConcurrentMap(); private final ConcurrentMap<QueryId, FragmentStatusListener> listeners = new ConcurrentHashMap<>(16, 0.75f, 16); - private final Cache<FragmentHandle, Integer> recentlyFinishedFragments = CacheBuilder.newBuilder() - .maximumSize(10000) - .expireAfterWrite(10, TimeUnit.MINUTES) - .build(); public void removeFragmentStatusListener(final QueryId queryId) { if (logger.isDebugEnabled()) { @@ -74,83 +70,42 @@ public class WorkEventBus { public void addFragmentManager(final FragmentManager fragmentManager) { if (logger.isDebugEnabled()) { - logger.debug("Manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle())); + logger.debug("Fragment {} manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()), fragmentManager); } final FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager); - if (old != null) { - throw new IllegalStateException( - "Tried to set fragment manager when has already been set for the provided fragment handle."); - } - } - - public FragmentManager getFragmentManagerIfExists(final FragmentHandle handle) { - synchronized (this) { - return managers.get(handle); + if (old != null) { + throw new IllegalStateException( + String.format("Manager {} for fragment {} already exists.", old, QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()))); } } - public FragmentManager getFragmentManager(final FragmentHandle handle) throws FragmentSetupException { - synchronized (this) { - // Check if this was a recently finished (completed or cancelled) fragment. If so, throw away message. - if (recentlyFinishedFragments.asMap().containsKey(handle)) { - if (logger.isDebugEnabled()) { - logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle); - } - return null; - } - - // since non-leaf fragments are sent first, it is an error condition if the manager is unavailable. - final FragmentManager m = managers.get(handle); - if (m != null) { - return m; - } - } - throw new FragmentSetupException("Failed to receive plan fragment that was required for id: " - + QueryIdHelper.getQueryIdentifier(handle)); + public FragmentManager getFragmentManager(final FragmentHandle handle) { + return managers.get(handle); } /** - * Removes fragment manager (for the corresponding the handle) from the work event bus. This method can be called - * multiple times. The manager will be removed only once (the first call). - * @param handle the handle to the fragment - */ - public void removeFragmentManager(final FragmentHandle handle) { - if (logger.isDebugEnabled()) { - logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle)); - } - - synchronized (this) { - final FragmentManager manager = managers.get(handle); - if (manager != null) { - recentlyFinishedFragments.put(handle, 1); - managers.remove(handle); - } else { - logger.warn("Fragment {} not found in the work bus.", QueryIdHelper.getQueryIdentifier(handle)); - } - } - } - - /** - * Cancels and removes fragment manager (for the corresponding the handle) from the work event bus, Currently, used - * for fragments waiting on data (root and intermediate). + * Optionally cancels and removes fragment manager (for the corresponding the handle) from the work event bus. Currently, used + * for fragments waiting on data (root and intermediate). This method can be called multiple times. The manager will be removed + * only once (the first call). * @param handle the handle to the fragment + * @param cancel * @return if the fragment was found and removed from the event bus */ - public boolean cancelAndRemoveFragmentManagerIfExists(final FragmentHandle handle) { - if (logger.isDebugEnabled()) { - logger.debug("Cancelling and removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle)); - } - - synchronized (this) { - final FragmentManager manager = managers.get(handle); - if (manager == null) { - return false; + public boolean removeFragmentManager(final FragmentHandle handle, final boolean cancel) { + final FragmentManager manager = managers.remove(handle); + if (manager != null) { + assert !manager.isCancelled() : String.format("Fragment {} manager {} is already cancelled.", QueryIdHelper.getQueryIdentifier(handle), manager); + if (cancel) { + manager.cancel(); + } + if (logger.isDebugEnabled()) { + logger.debug("{} fragment {} manager {} from the work bus.", cancel ? "Cancel and removed" : "Removed", + QueryIdHelper.getQueryIdentifier(handle), manager); } - - manager.cancel(); - recentlyFinishedFragments.put(handle, 1); - managers.remove(handle); return true; + } else if (logger.isWarnEnabled()) { + logger.warn("Fragment {} manager is not found in the work bus.", QueryIdHelper.getQueryIdentifier(handle)); } + return false; } } http://git-wip-us.apache.org/repos/asf/drill/blob/c5af3aef/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java index e935819..d75668c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java @@ -325,7 +325,9 @@ public class WorkManager implements AutoCloseable { @Override protected void cleanup() { runningFragments.remove(fragmentHandle); - workBus.removeFragmentManager(fragmentHandle); + if (!fragmentManager.isCancelled()) { + workBus.removeFragmentManager(fragmentHandle, false); + } indicateIfSafeToExit(); } }); http://git-wip-us.apache.org/repos/asf/drill/blob/c5af3aef/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java index 2bbaf1b..972b56a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java @@ -193,7 +193,7 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection> // Case 2: Cancel active intermediate fragment. Such a fragment will be in the work bus. Delegate cancel to the // work bus. - final boolean removed = bee.getContext().getWorkBus().cancelAndRemoveFragmentManagerIfExists(handle); + final boolean removed = bee.getContext().getWorkBus().removeFragmentManager(handle, true); if (removed) { return Acks.OK; } @@ -217,7 +217,7 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection> private Ack resumeFragment(final FragmentHandle handle) { // resume a pending fragment - final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle); + final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(handle); if (manager != null) { manager.unpause(); return Acks.OK; @@ -237,14 +237,12 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection> private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) { - final FragmentManager manager = - bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender()); + final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(finishedReceiver.getSender()); - FragmentExecutor executor; if (manager != null) { manager.receivingFragmentFinished(finishedReceiver.getReceiver()); } else { - executor = bee.getFragmentRunner(finishedReceiver.getSender()); + final FragmentExecutor executor = bee.getFragmentRunner(finishedReceiver.getSender()); if (executor != null) { executor.receivingFragmentFinished(finishedReceiver.getReceiver()); } else { http://git-wip-us.apache.org/repos/asf/drill/blob/c5af3aef/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java index 7d1585b..17a5965 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java @@ -57,9 +57,4 @@ public class NonRootFragmentManager extends AbstractFragmentManager { public void receivingFragmentFinished(final FragmentHandle handle) { fragmentExecutor.receivingFragmentFinished(handle); } - - @Override - public synchronized void cancel() { - super.cancel(); - } } http://git-wip-us.apache.org/repos/asf/drill/blob/c5af3aef/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java index a24b0db..834f108 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java @@ -225,7 +225,6 @@ public class TestBitBitKerberos extends BaseTestQuery { public void success(@Injectable WorkerBee bee, @Injectable final WorkEventBus workBus) throws Exception { new NonStrictExpectations() {{ - workBus.getFragmentManagerIfExists((FragmentHandle) any); result = manager; workBus.getFragmentManager( (FragmentHandle) any); result = manager; }}; @@ -273,7 +272,6 @@ public class TestBitBitKerberos extends BaseTestQuery { updateTestCluster(1, newConfig); new NonStrictExpectations() {{ - workBus.getFragmentManagerIfExists((FragmentHandle) any); result = manager; workBus.getFragmentManager( (FragmentHandle) any); result = manager; }}; @@ -322,7 +320,6 @@ public class TestBitBitKerberos extends BaseTestQuery { updateTestCluster(1, newConfig); new NonStrictExpectations() {{ - workBus.getFragmentManagerIfExists((FragmentHandle) any); result = manager; workBus.getFragmentManager( (FragmentHandle) any); result = manager; }}; http://git-wip-us.apache.org/repos/asf/drill/blob/c5af3aef/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java index 8c53915..1e8318f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java @@ -105,7 +105,6 @@ public class TestBitRpc extends ExecTest { new NonStrictExpectations() {{ - workBus.getFragmentManagerIfExists((FragmentHandle) any); result = fman; workBus.getFragmentManager( (FragmentHandle) any); result = fman; }};
