http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java new file mode 100644 index 0000000..caf4bec --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java @@ -0,0 +1,659 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.active; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.active.ActiveEvent; +import org.apache.asterix.active.ActiveEvent.Kind; +import org.apache.asterix.active.ActiveRuntimeId; +import org.apache.asterix.active.ActivityState; +import org.apache.asterix.active.EntityId; +import org.apache.asterix.active.IActiveEntityEventSubscriber; +import org.apache.asterix.active.IRetryPolicy; +import org.apache.asterix.active.IRetryPolicyFactory; +import org.apache.asterix.active.NoRetryPolicyFactory; +import org.apache.asterix.active.message.ActiveManagerMessage; +import org.apache.asterix.active.message.ActivePartitionMessage; +import org.apache.asterix.active.message.StatsRequestMessage; +import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; +import org.apache.asterix.common.api.IMetadataLockManager; +import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.asterix.common.metadata.IDataset; +import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; +import org.apache.asterix.metadata.api.IActiveEntityController; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.utils.DatasetUtil; +import org.apache.asterix.metadata.utils.MetadataLockUtil; +import org.apache.asterix.translator.IStatementExecutor; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobStatus; + +public abstract class ActiveEntityEventsListener implements IActiveEntityController { + + private static final Logger LOGGER = Logger.getLogger(ActiveEntityEventsListener.class.getName()); + private static final ActiveEvent STATE_CHANGED = new ActiveEvent(null, Kind.STATE_CHANGED, null, null); + private static final EnumSet<ActivityState> TRANSITION_STATES = EnumSet.of(ActivityState.RESUMING, + ActivityState.STARTING, ActivityState.STOPPING, ActivityState.RECOVERING); + // finals + protected final IClusterStateManager clusterStateManager; + protected final ActiveNotificationHandler handler; + protected final List<IActiveEntityEventSubscriber> subscribers = new ArrayList<>(); + protected final IStatementExecutor statementExecutor; + protected final ICcApplicationContext appCtx; + protected final MetadataProvider metadataProvider; + protected final IHyracksClientConnection hcc; + protected final EntityId entityId; + private final List<Dataset> datasets; + protected final ActiveEvent statsUpdatedEvent; + protected final String runtimeName; + protected final IRetryPolicyFactory retryPolicyFactory; + // mutables + protected volatile ActivityState state; + private AlgebricksAbsolutePartitionConstraint locations; + protected ActivityState prevState; + protected JobId jobId; + protected long statsTimestamp; + protected String stats; + protected boolean isFetchingStats; + protected int numRegistered; + protected volatile Future<Void> recoveryTask; + protected volatile boolean cancelRecovery; + protected volatile boolean suspended = false; + // failures + protected Exception jobFailure; + protected Exception resumeFailure; + protected Exception startFailure; + protected Exception stopFailure; + protected Exception recoverFailure; + + public ActiveEntityEventsListener(IStatementExecutor statementExecutor, ICcApplicationContext appCtx, + IHyracksClientConnection hcc, EntityId entityId, List<Dataset> datasets, + AlgebricksAbsolutePartitionConstraint locations, String runtimeName, IRetryPolicyFactory retryPolicyFactory) + throws HyracksDataException { + this.statementExecutor = statementExecutor; + this.appCtx = appCtx; + this.clusterStateManager = appCtx.getClusterStateManager(); + this.metadataProvider = new MetadataProvider(appCtx, null); + metadataProvider.setConfig(new HashMap<>()); + this.hcc = hcc; + this.entityId = entityId; + this.datasets = datasets; + this.retryPolicyFactory = retryPolicyFactory; + this.state = ActivityState.STOPPED; + this.statsTimestamp = -1; + this.isFetchingStats = false; + this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED, entityId, null); + this.stats = "{\"Stats\":\"N/A\"}"; + this.runtimeName = runtimeName; + this.locations = locations; + this.numRegistered = 0; + this.handler = + (ActiveNotificationHandler) metadataProvider.getApplicationContext().getActiveNotificationHandler(); + handler.registerListener(this); + } + + protected synchronized void setState(ActivityState newState) { + LOGGER.log(Level.WARNING, "State is being set to " + newState + " from " + state); + this.prevState = state; + this.state = newState; + if (newState == ActivityState.SUSPENDED) { + suspended = true; + } + notifySubscribers(STATE_CHANGED); + } + + @Override + public synchronized void notify(ActiveEvent event) { + try { + LOGGER.warning("EventListener is notified."); + ActiveEvent.Kind eventKind = event.getEventKind(); + switch (eventKind) { + case JOB_CREATED: + jobCreated(event); + break; + case JOB_STARTED: + start(event); + break; + case JOB_FINISHED: + finish(event); + break; + case PARTITION_EVENT: + handle((ActivePartitionMessage) event.getEventObject()); + break; + default: + LOGGER.log(Level.WARNING, "Unhandled feed event notification: " + event); + break; + } + notifySubscribers(event); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Unhandled Exception", e); + } + } + + protected void jobCreated(ActiveEvent event) { + // Do nothing + } + + protected synchronized void handle(ActivePartitionMessage message) { + if (message.getEvent() == ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED) { + numRegistered++; + if (numRegistered == locations.getLocations().length) { + setState(ActivityState.RUNNING); + } + } else if (message.getEvent() == ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED) { + numRegistered--; + } + } + + @SuppressWarnings("unchecked") + protected void finish(ActiveEvent event) throws HyracksDataException { + jobId = null; + Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, List<Exception>>) event.getEventObject(); + JobStatus jobStatus = status.getLeft(); + List<Exception> exceptions = status.getRight(); + if (jobStatus.equals(JobStatus.FAILURE)) { + jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION) + : exceptions.get(0); + setState(ActivityState.TEMPORARILY_FAILED); + if (prevState != ActivityState.SUSPENDING && prevState != ActivityState.RECOVERING) { + recover(); + } + } else { + setState(state == ActivityState.SUSPENDING ? ActivityState.SUSPENDED : ActivityState.STOPPED); + } + } + + protected void start(ActiveEvent event) { + this.jobId = event.getJobId(); + numRegistered = 0; + } + + @Override + public synchronized void subscribe(IActiveEntityEventSubscriber subscriber) throws HyracksDataException { + subscriber.subscribed(this); + if (!subscriber.isDone()) { + subscribers.add(subscriber); + } + } + + @Override + public EntityId getEntityId() { + return entityId; + } + + @Override + public ActivityState getState() { + return state; + } + + @Override + public synchronized boolean isEntityUsingDataset(IDataset dataset) { + return isActive() && getDatasets().contains(dataset); + } + + @Override + public synchronized void remove(Dataset dataset) throws HyracksDataException { + if (isActive()) { + throw new RuntimeDataException(ErrorCode.CANNOT_REMOVE_DATASET_FROM_ACTIVE_ENTITY, entityId, state); + } + getDatasets().remove(dataset); + } + + @Override + public synchronized void add(Dataset dataset) throws HyracksDataException { + if (isActive()) { + throw new RuntimeDataException(ErrorCode.CANNOT_ADD_DATASET_TO_ACTIVE_ENTITY, entityId, state); + } + getDatasets().add(dataset); + } + + public JobId getJobId() { + return jobId; + } + + @Override + public String getStats() { + return stats; + } + + @Override + public long getStatsTimeStamp() { + return statsTimestamp; + } + + public String formatStats(List<String> responses) { + StringBuilder strBuilder = new StringBuilder(); + strBuilder.append("{\"Stats\": [").append(responses.get(0)); + for (int i = 1; i < responses.size(); i++) { + strBuilder.append(", ").append(responses.get(i)); + } + strBuilder.append("]}"); + return strBuilder.toString(); + } + + @SuppressWarnings("unchecked") + @Override + public void refreshStats(long timeout) throws HyracksDataException { + LOGGER.log(Level.WARNING, "refreshStats called"); + synchronized (this) { + if (state != ActivityState.RUNNING || isFetchingStats) { + LOGGER.log(Level.WARNING, + "returning immediately since state = " + state + " and fetchingStats = " + isFetchingStats); + return; + } else { + isFetchingStats = true; + } + } + ICCMessageBroker messageBroker = + (ICCMessageBroker) metadataProvider.getApplicationContext().getServiceContext().getMessageBroker(); + long reqId = messageBroker.newRequestId(); + List<INcAddressedMessage> requests = new ArrayList<>(); + List<String> ncs = Arrays.asList(locations.getLocations()); + for (int i = 0; i < ncs.size(); i++) { + requests.add(new StatsRequestMessage(ActiveManagerMessage.REQUEST_STATS, + new ActiveRuntimeId(entityId, runtimeName, i), reqId)); + } + try { + List<String> responses = (List<String>) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout); + stats = formatStats(responses); + statsTimestamp = System.currentTimeMillis(); + notifySubscribers(statsUpdatedEvent); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + isFetchingStats = false; + } + + protected synchronized void notifySubscribers(ActiveEvent event) { + notifyAll(); + Iterator<IActiveEntityEventSubscriber> it = subscribers.iterator(); + while (it.hasNext()) { + IActiveEntityEventSubscriber subscriber = it.next(); + if (subscriber.isDone()) { + it.remove(); + } else { + try { + subscriber.notify(event); + } catch (HyracksDataException e) { + LOGGER.log(Level.WARNING, "Failed to notify subscriber", e); + } + if (subscriber.isDone()) { + it.remove(); + } + } + } + } + + public AlgebricksAbsolutePartitionConstraint getLocations() { + return locations; + } + + /** + * this method is called whenever an action is requested. It ensures no interleaved requests + * + * @throws InterruptedException + */ + protected synchronized void waitForNonTransitionState() throws InterruptedException { + while (TRANSITION_STATES.contains(state) || suspended) { + this.wait(); + } + } + + /** + * this method is called before an action call is returned. It ensures that the request didn't fail + * + */ + protected synchronized void checkNoFailure() throws HyracksDataException { + if (state == ActivityState.PERMANENTLY_FAILED) { + throw HyracksDataException.create(jobFailure); + } + } + + @Override + public synchronized void recover() throws HyracksDataException { + LOGGER.log(Level.WARNING, "Recover is called on " + entityId); + if (recoveryTask != null) { + LOGGER.log(Level.WARNING, + "But recovery task for " + entityId + " is already there!! throwing an exception"); + throw new RuntimeDataException(ErrorCode.DOUBLE_RECOVERY_ATTEMPTS); + } + if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) { + LOGGER.log(Level.WARNING, "But it has no recovery policy, so it is set to permanent failure"); + setState(ActivityState.PERMANENTLY_FAILED); + } else { + ExecutorService executor = appCtx.getServiceContext().getControllerService().getExecutor(); + IRetryPolicy policy = retryPolicyFactory.create(this); + cancelRecovery = false; + setState(ActivityState.TEMPORARILY_FAILED); + LOGGER.log(Level.WARNING, "Recovery task has been submitted"); + recoveryTask = executor.submit(() -> doRecover(policy)); + } + } + + protected Void doRecover(IRetryPolicy policy) + throws AlgebricksException, HyracksDataException, InterruptedException { + LOGGER.log(Level.WARNING, "Actual Recovery task has started"); + if (getState() != ActivityState.TEMPORARILY_FAILED) { + LOGGER.log(Level.WARNING, "but its state is not temp failure and so we're just returning"); + return null; + } + LOGGER.log(Level.WARNING, "calling the policy"); + while (policy.retry()) { + synchronized (this) { + if (cancelRecovery) { + recoveryTask = null; + return null; + } + while (clusterStateManager.getState() != ClusterState.ACTIVE) { + if (cancelRecovery) { + recoveryTask = null; + return null; + } + wait(); + } + } + waitForNonTransitionState(); + IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager(); + lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), + entityId.getDataverse() + '.' + entityId.getEntityName()); + for (Dataset dataset : getDatasets()) { + MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), + dataset.getDataverseName(), DatasetUtil.getFullyQualifiedName(dataset)); + } + synchronized (this) { + try { + setState(ActivityState.RECOVERING); + doStart(metadataProvider); + return null; + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Attempt to revive " + entityId + " failed", e); + setState(ActivityState.TEMPORARILY_FAILED); + recoverFailure = e; + } finally { + metadataProvider.getLocks().reset(); + } + notifyAll(); + } + } + IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager(); + try { + lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), + entityId.getDataverse() + '.' + entityId.getEntityName()); + for (Dataset dataset : getDatasets()) { + MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), dataset.getDatasetName(), + DatasetUtil.getFullyQualifiedName(dataset)); + } + synchronized (this) { + if (state == ActivityState.TEMPORARILY_FAILED) { + setState(ActivityState.PERMANENTLY_FAILED); + recoveryTask = null; + } + notifyAll(); + } + } finally { + metadataProvider.getLocks().reset(); + } + return null; + } + + @Override + public synchronized void start(MetadataProvider metadataProvider) + throws HyracksDataException, InterruptedException { + waitForNonTransitionState(); + if (state != ActivityState.PERMANENTLY_FAILED && state != ActivityState.STOPPED) { + throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_ALREADY_STARTED, entityId, state); + } + try { + setState(ActivityState.STARTING); + doStart(metadataProvider); + setRunning(metadataProvider, true); + } catch (Exception e) { + setState(ActivityState.PERMANENTLY_FAILED); + LOGGER.log(Level.SEVERE, "Failed to start the entity " + entityId, e); + throw HyracksDataException.create(e); + } + } + + protected abstract void doStart(MetadataProvider metadataProvider) throws HyracksDataException, AlgebricksException; + + protected abstract Void doStop(MetadataProvider metadataProvider) throws HyracksDataException, AlgebricksException; + + protected abstract Void doSuspend(MetadataProvider metadataProvider) + throws HyracksDataException, AlgebricksException; + + protected abstract void doResume(MetadataProvider metadataProvider) + throws HyracksDataException, AlgebricksException; + + protected abstract void setRunning(MetadataProvider metadataProvider, boolean running) + throws HyracksDataException, AlgebricksException; + + @Override + public void stop(MetadataProvider metadataProvider) throws HyracksDataException, InterruptedException { + Future<Void> aRecoveryTask = null; + synchronized (this) { + waitForNonTransitionState(); + if (state != ActivityState.RUNNING && state != ActivityState.PERMANENTLY_FAILED + && state != ActivityState.TEMPORARILY_FAILED) { + throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state); + } + if (state == ActivityState.TEMPORARILY_FAILED || state == ActivityState.PERMANENTLY_FAILED) { + if (recoveryTask != null) { + aRecoveryTask = recoveryTask; + cancelRecovery = true; + recoveryTask.cancel(true); + } + setState(ActivityState.STOPPED); + try { + setRunning(metadataProvider, false); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Failed to set the entity state as not running " + entityId, e); + throw HyracksDataException.create(e); + } + } else if (state == ActivityState.RUNNING) { + setState(ActivityState.STOPPING); + try { + doStop(metadataProvider); + setRunning(metadataProvider, false); + } catch (Exception e) { + setState(ActivityState.PERMANENTLY_FAILED); + LOGGER.log(Level.SEVERE, "Failed to stop the entity " + entityId, e); + throw HyracksDataException.create(e); + } + } else { + throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state); + } + } + try { + if (aRecoveryTask != null) { + aRecoveryTask.get(); + } + } catch (InterruptedException e) { + throw e; + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + + @Override + public void suspend(MetadataProvider metadataProvider) throws HyracksDataException, InterruptedException { + WaitForStateSubscriber subscriber; + Future<Void> suspendTask; + synchronized (this) { + LOGGER.log(Level.WARNING, "suspending entity " + entityId); + LOGGER.log(Level.WARNING, "Waiting for ongoing activities"); + waitForNonTransitionState(); + LOGGER.log(Level.WARNING, "Proceeding with suspension. Current state is " + state); + if (state == ActivityState.STOPPED || state == ActivityState.PERMANENTLY_FAILED) { + suspended = true; + return; + } + if (state == ActivityState.SUSPENDED) { + throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_ALREADY_SUSPENDED, entityId, state); + } + if (state == ActivityState.TEMPORARILY_FAILED) { + suspended = true; + setState(ActivityState.SUSPENDED); + return; + } + setState(ActivityState.SUSPENDING); + subscriber = new WaitForStateSubscriber(this, + EnumSet.of(ActivityState.SUSPENDED, ActivityState.TEMPORARILY_FAILED)); + suspendTask = metadataProvider.getApplicationContext().getServiceContext().getControllerService() + .getExecutor().submit(() -> doSuspend(metadataProvider)); + LOGGER.log(Level.WARNING, "Suspension task has been submitted"); + } + try { + LOGGER.log(Level.WARNING, "Waiting for suspension task to complete"); + suspendTask.get(); + LOGGER.log(Level.WARNING, "waiting for state to become SUSPENDED or TEMPORARILY_FAILED"); + subscriber.sync(); + } catch (Exception e) { + synchronized (this) { + LOGGER.log(Level.SEVERE, "Failure while waiting for " + entityId + " to become suspended", e); + // failed to suspend + if (state == ActivityState.SUSPENDING) { + if (jobId != null) { + // job is still running + // restore state + setState(prevState); + } else { + setState(ActivityState.PERMANENTLY_FAILED); + } + } + throw HyracksDataException.create(e); + } + } + } + + @Override + public synchronized void resume(MetadataProvider metadataProvider) throws HyracksDataException { + if (state == ActivityState.STOPPED || state == ActivityState.PERMANENTLY_FAILED) { + suspended = false; + notifyAll(); + return; + } + if (state != ActivityState.SUSPENDED && state != ActivityState.TEMPORARILY_FAILED) { + throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_RESUME_FROM_STATE, entityId, state); + } + try { + if (prevState == ActivityState.TEMPORARILY_FAILED) { + setState(ActivityState.TEMPORARILY_FAILED); + return; + } + setState(ActivityState.RESUMING); + WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this, + EnumSet.of(ActivityState.RUNNING, ActivityState.TEMPORARILY_FAILED)); + recoveryTask = metadataProvider.getApplicationContext().getServiceContext().getControllerService() + .getExecutor().submit(() -> resumeOrRecover(metadataProvider)); + try { + subscriber.sync(); + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Failure while attempting to resume " + entityId, e); + throw HyracksDataException.create(e); + } + } finally { + suspended = false; + notifyAll(); + } + } + + protected Void resumeOrRecover(MetadataProvider metadataProvider) + throws HyracksDataException, AlgebricksException, InterruptedException { + try { + doResume(metadataProvider); + synchronized (this) { + setState(ActivityState.RUNNING); + recoveryTask = null; + } + } catch (Exception e) { + LOGGER.log(Level.WARNING, "First attempt to resume " + entityId + " Failed", e); + setState(ActivityState.TEMPORARILY_FAILED); + if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) { + setState(ActivityState.PERMANENTLY_FAILED); + } else { + IRetryPolicy policy = retryPolicyFactory.create(this); + cancelRecovery = false; + doRecover(policy); + } + } + return null; + } + + @Override + public boolean isActive() { + return state != ActivityState.STOPPED && state != ActivityState.PERMANENTLY_FAILED; + } + + @Override + public void unregister() throws HyracksDataException { + handler.unregisterListener(this); + } + + public void setLocations(AlgebricksAbsolutePartitionConstraint locations) { + this.locations = locations; + } + + public Future<Void> getRecoveryTask() { + return recoveryTask; + } + + public synchronized void cancelRecovery() { + cancelRecovery = true; + notifyAll(); + } + + @Override + public Exception getJobFailure() { + return jobFailure; + } + + @Override + public List<Dataset> getDatasets() { + return datasets; + } + + @Override + public synchronized void replace(Dataset dataset) { + if (getDatasets().contains(dataset)) { + getDatasets().remove(dataset); + getDatasets().add(dataset); + } + } +}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java new file mode 100644 index 0000000..b34d011 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.active; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.active.ActiveEvent; +import org.apache.asterix.active.ActiveEvent.Kind; +import org.apache.asterix.active.ActivityState; +import org.apache.asterix.active.EntityId; +import org.apache.asterix.active.IActiveEntityEventsListener; +import org.apache.asterix.active.IActiveNotificationHandler; +import org.apache.asterix.active.SingleThreadEventProcessor; +import org.apache.asterix.active.message.ActivePartitionMessage; +import org.apache.asterix.common.api.IMetadataLockManager; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.asterix.metadata.api.IActiveEntityController; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.utils.DatasetUtil; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.IJobLifecycleListener; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.job.JobStatus; + +public class ActiveNotificationHandler extends SingleThreadEventProcessor<ActiveEvent> + implements IActiveNotificationHandler, IJobLifecycleListener { + + private static final Logger LOGGER = Logger.getLogger(ActiveNotificationHandler.class.getName()); + private static final boolean DEBUG = false; + public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob"; + private final Map<EntityId, IActiveEntityEventsListener> entityEventListeners; + private final Map<JobId, EntityId> jobId2EntityId; + private boolean initialized = false; + private boolean suspended = false; + + public ActiveNotificationHandler() { + super(ActiveNotificationHandler.class.getSimpleName()); + jobId2EntityId = new HashMap<>(); + entityEventListeners = new HashMap<>(); + } + + // *** SingleThreadEventProcessor<ActiveEvent> + + @Override + protected void handle(ActiveEvent event) { + EntityId entityId = jobId2EntityId.get(event.getJobId()); + if (entityId != null) { + IActiveEntityEventsListener listener = entityEventListeners.get(entityId); + LOGGER.log(Level.WARNING, "Next event is of type " + event.getEventKind()); + if (event.getEventKind() == Kind.JOB_FINISHED) { + LOGGER.log(Level.WARNING, "Removing the job"); + jobId2EntityId.remove(event.getJobId()); + } + if (listener != null) { + LOGGER.log(Level.WARNING, "Notifying the listener"); + listener.notify(event); + } + } else { + LOGGER.log(Level.SEVERE, "Entity not found for received message for job " + event.getJobId()); + } + } + + // *** IJobLifecycleListener + + @Override + public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) throws HyracksDataException { + LOGGER.log(Level.WARNING, + "notifyJobCreation(JobId jobId, JobSpecification jobSpecification) was called with jobId = " + jobId); + Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME); + if (property == null || !(property instanceof EntityId)) { + LOGGER.log(Level.WARNING, "Job is not of type active job. property found to be: " + property); + return; + } + EntityId entityId = (EntityId) property; + monitorJob(jobId, entityId); + boolean found = jobId2EntityId.get(jobId) != null; + LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? "Active" : "Inactive")); + add(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, jobSpecification)); + } + + private synchronized void monitorJob(JobId jobId, EntityId entityId) { + if (DEBUG) { + LOGGER.log(Level.WARNING, "monitorJob(JobId jobId, ActiveJob activeJob) called with job id: " + jobId); + boolean found = jobId2EntityId.get(jobId) != null; + LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? "Active" : "Inactive")); + } + if (entityEventListeners.containsKey(entityId)) { + if (jobId2EntityId.containsKey(jobId)) { + LOGGER.severe("Job is already being monitored for job: " + jobId); + return; + } + if (DEBUG) { + LOGGER.log(Level.WARNING, "monitoring started for job id: " + jobId); + } + } else { + LOGGER.severe("No listener was found for the entity: " + entityId); + } + jobId2EntityId.put(jobId, entityId); + } + + @Override + public synchronized void notifyJobStart(JobId jobId) throws HyracksException { + EntityId entityId = jobId2EntityId.get(jobId); + if (entityId != null) { + add(new ActiveEvent(jobId, Kind.JOB_STARTED, entityId, null)); + } + } + + @Override + public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) + throws HyracksException { + EntityId entityId = jobId2EntityId.get(jobId); + if (entityId != null) { + add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId, Pair.of(jobStatus, exceptions))); + } else { + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("NO NEED TO NOTIFY JOB FINISH!"); + } + } + } + + // *** IActiveNotificationHandler + + @Override + public void receive(ActivePartitionMessage message) { + add(new ActiveEvent(message.getJobId(), Kind.PARTITION_EVENT, message.getActiveRuntimeId().getEntityId(), + message)); + } + + @Override + public IActiveEntityEventsListener getListener(EntityId entityId) { + if (DEBUG) { + LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId); + IActiveEntityEventsListener listener = entityEventListeners.get(entityId); + LOGGER.log(Level.WARNING, "Listener found: " + listener); + } + return entityEventListeners.get(entityId); + } + + @Override + public synchronized IActiveEntityEventsListener[] getEventListeners() { + if (DEBUG) { + LOGGER.log(Level.WARNING, "getEventListeners() was called"); + LOGGER.log(Level.WARNING, "returning " + entityEventListeners.size() + " Listeners"); + } + return entityEventListeners.values().toArray(new IActiveEntityEventsListener[entityEventListeners.size()]); + } + + @Override + public synchronized void registerListener(IActiveEntityEventsListener listener) throws HyracksDataException { + if (suspended) { + throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED); + } + if (DEBUG) { + LOGGER.log(Level.WARNING, + "registerListener(IActiveEntityEventsListener listener) was called for the entity " + + listener.getEntityId()); + } + if (entityEventListeners.containsKey(listener.getEntityId())) { + throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_IS_ALREADY_REGISTERED, listener.getEntityId()); + } + entityEventListeners.put(listener.getEntityId(), listener); + } + + @Override + public synchronized void unregisterListener(IActiveEntityEventsListener listener) throws HyracksDataException { + if (suspended) { + throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED); + } + LOGGER.log(Level.WARNING, "unregisterListener(IActiveEntityEventsListener listener) was called for the entity " + + listener.getEntityId()); + IActiveEntityEventsListener registeredListener = entityEventListeners.remove(listener.getEntityId()); + if (registeredListener == null) { + throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_LISTENER_IS_NOT_REGISTERED, listener.getEntityId()); + } + if (registeredListener.isActive()) { + entityEventListeners.put(registeredListener.getEntityId(), registeredListener); + throw new RuntimeDataException(ErrorCode.CANNOT_DERIGESTER_ACTIVE_ENTITY_LISTENER, listener.getEntityId()); + } + } + + @Override + public boolean isInitialized() { + return initialized; + } + + @Override + public void setInitialized(boolean initialized) throws HyracksDataException { + if (this.initialized) { + throw new RuntimeDataException(ErrorCode.DOUBLE_INITIALIZATION_OF_ACTIVE_NOTIFICATION_HANDLER); + } + this.initialized = initialized; + } + + @Override + public synchronized void recover() throws HyracksDataException { + LOGGER.log(Level.WARNING, "Starting active recovery"); + for (IActiveEntityEventsListener listener : entityEventListeners.values()) { + synchronized (listener) { + LOGGER.log(Level.WARNING, "Entity " + listener.getEntityId() + " is " + listener.getStats()); + if (listener.getState() == ActivityState.PERMANENTLY_FAILED + && listener instanceof IActiveEntityController) { + LOGGER.log(Level.WARNING, "Recovering"); + ((IActiveEntityController) listener).recover(); + } else { + LOGGER.log(Level.WARNING, "Only notifying"); + listener.notifyAll(); + } + } + } + } + + public void suspend(MetadataProvider mdProvider) + throws AsterixException, HyracksDataException, InterruptedException { + synchronized (this) { + if (suspended) { + throw new RuntimeDataException(ErrorCode.ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED); + } + LOGGER.log(Level.WARNING, "Suspending active events handler"); + suspended = true; + } + IMetadataLockManager lockManager = mdProvider.getApplicationContext().getMetadataLockManager(); + Collection<IActiveEntityEventsListener> registeredListeners = entityEventListeners.values(); + for (IActiveEntityEventsListener listener : registeredListeners) { + // write lock the listener + // exclusive lock all the datasets + String dataverseName = listener.getEntityId().getDataverse(); + String entityName = listener.getEntityId().getEntityName(); + LOGGER.log(Level.WARNING, "Suspending " + listener.getEntityId()); + LOGGER.log(Level.WARNING, "Acquiring locks"); + lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName + '.' + entityName); + List<Dataset> datasets = ((ActiveEntityEventsListener) listener).getDatasets(); + for (Dataset dataset : datasets) { + lockManager.acquireDatasetExclusiveModificationLock(mdProvider.getLocks(), + DatasetUtil.getFullyQualifiedName(dataset)); + } + LOGGER.log(Level.WARNING, "locks acquired"); + ((ActiveEntityEventsListener) listener).suspend(mdProvider); + LOGGER.log(Level.WARNING, listener.getEntityId() + " suspended"); + } + } + + public void resume(MetadataProvider mdProvider) + throws AsterixException, HyracksDataException, InterruptedException { + LOGGER.log(Level.WARNING, "Resuming active events handler"); + for (IActiveEntityEventsListener listener : entityEventListeners.values()) { + LOGGER.log(Level.WARNING, "Resuming " + listener.getEntityId()); + ((ActiveEntityEventsListener) listener).resume(mdProvider); + LOGGER.log(Level.WARNING, listener.getEntityId() + " resumed"); + } + synchronized (this) { + suspended = false; + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java new file mode 100644 index 0000000..45c79a0 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.active; + +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; + +import org.apache.asterix.active.ActivityState; +import org.apache.asterix.active.EntityId; +import org.apache.asterix.active.IActiveEntityEventSubscriber; +import org.apache.asterix.active.IRetryPolicyFactory; +import org.apache.asterix.app.translator.DefaultStatementExecutorFactory; +import org.apache.asterix.app.translator.QueryTranslator; +import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.asterix.common.utils.JobUtils; +import org.apache.asterix.compiler.provider.AqlCompilationProvider; +import org.apache.asterix.compiler.provider.ILangCompilationProvider; +import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; +import org.apache.asterix.file.StorageComponentProvider; +import org.apache.asterix.lang.common.statement.StartFeedStatement; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.entities.Feed; +import org.apache.asterix.metadata.entities.FeedConnection; +import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.utils.FeedOperations; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobSpecification; + +public class FeedEventsListener extends ActiveEntityEventsListener { + + private final Feed feed; + private final List<FeedConnection> feedConnections; + + public FeedEventsListener(IStatementExecutor statementExecutor, ICcApplicationContext appCtx, + IHyracksClientConnection hcc, EntityId entityId, List<Dataset> datasets, + AlgebricksAbsolutePartitionConstraint locations, String runtimeName, IRetryPolicyFactory retryPolicyFactory, + Feed feed, final List<FeedConnection> feedConnections) throws HyracksDataException { + super(statementExecutor, appCtx, hcc, entityId, datasets, locations, runtimeName, retryPolicyFactory); + this.feed = feed; + this.feedConnections = feedConnections; + } + + @Override + public synchronized void remove(Dataset dataset) throws HyracksDataException { + super.remove(dataset); + feedConnections.removeIf(o -> o.getDataverseName().equals(dataset.getDataverseName()) + && o.getDatasetName().equals(dataset.getDatasetName())); + } + + public synchronized void addFeedConnection(FeedConnection feedConnection) { + feedConnections.add(feedConnection); + } + + public Feed getFeed() { + return feed; + } + + @Override + protected void doStart(MetadataProvider mdProvider) throws HyracksDataException, AlgebricksException { + try { + ILangCompilationProvider compilationProvider = new AqlCompilationProvider(); + IStorageComponentProvider storageComponentProvider = new StorageComponentProvider(); + DefaultStatementExecutorFactory statementExecutorFactory = new DefaultStatementExecutorFactory(); + Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> jobInfo = FeedOperations.buildStartFeedJob( + ((QueryTranslator) statementExecutor).getSessionOutput(), mdProvider, feed, feedConnections, + compilationProvider, storageComponentProvider, statementExecutorFactory, hcc); + JobSpecification feedJob = jobInfo.getLeft(); + IActiveEntityEventSubscriber eventSubscriber = + new WaitForStateSubscriber(this, Collections.singleton(ActivityState.RUNNING)); + feedJob.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId); + // TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs. + // We will need to design general exception handling mechanism for feeds. + setLocations(jobInfo.getRight()); + boolean wait = Boolean.parseBoolean(mdProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION)); + JobUtils.runJob(hcc, feedJob, false); + eventSubscriber.sync(); + if (wait) { + IActiveEntityEventSubscriber stoppedSubscriber = new WaitForStateSubscriber(this, + EnumSet.of(ActivityState.STOPPED, ActivityState.PERMANENTLY_FAILED)); + stoppedSubscriber.sync(); + } + } catch (AlgebricksException e) { + throw e; + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + + @Override + protected Void doStop(MetadataProvider metadataProvider) throws HyracksDataException, AlgebricksException { + IActiveEntityEventSubscriber eventSubscriber = + new WaitForStateSubscriber(this, Collections.singleton(ActivityState.STOPPED)); + try { + // Construct ActiveMessage + for (int i = 0; i < getLocations().getLocations().length; i++) { + String intakeLocation = getLocations().getLocations()[i]; + FeedOperations.SendStopMessageToNode(metadataProvider.getApplicationContext(), entityId, intakeLocation, + i); + } + eventSubscriber.sync(); + } catch (AlgebricksException e) { + throw e; + } catch (Exception e) { + throw HyracksDataException.create(e); + } + return null; + } + + @Override + protected void setRunning(MetadataProvider metadataProvider, boolean running) + throws HyracksDataException, AlgebricksException { + // No op + } + + @Override + protected Void doSuspend(MetadataProvider metadataProvider) throws HyracksDataException, AlgebricksException { + throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED); + } + + @Override + protected void doResume(MetadataProvider metadataProvider) throws HyracksDataException, AlgebricksException { + throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java index 00fb1b0..f6bd708 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java @@ -50,7 +50,6 @@ public class DefaultStatementExecutorFactory implements IStatementExecutorFactor @Override public IStatementExecutor create(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output, ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider) { - return new QueryTranslator(appCtx, statements, output, compilationProvider, storageComponentProvider, - executorService); + return new QueryTranslator(appCtx, statements, output, compilationProvider, executorService); } }
