Update WorkEventBus to immediately fail if Fragment is unavailable. No need to wait now that we propogate intermediate fragments before leaf fragments.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/797b1bcf Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/797b1bcf Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/797b1bcf Branch: refs/heads/master Commit: 797b1bcffbcda4dd5e7c3be1bb3ba1e9593cff16 Parents: 06f0e17 Author: Jacques Nadeau <jacq...@apache.org> Authored: Thu Nov 6 20:43:32 2014 -0800 Committer: Jinfeng Ni <j...@maprtech.com> Committed: Fri Nov 7 10:50:57 2014 -0800 ---------------------------------------------------------------------- .../drill/exec/rpc/control/WorkEventBus.java | 49 +++----------------- 1 file changed, 7 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/797b1bcf/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java index 23380ff..eae7b5e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java @@ -17,25 +17,19 @@ */ package org.apache.drill.exec.rpc.control; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import org.apache.drill.exec.cache.DistributedMap; import org.apache.drill.exec.exception.FragmentSetupException; import org.apache.drill.exec.proto.BitControl.FragmentStatus; -import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.work.WorkManager.WorkerBee; -import org.apache.drill.exec.work.foreman.Foreman; import org.apache.drill.exec.work.foreman.FragmentStatusListener; import org.apache.drill.exec.work.fragment.FragmentManager; -import org.apache.drill.exec.work.fragment.NonRootFragmentManager; -import org.apache.drill.exec.work.fragment.RootFragmentManager; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; @@ -48,8 +42,9 @@ public class WorkEventBus { private final ConcurrentMap<QueryId, FragmentStatusListener> listeners = new ConcurrentHashMap<QueryId, FragmentStatusListener>( 16, 0.75f, 16); private final WorkerBee bee; - private final Cache<FragmentHandle,Void> cancelledFragments = CacheBuilder.newBuilder() + private final Cache<FragmentHandle,Integer> recentlyFinishedFragments = CacheBuilder.newBuilder() .maximumSize(10000) + .expireAfterWrite(10, TimeUnit.MINUTES) .build(); @@ -85,14 +80,10 @@ public class WorkEventBus { public void setFragmentManager(FragmentManager fragmentManager) { logger.debug("Manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle())); - - synchronized (managers) { - FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager); - managers.notifyAll(); + FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager); if (old != null) { throw new IllegalStateException( "Tried to set fragment manager when has already been set for the provided fragment handle."); - } } } @@ -102,54 +93,28 @@ public class WorkEventBus { } public FragmentManager getFragmentManager(FragmentHandle handle) throws FragmentSetupException { - // check if this was a recently canceled fragment. If so, throw away message. - if (cancelledFragments.asMap().containsKey(handle)) { + if (recentlyFinishedFragments.asMap().containsKey(handle)) { logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle); return null; } - // chm manages concurrency better then everyone fighting for the same lock so we'll do a double check. + // since non-leaf fragments are sent first, it is an error condition if the manager is unavailable. FragmentManager m = managers.get(handle); if(m != null){ return m; } - - logger.debug("Fragment was requested but no manager exists. Waiting for manager for fragment: {}", QueryIdHelper.getQueryIdentifier(handle)); - try{ - // We need to handle the race condition between the fragments being sent to leaf nodes and intermediate nodes. It is possible that a leaf node would send a data batch to a intermediate node before the intermediate node received the associated plan. As such, we will wait here for a bit to see if the appropriate fragment shows up. - long expire = System.currentTimeMillis() + 30*1000; - synchronized(managers){ - - // we loop because we may be woken up by some other, unrelated manager insertion. - while(true){ - m = managers.get(handle); - if(m != null) { - return m; - } - long timeToWait = expire - System.currentTimeMillis(); - if(timeToWait <= 0){ - break; - } - - managers.wait(timeToWait); - } - - throw new FragmentSetupException("Failed to receive plan fragment that was required for id: " + QueryIdHelper.getQueryIdentifier(handle)); - } - }catch(InterruptedException e){ - throw new FragmentSetupException("Interrupted while waiting to receive plan fragment.."); - } + throw new FragmentSetupException("Failed to receive plan fragment that was required for id: " + QueryIdHelper.getQueryIdentifier(handle)); } public void cancelFragment(FragmentHandle handle) { logger.debug("Fragment canceled: {}", QueryIdHelper.getQueryIdentifier(handle)); - cancelledFragments.put(handle, null); removeFragmentManager(handle); } public void removeFragmentManager(FragmentHandle handle) { logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle)); + recentlyFinishedFragments.put(handle, 1); managers.remove(handle); }