[ASTERIXDB-2008][CLUS] Only add pending removal if node known

[ASTERIXDB-2023][ING] Introduce Enums instead of using bytes

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- Only nodes which are known to cluster manager are added
to the list of nodes pending removal. Other nodes are ignored
- Enums introduced:
  - ActiveEvent.Kind
  - ActivePartitionMessage.Event
- Remove AdapterRuntimeManager
- Remove AdapterExecutor

Change-Id: I7044896559798426c04a3f46861bc5335b25d140
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1921
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhub...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/07075667
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/07075667
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/07075667

Branch: refs/heads/master
Commit: 07075667c39d1b44939dfb83389de2626b21d8b7
Parents: 3f66d09
Author: Abdullah Alamoudi <bamou...@gmail.com>
Authored: Mon Aug 7 22:28:36 2017 -0700
Committer: Murtadha Hubail <mhub...@apache.org>
Committed: Tue Aug 8 04:45:08 2017 -0700

----------------------------------------------------------------------
 asterixdb/asterix-active/pom.xml                |   4 -
 .../apache/asterix/active/ActiveManager.java    |  32 ++--
 .../ActiveSourceOperatorNodePushable.java       |   7 +-
 .../active/SingleThreadEventProcessor.java      |  32 ++--
 .../active/message/ActiveManagerMessage.java    |  12 +-
 .../active/message/ActivePartitionMessage.java  |  14 +-
 .../active/message/StatsRequestMessage.java     |   4 +-
 .../AppRuntimeContextProviderForRecovery.java   |   5 +-
 .../app/active/ActiveEntityEventsListener.java  |  48 +++---
 .../app/active/ActiveNotificationHandler.java   |  77 ++++-----
 .../asterix/app/active/FeedEventsListener.java  |   2 +-
 .../message/ExecuteStatementRequestMessage.java |   1 -
 .../asterix/app/nc/NCAppRuntimeContext.java     |   9 +-
 .../asterix/app/result/ResultPrinter.java       |   2 +-
 .../apache/asterix/utils/FeedOperations.java    |   8 +-
 .../org/apache/asterix/test/active/Action.java  |  10 +-
 .../asterix/test/active/ActiveStatsTest.java    |   5 +-
 .../org/apache/asterix/test/active/Actor.java   |   2 +-
 .../test/active/TestNodeControllerActor.java    |  11 +-
 .../asterix/test/common/TestExecutor.java       |  52 +++---
 .../results/api/feed-stats/feed-stats.1.adm     |   3 +-
 .../common/context/DatasetLifecycleManager.java |   5 +-
 .../asterix/common/exceptions/ErrorCode.java    |   1 +
 .../asterix/common/transactions/Checkpoint.java |   4 +-
 .../IAppRuntimeContextProvider.java             |   5 +-
 .../asterix/common/utils/StoragePathUtil.java   |   2 +-
 .../main/resources/asx_errormsg/en.properties   |   1 +
 .../adapter/factory/LookupAdapterFactory.java   |   2 +-
 .../external/api/IDataFlowController.java       |  12 +-
 .../external/api/IDataSourceAdapter.java        |   4 +-
 .../AbstractFeedDataFlowController.java         |   9 --
 .../dataflow/FeedRecordDataFlowController.java  | 160 ++++++++++++-------
 .../dataflow/FeedStreamDataFlowController.java  |  22 ++-
 .../external/dataflow/FeedTupleForwarder.java   |   2 +-
 .../dataflow/IndexingDataFlowController.java    |   6 +-
 .../dataflow/RateControlledTupleForwarder.java  |   3 +-
 .../external/dataset/adapter/FeedAdapter.java   |   9 +-
 .../dataset/adapter/GenericAdapter.java         |   1 -
 .../external/feed/runtime/AdapterExecutor.java  |  98 ------------
 .../feed/runtime/AdapterRuntimeManager.java     | 144 -----------------
 .../external/input/HDFSDataSourceFactory.java   |   2 +-
 .../input/stream/LocalFSInputStream.java        |  18 ++-
 .../factory/SocketClientInputStreamFactory.java |   2 +-
 .../factory/SocketServerInputStreamFactory.java |   2 +-
 .../factory/TwitterFirehoseStreamFactory.java   |   2 +-
 .../external/library/java/JObjectAccessors.java |  23 +--
 ...rnalIndexBulkModifyOperatorNodePushable.java |   2 +-
 .../ExternalLookupOperatorDescriptor.java       |   7 +-
 .../operators/FeedIntakeOperatorDescriptor.java |   5 +-
 .../FeedIntakeOperatorNodePushable.java         |  98 +++++++-----
 .../classad/BuiltinClassAdFunctions.java        |   4 +-
 .../library/adapter/TestTypedAdapter.java       |   7 -
 .../adapter/TestTypedAdapterFactory.java        |   4 +-
 .../asterix/builders/AbstractListBuilder.java   |   6 +-
 .../records/RecordAddFieldsDescriptor.java      |  30 ++--
 .../std/FlushDatasetOperatorDescriptor.java     |   6 +-
 .../transaction/GlobalResourceIdFactory.java    |   4 +-
 .../runtime/utils/ClusterStateManager.java      |   5 +-
 .../LockThenSearchOperationCallback.java        |   6 +-
 ...maryIndexInstantSearchOperationCallback.java |   6 +-
 ...imaryIndexModificationOperationCallback.java |   4 +-
 ...dexModificationOperationCallbackFactory.java |   2 +-
 ...dexModificationOperationCallbackFactory.java |   2 +-
 .../locking/TestRuntimeContextProvider.java     |   6 +-
 .../AlgebricksAbsolutePartitionConstraint.java  |   8 +
 .../operators/std/SplitOperatorDescriptor.java  |   6 +-
 .../std/StringStreamingRuntimeFactory.java      |   7 +-
 .../org/apache/hyracks/api/util/IoUtil.java     |  37 ++++-
 .../hyracks/api/test/FrameWriterTestUtils.java  |   5 +
 .../cc/dataset/DatasetDirectoryService.java     |   6 +
 .../control/cc/executor/JobExecutor.java        |  51 +++---
 .../control/cc/work/TaskFailureWork.java        |   4 +
 .../nc/dataset/DatasetPartitionWriter.java      |   2 +-
 .../apache/hyracks/control/nc/io/IOManager.java |   4 +-
 .../control/nc/work/ApplicationMessageWork.java |   5 +-
 .../control/nc/work/NotifyTaskCompleteWork.java |  11 +-
 .../control/nc/work/NotifyTaskFailureWork.java  |   3 +-
 .../common/comm/io/ArrayTupleBuilder.java       |   4 +-
 .../AbstractReplicateOperatorDescriptor.java    |  10 +-
 .../std/collectors/InputChannelFrameReader.java |   3 +-
 .../LocalityAwarePartitionDataWriter.java       |   4 +-
 .../MToNBroadcastConnectorDescriptor.java       |   6 +-
 .../file/FrameFileWriterOperatorDescriptor.java |   4 +-
 .../file/PlainFileWriterOperatorDescriptor.java |   6 +-
 ...eflectionBasedDeserializedMapperFactory.java |   6 +-
 .../org/apache/hyracks/hdfs/ContextFactory.java |   5 +-
 .../org/apache/hyracks/hdfs/ContextFactory.java |   5 +-
 .../hyracks/hdfs/dataflow/ConfFactory.java      |   4 +-
 .../dataflow/HDFSWriteOperatorDescriptor.java   |   3 +-
 .../hdfs/dataflow/InputSplitsFactory.java       |   2 +-
 .../hyracks/hdfs2/dataflow/ConfFactory.java     |   5 +-
 .../hdfs2/dataflow/FileSplitsFactory.java       |   5 +-
 .../dataflow/HDFSWriteOperatorDescriptor.java   |   3 +-
 .../org/apache/hyracks/ipc/impl/IPCSystem.java  |   1 +
 .../IndexSearchOperatorNodePushable.java        |   4 +-
 .../freepage/MutableArrayValueReference.java    |   7 +
 .../lsm/btree/impls/LSMBTreeDiskComponent.java  |   5 +
 .../impls/LSMBTreeWithBuddyDiskComponent.java   |   4 +
 .../storage/am/lsm/common/impls/LSMHarness.java |  16 +-
 .../lsm/common/utils/ComponentMetadataUtil.java |  23 ++-
 .../impls/LSMInvertedIndexDiskComponent.java    |   5 +
 .../ondisk/OnDiskInvertedIndex.java             |   4 +-
 .../lsm/rtree/impls/LSMRTreeDiskComponent.java  |   5 +
 103 files changed, 665 insertions(+), 716 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-active/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml
index 3dd24b6..6568795 100644
--- a/asterixdb/asterix-active/pom.xml
+++ b/asterixdb/asterix-active/pom.xml
@@ -31,10 +31,6 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>log4j</groupId>
-      <artifactId>log4j</artifactId>
-    </dependency>
-    <dependency>
       <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-api</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index fcf2be9..c0717b9 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -23,14 +23,16 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.active.message.ActiveStatsResponse;
 import org.apache.asterix.active.message.StatsRequestMessage;
-import org.apache.asterix.common.api.ThreadExecutor;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.memory.ConcurrentFramePool;
@@ -38,21 +40,20 @@ import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.nc.NodeControllerService;
-import org.apache.log4j.Logger;
 
 public class ActiveManager {
 
     private static final Logger LOGGER = 
Logger.getLogger(ActiveManager.class.getName());
     private static final int SHUTDOWN_TIMEOUT_SECS = 60;
 
-    private final ThreadExecutor executor;
+    private final ExecutorService executor;
     private final ConcurrentMap<ActiveRuntimeId, IActiveRuntime> runtimes;
     private final ConcurrentFramePool activeFramePool;
     private final String nodeId;
     private final INCServiceContext serviceCtx;
     private volatile boolean shutdown;
 
-    public ActiveManager(ThreadExecutor executor, String nodeId, long 
activeMemoryBudget, int frameSize,
+    public ActiveManager(ExecutorService executor, String nodeId, long 
activeMemoryBudget, int frameSize,
             INCServiceContext serviceCtx) throws HyracksDataException {
         this.executor = executor;
         this.nodeId = nodeId;
@@ -86,15 +87,16 @@ public class ActiveManager {
     }
 
     public void submit(ActiveManagerMessage message) throws 
HyracksDataException {
+        LOGGER.log(Level.INFO, "Message of type " + message.getKind() + " 
received in " + nodeId);
         switch (message.getKind()) {
-            case ActiveManagerMessage.STOP_ACTIVITY:
+            case STOP_ACTIVITY:
                 stopRuntime(message);
                 break;
-            case ActiveManagerMessage.REQUEST_STATS:
+            case REQUEST_STATS:
                 requestStats((StatsRequestMessage) message);
                 break;
             default:
-                LOGGER.warn("Unknown message type received: " + 
message.getKind());
+                LOGGER.warning("Unknown message type received: " + 
message.getKind());
         }
     }
 
@@ -104,7 +106,7 @@ public class ActiveManager {
             IActiveRuntime runtime = runtimes.get(runtimeId);
             long reqId = message.getReqId();
             if (runtime == null) {
-                LOGGER.warn("Request stats of a runtime that is not registered 
" + runtimeId);
+                LOGGER.warning("Request stats of a runtime that is not 
registered " + runtimeId);
                 // Send a failure message
                 ((NodeControllerService) serviceCtx.getControllerService())
                         .sendApplicationMessageToCC(
@@ -124,7 +126,7 @@ public class ActiveManager {
     }
 
     public void shutdown() {
-        LOGGER.warn("Shutting down ActiveManager on node " + nodeId);
+        LOGGER.warning("Shutting down ActiveManager on node " + nodeId);
         Map<ActiveRuntimeId, Future<Void>> stopFutures = new HashMap<>();
         shutdown = true;
         runtimes.forEach((runtimeId, runtime) -> stopFutures.put(runtimeId, 
executor.submit(() -> {
@@ -136,29 +138,29 @@ public class ActiveManager {
             try {
                 entry.getValue().get(SHUTDOWN_TIMEOUT_SECS, TimeUnit.SECONDS);
             } catch (InterruptedException e) {
-                LOGGER.warn("Interrupted waiting to stop runtime: " + 
entry.getKey());
+                LOGGER.warning("Interrupted waiting to stop runtime: " + 
entry.getKey());
                 Thread.currentThread().interrupt();
             } catch (ExecutionException e) {
-                LOGGER.warn("Exception while stopping runtime: " + 
entry.getKey(), e);
+                LOGGER.log(Level.WARNING, "Exception while stopping runtime: " 
+ entry.getKey(), e);
             } catch (TimeoutException e) {
-                LOGGER.warn("Timed out waiting to stop runtime: " + 
entry.getKey(), e);
+                LOGGER.log(Level.WARNING, "Timed out waiting to stop runtime: 
" + entry.getKey(), e);
             }
         });
-        LOGGER.warn("Shutdown ActiveManager on node " + nodeId + " complete");
+        LOGGER.warning("Shutdown ActiveManager on node " + nodeId + " 
complete");
     }
 
     private void stopRuntime(ActiveManagerMessage message) {
         ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload();
         IActiveRuntime runtime = runtimes.get(runtimeId);
         if (runtime == null) {
-            LOGGER.warn("Request to stop a runtime that is not registered " + 
runtimeId);
+            LOGGER.warning("Request to stop a runtime that is not registered " 
+ runtimeId);
         } else {
             executor.execute(() -> {
                 try {
                     stopIfRunning(runtimeId, runtime);
                 } catch (Exception e) {
                     // TODO(till) Figure out a better way to handle failure to 
stop a runtime
-                    LOGGER.warn("Failed to stop runtime: " + runtimeId, e);
+                    LOGGER.log(Level.WARNING, "Failed to stop runtime: " + 
runtimeId, e);
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index a7d7796..27ecb52 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -22,6 +22,7 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage.Event;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -36,6 +37,7 @@ public abstract class ActiveSourceOperatorNodePushable 
extends AbstractUnaryOutp
     protected final IHyracksTaskContext ctx;
     protected final ActiveManager activeManager;
     /** A unique identifier for the runtime **/
+    protected Thread taskThread;
     protected final ActiveRuntimeId runtimeId;
     private volatile boolean done = false;
 
@@ -85,11 +87,12 @@ public abstract class ActiveSourceOperatorNodePushable 
extends AbstractUnaryOutp
     @Override
     public final void initialize() throws HyracksDataException {
         LOGGER.log(Level.INFO, "initialize() called on 
ActiveSourceOperatorNodePushable");
+        taskThread = Thread.currentThread();
         activeManager.registerRuntime(this);
         try {
             // notify cc that runtime has been registered
             ctx.sendApplicationMessageToCC(new 
ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
-                    ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null), 
null);
+                    Event.RUNTIME_REGISTERED, null), null);
             start();
         } catch (InterruptedException e) {
             LOGGER.log(Level.INFO, "initialize() interrupted on 
ActiveSourceOperatorNodePushable", e);
@@ -112,7 +115,7 @@ public abstract class ActiveSourceOperatorNodePushable 
extends AbstractUnaryOutp
         activeManager.deregisterRuntime(runtimeId);
         try {
             ctx.sendApplicationMessageToCC(new 
ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
-                    ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED, null), 
null);
+                    Event.RUNTIME_DEREGISTERED, null), null);
         } catch (Exception e) {
             LOGGER.log(Level.INFO, "deinitialize() failed on 
ActiveSourceOperatorNodePushable", e);
             throw HyracksDataException.create(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
index 0a36216..de6682d 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
@@ -18,15 +18,12 @@
  */
 package org.apache.asterix.active;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public abstract class SingleThreadEventProcessor<T> implements Runnable {
@@ -34,20 +31,20 @@ public abstract class SingleThreadEventProcessor<T> 
implements Runnable {
     private static final Logger LOGGER = 
Logger.getLogger(SingleThreadEventProcessor.class.getName());
     private final String name;
     private final LinkedBlockingQueue<T> eventInbox;
-    private final ExecutorService executorService;
-    private final Future<?> future;
+    private volatile Thread executorThread;
+    private volatile boolean stopped = false;
 
     public SingleThreadEventProcessor(String threadName) {
         this.name = threadName;
         eventInbox = new LinkedBlockingQueue<>();
-        executorService = Executors.newSingleThreadExecutor(r -> new Thread(r, 
threadName));
-        future = executorService.submit(this);
+        executorThread = new Thread(this, threadName);
+        executorThread.start();
     }
 
     @Override
     public final void run() {
         LOGGER.log(Level.INFO, "Started " + Thread.currentThread().getName());
-        while (!Thread.currentThread().isInterrupted()) {
+        while (!stopped) {
             try {
                 T event = eventInbox.take();
                 handle(event);
@@ -69,10 +66,19 @@ public abstract class SingleThreadEventProcessor<T> 
implements Runnable {
     }
 
     public void stop() throws HyracksDataException, InterruptedException {
-        future.cancel(true);
-        executorService.shutdown();
-        if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
-            throw 
HyracksDataException.create(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name);
+        stopped = true;
+        executorThread.interrupt();
+        executorThread.join(1000);
+        int attempt = 0;
+        while (executorThread.isAlive()) {
+            attempt++;
+            LOGGER.log(Level.WARNING,
+                    "Failed to stop event processor after " + attempt + " 
attempts. Interrupted exception swallowed?");
+            if (attempt == 10) {
+                throw new 
RuntimeDataException(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name);
+            }
+            executorThread.interrupt();
+            executorThread.join(1000);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index 9772698..bef418b 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -26,14 +26,16 @@ import 
org.apache.asterix.common.messaging.api.INcAddressedMessage;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class ActiveManagerMessage implements INcAddressedMessage {
-    public static final byte STOP_ACTIVITY = 0x00;
-    public static final byte REQUEST_STATS = 0x01;
+    public enum Kind {
+        STOP_ACTIVITY,
+        REQUEST_STATS
+    }
 
     private static final long serialVersionUID = 1L;
-    private final byte kind;
+    private final Kind kind;
     private final Serializable payload;
 
-    public ActiveManagerMessage(byte kind, Serializable payload) {
+    public ActiveManagerMessage(Kind kind, Serializable payload) {
         this.kind = kind;
         this.payload = payload;
     }
@@ -42,7 +44,7 @@ public class ActiveManagerMessage implements 
INcAddressedMessage {
         return payload;
     }
 
-    public byte getKind() {
+    public Kind getKind() {
         return kind;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index a47d5a5..9ace417 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -29,17 +29,19 @@ import 
org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 
 public class ActivePartitionMessage implements ICcAddressedMessage {
+    public enum Event {
+        RUNTIME_REGISTERED,
+        RUNTIME_DEREGISTERED,
+        GENERIC_EVENT
+    }
 
     private static final long serialVersionUID = 1L;
-    public static final byte ACTIVE_RUNTIME_REGISTERED = 0x00;
-    public static final byte ACTIVE_RUNTIME_DEREGISTERED = 0x01;
-    public static final byte GENERIC_EVENT = 0x02;
     private final ActiveRuntimeId activeRuntimeId;
     private final JobId jobId;
     private final Serializable payload;
-    private final byte event;
+    private final Event event;
 
-    public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId 
jobId, byte event, Serializable payload) {
+    public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId 
jobId, Event event, Serializable payload) {
         this.activeRuntimeId = activeRuntimeId;
         this.jobId = jobId;
         this.event = event;
@@ -58,7 +60,7 @@ public class ActivePartitionMessage implements 
ICcAddressedMessage {
         return payload;
     }
 
-    public byte getEvent() {
+    public Event getEvent() {
         return event;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
index 8fa5f19..d43f00e 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
@@ -24,8 +24,8 @@ public class StatsRequestMessage extends ActiveManagerMessage 
{
     private static final long serialVersionUID = 1L;
     private final long reqId;
 
-    public StatsRequestMessage(byte kind, Serializable payload, long reqId) {
-        super(kind, payload);
+    public StatsRequestMessage(Serializable payload, long reqId) {
+        super(Kind.REQUEST_STATS, payload);
         this.reqId = reqId;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java
index 1fea840..18ef143 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java
@@ -18,10 +18,11 @@
  */
 package org.apache.asterix.api.common;
 
+import java.util.concurrent.ExecutorService;
+
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.api.ThreadExecutor;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.hyracks.api.io.IIOManager;
@@ -84,7 +85,7 @@ public class AppRuntimeContextProviderForRecovery implements 
IAppRuntimeContextP
     }
 
     @Override
-    public ThreadExecutor getThreadExecutor() {
+    public ExecutorService getThreadExecutor() {
         return asterixAppRuntimeContext.getThreadExecutor();
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 995e372..acb1614 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -38,8 +38,8 @@ import org.apache.asterix.active.IActiveEntityEventSubscriber;
 import org.apache.asterix.active.IRetryPolicy;
 import org.apache.asterix.active.IRetryPolicyFactory;
 import org.apache.asterix.active.NoRetryPolicyFactory;
-import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage.Event;
 import org.apache.asterix.active.message.StatsRequestMessage;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.api.IMetadataLockManager;
@@ -68,6 +68,7 @@ import org.apache.hyracks.api.job.JobStatus;
 public abstract class ActiveEntityEventsListener implements 
IActiveEntityController {
 
     private static final Logger LOGGER = 
Logger.getLogger(ActiveEntityEventsListener.class.getName());
+    private static final Level level = Level.INFO;
     private static final ActiveEvent STATE_CHANGED = new ActiveEvent(null, 
Kind.STATE_CHANGED, null, null);
     private static final EnumSet<ActivityState> TRANSITION_STATES = 
EnumSet.of(ActivityState.RESUMING,
             ActivityState.STARTING, ActivityState.STOPPING, 
ActivityState.RECOVERING);
@@ -130,7 +131,7 @@ public abstract class ActiveEntityEventsListener implements 
IActiveEntityControl
     }
 
     protected synchronized void setState(ActivityState newState) {
-        LOGGER.log(Level.FINE, "State is being set to " + newState + " from " 
+ state);
+        LOGGER.log(level, "State of " + getEntityId() + "is being set to " + 
newState + " from " + state);
         this.prevState = state;
         this.state = newState;
         if (newState == ActivityState.SUSPENDED) {
@@ -142,7 +143,7 @@ public abstract class ActiveEntityEventsListener implements 
IActiveEntityControl
     @Override
     public synchronized void notify(ActiveEvent event) {
         try {
-            LOGGER.fine("EventListener is notified.");
+            LOGGER.log(level, "EventListener is notified.");
             ActiveEvent.Kind eventKind = event.getEventKind();
             switch (eventKind) {
                 case JOB_CREATED:
@@ -172,22 +173,24 @@ public abstract class ActiveEntityEventsListener 
implements IActiveEntityControl
     }
 
     protected synchronized void handle(ActivePartitionMessage message) {
-        if (message.getEvent() == 
ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED) {
+        if (message.getEvent() == Event.RUNTIME_REGISTERED) {
             numRegistered++;
             if (numRegistered == locations.getLocations().length) {
                 setState(ActivityState.RUNNING);
             }
-        } else if (message.getEvent() == 
ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED) {
+        } else if (message.getEvent() == Event.RUNTIME_DEREGISTERED) {
             numRegistered--;
         }
     }
 
     @SuppressWarnings("unchecked")
     protected void finish(ActiveEvent event) throws HyracksDataException {
+        LOGGER.log(level, "the job " + jobId + " finished");
         jobId = null;
         Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, 
List<Exception>>) event.getEventObject();
         JobStatus jobStatus = status.getLeft();
         List<Exception> exceptions = status.getRight();
+        LOGGER.log(level, "The job finished with status: " + jobStatus);
         if (jobStatus.equals(JobStatus.FAILURE)) {
             jobFailure = exceptions.isEmpty() ? new 
RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
                     : exceptions.get(0);
@@ -271,10 +274,10 @@ public abstract class ActiveEntityEventsListener 
implements IActiveEntityControl
     @SuppressWarnings("unchecked")
     @Override
     public void refreshStats(long timeout) throws HyracksDataException {
-        LOGGER.log(Level.FINE, "refreshStats called");
+        LOGGER.log(level, "refreshStats called");
         synchronized (this) {
             if (state != ActivityState.RUNNING || isFetchingStats) {
-                LOGGER.log(Level.FINE,
+                LOGGER.log(level,
                         "returning immediately since state = " + state + " and 
fetchingStats = " + isFetchingStats);
                 return;
             } else {
@@ -287,8 +290,7 @@ public abstract class ActiveEntityEventsListener implements 
IActiveEntityControl
         List<INcAddressedMessage> requests = new ArrayList<>();
         List<String> ncs = Arrays.asList(locations.getLocations());
         for (int i = 0; i < ncs.size(); i++) {
-            requests.add(new 
StatsRequestMessage(ActiveManagerMessage.REQUEST_STATS,
-                    new ActiveRuntimeId(entityId, runtimeName, i), reqId));
+            requests.add(new StatsRequestMessage(new ActiveRuntimeId(entityId, 
runtimeName, i), reqId));
         }
         try {
             List<String> responses = (List<String>) 
messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout);
@@ -348,32 +350,32 @@ public abstract class ActiveEntityEventsListener 
implements IActiveEntityControl
 
     @Override
     public synchronized void recover() throws HyracksDataException {
-        LOGGER.log(Level.FINE, "Recover is called on " + entityId);
+        LOGGER.log(level, "Recover is called on " + entityId);
         if (recoveryTask != null) {
-            LOGGER.log(Level.FINE, "But recovery task for " + entityId + " is 
already there!! throwing an exception");
+            LOGGER.log(level, "But recovery task for " + entityId + " is 
already there!! throwing an exception");
             throw new RuntimeDataException(ErrorCode.DOUBLE_RECOVERY_ATTEMPTS);
         }
         if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
-            LOGGER.log(Level.FINE, "But it has no recovery policy, so it is 
set to permanent failure");
+            LOGGER.log(level, "But it has no recovery policy, so it is set to 
permanent failure");
             setState(ActivityState.PERMANENTLY_FAILED);
         } else {
             ExecutorService executor = 
appCtx.getServiceContext().getControllerService().getExecutor();
             IRetryPolicy policy = retryPolicyFactory.create(this);
             cancelRecovery = false;
             setState(ActivityState.TEMPORARILY_FAILED);
-            LOGGER.log(Level.FINE, "Recovery task has been submitted");
+            LOGGER.log(level, "Recovery task has been submitted");
             recoveryTask = executor.submit(() -> doRecover(policy));
         }
     }
 
     protected Void doRecover(IRetryPolicy policy)
             throws AlgebricksException, HyracksDataException, 
InterruptedException {
-        LOGGER.log(Level.FINE, "Actual Recovery task has started");
+        LOGGER.log(level, "Actual Recovery task has started");
         if (getState() != ActivityState.TEMPORARILY_FAILED) {
-            LOGGER.log(Level.FINE, "but its state is not temp failure and so 
we're just returning");
+            LOGGER.log(level, "but its state is not temp failure and so we're 
just returning");
             return null;
         }
-        LOGGER.log(Level.FINE, "calling the policy");
+        LOGGER.log(level, "calling the policy");
         while (policy.retry()) {
             synchronized (this) {
                 if (cancelRecovery) {
@@ -402,7 +404,7 @@ public abstract class ActiveEntityEventsListener implements 
IActiveEntityControl
                     doStart(metadataProvider);
                     return null;
                 } catch (Exception e) {
-                    LOGGER.log(Level.WARNING, "Attempt to revive " + entityId 
+ " failed", e);
+                    LOGGER.log(level, "Attempt to revive " + entityId + " 
failed", e);
                     setState(ActivityState.TEMPORARILY_FAILED);
                     recoverFailure = e;
                 } finally {
@@ -515,10 +517,10 @@ public abstract class ActiveEntityEventsListener 
implements IActiveEntityControl
         WaitForStateSubscriber subscriber;
         Future<Void> suspendTask;
         synchronized (this) {
-            LOGGER.log(Level.FINE, "suspending entity " + entityId);
-            LOGGER.log(Level.FINE, "Waiting for ongoing activities");
+            LOGGER.log(level, "suspending entity " + entityId);
+            LOGGER.log(level, "Waiting for ongoing activities");
             waitForNonTransitionState();
-            LOGGER.log(Level.FINE, "Proceeding with suspension. Current state 
is " + state);
+            LOGGER.log(level, "Proceeding with suspension. Current state is " 
+ state);
             if (state == ActivityState.STOPPED || state == 
ActivityState.PERMANENTLY_FAILED) {
                 suspended = true;
                 return;
@@ -536,12 +538,12 @@ public abstract class ActiveEntityEventsListener 
implements IActiveEntityControl
                     EnumSet.of(ActivityState.SUSPENDED, 
ActivityState.TEMPORARILY_FAILED));
             suspendTask = 
metadataProvider.getApplicationContext().getServiceContext().getControllerService()
                     .getExecutor().submit(() -> doSuspend(metadataProvider));
-            LOGGER.log(Level.FINE, "Suspension task has been submitted");
+            LOGGER.log(level, "Suspension task has been submitted");
         }
         try {
-            LOGGER.log(Level.FINE, "Waiting for suspension task to complete");
+            LOGGER.log(level, "Waiting for suspension task to complete");
             suspendTask.get();
-            LOGGER.log(Level.FINE, "waiting for state to become SUSPENDED or 
TEMPORARILY_FAILED");
+            LOGGER.log(level, "waiting for state to become SUSPENDED or 
TEMPORARILY_FAILED");
             subscriber.sync();
         } catch (Exception e) {
             synchronized (this) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index c5e5dbb..d36d9b7 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -53,7 +53,7 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
         implements IActiveNotificationHandler, IJobLifecycleListener {
 
     private static final Logger LOGGER = 
Logger.getLogger(ActiveNotificationHandler.class.getName());
-    private static final boolean DEBUG = false;
+    private static final Level level = Level.INFO;
     public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob";
     private final Map<EntityId, IActiveEntityEventsListener> 
entityEventListeners;
     private final Map<JobId, EntityId> jobId2EntityId;
@@ -73,13 +73,13 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
         EntityId entityId = jobId2EntityId.get(event.getJobId());
         if (entityId != null) {
             IActiveEntityEventsListener listener = 
entityEventListeners.get(entityId);
-            LOGGER.log(Level.FINE, "Next event is of type " + 
event.getEventKind());
+            LOGGER.log(level, "Next event is of type " + event.getEventKind());
             if (event.getEventKind() == Kind.JOB_FINISHED) {
-                LOGGER.log(Level.FINE, "Removing the job");
+                LOGGER.log(level, "Removing the job");
                 jobId2EntityId.remove(event.getJobId());
             }
             if (listener != null) {
-                LOGGER.log(Level.FINE, "Notifying the listener");
+                LOGGER.log(level, "Notifying the listener");
                 listener.notify(event);
             }
         } else {
@@ -91,34 +91,30 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
 
     @Override
     public void notifyJobCreation(JobId jobId, JobSpecification 
jobSpecification) throws HyracksDataException {
-        LOGGER.log(Level.FINE,
+        LOGGER.log(level,
                 "notifyJobCreation(JobId jobId, JobSpecification 
jobSpecification) was called with jobId = " + jobId);
         Object property = 
jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
         if (property == null || !(property instanceof EntityId)) {
-            LOGGER.log(Level.FINE, "Job is not of type active job. property 
found to be: " + property);
+            LOGGER.log(level, "Job is not of type active job. property found 
to be: " + property);
             return;
         }
         EntityId entityId = (EntityId) property;
         monitorJob(jobId, entityId);
         boolean found = jobId2EntityId.get(jobId) != null;
-        LOGGER.log(Level.FINE, "Job was found to be: " + (found ? "Active" : 
"Inactive"));
+        LOGGER.log(level, "Job was found to be: " + (found ? "Active" : 
"Inactive"));
         add(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, 
jobSpecification));
     }
 
     private synchronized void monitorJob(JobId jobId, EntityId entityId) {
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING, "monitorJob(JobId jobId, ActiveJob 
activeJob) called with job id: " + jobId);
-            boolean found = jobId2EntityId.get(jobId) != null;
-            LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? 
"Active" : "Inactive"));
-        }
+        LOGGER.log(level, "monitorJob(JobId jobId, ActiveJob activeJob) called 
with job id: " + jobId);
+        boolean found = jobId2EntityId.get(jobId) != null;
+        LOGGER.log(level, "Job was found to be: " + (found ? "Active" : 
"Inactive"));
         if (entityEventListeners.containsKey(entityId)) {
             if (jobId2EntityId.containsKey(jobId)) {
                 LOGGER.severe("Job is already being monitored for job: " + 
jobId);
                 return;
             }
-            if (DEBUG) {
-                LOGGER.log(Level.WARNING, "monitoring started for job id: " + 
jobId);
-            }
+            LOGGER.log(level, "monitoring started for job id: " + jobId);
         } else {
             LOGGER.info("No listener was found for the entity: " + entityId);
         }
@@ -140,9 +136,7 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
         if (entityId != null) {
             add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId, 
Pair.of(jobStatus, exceptions)));
         } else {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("NO NEED TO NOTIFY JOB FINISH!");
-            }
+            LOGGER.log(level, "NO NEED TO NOTIFY JOB FINISH!");
         }
     }
 
@@ -156,20 +150,16 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
 
     @Override
     public IActiveEntityEventsListener getListener(EntityId entityId) {
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId 
entityId) was called with entity " + entityId);
-            IActiveEntityEventsListener listener = 
entityEventListeners.get(entityId);
-            LOGGER.log(Level.WARNING, "Listener found: " + listener);
-        }
+        LOGGER.log(level, "getActiveEntityListener(EntityId entityId) was 
called with entity " + entityId);
+        IActiveEntityEventsListener listener = 
entityEventListeners.get(entityId);
+        LOGGER.log(level, "Listener found: " + listener);
         return entityEventListeners.get(entityId);
     }
 
     @Override
     public synchronized IActiveEntityEventsListener[] getEventListeners() {
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING, "getEventListeners() was called");
-            LOGGER.log(Level.WARNING, "returning " + 
entityEventListeners.size() + " Listeners");
-        }
+        LOGGER.log(level, "getEventListeners() was called");
+        LOGGER.log(level, "returning " + entityEventListeners.size() + " 
Listeners");
         return entityEventListeners.values().toArray(new 
IActiveEntityEventsListener[entityEventListeners.size()]);
     }
 
@@ -178,11 +168,8 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
         if (suspended) {
             throw new 
RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
         }
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING,
-                    "registerListener(IActiveEntityEventsListener listener) 
was called for the entity "
-                            + listener.getEntityId());
-        }
+        LOGGER.log(level, "registerListener(IActiveEntityEventsListener 
listener) was called for the entity "
+                + listener.getEntityId());
         if (entityEventListeners.containsKey(listener.getEntityId())) {
             throw new 
RuntimeDataException(ErrorCode.ACTIVE_ENTITY_IS_ALREADY_REGISTERED, 
listener.getEntityId());
         }
@@ -194,7 +181,7 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
         if (suspended) {
             throw new 
RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
         }
-        LOGGER.log(Level.FINE, "unregisterListener(IActiveEntityEventsListener 
listener) was called for the entity "
+        LOGGER.log(level, "unregisterListener(IActiveEntityEventsListener 
listener) was called for the entity "
                 + listener.getEntityId());
         IActiveEntityEventsListener registeredListener = 
entityEventListeners.remove(listener.getEntityId());
         if (registeredListener == null) {
@@ -221,16 +208,16 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
 
     @Override
     public synchronized void recover() throws HyracksDataException {
-        LOGGER.log(Level.FINE, "Starting active recovery");
+        LOGGER.log(level, "Starting active recovery");
         for (IActiveEntityEventsListener listener : 
entityEventListeners.values()) {
             synchronized (listener) {
-                LOGGER.log(Level.FINE, "Entity " + listener.getEntityId() + " 
is " + listener.getStats());
+                LOGGER.log(level, "Entity " + listener.getEntityId() + " is " 
+ listener.getStats());
                 if (listener.getState() == ActivityState.PERMANENTLY_FAILED
                         && listener instanceof IActiveEntityController) {
-                    LOGGER.log(Level.FINE, "Recovering");
+                    LOGGER.log(level, "Recovering");
                     ((IActiveEntityController) listener).recover();
                 } else {
-                    LOGGER.log(Level.FINE, "Only notifying");
+                    LOGGER.log(level, "Only notifying");
                     listener.notifyAll();
                 }
             }
@@ -243,7 +230,7 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
             if (suspended) {
                 throw new 
RuntimeDataException(ErrorCode.ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED);
             }
-            LOGGER.log(Level.FINE, "Suspending active events handler");
+            LOGGER.log(level, "Suspending active events handler");
             suspended = true;
         }
         IMetadataLockManager lockManager = 
mdProvider.getApplicationContext().getMetadataLockManager();
@@ -253,27 +240,27 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
             // exclusive lock all the datasets
             String dataverseName = listener.getEntityId().getDataverse();
             String entityName = listener.getEntityId().getEntityName();
-            LOGGER.log(Level.FINE, "Suspending " + listener.getEntityId());
-            LOGGER.log(Level.FINE, "Acquiring locks");
+            LOGGER.log(level, "Suspending " + listener.getEntityId());
+            LOGGER.log(level, "Acquiring locks");
             lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), 
dataverseName + '.' + entityName);
             List<Dataset> datasets = ((ActiveEntityEventsListener) 
listener).getDatasets();
             for (Dataset dataset : datasets) {
                 
lockManager.acquireDatasetExclusiveModificationLock(mdProvider.getLocks(),
                         DatasetUtil.getFullyQualifiedName(dataset));
             }
-            LOGGER.log(Level.FINE, "locks acquired");
+            LOGGER.log(level, "locks acquired");
             ((ActiveEntityEventsListener) listener).suspend(mdProvider);
-            LOGGER.log(Level.FINE, listener.getEntityId() + " suspended");
+            LOGGER.log(level, listener.getEntityId() + " suspended");
         }
     }
 
     public void resume(MetadataProvider mdProvider)
             throws AsterixException, HyracksDataException, 
InterruptedException {
-        LOGGER.log(Level.FINE, "Resuming active events handler");
+        LOGGER.log(level, "Resuming active events handler");
         for (IActiveEntityEventsListener listener : 
entityEventListeners.values()) {
-            LOGGER.log(Level.FINE, "Resuming " + listener.getEntityId());
+            LOGGER.log(level, "Resuming " + listener.getEntityId());
             ((ActiveEntityEventsListener) listener).resume(mdProvider);
-            LOGGER.log(Level.FINE, listener.getEntityId() + " resumed");
+            LOGGER.log(level, listener.getEntityId() + " resumed");
         }
         synchronized (this) {
             suspended = false;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index 45c79a0..124e56e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -114,7 +114,7 @@ public class FeedEventsListener extends 
ActiveEntityEventsListener {
     @Override
     protected Void doStop(MetadataProvider metadataProvider) throws 
HyracksDataException, AlgebricksException {
         IActiveEntityEventSubscriber eventSubscriber =
-                new WaitForStateSubscriber(this, 
Collections.singleton(ActivityState.STOPPED));
+                new WaitForStateSubscriber(this, 
EnumSet.of(ActivityState.STOPPED, ActivityState.PERMANENTLY_FAILED));
         try {
             // Construct ActiveMessage
             for (int i = 0; i < getLocations().getLocations().length; i++) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index 9faa9e9..e7919fa 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -137,7 +137,6 @@ public final class ExecuteStatementRequestMessage 
implements ICcAddressedMessage
                 GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected 
exception", e);
                 responseMsg.setError(new Exception(e.toString()));
             }
-
             try {
                 messageBroker.sendApplicationMessageToNC(responseMsg, 
requestNodeId);
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 29bc95e..7647881 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -24,6 +24,8 @@ import java.rmi.server.UnicastRemoteObject;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -31,7 +33,6 @@ import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.api.common.AppRuntimeContextProviderForRecovery;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.api.ThreadExecutor;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.ActiveProperties;
 import org.apache.asterix.common.config.AsterixExtension;
@@ -113,7 +114,7 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
     private ReplicationProperties replicationProperties;
     private MessagingProperties messagingProperties;
     private final NodeProperties nodeProperties;
-    private ThreadExecutor threadExecutor;
+    private ExecutorService threadExecutor;
     private IDatasetLifecycleManager datasetLifecycleManager;
     private IBufferCache bufferCache;
     private ITransactionSubsystem txnSubsystem;
@@ -164,7 +165,7 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
     @Override
     public void initialize(boolean initialRun) throws IOException, 
ACIDException {
         ioManager = getServiceContext().getIoManager();
-        threadExecutor = new 
ThreadExecutor(getServiceContext().getThreadFactory());
+        threadExecutor = 
Executors.newCachedThreadPool(getServiceContext().getThreadFactory());
         ICacheMemoryAllocator allocator = new HeapBufferAllocator();
         IPageCleanerPolicy pcp = new DelayPageCleanerPolicy(600000);
         IPageReplacementStrategy prs = new 
ClockPageReplacementStrategy(allocator,
@@ -383,7 +384,7 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
     }
 
     @Override
-    public ThreadExecutor getThreadExecutor() {
+    public ExecutorService getThreadExecutor() {
         return threadExecutor;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
index 452d13e..56975d1 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
@@ -107,7 +107,7 @@ public class ResultPrinter {
             }
             app.append("\r\n");
         } catch (IOException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 09c4983..9fc9940 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -31,6 +31,7 @@ import java.util.TreeSet;
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.message.ActiveManagerMessage;
+import org.apache.asterix.active.message.ActiveManagerMessage.Kind;
 import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -272,8 +273,9 @@ public class FeedOperations {
             }
 
             // make connections between operators
-            for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, 
Integer>,
-                    Pair<IOperatorDescriptor, Integer>>> entry : 
subJob.getConnectorOperatorMap().entrySet()) {
+            for (Entry<ConnectorDescriptorId,
+                       Pair<Pair<IOperatorDescriptor, Integer>, 
Pair<IOperatorDescriptor, Integer>>> entry
+                       : subJob.getConnectorOperatorMap().entrySet()) {
                 ConnectorDescriptorId newId = 
connectorIdMapping.get(entry.getKey());
                 IConnectorDescriptor connDesc = 
jobSpec.getConnectorMap().get(newId);
                 Pair<IOperatorDescriptor, Integer> leftOp = 
entry.getValue().getLeft();
@@ -381,7 +383,7 @@ public class FeedOperations {
 
     public static void SendStopMessageToNode(ICcApplicationContext appCtx, 
EntityId feedId, String intakeNodeLocation,
             Integer partition) throws Exception {
-        ActiveManagerMessage stopFeedMessage = new 
ActiveManagerMessage(ActiveManagerMessage.STOP_ACTIVITY,
+        ActiveManagerMessage stopFeedMessage = new 
ActiveManagerMessage(Kind.STOP_ACTIVITY,
                 new ActiveRuntimeId(feedId, 
FeedIntakeOperatorNodePushable.class.getSimpleName(), partition));
         SendActiveMessage(appCtx, stopFeedMessage, intakeNodeLocation);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
index 71cb038..74c4364 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
@@ -21,7 +21,7 @@ package org.apache.asterix.test.active;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-abstract class Action {
+public abstract class Action {
     boolean done = false;
     HyracksDataException failure;
 
@@ -39,21 +39,21 @@ abstract class Action {
 
     protected abstract void doExecute(MetadataProvider mdProvider) throws 
Exception;
 
-    boolean hasFailed() {
+    public boolean hasFailed() {
         return failure != null;
     }
 
-    HyracksDataException getFailure() {
+    public HyracksDataException getFailure() {
         return failure;
     }
 
-    synchronized void sync() throws InterruptedException {
+    public synchronized void sync() throws InterruptedException {
         while (!done) {
             wait();
         }
     }
 
-    boolean isDone() {
+    public boolean isDone() {
         return done;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index f8baa0e..e1fdb69 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -30,6 +30,7 @@ import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveRuntime;
 import org.apache.asterix.active.NoRetryPolicyFactory;
 import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage.Event;
 import org.apache.asterix.algebra.base.ILangExtension.Language;
 import org.apache.asterix.app.active.ActiveEntityEventsListener;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
@@ -126,8 +127,8 @@ public class ActiveStatsTest {
         requestedStats = eventsListener.getStats();
         Assert.assertTrue(requestedStats.contains("N/A"));
         // Fake partition message and notify eventListener
-        ActivePartitionMessage partitionMessage = new 
ActivePartitionMessage(activeRuntimeId, jobId,
-                ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null);
+        ActivePartitionMessage partitionMessage =
+                new ActivePartitionMessage(activeRuntimeId, jobId, 
Event.RUNTIME_REGISTERED, null);
         partitionMessage.handle(appCtx);
         start.sync();
         if (start.hasFailed()) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
index 3f68651..8d21b55 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
@@ -21,7 +21,7 @@ package org.apache.asterix.test.active;
 import org.apache.asterix.active.SingleThreadEventProcessor;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 
-class Actor extends SingleThreadEventProcessor<Action> {
+public class Actor extends SingleThreadEventProcessor<Action> {
 
     private final MetadataProvider actorMdProvider;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
index e7e21b6..99499a3 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
@@ -23,6 +23,7 @@ import org.apache.asterix.active.ActiveEvent.Kind;
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage.Event;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.hyracks.api.job.JobId;
 
@@ -41,9 +42,8 @@ public class TestNodeControllerActor extends Actor {
         Action registration = new Action() {
             @Override
             protected void doExecute(MetadataProvider actorMdProvider) throws 
Exception {
-                ActiveEvent event = new ActiveEvent(jobId, 
Kind.PARTITION_EVENT, entityId,
-                        new ActivePartitionMessage(new 
ActiveRuntimeId(entityId, id, partition), jobId,
-                                
ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null));
+                ActiveEvent event = new ActiveEvent(jobId, 
Kind.PARTITION_EVENT, entityId, new ActivePartitionMessage(
+                        new ActiveRuntimeId(entityId, id, partition), jobId, 
Event.RUNTIME_REGISTERED, null));
                 clusterController.activeEvent(event);
             }
         };
@@ -55,9 +55,8 @@ public class TestNodeControllerActor extends Actor {
         Action registration = new Action() {
             @Override
             protected void doExecute(MetadataProvider actorMdProvider) throws 
Exception {
-                ActiveEvent event = new ActiveEvent(jobId, 
Kind.PARTITION_EVENT, entityId,
-                        new ActivePartitionMessage(new 
ActiveRuntimeId(entityId, id, partition), jobId,
-                                
ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED, null));
+                ActiveEvent event = new ActiveEvent(jobId, 
Kind.PARTITION_EVENT, entityId, new ActivePartitionMessage(
+                        new ActiveRuntimeId(entityId, id, partition), jobId, 
Event.RUNTIME_DEREGISTERED, null));
                 clusterController.activeEvent(event);
             }
         };

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 1c29e86..a96adc0 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -103,13 +103,13 @@ public class TestExecutor {
     // see
     // 
https://stackoverflow.com/questions/417142/what-is-the-maximum-length-of-a-url-in-different-browsers/417184
     private static final long MAX_URL_LENGTH = 2000l;
-    private static final Pattern JAVA_BLOCK_COMMENT_PATTERN = 
Pattern.compile("/\\*.*\\*/",
-            Pattern.MULTILINE | Pattern.DOTALL);
+    private static final Pattern JAVA_BLOCK_COMMENT_PATTERN =
+            Pattern.compile("/\\*.*\\*/", Pattern.MULTILINE | Pattern.DOTALL);
     private static final Pattern JAVA_LINE_COMMENT_PATTERN = 
Pattern.compile("^//.*$", Pattern.MULTILINE);
     private static final Pattern SHELL_LINE_COMMENT_PATTERN = 
Pattern.compile("^#.*$", Pattern.MULTILINE);
     private static final Pattern REGEX_LINES_PATTERN = 
Pattern.compile("^(-)?/(.*)/([im]*)$");
-    private static final Pattern POLL_TIMEOUT_PATTERN = 
Pattern.compile("polltimeoutsecs=(\\d+)(\\D|$)",
-            Pattern.MULTILINE);
+    private static final Pattern POLL_TIMEOUT_PATTERN =
+            Pattern.compile("polltimeoutsecs=(\\d+)(\\D|$)", 
Pattern.MULTILINE);
     private static final Pattern POLL_DELAY_PATTERN = 
Pattern.compile("polldelaysecs=(\\d+)(\\D|$)", Pattern.MULTILINE);
     private static final Pattern HANDLE_VARIABLE_PATTERN = 
Pattern.compile("handlevariable=(\\w+)");
     private static final Pattern VARIABLE_REF_PATTERN = 
Pattern.compile("\\$(\\w+)");
@@ -168,10 +168,10 @@ public class TestExecutor {
     public void runScriptAndCompareWithResult(File scriptFile, PrintWriter 
print, File expectedFile, File actualFile,
             ComparisonEnum compare) throws Exception {
         System.err.println("Expected results file: " + 
expectedFile.toString());
-        BufferedReader readerExpected = new BufferedReader(
-                new InputStreamReader(new FileInputStream(expectedFile), 
"UTF-8"));
-        BufferedReader readerActual = new BufferedReader(
-                new InputStreamReader(new FileInputStream(actualFile), 
"UTF-8"));
+        BufferedReader readerExpected =
+                new BufferedReader(new InputStreamReader(new 
FileInputStream(expectedFile), "UTF-8"));
+        BufferedReader readerActual =
+                new BufferedReader(new InputStreamReader(new 
FileInputStream(actualFile), "UTF-8"));
         boolean regex = false;
         try {
             if (ComparisonEnum.BINARY.equals(compare)) {
@@ -354,10 +354,10 @@ public class TestExecutor {
     public void runScriptAndCompareWithResultRegex(File scriptFile, File 
expectedFile, File actualFile)
             throws Exception {
         String lineExpected, lineActual;
-        try (BufferedReader readerExpected = new BufferedReader(
-                new InputStreamReader(new FileInputStream(expectedFile), 
"UTF-8"));
-                BufferedReader readerActual = new BufferedReader(
-                        new InputStreamReader(new FileInputStream(actualFile), 
"UTF-8"))) {
+        try (BufferedReader readerExpected =
+                new BufferedReader(new InputStreamReader(new 
FileInputStream(expectedFile), "UTF-8"));
+                BufferedReader readerActual =
+                        new BufferedReader(new InputStreamReader(new 
FileInputStream(actualFile), "UTF-8"))) {
             StringBuilder actual = new StringBuilder();
             while ((lineActual = readerActual.readLine()) != null) {
                 actual.append(lineActual).append('\n');
@@ -534,7 +534,7 @@ public class TestExecutor {
         return executeQueryService(str, fmt, uri, params, jsonEncoded, 
responseCodeValidator, false);
     }
 
-    protected InputStream executeQueryService(String str, OutputFormat fmt, 
URI uri, List<Parameter> params,
+    public InputStream executeQueryService(String str, OutputFormat fmt, URI 
uri, List<Parameter> params,
             boolean jsonEncoded, Predicate<Integer> responseCodeValidator, 
boolean cancellable) throws Exception {
         final List<Parameter> newParams = upsertParam(params, "format", 
fmt.mimeType());
         HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str, 
uri, "statement", newParams)
@@ -697,8 +697,8 @@ public class TestExecutor {
     // Insert and Delete statements are executed here
     public void executeUpdate(String str, URI uri) throws Exception {
         // Create a method instance.
-        HttpUriRequest request = RequestBuilder.post(uri).setEntity(new 
StringEntity(str, StandardCharsets.UTF_8))
-                .build();
+        HttpUriRequest request =
+                RequestBuilder.post(uri).setEntity(new StringEntity(str, 
StandardCharsets.UTF_8)).build();
 
         // Execute the method.
         executeAndCheckHttpRequest(request);
@@ -708,10 +708,10 @@ public class TestExecutor {
     public InputStream executeAnyAQLAsync(String statement, boolean defer, 
OutputFormat fmt, URI uri,
             Map<String, Object> variableCtx) throws Exception {
         // Create a method instance.
-        HttpUriRequest request = RequestBuilder.post(uri)
-                .addParameter("mode", defer ? "asynchronous-deferred" : 
"asynchronous")
-                .setEntity(new StringEntity(statement, 
StandardCharsets.UTF_8)).setHeader("Accept", fmt.mimeType())
-                .build();
+        HttpUriRequest request =
+                RequestBuilder.post(uri).addParameter("mode", defer ? 
"asynchronous-deferred" : "asynchronous")
+                        .setEntity(new StringEntity(statement, 
StandardCharsets.UTF_8))
+                        .setHeader("Accept", fmt.mimeType()).build();
 
         String handleVar = getHandleVariable(statement);
 
@@ -737,8 +737,8 @@ public class TestExecutor {
     // create function statement
     public void executeDDL(String str, URI uri) throws Exception {
         // Create a method instance.
-        HttpUriRequest request = RequestBuilder.post(uri).setEntity(new 
StringEntity(str, StandardCharsets.UTF_8))
-                .build();
+        HttpUriRequest request =
+                RequestBuilder.post(uri).setEntity(new StringEntity(str, 
StandardCharsets.UTF_8)).build();
 
         // Execute the method.
         executeAndCheckHttpRequest(request);
@@ -748,8 +748,8 @@ public class TestExecutor {
     // and returns the contents as a string
     // This string is later passed to REST API for execution.
     public String readTestFile(File testFile) throws Exception {
-        BufferedReader reader = new BufferedReader(
-                new InputStreamReader(new FileInputStream(testFile), 
StandardCharsets.UTF_8));
+        BufferedReader reader =
+                new BufferedReader(new InputStreamReader(new 
FileInputStream(testFile), StandardCharsets.UTF_8));
         String line;
         StringBuilder stringBuilder = new StringBuilder();
         String ls = System.getProperty("line.separator");
@@ -804,8 +804,8 @@ public class TestExecutor {
 
     private static String getProcessOutput(Process p) throws Exception {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        Future<Integer> future = Executors.newSingleThreadExecutor()
-                .submit(() -> IOUtils.copy(p.getInputStream(), new 
OutputStream() {
+        Future<Integer> future =
+                Executors.newSingleThreadExecutor().submit(() -> 
IOUtils.copy(p.getInputStream(), new OutputStream() {
                     @Override
                     public void write(int b) throws IOException {
                         baos.write(b);
@@ -1357,7 +1357,7 @@ public class TestExecutor {
                         if (failedGroup != null) {
                             
failedGroup.getTestCase().add(testCaseCtx.getTestCase());
                         }
-                        throw new Exception("Test \"" + testFile + "\" 
FAILED!");
+                        throw new Exception("Test \"" + testFile + "\" 
FAILED!", e);
                     }
                 } finally {
                     if (numOfFiles == testFileCtxs.size()) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/test/resources/runtimets/results/api/feed-stats/feed-stats.1.adm
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/feed-stats/feed-stats.1.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/feed-stats/feed-stats.1.adm
index d0b0ea0..0fcdf15 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/feed-stats/feed-stats.1.adm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/feed-stats/feed-stats.1.adm
@@ -4,8 +4,7 @@
       "adapter-stats" : {
         "incoming-records-count" : 13,
         "failed-at-parser-records-count" : 3
-      },
-      "executor-restart-times" : 0
+      }
     } ]
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 3da58e9..8abbeab 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -438,14 +438,15 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
                 try {
                     dsInfo.wait();
                 } catch (InterruptedException e) {
-                    throw new HyracksDataException(e);
+                    Thread.currentThread().interrupt();
+                    throw HyracksDataException.create(e);
                 }
             }
         }
         try {
             flushDatasetOpenIndexes(dsInfo, false);
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
         for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
             if (iInfo.isOpen()) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 94fe951..5aed5f2 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -228,6 +228,7 @@ public class ErrorCode {
     public static final int ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED = 3107;
     public static final int FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD = 3108;
     public static final int METADATA_DROP_FUCTION_IN_USE = 3109;
+    public static final int FEED_FAILED_WHILE_GETTING_A_NEW_RECORD = 3110;
 
     // Lifecycle management errors
     public static final int DUPLICATE_PARTITION_ID = 4000;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
index a74898e..a4c41df 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
@@ -120,7 +120,7 @@ public class Checkpoint implements Comparable<Checkpoint> {
         try {
             return new ObjectMapper().writeValueAsString(this);
         } catch (JsonProcessingException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
@@ -128,7 +128,7 @@ public class Checkpoint implements Comparable<Checkpoint> {
         try {
             return new ObjectMapper().readValue(json, Checkpoint.class);
         } catch (IOException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java
index 49f5457..229fb6d 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java
@@ -18,9 +18,10 @@
  */
 package org.apache.asterix.common.transactions;
 
+import java.util.concurrent.ExecutorService;
+
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.api.ThreadExecutor;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -29,7 +30,7 @@ import 
org.apache.hyracks.storage.common.buffercache.IBufferCache;
 
 public interface IAppRuntimeContextProvider {
 
-    ThreadExecutor getThreadExecutor();
+    ExecutorService getThreadExecutor();
 
     IBufferCache getBufferCache();
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 2e98abd..824e30b 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -126,7 +126,7 @@ public class StoragePathUtil {
             }
             return file;
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties 
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 4cc06a6..a7a1990 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -217,6 +217,7 @@
 3107 = Active Notification Handler is already suspended
 3108 = Feed stopped while waiting for a new record
 3109 = Function %1$s is being used. It cannot be dropped.
+3110 = Feed failed while reading a new record
 
 # Lifecycle management errors
 4000 = Partition id %1$d for node %2$s already in use by node %3$s

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
index a31b46d..b51416a 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
@@ -74,7 +74,7 @@ public class LookupAdapterFactory<T> implements Serializable {
             return new LookupAdapter<>(dataParser, reader, inRecDesc, 
ridReader, retainInput, retainMissing,
                     isMissingWriterFactory, ctx, writer);
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
index def0bf1..7412338 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
@@ -18,27 +18,29 @@
  */
 package org.apache.asterix.external.api;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
+@FunctionalInterface
 public interface IDataFlowController {
 
-    //TODO: Refactor this interface. Remove writer from start() signature
     public void start(IFrameWriter writer) throws HyracksDataException, 
InterruptedException;
 
     public default boolean pause() throws HyracksDataException {
-        throw new HyracksDataException("Method not implemented");
+        throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
     }
 
     public default boolean resume() throws HyracksDataException {
-        throw new HyracksDataException("Method not implemented");
+        throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
     }
 
     public default void flush() throws HyracksDataException {
-        throw new HyracksDataException("Method not implemented");
+        throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
     }
 
     public default boolean stop() throws HyracksDataException {
-        throw new HyracksDataException("Method not implemented");
+        throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
index e62672d..472cdae 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.external.api;
 
-import java.io.Serializable;
-
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
@@ -30,7 +28,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
  * adapter(pull or push).
  */
 @FunctionalInterface
-public interface IDataSourceAdapter extends Serializable {
+public interface IDataSourceAdapter {
 
     public enum AdapterType {
         INTERNAL,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index c87fe2d..53fa137 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -57,14 +57,5 @@ public abstract class AbstractFeedDataFlowController 
implements IDataFlowControl
         tupleForwarder.flush();
     }
 
-    @Override
-    public abstract boolean stop() throws HyracksDataException;
-
-    public abstract boolean handleException(Throwable th) throws 
HyracksDataException;
-
     public abstract String getStats();
-
-    public void fail() throws HyracksDataException {
-        tupleForwarder.fail();
-    }
 }

Reply via email to