Repository: asterixdb Updated Branches: refs/heads/master 87411c22c -> 979012d5e
[ASTERIXDB-2064][ING] Timeout Stop Feed - user model changes: no - storage format changes: no - interface changes: no details: - The abort feed message stops the reader and wait for the dataflow controller to signal end of life. - If the reader returns true to stop but the dataflow controller never signal ends, it can get stuck. - This change adds a timeout after which, the task thread is interrupted. Change-Id: If609a8343767ee7a80689a58af35a1b3fca2964b Reviewed-on: https://asterix-gerrit.ics.uci.edu/1964 Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Integration-Tests: Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/979012d5 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/979012d5 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/979012d5 Branch: refs/heads/master Commit: 979012d5e5c2dacde53ac6144c35a155293a8d4e Parents: 87411c2 Author: Abdullah Alamoudi <[email protected]> Authored: Thu Aug 24 09:42:29 2017 -0700 Committer: abdullah alamoudi <[email protected]> Committed: Thu Aug 24 13:31:58 2017 -0700 ---------------------------------------------------------------------- .../external/api/IDataFlowController.java | 2 +- .../dataflow/FeedRecordDataFlowController.java | 16 ++++++++++++---- .../dataflow/FeedStreamDataFlowController.java | 2 +- .../external/dataset/adapter/FeedAdapter.java | 4 ++-- .../FeedIntakeOperatorNodePushable.java | 19 ++++++++++++++++--- .../library/adapter/TestTypedAdapter.java | 2 +- .../apache/hyracks/api/exceptions/ErrorCode.java | 1 + .../src/main/resources/errormsg/en.properties | 1 + 8 files changed, 35 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/979012d5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java index 7412338..f59b82e 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java @@ -40,7 +40,7 @@ public interface IDataFlowController { throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED); } - public default boolean stop() throws HyracksDataException { + public default boolean stop(long timeout) throws HyracksDataException { throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/979012d5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java index e24c26d..4ed1b08 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java @@ -213,16 +213,24 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl } } - private void waitForSignal() throws InterruptedException { + private void waitForSignal(long timeout) throws InterruptedException, HyracksDataException { + if (timeout <= 0) { + throw new IllegalArgumentException("timeout must be greater than 0"); + } synchronized (closed) { while (!closed.get()) { - closed.wait(); + long before = System.currentTimeMillis(); + closed.wait(timeout); + timeout -= System.currentTimeMillis() - before; + if (!closed.get() && timeout <= 0) { + throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.TIMEOUT); + } } } } @Override - public boolean stop() throws HyracksDataException { + public boolean stop(long timeout) throws HyracksDataException { synchronized (this) { switch (state) { case CREATED: @@ -238,7 +246,7 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl } if (recordReader.stop()) { try { - waitForSignal(); + waitForSignal(timeout); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw HyracksDataException.create(e); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/979012d5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java index 1f1f545..025520e 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java @@ -71,7 +71,7 @@ public class FeedStreamDataFlowController extends AbstractFeedDataFlowController } @Override - public boolean stop() throws HyracksDataException { + public boolean stop(long timeout) throws HyracksDataException { try { if (stream.stop()) { return true; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/979012d5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java index eeda80c..fd9db7e 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java @@ -35,8 +35,8 @@ public class FeedAdapter implements IDataSourceAdapter { controller.start(writer); } - public boolean stop() throws HyracksDataException { - return controller.stop(); + public boolean stop(long timeout) throws HyracksDataException { + return controller.stop(timeout); } public boolean pause() throws HyracksDataException { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/979012d5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java index 16b8fba..7907e69 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java @@ -30,6 +30,7 @@ import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.util.HyracksConstants; import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender; @@ -42,6 +43,8 @@ import org.apache.hyracks.dataflow.common.utils.TaskUtil; */ public class FeedIntakeOperatorNodePushable extends ActiveSourceOperatorNodePushable { private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorNodePushable.class.getName()); + // TODO: Make configurable https://issues.apache.org/jira/browse/ASTERIXDB-2065 + public static final int DEFAULT_ABORT_TIMEOUT = 10000; private final FeedIntakeOperatorDescriptor opDesc; private final FeedAdapter adapter; private boolean poisoned = false; @@ -121,9 +124,19 @@ public class FeedIntakeOperatorNodePushable extends ActiveSourceOperatorNodePush LOGGER.info(runtimeId + " aborting..."); synchronized (this) { poisoned = true; - if (!adapter.stop()) { - LOGGER.info(runtimeId + " failed to stop adapter. interrupting the thread..."); - taskThread.interrupt(); + try { + if (!adapter.stop(DEFAULT_ABORT_TIMEOUT)) { + LOGGER.info(runtimeId + " failed to stop adapter. interrupting the thread..."); + taskThread.interrupt(); + } + } catch (HyracksDataException hde) { + if (hde.getComponent() == ErrorCode.HYRACKS && hde.getErrorCode() == ErrorCode.TIMEOUT) { + LOGGER.log(Level.WARNING, runtimeId + " stop adapter timed out. interrupting the thread...", hde); + taskThread.interrupt(); + } else { + LOGGER.log(Level.WARNING, "Failure during attempt to stop " + runtimeId, hde); + throw hde; + } } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/979012d5/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java index 2273bea..fcd010d 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java @@ -137,7 +137,7 @@ public class TestTypedAdapter extends FeedAdapter { } @Override - public boolean stop() { + public boolean stop(long timeout) { generator.stop(); return true; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/979012d5/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index b054faf..ff98efa 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -124,6 +124,7 @@ public class ErrorCode { public static final int CANNOT_MODIFY_INDEX_DISK_IS_FULL = 88; public static final int GROUP_BY_MEMORY_BUDGET_EXCEEDS = 89; public static final int ILLEGAL_MEMORY_BUDGET = 90; + public static final int TIMEOUT = 91; // Compilation error codes. public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/979012d5/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index 1d2143b..6d4ccdb 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -107,5 +107,6 @@ 88 = Cannot modify index (Disk is full) 89 = The byte size of a single group (%1$s bytes) exceeds the budget for a group by operator (%2$s bytes) 90 = Memory budget for the %1$s operator (%2$s bytes) is lower than the minimum (%3$s bytes) +91 = Operation timed out 10000 = The given rule collection %1$s is not an instance of the List class.
