Repository: asterixdb Updated Branches: refs/heads/master 979012d5e -> 4f79620cd
[ASTERIXDB-2067][ING] Handle Failures in Controller Flush - user model changes: no - storage format changes: no - interface changes: no details: - Failures that happen in feed while reading from external sources allows ingestion pipeline to close gracefully pushing parsed records in the frame forward before failing. - There was an assumption that when hasNext() or next() are being called on a data reader and we fail, then the failure didn't affect the integrity of the pipeline. - This assumption is incorrect as hasNext() and next() can themselves flush the pipeline and if the failure happened during the flush call, the pipeline must be failed. Change-Id: Ib9be729088bd94338ef2353333eaea34ba3da99f Reviewed-on: https://asterix-gerrit.ics.uci.edu/1968 Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/4f79620c Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/4f79620c Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/4f79620c Branch: refs/heads/master Commit: 4f79620cd71bd750548a749db352c418fbadddae Parents: 979012d Author: Abdullah Alamoudi <[email protected]> Authored: Thu Aug 24 17:33:29 2017 -0700 Committer: abdullah alamoudi <[email protected]> Committed: Fri Aug 25 10:31:38 2017 -0700 ---------------------------------------------------------------------- .../org/apache/asterix/app/active/RecoveryTask.java | 3 +++ .../dataflow/AbstractFeedDataFlowController.java | 3 +++ .../dataflow/FeedRecordDataFlowController.java | 16 ++++++++++++++-- 3 files changed, 20 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4f79620c/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java index 7b7de93..73439ce 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java @@ -171,6 +171,9 @@ public class RecoveryTask { DatasetUtil.getFullyQualifiedName(dataset)); } synchronized (listener) { + if (cancelRecovery) { + return null; + } if (listener.getState() == ActivityState.TEMPORARILY_FAILED) { listener.setState(ActivityState.PERMANENTLY_FAILED); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4f79620c/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java index 53fa137..3437de1 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java @@ -30,6 +30,7 @@ public abstract class AbstractFeedDataFlowController implements IDataFlowControl protected final int numOfFields; protected final ArrayTupleBuilder tb; protected final FeedLogManager feedLogManager; + protected boolean flushing; public AbstractFeedDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder, FeedLogManager feedLogManager, int numOfFields) { @@ -54,7 +55,9 @@ public abstract class AbstractFeedDataFlowController implements IDataFlowControl @Override public void flush() throws HyracksDataException { + flushing = true; tupleForwarder.flush(); + flushing = false; } public abstract String getStats(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4f79620c/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java index 4ed1b08..c85e236 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java @@ -131,12 +131,18 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl return state; } - private IRawRecord<? extends T> next() throws HyracksDataException { + private IRawRecord<? extends T> next() throws Exception { try { return recordReader.next(); } catch (InterruptedException e) { // NOSONAR Gracefully handling interrupt to push records in the pipeline + if (flushing) { + throw e; + } throw new RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e); } catch (Exception e) { + if (flushing) { + throw e; + } if (!recordReader.handleException(e)) { throw new RuntimeDataException(ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD, e); } @@ -144,13 +150,19 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl } } - private boolean hasNext() throws HyracksDataException { + private boolean hasNext() throws Exception { while (true) { try { return recordReader.hasNext(); } catch (InterruptedException e) { // NOSONAR Gracefully handling interrupt to push records in the pipeline + if (flushing) { + throw e; + } throw new RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e); } catch (Exception e) { + if (flushing) { + throw e; + } if (!recordReader.handleException(e)) { throw new RuntimeDataException(ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD, e); }
