Github user vrozov commented on a diff in the pull request:
https://github.com/apache/drill/pull/1041#discussion_r155433860
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
---
@@ -74,83 +70,42 @@ public void statusUpdate(final FragmentStatus status) {
public void addFragmentManager(final FragmentManager fragmentManager) {
if (logger.isDebugEnabled()) {
- logger.debug("Manager created: {}",
QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()));
+ logger.debug("Fragment {} manager created: {}",
QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()), fragmentManager);
}
final FragmentManager old =
managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager);
- if (old != null) {
- throw new IllegalStateException(
- "Tried to set fragment manager when has already been set for
the provided fragment handle.");
- }
- }
-
- public FragmentManager getFragmentManagerIfExists(final FragmentHandle
handle) {
- synchronized (this) {
- return managers.get(handle);
+ if (old != null) {
+ throw new IllegalStateException(
+ String.format("Manager {} for fragment {} already exists.", old,
QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle())));
}
}
- public FragmentManager getFragmentManager(final FragmentHandle handle)
throws FragmentSetupException {
- synchronized (this) {
- // Check if this was a recently finished (completed or cancelled)
fragment. If so, throw away message.
- if (recentlyFinishedFragments.asMap().containsKey(handle)) {
- if (logger.isDebugEnabled()) {
- logger.debug("Fragment: {} was cancelled. Ignoring fragment
handle", handle);
- }
- return null;
- }
-
- // since non-leaf fragments are sent first, it is an error condition
if the manager is unavailable.
- final FragmentManager m = managers.get(handle);
- if (m != null) {
- return m;
- }
- }
- throw new FragmentSetupException("Failed to receive plan fragment that
was required for id: "
- + QueryIdHelper.getQueryIdentifier(handle));
+ public FragmentManager getFragmentManager(final FragmentHandle handle) {
+ return managers.get(handle);
}
/**
- * Removes fragment manager (for the corresponding the handle) from the
work event bus. This method can be called
- * multiple times. The manager will be removed only once (the first
call).
- * @param handle the handle to the fragment
- */
- public void removeFragmentManager(final FragmentHandle handle) {
- if (logger.isDebugEnabled()) {
- logger.debug("Removing fragment manager: {}",
QueryIdHelper.getQueryIdentifier(handle));
- }
-
- synchronized (this) {
- final FragmentManager manager = managers.get(handle);
- if (manager != null) {
- recentlyFinishedFragments.put(handle, 1);
- managers.remove(handle);
- } else {
- logger.warn("Fragment {} not found in the work bus.",
QueryIdHelper.getQueryIdentifier(handle));
- }
- }
- }
-
- /**
- * Cancels and removes fragment manager (for the corresponding the
handle) from the work event bus, Currently, used
- * for fragments waiting on data (root and intermediate).
+ * Optionally cancels and removes fragment manager (for the
corresponding the handle) from the work event bus. Currently, used
+ * for fragments waiting on data (root and intermediate). This method
can be called multiple times. The manager will be removed
+ * only once (the first call).
* @param handle the handle to the fragment
+ * @param cancel
* @return if the fragment was found and removed from the event bus
*/
- public boolean cancelAndRemoveFragmentManagerIfExists(final
FragmentHandle handle) {
- if (logger.isDebugEnabled()) {
- logger.debug("Cancelling and removing fragment manager: {}",
QueryIdHelper.getQueryIdentifier(handle));
- }
-
- synchronized (this) {
- final FragmentManager manager = managers.get(handle);
- if (manager == null) {
- return false;
+ public boolean removeFragmentManager(final FragmentHandle handle, final
boolean cancel) {
+ final FragmentManager manager = managers.remove(handle);
+ if (manager != null) {
+ assert !manager.isCancelled() : String.format("Fragment {} manager
{} is already cancelled.", QueryIdHelper.getQueryIdentifier(handle), manager);
+ if (cancel) {
+ manager.cancel();
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} fragment {} manager {} from the work bus.",
cancel ? "Cancel and removed" : "Removed",
+ QueryIdHelper.getQueryIdentifier(handle), manager);
}
-
- manager.cancel();
- recentlyFinishedFragments.put(handle, 1);
- managers.remove(handle);
return true;
+ } else if (logger.isWarnEnabled()) {
+ logger.warn("Fragment {} manager is not found in the work bus.",
QueryIdHelper.getQueryIdentifier(handle));
}
+ return false;
--- End diff --
ControlMessageHandler checks the result on line 196.
---