Repository: asterixdb Updated Branches: refs/heads/master c2cd8bd93 -> 91756a341
Prevent hangs on active runtime stop Change-Id: I2e60f633cac8e835dcc7211e87d104ecbb8947b0 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1608 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Michael Blow <mb...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/91756a34 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/91756a34 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/91756a34 Branch: refs/heads/master Commit: 91756a341ec47491228e6cdf024e8050806b6518 Parents: c2cd8bd Author: Abdullah Alamoudi <bamou...@gmail.com> Authored: Wed Mar 22 13:30:30 2017 -0700 Committer: abdullah alamoudi <bamou...@gmail.com> Committed: Wed Mar 22 14:28:57 2017 -0700 ---------------------------------------------------------------------- .../adapter/factory/GenericAdapterFactory.java | 4 ++ .../external/api/IDataFlowController.java | 2 +- .../external/api/IDataSourceAdapter.java | 3 +- .../AbstractFeedDataFlowController.java | 2 +- .../dataflow/FeedRecordDataFlowController.java | 23 ++++++-- .../external/dataset/adapter/FeedAdapter.java | 4 +- .../dataset/adapter/GenericAdapter.java | 2 +- .../external/feed/runtime/AdapterExecutor.java | 37 ++++++++---- .../feed/runtime/AdapterRuntimeManager.java | 61 +++++++++++++------- .../external/feed/runtime/IngestionRuntime.java | 3 +- 10 files changed, 96 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/91756a34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java index 577da5e..46da770 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java @@ -20,6 +20,8 @@ package org.apache.asterix.external.adapter.factory; import java.util.List; import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.asterix.common.api.IAppRuntimeContext; import org.apache.asterix.common.library.ILibraryManager; @@ -52,6 +54,7 @@ import org.apache.hyracks.api.io.FileSplit; public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterFactory { private static final long serialVersionUID = 1L; + private static final Logger LOGGER = Logger.getLogger(GenericAdapterFactory.class.getName()); private IExternalDataSourceFactory dataSourceFactory; private IDataParserFactory dataParserFactory; private ARecordType recordType; @@ -90,6 +93,7 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF try { restoreExternalObjects(appCtx.getLibraryManager()); } catch (Exception e) { + LOGGER.log(Level.INFO, "Failure restoring external objects", e); throw HyracksDataException.create(e); } if (isFeed) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/91756a34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java index 33f262a..def0bf1 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java @@ -24,7 +24,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public interface IDataFlowController { //TODO: Refactor this interface. Remove writer from start() signature - public void start(IFrameWriter writer) throws HyracksDataException; + public void start(IFrameWriter writer) throws HyracksDataException, InterruptedException; public default boolean pause() throws HyracksDataException { throw new HyracksDataException("Method not implemented"); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/91756a34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java index e37f2b1..48df79b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java @@ -21,6 +21,7 @@ package org.apache.asterix.external.api; import java.io.Serializable; import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.exceptions.HyracksDataException; /** * A super interface implemented by a data source adapter. An adapter can be a @@ -47,5 +48,5 @@ public interface IDataSourceAdapter extends Serializable { * operator using the instance of IFrameWriter. * @throws Exception */ - public void start(int partition, IFrameWriter writer) throws Exception; + public void start(int partition, IFrameWriter writer) throws HyracksDataException, InterruptedException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/91756a34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java index 87ee167..213231b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java @@ -60,5 +60,5 @@ public abstract class AbstractFeedDataFlowController implements IDataFlowControl @Override public abstract boolean stop() throws HyracksDataException; - public abstract boolean handleException(Throwable th); + public abstract boolean handleException(Throwable th) throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/91756a34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java index 7ba3ae4..1b12dc1 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java @@ -51,8 +51,7 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl } @Override - public void start(IFrameWriter writer) throws HyracksDataException { - HyracksDataException hde = null; + public void start(IFrameWriter writer) throws HyracksDataException, InterruptedException { try { failed = false; tupleForwarder.initialize(ctx, writer); @@ -69,13 +68,24 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl } catch (InterruptedException e) { //TODO: Find out what could cause an interrupted exception beside termination of a job/feed LOGGER.warn("Feed has been interrupted. Closing the feed", e); - Thread.currentThread().interrupt(); + failed = true; + try { + finish(); + } catch (HyracksDataException hde) { + e.addSuppressed(hde); + } + throw e; } catch (Exception e) { failed = true; tupleForwarder.flush(); LOGGER.warn("Failure while operating a feed source", e); throw HyracksDataException.create(e); } + finish(); + } + + private void finish() throws HyracksDataException { + HyracksDataException hde = null; try { tupleForwarder.close(); } catch (Throwable th) { @@ -162,9 +172,12 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl } @Override - public boolean handleException(Throwable th) { + public boolean handleException(Throwable th) throws HyracksDataException { // This is not a parser record. most likely, this error happened in the record reader. - return recordReader.handleException(th); + if (!recordReader.handleException(th)) { + finish(); + } + return closed.get(); } public IRecordReader<T> getReader() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/91756a34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java index d1bde71..8d80e6f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java @@ -32,7 +32,7 @@ public class FeedAdapter implements IDataSourceAdapter { } @Override - public void start(int partition, IFrameWriter writer) throws HyracksDataException { + public void start(int partition, IFrameWriter writer) throws HyracksDataException, InterruptedException { controller.start(writer); } @@ -40,7 +40,7 @@ public class FeedAdapter implements IDataSourceAdapter { return controller.stop(); } - public boolean handleException(Throwable e) { + public boolean handleException(Throwable e) throws HyracksDataException { return controller.handleException(e); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/91756a34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java index 3ab370e..0681d71 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java @@ -33,7 +33,7 @@ public class GenericAdapter implements IDataSourceAdapter { } @Override - public void start(int partition, IFrameWriter writer) throws HyracksDataException { + public void start(int partition, IFrameWriter writer) throws HyracksDataException, InterruptedException { controller.start(writer); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/91756a34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java index 2adce1c..c71b8a2 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java @@ -20,6 +20,7 @@ package org.apache.asterix.external.feed.runtime; import org.apache.asterix.external.dataset.adapter.FeedAdapter; import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.log4j.Logger; /** @@ -43,31 +44,45 @@ public class AdapterExecutor implements Runnable { @Override public void run() { // Start by getting the partition number from the manager - int partition = adapterManager.getPartition(); if (LOGGER.isInfoEnabled()) { - LOGGER.info("Starting ingestion for partition:" + partition); + LOGGER.info("Starting ingestion for partition:" + adapterManager.getPartition()); } + boolean failed = false; + try { + failed = doRun(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + failed = true; + LOGGER.error("Unhandled Exception", e); + } finally { + // Done with the adapter. about to close, setting the stage based on the failed ingestion flag and notifying + // the runtime manager + adapterManager.setFailed(failed); + adapterManager.setDone(true); + synchronized (adapterManager) { + adapterManager.notifyAll(); + } + } + } + + private boolean doRun() throws HyracksDataException, InterruptedException { boolean continueIngestion = true; boolean failedIngestion = false; while (continueIngestion) { try { // Start the adapter - adapter.start(partition, writer); + adapter.start(adapterManager.getPartition(), writer); // Adapter has completed execution continueIngestion = false; + } catch (InterruptedException e) { + throw e; } catch (Exception e) { LOGGER.error("Exception during feed ingestion ", e); continueIngestion = adapter.handleException(e); failedIngestion = !continueIngestion; } } - // Done with the adapter. about to close, setting the stage based on the failed ingestion flag and notifying the - // runtime manager - adapterManager.setFailed(failedIngestion); - adapterManager.setDone(true); - synchronized (adapterManager) { - adapterManager.notifyAll(); - } + return failedIngestion; } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/91756a34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java index 7f5372b..6214d9f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java @@ -19,12 +19,15 @@ package org.apache.asterix.external.feed.runtime; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.asterix.active.EntityId; import org.apache.asterix.external.dataset.adapter.FeedAdapter; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.log4j.Logger; +import org.apache.hyracks.api.exceptions.HyracksDataException; /** * This class manages the execution of an adapter within a feed @@ -43,15 +46,14 @@ public class AdapterRuntimeManager { private final IHyracksTaskContext ctx; - private IngestionRuntime ingestionRuntime; // Runtime representing the ingestion stage of a feed - private Future<?> execution; + private boolean started = false; private volatile boolean done = false; private volatile boolean failed = false; public AdapterRuntimeManager(IHyracksTaskContext ctx, EntityId entityId, FeedAdapter feedAdapter, - IFrameWriter writer, int partition) { + IFrameWriter writer, int partition) { this.ctx = ctx; this.feedId = entityId; this.feedAdapter = feedAdapter; @@ -60,23 +62,42 @@ public class AdapterRuntimeManager { } public void start() { - execution = ctx.getExecutorService().submit(adapterExecutor); + synchronized (adapterExecutor) { + started = true; + if (!done) { + execution = ctx.getExecutorService().submit(adapterExecutor); + } else { + LOGGER.log(Level.WARNING, "Someone stopped me before I even start. I will simply not start"); + } + } } - public void stop() throws InterruptedException { - try { - if (feedAdapter.stop()) { - // stop() returned true, we wait for the process termination - execution.get(); - } else { - // stop() returned false, we try to force shutdown - execution.cancel(true); + public void stop() throws HyracksDataException, InterruptedException { + synchronized (adapterExecutor) { + try { + if (started) { + try { + ctx.getExecutorService().submit(() -> { + if (feedAdapter.stop()) { + execution.get(); + } + return null; + }).get(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.log(Level.WARNING, "Interrupted while trying to stop an adapter runtime", e); + throw e; + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Exception while trying to stop an adapter runtime", e); + throw HyracksDataException.create(e); + } finally { + execution.cancel(true); + } + } else { + LOGGER.log(Level.WARNING, "Adapter executor was stopped before it starts"); + } + } finally { + done = true; } - } catch (InterruptedException e) { - LOGGER.error("Interrupted while waiting for feed adapter to finish its work", e); - throw e; - } catch (Exception exception) { - LOGGER.error("Unable to stop adapter " + feedAdapter, exception); } } @@ -101,10 +122,6 @@ public class AdapterRuntimeManager { return partition; } - public IngestionRuntime getIngestionRuntime() { - return ingestionRuntime; - } - public boolean isFailed() { return failed; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/91756a34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java index 3100704..590af01 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java @@ -24,6 +24,7 @@ import java.util.logging.Logger; import org.apache.asterix.active.ActiveRuntimeId; import org.apache.asterix.active.EntityId; import org.apache.asterix.active.IActiveRuntime; +import org.apache.hyracks.api.exceptions.HyracksDataException; public class IngestionRuntime implements IActiveRuntime { @@ -50,7 +51,7 @@ public class IngestionRuntime implements IActiveRuntime { } @Override - public void stop() throws InterruptedException { + public void stop() throws InterruptedException, HyracksDataException { adapterRuntimeManager.stop(); LOGGER.log(Level.INFO, "Feed " + feedId.getEntityName() + " stopped on partition " + runtimeId); }