Repository: asterixdb
Updated Branches:
  refs/heads/master c2cd8bd93 -> 91756a341


Prevent hangs on active runtime stop

Change-Id: I2e60f633cac8e835dcc7211e87d104ecbb8947b0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1608
Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/91756a34
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/91756a34
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/91756a34

Branch: refs/heads/master
Commit: 91756a341ec47491228e6cdf024e8050806b6518
Parents: c2cd8bd
Author: Abdullah Alamoudi <bamou...@gmail.com>
Authored: Wed Mar 22 13:30:30 2017 -0700
Committer: abdullah alamoudi <bamou...@gmail.com>
Committed: Wed Mar 22 14:28:57 2017 -0700

----------------------------------------------------------------------
 .../adapter/factory/GenericAdapterFactory.java  |  4 ++
 .../external/api/IDataFlowController.java       |  2 +-
 .../external/api/IDataSourceAdapter.java        |  3 +-
 .../AbstractFeedDataFlowController.java         |  2 +-
 .../dataflow/FeedRecordDataFlowController.java  | 23 ++++++--
 .../external/dataset/adapter/FeedAdapter.java   |  4 +-
 .../dataset/adapter/GenericAdapter.java         |  2 +-
 .../external/feed/runtime/AdapterExecutor.java  | 37 ++++++++----
 .../feed/runtime/AdapterRuntimeManager.java     | 61 +++++++++++++-------
 .../external/feed/runtime/IngestionRuntime.java |  3 +-
 10 files changed, 96 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/91756a34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index 577da5e..46da770 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -20,6 +20,8 @@ package org.apache.asterix.external.adapter.factory;
 
 import java.util.List;
 import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IAppRuntimeContext;
 import org.apache.asterix.common.library.ILibraryManager;
@@ -52,6 +54,7 @@ import org.apache.hyracks.api.io.FileSplit;
 public class GenericAdapterFactory implements IIndexingAdapterFactory, 
IAdapterFactory {
 
     private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = 
Logger.getLogger(GenericAdapterFactory.class.getName());
     private IExternalDataSourceFactory dataSourceFactory;
     private IDataParserFactory dataParserFactory;
     private ARecordType recordType;
@@ -90,6 +93,7 @@ public class GenericAdapterFactory implements 
IIndexingAdapterFactory, IAdapterF
         try {
             restoreExternalObjects(appCtx.getLibraryManager());
         } catch (Exception e) {
+            LOGGER.log(Level.INFO, "Failure restoring external objects", e);
             throw HyracksDataException.create(e);
         }
         if (isFeed) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/91756a34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
index 33f262a..def0bf1 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
@@ -24,7 +24,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 public interface IDataFlowController {
 
     //TODO: Refactor this interface. Remove writer from start() signature
-    public void start(IFrameWriter writer) throws HyracksDataException;
+    public void start(IFrameWriter writer) throws HyracksDataException, 
InterruptedException;
 
     public default boolean pause() throws HyracksDataException {
         throw new HyracksDataException("Method not implemented");

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/91756a34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
index e37f2b1..48df79b 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
@@ -21,6 +21,7 @@ package org.apache.asterix.external.api;
 import java.io.Serializable;
 
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * A super interface implemented by a data source adapter. An adapter can be a
@@ -47,5 +48,5 @@ public interface IDataSourceAdapter extends Serializable {
      *            operator using the instance of IFrameWriter.
      * @throws Exception
      */
-    public void start(int partition, IFrameWriter writer) throws Exception;
+    public void start(int partition, IFrameWriter writer) throws 
HyracksDataException, InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/91756a34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index 87ee167..213231b 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -60,5 +60,5 @@ public abstract class AbstractFeedDataFlowController 
implements IDataFlowControl
     @Override
     public abstract boolean stop() throws HyracksDataException;
 
-    public abstract boolean handleException(Throwable th);
+    public abstract boolean handleException(Throwable th) throws 
HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/91756a34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 7ba3ae4..1b12dc1 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -51,8 +51,7 @@ public class FeedRecordDataFlowController<T> extends 
AbstractFeedDataFlowControl
     }
 
     @Override
-    public void start(IFrameWriter writer) throws HyracksDataException {
-        HyracksDataException hde = null;
+    public void start(IFrameWriter writer) throws HyracksDataException, 
InterruptedException {
         try {
             failed = false;
             tupleForwarder.initialize(ctx, writer);
@@ -69,13 +68,24 @@ public class FeedRecordDataFlowController<T> extends 
AbstractFeedDataFlowControl
         } catch (InterruptedException e) {
             //TODO: Find out what could cause an interrupted exception beside 
termination of a job/feed
             LOGGER.warn("Feed has been interrupted. Closing the feed", e);
-            Thread.currentThread().interrupt();
+            failed = true;
+            try {
+                finish();
+            } catch (HyracksDataException hde) {
+                e.addSuppressed(hde);
+            }
+            throw e;
         } catch (Exception e) {
             failed = true;
             tupleForwarder.flush();
             LOGGER.warn("Failure while operating a feed source", e);
             throw HyracksDataException.create(e);
         }
+        finish();
+    }
+
+    private void finish() throws HyracksDataException {
+        HyracksDataException hde = null;
         try {
             tupleForwarder.close();
         } catch (Throwable th) {
@@ -162,9 +172,12 @@ public class FeedRecordDataFlowController<T> extends 
AbstractFeedDataFlowControl
     }
 
     @Override
-    public boolean handleException(Throwable th) {
+    public boolean handleException(Throwable th) throws HyracksDataException {
         // This is not a parser record. most likely, this error happened in 
the record reader.
-        return recordReader.handleException(th);
+        if (!recordReader.handleException(th)) {
+            finish();
+        }
+        return closed.get();
     }
 
     public IRecordReader<T> getReader() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/91756a34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index d1bde71..8d80e6f 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -32,7 +32,7 @@ public class FeedAdapter implements IDataSourceAdapter {
     }
 
     @Override
-    public void start(int partition, IFrameWriter writer) throws 
HyracksDataException {
+    public void start(int partition, IFrameWriter writer) throws 
HyracksDataException, InterruptedException {
         controller.start(writer);
     }
 
@@ -40,7 +40,7 @@ public class FeedAdapter implements IDataSourceAdapter {
         return controller.stop();
     }
 
-    public boolean handleException(Throwable e) {
+    public boolean handleException(Throwable e) throws HyracksDataException {
         return controller.handleException(e);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/91756a34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
index 3ab370e..0681d71 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
@@ -33,7 +33,7 @@ public class GenericAdapter implements IDataSourceAdapter {
     }
 
     @Override
-    public void start(int partition, IFrameWriter writer) throws 
HyracksDataException {
+    public void start(int partition, IFrameWriter writer) throws 
HyracksDataException, InterruptedException {
         controller.start(writer);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/91756a34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
index 2adce1c..c71b8a2 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
@@ -20,6 +20,7 @@ package org.apache.asterix.external.feed.runtime;
 
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.log4j.Logger;
 
 /**
@@ -43,31 +44,45 @@ public class AdapterExecutor implements Runnable {
     @Override
     public void run() {
         // Start by getting the partition number from the manager
-        int partition = adapterManager.getPartition();
         if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("Starting ingestion for partition:" + partition);
+            LOGGER.info("Starting ingestion for partition:" + 
adapterManager.getPartition());
         }
+        boolean failed = false;
+        try {
+            failed = doRun();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        } catch (Exception e) {
+            failed = true;
+            LOGGER.error("Unhandled Exception", e);
+        } finally {
+            // Done with the adapter. about to close, setting the stage based 
on the failed ingestion flag and notifying
+            // the runtime manager
+            adapterManager.setFailed(failed);
+            adapterManager.setDone(true);
+            synchronized (adapterManager) {
+                adapterManager.notifyAll();
+            }
+        }
+    }
+
+    private boolean doRun() throws HyracksDataException, InterruptedException {
         boolean continueIngestion = true;
         boolean failedIngestion = false;
         while (continueIngestion) {
             try {
                 // Start the adapter
-                adapter.start(partition, writer);
+                adapter.start(adapterManager.getPartition(), writer);
                 // Adapter has completed execution
                 continueIngestion = false;
+            } catch (InterruptedException e) {
+                throw e;
             } catch (Exception e) {
                 LOGGER.error("Exception during feed ingestion ", e);
                 continueIngestion = adapter.handleException(e);
                 failedIngestion = !continueIngestion;
             }
         }
-        // Done with the adapter. about to close, setting the stage based on 
the failed ingestion flag and notifying the
-        // runtime manager
-        adapterManager.setFailed(failedIngestion);
-        adapterManager.setDone(true);
-        synchronized (adapterManager) {
-            adapterManager.notifyAll();
-        }
+        return failedIngestion;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/91756a34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
index 7f5372b..6214d9f 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
@@ -19,12 +19,15 @@
 package org.apache.asterix.external.feed.runtime;
 
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.log4j.Logger;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * This class manages the execution of an adapter within a feed
@@ -43,15 +46,14 @@ public class AdapterRuntimeManager {
 
     private final IHyracksTaskContext ctx;
 
-    private IngestionRuntime ingestionRuntime; // Runtime representing the 
ingestion stage of a feed
-
     private Future<?> execution;
 
+    private boolean started = false;
     private volatile boolean done = false;
     private volatile boolean failed = false;
 
     public AdapterRuntimeManager(IHyracksTaskContext ctx, EntityId entityId, 
FeedAdapter feedAdapter,
-                                 IFrameWriter writer, int partition) {
+            IFrameWriter writer, int partition) {
         this.ctx = ctx;
         this.feedId = entityId;
         this.feedAdapter = feedAdapter;
@@ -60,23 +62,42 @@ public class AdapterRuntimeManager {
     }
 
     public void start() {
-        execution = ctx.getExecutorService().submit(adapterExecutor);
+        synchronized (adapterExecutor) {
+            started = true;
+            if (!done) {
+                execution = ctx.getExecutorService().submit(adapterExecutor);
+            } else {
+                LOGGER.log(Level.WARNING, "Someone stopped me before I even 
start. I will simply not start");
+            }
+        }
     }
 
-    public void stop() throws InterruptedException {
-        try {
-            if (feedAdapter.stop()) {
-                // stop() returned true, we wait for the process termination
-                execution.get();
-            } else {
-                // stop() returned false, we try to force shutdown
-                execution.cancel(true);
+    public void stop() throws HyracksDataException, InterruptedException {
+        synchronized (adapterExecutor) {
+            try {
+                if (started) {
+                    try {
+                        ctx.getExecutorService().submit(() -> {
+                            if (feedAdapter.stop()) {
+                                execution.get();
+                            }
+                            return null;
+                        }).get(30, TimeUnit.SECONDS);
+                    } catch (InterruptedException e) {
+                        LOGGER.log(Level.WARNING, "Interrupted while trying to 
stop an adapter runtime", e);
+                        throw e;
+                    } catch (Exception e) {
+                        LOGGER.log(Level.WARNING, "Exception while trying to 
stop an adapter runtime", e);
+                        throw HyracksDataException.create(e);
+                    } finally {
+                        execution.cancel(true);
+                    }
+                } else {
+                    LOGGER.log(Level.WARNING, "Adapter executor was stopped 
before it starts");
+                }
+            } finally {
+                done = true;
             }
-        } catch (InterruptedException e) {
-            LOGGER.error("Interrupted while waiting for feed adapter to finish 
its work", e);
-            throw e;
-        } catch (Exception exception) {
-            LOGGER.error("Unable to stop adapter " + feedAdapter, exception);
         }
     }
 
@@ -101,10 +122,6 @@ public class AdapterRuntimeManager {
         return partition;
     }
 
-    public IngestionRuntime getIngestionRuntime() {
-        return ingestionRuntime;
-    }
-
     public boolean isFailed() {
         return failed;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/91756a34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
index 3100704..590af01 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
@@ -24,6 +24,7 @@ import java.util.logging.Logger;
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveRuntime;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class IngestionRuntime implements IActiveRuntime {
 
@@ -50,7 +51,7 @@ public class IngestionRuntime implements IActiveRuntime {
     }
 
     @Override
-    public void stop() throws InterruptedException {
+    public void stop() throws InterruptedException, HyracksDataException {
         adapterRuntimeManager.stop();
         LOGGER.log(Level.INFO, "Feed " + feedId.getEntityName() + " stopped on 
partition " + runtimeId);
     }

Reply via email to