http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
new file mode 100644
index 0000000..caf4bec
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.active;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActiveEvent.Kind;
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveEntityEventSubscriber;
+import org.apache.asterix.active.IRetryPolicy;
+import org.apache.asterix.active.IRetryPolicyFactory;
+import org.apache.asterix.active.NoRetryPolicyFactory;
+import org.apache.asterix.active.message.ActiveManagerMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.active.message.StatsRequestMessage;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.metadata.IDataset;
+import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
+import org.apache.asterix.metadata.api.IActiveEntityController;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.metadata.utils.MetadataLockUtil;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.commons.lang3.tuple.Pair;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
+
+public abstract class ActiveEntityEventsListener implements 
IActiveEntityController {
+
+    private static final Logger LOGGER = 
Logger.getLogger(ActiveEntityEventsListener.class.getName());
+    private static final ActiveEvent STATE_CHANGED = new ActiveEvent(null, 
Kind.STATE_CHANGED, null, null);
+    private static final EnumSet<ActivityState> TRANSITION_STATES = 
EnumSet.of(ActivityState.RESUMING,
+            ActivityState.STARTING, ActivityState.STOPPING, 
ActivityState.RECOVERING);
+    // finals
+    protected final IClusterStateManager clusterStateManager;
+    protected final ActiveNotificationHandler handler;
+    protected final List<IActiveEntityEventSubscriber> subscribers = new 
ArrayList<>();
+    protected final IStatementExecutor statementExecutor;
+    protected final ICcApplicationContext appCtx;
+    protected final MetadataProvider metadataProvider;
+    protected final IHyracksClientConnection hcc;
+    protected final EntityId entityId;
+    private final List<Dataset> datasets;
+    protected final ActiveEvent statsUpdatedEvent;
+    protected final String runtimeName;
+    protected final IRetryPolicyFactory retryPolicyFactory;
+    // mutables
+    protected volatile ActivityState state;
+    private AlgebricksAbsolutePartitionConstraint locations;
+    protected ActivityState prevState;
+    protected JobId jobId;
+    protected long statsTimestamp;
+    protected String stats;
+    protected boolean isFetchingStats;
+    protected int numRegistered;
+    protected volatile Future<Void> recoveryTask;
+    protected volatile boolean cancelRecovery;
+    protected volatile boolean suspended = false;
+    // failures
+    protected Exception jobFailure;
+    protected Exception resumeFailure;
+    protected Exception startFailure;
+    protected Exception stopFailure;
+    protected Exception recoverFailure;
+
+    public ActiveEntityEventsListener(IStatementExecutor statementExecutor, 
ICcApplicationContext appCtx,
+            IHyracksClientConnection hcc, EntityId entityId, List<Dataset> 
datasets,
+            AlgebricksAbsolutePartitionConstraint locations, String 
runtimeName, IRetryPolicyFactory retryPolicyFactory)
+            throws HyracksDataException {
+        this.statementExecutor = statementExecutor;
+        this.appCtx = appCtx;
+        this.clusterStateManager = appCtx.getClusterStateManager();
+        this.metadataProvider = new MetadataProvider(appCtx, null);
+        metadataProvider.setConfig(new HashMap<>());
+        this.hcc = hcc;
+        this.entityId = entityId;
+        this.datasets = datasets;
+        this.retryPolicyFactory = retryPolicyFactory;
+        this.state = ActivityState.STOPPED;
+        this.statsTimestamp = -1;
+        this.isFetchingStats = false;
+        this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED, 
entityId, null);
+        this.stats = "{\"Stats\":\"N/A\"}";
+        this.runtimeName = runtimeName;
+        this.locations = locations;
+        this.numRegistered = 0;
+        this.handler =
+                (ActiveNotificationHandler) 
metadataProvider.getApplicationContext().getActiveNotificationHandler();
+        handler.registerListener(this);
+    }
+
+    protected synchronized void setState(ActivityState newState) {
+        LOGGER.log(Level.WARNING, "State is being set to " + newState + " from 
" + state);
+        this.prevState = state;
+        this.state = newState;
+        if (newState == ActivityState.SUSPENDED) {
+            suspended = true;
+        }
+        notifySubscribers(STATE_CHANGED);
+    }
+
+    @Override
+    public synchronized void notify(ActiveEvent event) {
+        try {
+            LOGGER.warning("EventListener is notified.");
+            ActiveEvent.Kind eventKind = event.getEventKind();
+            switch (eventKind) {
+                case JOB_CREATED:
+                    jobCreated(event);
+                    break;
+                case JOB_STARTED:
+                    start(event);
+                    break;
+                case JOB_FINISHED:
+                    finish(event);
+                    break;
+                case PARTITION_EVENT:
+                    handle((ActivePartitionMessage) event.getEventObject());
+                    break;
+                default:
+                    LOGGER.log(Level.WARNING, "Unhandled feed event 
notification: " + event);
+                    break;
+            }
+            notifySubscribers(event);
+        } catch (Exception e) {
+            LOGGER.log(Level.SEVERE, "Unhandled Exception", e);
+        }
+    }
+
+    protected void jobCreated(ActiveEvent event) {
+        // Do nothing
+    }
+
+    protected synchronized void handle(ActivePartitionMessage message) {
+        if (message.getEvent() == 
ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED) {
+            numRegistered++;
+            if (numRegistered == locations.getLocations().length) {
+                setState(ActivityState.RUNNING);
+            }
+        } else if (message.getEvent() == 
ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED) {
+            numRegistered--;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    protected void finish(ActiveEvent event) throws HyracksDataException {
+        jobId = null;
+        Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, 
List<Exception>>) event.getEventObject();
+        JobStatus jobStatus = status.getLeft();
+        List<Exception> exceptions = status.getRight();
+        if (jobStatus.equals(JobStatus.FAILURE)) {
+            jobFailure = exceptions.isEmpty() ? new 
RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
+                    : exceptions.get(0);
+            setState(ActivityState.TEMPORARILY_FAILED);
+            if (prevState != ActivityState.SUSPENDING && prevState != 
ActivityState.RECOVERING) {
+                recover();
+            }
+        } else {
+            setState(state == ActivityState.SUSPENDING ? 
ActivityState.SUSPENDED : ActivityState.STOPPED);
+        }
+    }
+
+    protected void start(ActiveEvent event) {
+        this.jobId = event.getJobId();
+        numRegistered = 0;
+    }
+
+    @Override
+    public synchronized void subscribe(IActiveEntityEventSubscriber 
subscriber) throws HyracksDataException {
+        subscriber.subscribed(this);
+        if (!subscriber.isDone()) {
+            subscribers.add(subscriber);
+        }
+    }
+
+    @Override
+    public EntityId getEntityId() {
+        return entityId;
+    }
+
+    @Override
+    public ActivityState getState() {
+        return state;
+    }
+
+    @Override
+    public synchronized boolean isEntityUsingDataset(IDataset dataset) {
+        return isActive() && getDatasets().contains(dataset);
+    }
+
+    @Override
+    public synchronized void remove(Dataset dataset) throws 
HyracksDataException {
+        if (isActive()) {
+            throw new 
RuntimeDataException(ErrorCode.CANNOT_REMOVE_DATASET_FROM_ACTIVE_ENTITY, 
entityId, state);
+        }
+        getDatasets().remove(dataset);
+    }
+
+    @Override
+    public synchronized void add(Dataset dataset) throws HyracksDataException {
+        if (isActive()) {
+            throw new 
RuntimeDataException(ErrorCode.CANNOT_ADD_DATASET_TO_ACTIVE_ENTITY, entityId, 
state);
+        }
+        getDatasets().add(dataset);
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    @Override
+    public String getStats() {
+        return stats;
+    }
+
+    @Override
+    public long getStatsTimeStamp() {
+        return statsTimestamp;
+    }
+
+    public String formatStats(List<String> responses) {
+        StringBuilder strBuilder = new StringBuilder();
+        strBuilder.append("{\"Stats\": [").append(responses.get(0));
+        for (int i = 1; i < responses.size(); i++) {
+            strBuilder.append(", ").append(responses.get(i));
+        }
+        strBuilder.append("]}");
+        return strBuilder.toString();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void refreshStats(long timeout) throws HyracksDataException {
+        LOGGER.log(Level.WARNING, "refreshStats called");
+        synchronized (this) {
+            if (state != ActivityState.RUNNING || isFetchingStats) {
+                LOGGER.log(Level.WARNING,
+                        "returning immediately since state = " + state + " and 
fetchingStats = " + isFetchingStats);
+                return;
+            } else {
+                isFetchingStats = true;
+            }
+        }
+        ICCMessageBroker messageBroker =
+                (ICCMessageBroker) 
metadataProvider.getApplicationContext().getServiceContext().getMessageBroker();
+        long reqId = messageBroker.newRequestId();
+        List<INcAddressedMessage> requests = new ArrayList<>();
+        List<String> ncs = Arrays.asList(locations.getLocations());
+        for (int i = 0; i < ncs.size(); i++) {
+            requests.add(new 
StatsRequestMessage(ActiveManagerMessage.REQUEST_STATS,
+                    new ActiveRuntimeId(entityId, runtimeName, i), reqId));
+        }
+        try {
+            List<String> responses = (List<String>) 
messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout);
+            stats = formatStats(responses);
+            statsTimestamp = System.currentTimeMillis();
+            notifySubscribers(statsUpdatedEvent);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+        isFetchingStats = false;
+    }
+
+    protected synchronized void notifySubscribers(ActiveEvent event) {
+        notifyAll();
+        Iterator<IActiveEntityEventSubscriber> it = subscribers.iterator();
+        while (it.hasNext()) {
+            IActiveEntityEventSubscriber subscriber = it.next();
+            if (subscriber.isDone()) {
+                it.remove();
+            } else {
+                try {
+                    subscriber.notify(event);
+                } catch (HyracksDataException e) {
+                    LOGGER.log(Level.WARNING, "Failed to notify subscriber", 
e);
+                }
+                if (subscriber.isDone()) {
+                    it.remove();
+                }
+            }
+        }
+    }
+
+    public AlgebricksAbsolutePartitionConstraint getLocations() {
+        return locations;
+    }
+
+    /**
+     * this method is called whenever an action is requested. It ensures no 
interleaved requests
+     *
+     * @throws InterruptedException
+     */
+    protected synchronized void waitForNonTransitionState() throws 
InterruptedException {
+        while (TRANSITION_STATES.contains(state) || suspended) {
+            this.wait();
+        }
+    }
+
+    /**
+     * this method is called before an action call is returned. It ensures 
that the request didn't fail
+     *
+     */
+    protected synchronized void checkNoFailure() throws HyracksDataException {
+        if (state == ActivityState.PERMANENTLY_FAILED) {
+            throw HyracksDataException.create(jobFailure);
+        }
+    }
+
+    @Override
+    public synchronized void recover() throws HyracksDataException {
+        LOGGER.log(Level.WARNING, "Recover is called on " + entityId);
+        if (recoveryTask != null) {
+            LOGGER.log(Level.WARNING,
+                    "But recovery task for " + entityId + " is already there!! 
throwing an exception");
+            throw new RuntimeDataException(ErrorCode.DOUBLE_RECOVERY_ATTEMPTS);
+        }
+        if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
+            LOGGER.log(Level.WARNING, "But it has no recovery policy, so it is 
set to permanent failure");
+            setState(ActivityState.PERMANENTLY_FAILED);
+        } else {
+            ExecutorService executor = 
appCtx.getServiceContext().getControllerService().getExecutor();
+            IRetryPolicy policy = retryPolicyFactory.create(this);
+            cancelRecovery = false;
+            setState(ActivityState.TEMPORARILY_FAILED);
+            LOGGER.log(Level.WARNING, "Recovery task has been submitted");
+            recoveryTask = executor.submit(() -> doRecover(policy));
+        }
+    }
+
+    protected Void doRecover(IRetryPolicy policy)
+            throws AlgebricksException, HyracksDataException, 
InterruptedException {
+        LOGGER.log(Level.WARNING, "Actual Recovery task has started");
+        if (getState() != ActivityState.TEMPORARILY_FAILED) {
+            LOGGER.log(Level.WARNING, "but its state is not temp failure and 
so we're just returning");
+            return null;
+        }
+        LOGGER.log(Level.WARNING, "calling the policy");
+        while (policy.retry()) {
+            synchronized (this) {
+                if (cancelRecovery) {
+                    recoveryTask = null;
+                    return null;
+                }
+                while (clusterStateManager.getState() != ClusterState.ACTIVE) {
+                    if (cancelRecovery) {
+                        recoveryTask = null;
+                        return null;
+                    }
+                    wait();
+                }
+            }
+            waitForNonTransitionState();
+            IMetadataLockManager lockManager = 
metadataProvider.getApplicationContext().getMetadataLockManager();
+            
lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
+                    entityId.getDataverse() + '.' + entityId.getEntityName());
+            for (Dataset dataset : getDatasets()) {
+                MetadataLockUtil.modifyDatasetBegin(lockManager, 
metadataProvider.getLocks(),
+                        dataset.getDataverseName(), 
DatasetUtil.getFullyQualifiedName(dataset));
+            }
+            synchronized (this) {
+                try {
+                    setState(ActivityState.RECOVERING);
+                    doStart(metadataProvider);
+                    return null;
+                } catch (Exception e) {
+                    LOGGER.log(Level.WARNING, "Attempt to revive " + entityId 
+ " failed", e);
+                    setState(ActivityState.TEMPORARILY_FAILED);
+                    recoverFailure = e;
+                } finally {
+                    metadataProvider.getLocks().reset();
+                }
+                notifyAll();
+            }
+        }
+        IMetadataLockManager lockManager = 
metadataProvider.getApplicationContext().getMetadataLockManager();
+        try {
+            
lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
+                    entityId.getDataverse() + '.' + entityId.getEntityName());
+            for (Dataset dataset : getDatasets()) {
+                MetadataLockUtil.modifyDatasetBegin(lockManager, 
metadataProvider.getLocks(), dataset.getDatasetName(),
+                        DatasetUtil.getFullyQualifiedName(dataset));
+            }
+            synchronized (this) {
+                if (state == ActivityState.TEMPORARILY_FAILED) {
+                    setState(ActivityState.PERMANENTLY_FAILED);
+                    recoveryTask = null;
+                }
+                notifyAll();
+            }
+        } finally {
+            metadataProvider.getLocks().reset();
+        }
+        return null;
+    }
+
+    @Override
+    public synchronized void start(MetadataProvider metadataProvider)
+            throws HyracksDataException, InterruptedException {
+        waitForNonTransitionState();
+        if (state != ActivityState.PERMANENTLY_FAILED && state != 
ActivityState.STOPPED) {
+            throw new 
RuntimeDataException(ErrorCode.ACTIVE_ENTITY_ALREADY_STARTED, entityId, state);
+        }
+        try {
+            setState(ActivityState.STARTING);
+            doStart(metadataProvider);
+            setRunning(metadataProvider, true);
+        } catch (Exception e) {
+            setState(ActivityState.PERMANENTLY_FAILED);
+            LOGGER.log(Level.SEVERE, "Failed to start the entity " + entityId, 
e);
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    protected abstract void doStart(MetadataProvider metadataProvider) throws 
HyracksDataException, AlgebricksException;
+
+    protected abstract Void doStop(MetadataProvider metadataProvider) throws 
HyracksDataException, AlgebricksException;
+
+    protected abstract Void doSuspend(MetadataProvider metadataProvider)
+            throws HyracksDataException, AlgebricksException;
+
+    protected abstract void doResume(MetadataProvider metadataProvider)
+            throws HyracksDataException, AlgebricksException;
+
+    protected abstract void setRunning(MetadataProvider metadataProvider, 
boolean running)
+            throws HyracksDataException, AlgebricksException;
+
+    @Override
+    public void stop(MetadataProvider metadataProvider) throws 
HyracksDataException, InterruptedException {
+        Future<Void> aRecoveryTask = null;
+        synchronized (this) {
+            waitForNonTransitionState();
+            if (state != ActivityState.RUNNING && state != 
ActivityState.PERMANENTLY_FAILED
+                    && state != ActivityState.TEMPORARILY_FAILED) {
+                throw new 
RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, 
state);
+            }
+            if (state == ActivityState.TEMPORARILY_FAILED || state == 
ActivityState.PERMANENTLY_FAILED) {
+                if (recoveryTask != null) {
+                    aRecoveryTask = recoveryTask;
+                    cancelRecovery = true;
+                    recoveryTask.cancel(true);
+                }
+                setState(ActivityState.STOPPED);
+                try {
+                    setRunning(metadataProvider, false);
+                } catch (Exception e) {
+                    LOGGER.log(Level.SEVERE, "Failed to set the entity state 
as not running " + entityId, e);
+                    throw HyracksDataException.create(e);
+                }
+            } else if (state == ActivityState.RUNNING) {
+                setState(ActivityState.STOPPING);
+                try {
+                    doStop(metadataProvider);
+                    setRunning(metadataProvider, false);
+                } catch (Exception e) {
+                    setState(ActivityState.PERMANENTLY_FAILED);
+                    LOGGER.log(Level.SEVERE, "Failed to stop the entity " + 
entityId, e);
+                    throw HyracksDataException.create(e);
+                }
+            } else {
+                throw new 
RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, 
state);
+            }
+        }
+        try {
+            if (aRecoveryTask != null) {
+                aRecoveryTask.get();
+            }
+        } catch (InterruptedException e) {
+            throw e;
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public void suspend(MetadataProvider metadataProvider) throws 
HyracksDataException, InterruptedException {
+        WaitForStateSubscriber subscriber;
+        Future<Void> suspendTask;
+        synchronized (this) {
+            LOGGER.log(Level.WARNING, "suspending entity " + entityId);
+            LOGGER.log(Level.WARNING, "Waiting for ongoing activities");
+            waitForNonTransitionState();
+            LOGGER.log(Level.WARNING, "Proceeding with suspension. Current 
state is " + state);
+            if (state == ActivityState.STOPPED || state == 
ActivityState.PERMANENTLY_FAILED) {
+                suspended = true;
+                return;
+            }
+            if (state == ActivityState.SUSPENDED) {
+                throw new 
RuntimeDataException(ErrorCode.ACTIVE_ENTITY_ALREADY_SUSPENDED, entityId, 
state);
+            }
+            if (state == ActivityState.TEMPORARILY_FAILED) {
+                suspended = true;
+                setState(ActivityState.SUSPENDED);
+                return;
+            }
+            setState(ActivityState.SUSPENDING);
+            subscriber = new WaitForStateSubscriber(this,
+                    EnumSet.of(ActivityState.SUSPENDED, 
ActivityState.TEMPORARILY_FAILED));
+            suspendTask = 
metadataProvider.getApplicationContext().getServiceContext().getControllerService()
+                    .getExecutor().submit(() -> doSuspend(metadataProvider));
+            LOGGER.log(Level.WARNING, "Suspension task has been submitted");
+        }
+        try {
+            LOGGER.log(Level.WARNING, "Waiting for suspension task to 
complete");
+            suspendTask.get();
+            LOGGER.log(Level.WARNING, "waiting for state to become SUSPENDED 
or TEMPORARILY_FAILED");
+            subscriber.sync();
+        } catch (Exception e) {
+            synchronized (this) {
+                LOGGER.log(Level.SEVERE, "Failure while waiting for " + 
entityId + " to become suspended", e);
+                // failed to suspend
+                if (state == ActivityState.SUSPENDING) {
+                    if (jobId != null) {
+                        // job is still running
+                        // restore state
+                        setState(prevState);
+                    } else {
+                        setState(ActivityState.PERMANENTLY_FAILED);
+                    }
+                }
+                throw HyracksDataException.create(e);
+            }
+        }
+    }
+
+    @Override
+    public synchronized void resume(MetadataProvider metadataProvider) throws 
HyracksDataException {
+        if (state == ActivityState.STOPPED || state == 
ActivityState.PERMANENTLY_FAILED) {
+            suspended = false;
+            notifyAll();
+            return;
+        }
+        if (state != ActivityState.SUSPENDED && state != 
ActivityState.TEMPORARILY_FAILED) {
+            throw new 
RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_RESUME_FROM_STATE, 
entityId, state);
+        }
+        try {
+            if (prevState == ActivityState.TEMPORARILY_FAILED) {
+                setState(ActivityState.TEMPORARILY_FAILED);
+                return;
+            }
+            setState(ActivityState.RESUMING);
+            WaitForStateSubscriber subscriber = new 
WaitForStateSubscriber(this,
+                    EnumSet.of(ActivityState.RUNNING, 
ActivityState.TEMPORARILY_FAILED));
+            recoveryTask = 
metadataProvider.getApplicationContext().getServiceContext().getControllerService()
+                    .getExecutor().submit(() -> 
resumeOrRecover(metadataProvider));
+            try {
+                subscriber.sync();
+            } catch (Exception e) {
+                LOGGER.log(Level.WARNING, "Failure while attempting to resume 
" + entityId, e);
+                throw HyracksDataException.create(e);
+            }
+        } finally {
+            suspended = false;
+            notifyAll();
+        }
+    }
+
+    protected Void resumeOrRecover(MetadataProvider metadataProvider)
+            throws HyracksDataException, AlgebricksException, 
InterruptedException {
+        try {
+            doResume(metadataProvider);
+            synchronized (this) {
+                setState(ActivityState.RUNNING);
+                recoveryTask = null;
+            }
+        } catch (Exception e) {
+            LOGGER.log(Level.WARNING, "First attempt to resume " + entityId + 
" Failed", e);
+            setState(ActivityState.TEMPORARILY_FAILED);
+            if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
+                setState(ActivityState.PERMANENTLY_FAILED);
+            } else {
+                IRetryPolicy policy = retryPolicyFactory.create(this);
+                cancelRecovery = false;
+                doRecover(policy);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public boolean isActive() {
+        return state != ActivityState.STOPPED && state != 
ActivityState.PERMANENTLY_FAILED;
+    }
+
+    @Override
+    public void unregister() throws HyracksDataException {
+        handler.unregisterListener(this);
+    }
+
+    public void setLocations(AlgebricksAbsolutePartitionConstraint locations) {
+        this.locations = locations;
+    }
+
+    public Future<Void> getRecoveryTask() {
+        return recoveryTask;
+    }
+
+    public synchronized void cancelRecovery() {
+        cancelRecovery = true;
+        notifyAll();
+    }
+
+    @Override
+    public Exception getJobFailure() {
+        return jobFailure;
+    }
+
+    @Override
+    public List<Dataset> getDatasets() {
+        return datasets;
+    }
+
+    @Override
+    public synchronized void replace(Dataset dataset) {
+        if (getDatasets().contains(dataset)) {
+            getDatasets().remove(dataset);
+            getDatasets().add(dataset);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
new file mode 100644
index 0000000..b34d011
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.active;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActiveEvent.Kind;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.active.IActiveNotificationHandler;
+import org.apache.asterix.active.SingleThreadEventProcessor;
+import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.metadata.api.IActiveEntityController;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.IJobLifecycleListener;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+
+public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<ActiveEvent>
+        implements IActiveNotificationHandler, IJobLifecycleListener {
+
+    private static final Logger LOGGER = 
Logger.getLogger(ActiveNotificationHandler.class.getName());
+    private static final boolean DEBUG = false;
+    public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob";
+    private final Map<EntityId, IActiveEntityEventsListener> 
entityEventListeners;
+    private final Map<JobId, EntityId> jobId2EntityId;
+    private boolean initialized = false;
+    private boolean suspended = false;
+
+    public ActiveNotificationHandler() {
+        super(ActiveNotificationHandler.class.getSimpleName());
+        jobId2EntityId = new HashMap<>();
+        entityEventListeners = new HashMap<>();
+    }
+
+    // *** SingleThreadEventProcessor<ActiveEvent>
+
+    @Override
+    protected void handle(ActiveEvent event) {
+        EntityId entityId = jobId2EntityId.get(event.getJobId());
+        if (entityId != null) {
+            IActiveEntityEventsListener listener = 
entityEventListeners.get(entityId);
+            LOGGER.log(Level.WARNING, "Next event is of type " + 
event.getEventKind());
+            if (event.getEventKind() == Kind.JOB_FINISHED) {
+                LOGGER.log(Level.WARNING, "Removing the job");
+                jobId2EntityId.remove(event.getJobId());
+            }
+            if (listener != null) {
+                LOGGER.log(Level.WARNING, "Notifying the listener");
+                listener.notify(event);
+            }
+        } else {
+            LOGGER.log(Level.SEVERE, "Entity not found for received message 
for job " + event.getJobId());
+        }
+    }
+
+    // *** IJobLifecycleListener
+
+    @Override
+    public void notifyJobCreation(JobId jobId, JobSpecification 
jobSpecification) throws HyracksDataException {
+        LOGGER.log(Level.WARNING,
+                "notifyJobCreation(JobId jobId, JobSpecification 
jobSpecification) was called with jobId = " + jobId);
+        Object property = 
jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
+        if (property == null || !(property instanceof EntityId)) {
+            LOGGER.log(Level.WARNING, "Job is not of type active job. property 
found to be: " + property);
+            return;
+        }
+        EntityId entityId = (EntityId) property;
+        monitorJob(jobId, entityId);
+        boolean found = jobId2EntityId.get(jobId) != null;
+        LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? "Active" 
: "Inactive"));
+        add(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, 
jobSpecification));
+    }
+
+    private synchronized void monitorJob(JobId jobId, EntityId entityId) {
+        if (DEBUG) {
+            LOGGER.log(Level.WARNING, "monitorJob(JobId jobId, ActiveJob 
activeJob) called with job id: " + jobId);
+            boolean found = jobId2EntityId.get(jobId) != null;
+            LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? 
"Active" : "Inactive"));
+        }
+        if (entityEventListeners.containsKey(entityId)) {
+            if (jobId2EntityId.containsKey(jobId)) {
+                LOGGER.severe("Job is already being monitored for job: " + 
jobId);
+                return;
+            }
+            if (DEBUG) {
+                LOGGER.log(Level.WARNING, "monitoring started for job id: " + 
jobId);
+            }
+        } else {
+            LOGGER.severe("No listener was found for the entity: " + entityId);
+        }
+        jobId2EntityId.put(jobId, entityId);
+    }
+
+    @Override
+    public synchronized void notifyJobStart(JobId jobId) throws 
HyracksException {
+        EntityId entityId = jobId2EntityId.get(jobId);
+        if (entityId != null) {
+            add(new ActiveEvent(jobId, Kind.JOB_STARTED, entityId, null));
+        }
+    }
+
+    @Override
+    public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, 
List<Exception> exceptions)
+            throws HyracksException {
+        EntityId entityId = jobId2EntityId.get(jobId);
+        if (entityId != null) {
+            add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId, 
Pair.of(jobStatus, exceptions)));
+        } else {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("NO NEED TO NOTIFY JOB FINISH!");
+            }
+        }
+    }
+
+    // *** IActiveNotificationHandler
+
+    @Override
+    public void receive(ActivePartitionMessage message) {
+        add(new ActiveEvent(message.getJobId(), Kind.PARTITION_EVENT, 
message.getActiveRuntimeId().getEntityId(),
+                message));
+    }
+
+    @Override
+    public IActiveEntityEventsListener getListener(EntityId entityId) {
+        if (DEBUG) {
+            LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId 
entityId) was called with entity " + entityId);
+            IActiveEntityEventsListener listener = 
entityEventListeners.get(entityId);
+            LOGGER.log(Level.WARNING, "Listener found: " + listener);
+        }
+        return entityEventListeners.get(entityId);
+    }
+
+    @Override
+    public synchronized IActiveEntityEventsListener[] getEventListeners() {
+        if (DEBUG) {
+            LOGGER.log(Level.WARNING, "getEventListeners() was called");
+            LOGGER.log(Level.WARNING, "returning " + 
entityEventListeners.size() + " Listeners");
+        }
+        return entityEventListeners.values().toArray(new 
IActiveEntityEventsListener[entityEventListeners.size()]);
+    }
+
+    @Override
+    public synchronized void registerListener(IActiveEntityEventsListener 
listener) throws HyracksDataException {
+        if (suspended) {
+            throw new 
RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
+        }
+        if (DEBUG) {
+            LOGGER.log(Level.WARNING,
+                    "registerListener(IActiveEntityEventsListener listener) 
was called for the entity "
+                            + listener.getEntityId());
+        }
+        if (entityEventListeners.containsKey(listener.getEntityId())) {
+            throw new 
RuntimeDataException(ErrorCode.ACTIVE_ENTITY_IS_ALREADY_REGISTERED, 
listener.getEntityId());
+        }
+        entityEventListeners.put(listener.getEntityId(), listener);
+    }
+
+    @Override
+    public synchronized void unregisterListener(IActiveEntityEventsListener 
listener) throws HyracksDataException {
+        if (suspended) {
+            throw new 
RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
+        }
+        LOGGER.log(Level.WARNING, 
"unregisterListener(IActiveEntityEventsListener listener) was called for the 
entity "
+                + listener.getEntityId());
+        IActiveEntityEventsListener registeredListener = 
entityEventListeners.remove(listener.getEntityId());
+        if (registeredListener == null) {
+            throw new 
RuntimeDataException(ErrorCode.ACTIVE_ENTITY_LISTENER_IS_NOT_REGISTERED, 
listener.getEntityId());
+        }
+        if (registeredListener.isActive()) {
+            entityEventListeners.put(registeredListener.getEntityId(), 
registeredListener);
+            throw new 
RuntimeDataException(ErrorCode.CANNOT_DERIGESTER_ACTIVE_ENTITY_LISTENER, 
listener.getEntityId());
+        }
+    }
+
+    @Override
+    public boolean isInitialized() {
+        return initialized;
+    }
+
+    @Override
+    public void setInitialized(boolean initialized) throws 
HyracksDataException {
+        if (this.initialized) {
+            throw new 
RuntimeDataException(ErrorCode.DOUBLE_INITIALIZATION_OF_ACTIVE_NOTIFICATION_HANDLER);
+        }
+        this.initialized = initialized;
+    }
+
+    @Override
+    public synchronized void recover() throws HyracksDataException {
+        LOGGER.log(Level.WARNING, "Starting active recovery");
+        for (IActiveEntityEventsListener listener : 
entityEventListeners.values()) {
+            synchronized (listener) {
+                LOGGER.log(Level.WARNING, "Entity " + listener.getEntityId() + 
" is " + listener.getStats());
+                if (listener.getState() == ActivityState.PERMANENTLY_FAILED
+                        && listener instanceof IActiveEntityController) {
+                    LOGGER.log(Level.WARNING, "Recovering");
+                    ((IActiveEntityController) listener).recover();
+                } else {
+                    LOGGER.log(Level.WARNING, "Only notifying");
+                    listener.notifyAll();
+                }
+            }
+        }
+    }
+
+    public void suspend(MetadataProvider mdProvider)
+            throws AsterixException, HyracksDataException, 
InterruptedException {
+        synchronized (this) {
+            if (suspended) {
+                throw new 
RuntimeDataException(ErrorCode.ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED);
+            }
+            LOGGER.log(Level.WARNING, "Suspending active events handler");
+            suspended = true;
+        }
+        IMetadataLockManager lockManager = 
mdProvider.getApplicationContext().getMetadataLockManager();
+        Collection<IActiveEntityEventsListener> registeredListeners = 
entityEventListeners.values();
+        for (IActiveEntityEventsListener listener : registeredListeners) {
+            // write lock the listener
+            // exclusive lock all the datasets
+            String dataverseName = listener.getEntityId().getDataverse();
+            String entityName = listener.getEntityId().getEntityName();
+            LOGGER.log(Level.WARNING, "Suspending " + listener.getEntityId());
+            LOGGER.log(Level.WARNING, "Acquiring locks");
+            lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), 
dataverseName + '.' + entityName);
+            List<Dataset> datasets = ((ActiveEntityEventsListener) 
listener).getDatasets();
+            for (Dataset dataset : datasets) {
+                
lockManager.acquireDatasetExclusiveModificationLock(mdProvider.getLocks(),
+                        DatasetUtil.getFullyQualifiedName(dataset));
+            }
+            LOGGER.log(Level.WARNING, "locks acquired");
+            ((ActiveEntityEventsListener) listener).suspend(mdProvider);
+            LOGGER.log(Level.WARNING, listener.getEntityId() + " suspended");
+        }
+    }
+
+    public void resume(MetadataProvider mdProvider)
+            throws AsterixException, HyracksDataException, 
InterruptedException {
+        LOGGER.log(Level.WARNING, "Resuming active events handler");
+        for (IActiveEntityEventsListener listener : 
entityEventListeners.values()) {
+            LOGGER.log(Level.WARNING, "Resuming " + listener.getEntityId());
+            ((ActiveEntityEventsListener) listener).resume(mdProvider);
+            LOGGER.log(Level.WARNING, listener.getEntityId() + " resumed");
+        }
+        synchronized (this) {
+            suspended = false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
new file mode 100644
index 0000000..45c79a0
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.active;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveEntityEventSubscriber;
+import org.apache.asterix.active.IRetryPolicyFactory;
+import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.utils.JobUtils;
+import org.apache.asterix.compiler.provider.AqlCompilationProvider;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
+import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.lang.common.statement.StartFeedStatement;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.utils.FeedOperations;
+import org.apache.commons.lang3.tuple.Pair;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class FeedEventsListener extends ActiveEntityEventsListener {
+
+    private final Feed feed;
+    private final List<FeedConnection> feedConnections;
+
+    public FeedEventsListener(IStatementExecutor statementExecutor, 
ICcApplicationContext appCtx,
+            IHyracksClientConnection hcc, EntityId entityId, List<Dataset> 
datasets,
+            AlgebricksAbsolutePartitionConstraint locations, String 
runtimeName, IRetryPolicyFactory retryPolicyFactory,
+            Feed feed, final List<FeedConnection> feedConnections) throws 
HyracksDataException {
+        super(statementExecutor, appCtx, hcc, entityId, datasets, locations, 
runtimeName, retryPolicyFactory);
+        this.feed = feed;
+        this.feedConnections = feedConnections;
+    }
+
+    @Override
+    public synchronized void remove(Dataset dataset) throws 
HyracksDataException {
+        super.remove(dataset);
+        feedConnections.removeIf(o -> 
o.getDataverseName().equals(dataset.getDataverseName())
+                && o.getDatasetName().equals(dataset.getDatasetName()));
+    }
+
+    public synchronized void addFeedConnection(FeedConnection feedConnection) {
+        feedConnections.add(feedConnection);
+    }
+
+    public Feed getFeed() {
+        return feed;
+    }
+
+    @Override
+    protected void doStart(MetadataProvider mdProvider) throws 
HyracksDataException, AlgebricksException {
+        try {
+            ILangCompilationProvider compilationProvider = new 
AqlCompilationProvider();
+            IStorageComponentProvider storageComponentProvider = new 
StorageComponentProvider();
+            DefaultStatementExecutorFactory statementExecutorFactory = new 
DefaultStatementExecutorFactory();
+            Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> 
jobInfo = FeedOperations.buildStartFeedJob(
+                    ((QueryTranslator) statementExecutor).getSessionOutput(), 
mdProvider, feed, feedConnections,
+                    compilationProvider, storageComponentProvider, 
statementExecutorFactory, hcc);
+            JobSpecification feedJob = jobInfo.getLeft();
+            IActiveEntityEventSubscriber eventSubscriber =
+                    new WaitForStateSubscriber(this, 
Collections.singleton(ActivityState.RUNNING));
+            
feedJob.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, 
entityId);
+            // TODO(Yingyi): currently we do not check IFrameWriter protocol 
violations for Feed jobs.
+            // We will need to design general exception handling mechanism for 
feeds.
+            setLocations(jobInfo.getRight());
+            boolean wait = 
Boolean.parseBoolean(mdProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION));
+            JobUtils.runJob(hcc, feedJob, false);
+            eventSubscriber.sync();
+            if (wait) {
+                IActiveEntityEventSubscriber stoppedSubscriber = new 
WaitForStateSubscriber(this,
+                        EnumSet.of(ActivityState.STOPPED, 
ActivityState.PERMANENTLY_FAILED));
+                stoppedSubscriber.sync();
+            }
+        } catch (AlgebricksException e) {
+            throw e;
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    protected Void doStop(MetadataProvider metadataProvider) throws 
HyracksDataException, AlgebricksException {
+        IActiveEntityEventSubscriber eventSubscriber =
+                new WaitForStateSubscriber(this, 
Collections.singleton(ActivityState.STOPPED));
+        try {
+            // Construct ActiveMessage
+            for (int i = 0; i < getLocations().getLocations().length; i++) {
+                String intakeLocation = getLocations().getLocations()[i];
+                
FeedOperations.SendStopMessageToNode(metadataProvider.getApplicationContext(), 
entityId, intakeLocation,
+                        i);
+            }
+            eventSubscriber.sync();
+        } catch (AlgebricksException e) {
+            throw e;
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+        return null;
+    }
+
+    @Override
+    protected void setRunning(MetadataProvider metadataProvider, boolean 
running)
+            throws HyracksDataException, AlgebricksException {
+        // No op
+    }
+
+    @Override
+    protected Void doSuspend(MetadataProvider metadataProvider) throws 
HyracksDataException, AlgebricksException {
+        throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
+    }
+
+    @Override
+    protected void doResume(MetadataProvider metadataProvider) throws 
HyracksDataException, AlgebricksException {
+        throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
index 00fb1b0..f6bd708 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
@@ -50,7 +50,6 @@ public class DefaultStatementExecutorFactory implements 
IStatementExecutorFactor
     @Override
     public IStatementExecutor create(ICcApplicationContext appCtx, 
List<Statement> statements, SessionOutput output,
             ILangCompilationProvider compilationProvider, 
IStorageComponentProvider storageComponentProvider) {
-        return new QueryTranslator(appCtx, statements, output, 
compilationProvider, storageComponentProvider,
-                executorService);
+        return new QueryTranslator(appCtx, statements, output, 
compilationProvider, executorService);
     }
 }

Reply via email to