[ASTERIXDB-1950][ING][API] REST API for ActiveEntity stats - user model changes: yes Added ActiveEntity stats REST API. - storage format changes: no - interface changes: yes Changed ActiveEntityEventsListener & EventSubscrber interfaces. Added getStats method to IActiveRuntime.
Details: 1. Added HttpAPI for active feed stats. 2. Replaced FeedEventSubscriber with WaitForStateSubscriber. 3. Added StatsSubscriber for monitoring stats request. 4. Moved the message related methods from FeedEventsListener to ActiveEntityEventsListener for possible reuses in other cases. Change-Id: I46b48b52a1c9906510c5bdce778d1672845f75ca Reviewed-on: https://asterix-gerrit.ics.uci.edu/1748 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/604c921a Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/604c921a Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/604c921a Branch: refs/heads/master Commit: 604c921ae8ab650b61e754f85b16b4c95db3f58e Parents: f86a25b Author: Xikui Wang <[email protected]> Authored: Wed Jun 28 19:35:04 2017 -0700 Committer: Xikui Wang <[email protected]> Committed: Wed Jun 28 21:04:25 2017 -0700 ---------------------------------------------------------------------- asterixdb/asterix-active/pom.xml | 8 + .../org/apache/asterix/active/ActiveEvent.java | 25 ++- .../apache/asterix/active/ActiveManager.java | 41 +++- .../apache/asterix/active/ActiveRuntimeId.java | 5 +- .../ActiveSourceOperatorNodePushable.java | 18 +- .../apache/asterix/active/ActivityState.java | 4 + .../org/apache/asterix/active/EntityId.java | 13 +- .../active/IActiveEntityEventsListener.java | 37 +++- .../asterix/active/IActiveEventSubscriber.java | 18 +- .../apache/asterix/active/IActiveRuntime.java | 13 ++ .../active/message/ActiveManagerMessage.java | 9 +- .../active/message/ActivePartitionMessage.java | 27 ++- .../active/message/ActiveStatsResponse.java | 77 ++++++++ .../active/message/StatsRequestMessage.java | 35 ++++ .../api/http/server/ActiveStatsApiServlet.java | 86 ++++++++ .../asterix/app/nc/NCAppRuntimeContext.java | 3 +- .../asterix/app/translator/QueryTranslator.java | 45 +++-- .../hyracks/bootstrap/CCApplication.java | 4 + .../asterix/messaging/CCMessageBroker.java | 1 + .../apache/asterix/utils/FeedOperations.java | 2 +- .../asterix/test/active/ActiveMessageTest.java | 141 +++++++++++++ .../asterix/common/exceptions/ErrorCode.java | 4 +- .../common/messaging/api/ICCMessageBroker.java | 5 + .../apache/asterix/common/utils/Servlets.java | 1 + .../main/resources/asx_errormsg/en.properties | 28 +-- .../AbstractFeedDataFlowController.java | 2 + .../dataflow/FeedRecordDataFlowController.java | 17 +- .../dataflow/FeedStreamDataFlowController.java | 6 + .../external/dataset/adapter/FeedAdapter.java | 4 + .../feed/dataflow/FeedRuntimeInputHandler.java | 15 +- .../management/ActiveEntityEventsListener.java | 196 ++++++++++++++++++- .../feed/management/FeedEventsListener.java | 146 -------------- .../external/feed/runtime/AdapterExecutor.java | 6 + .../feed/runtime/AdapterRuntimeManager.java | 4 + .../external/feed/runtime/IngestionRuntime.java | 62 ------ .../external/feed/watch/AbstractSubscriber.java | 56 ++++++ .../feed/watch/FeedEventSubscriber.java | 64 ------ .../external/feed/watch/NoOpSubscriber.java | 14 +- .../external/feed/watch/StatsSubscriber.java | 43 ++++ .../feed/watch/WaitForStateSubscriber.java | 58 ++++++ .../FeedIntakeOperatorNodePushable.java | 9 + .../hyracks/http/server/AbstractServlet.java | 5 +- 42 files changed, 981 insertions(+), 376 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-active/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml index 950e4d6..3dd24b6 100644 --- a/asterixdb/asterix-active/pom.xml +++ b/asterixdb/asterix-active/pom.xml @@ -42,5 +42,13 @@ <groupId>org.apache.hyracks</groupId> <artifactId>hyracks-dataflow-std</artifactId> </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-control-nc</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java index 2669990..1141912 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.active; +import java.util.Objects; + import org.apache.hyracks.api.job.JobId; public class ActiveEvent { @@ -27,7 +29,8 @@ public class ActiveEvent { JOB_STARTED, JOB_FINISHED, PARTITION_EVENT, - EXTENSION_EVENT + EXTENSION_EVENT, + STATS_UPDATED } private final JobId jobId; @@ -64,6 +67,24 @@ public class ActiveEvent { @Override public String toString() { - return "JobId:" + jobId + ", " + "EntityId:" + entityId + ", " + "Kind" + eventKind; + return "JobId:" + jobId + "," + "EntityId:" + entityId + ", " + "Kind" + eventKind; + } + + @Override + public boolean equals(Object o) { + if (o == null || !(o instanceof ActiveEvent)) { + return false; + } + if (this == o) { + return true; + } + ActiveEvent other = (ActiveEvent) o; + return Objects.equals(entityId, other.entityId) && Objects.equals(eventKind, other.eventKind) && Objects + .equals(eventObject, other.eventObject); + } + + @Override + public int hashCode() { + return Objects.hash(jobId, entityId, eventKind, eventObject); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java index 44d6dae..fcf2be9 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java @@ -28,11 +28,16 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.asterix.active.message.ActiveManagerMessage; +import org.apache.asterix.active.message.ActiveStatsResponse; +import org.apache.asterix.active.message.StatsRequestMessage; import org.apache.asterix.common.api.ThreadExecutor; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.memory.ConcurrentFramePool; +import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.util.JavaSerializationUtils; +import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.log4j.Logger; public class ActiveManager { @@ -44,14 +49,16 @@ public class ActiveManager { private final ConcurrentMap<ActiveRuntimeId, IActiveRuntime> runtimes; private final ConcurrentFramePool activeFramePool; private final String nodeId; + private final INCServiceContext serviceCtx; private volatile boolean shutdown; - public ActiveManager(ThreadExecutor executor, String nodeId, long activeMemoryBudget, int frameSize) - throws HyracksDataException { + public ActiveManager(ThreadExecutor executor, String nodeId, long activeMemoryBudget, int frameSize, + INCServiceContext serviceCtx) throws HyracksDataException { this.executor = executor; this.nodeId = nodeId; this.activeFramePool = new ConcurrentFramePool(nodeId, activeMemoryBudget, frameSize); this.runtimes = new ConcurrentHashMap<>(); + this.serviceCtx = serviceCtx; } public ConcurrentFramePool getFramePool() { @@ -78,16 +85,44 @@ public class ActiveManager { return ActiveManager.class.getSimpleName() + "[" + nodeId + "]"; } - public void submit(ActiveManagerMessage message) { + public void submit(ActiveManagerMessage message) throws HyracksDataException { switch (message.getKind()) { case ActiveManagerMessage.STOP_ACTIVITY: stopRuntime(message); break; + case ActiveManagerMessage.REQUEST_STATS: + requestStats((StatsRequestMessage) message); + break; default: LOGGER.warn("Unknown message type received: " + message.getKind()); } } + private void requestStats(StatsRequestMessage message) throws HyracksDataException { + try { + ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload(); + IActiveRuntime runtime = runtimes.get(runtimeId); + long reqId = message.getReqId(); + if (runtime == null) { + LOGGER.warn("Request stats of a runtime that is not registered " + runtimeId); + // Send a failure message + ((NodeControllerService) serviceCtx.getControllerService()) + .sendApplicationMessageToCC( + JavaSerializationUtils + .serialize(new ActiveStatsResponse(reqId, null, new RuntimeDataException( + ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME, runtimeId.toString()))), + null); + return; + } + String stats = runtime.getStats(); + ActiveStatsResponse response = new ActiveStatsResponse(reqId, stats, null); + ((NodeControllerService) serviceCtx.getControllerService()) + .sendApplicationMessageToCC(JavaSerializationUtils.serialize(response), null); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + public void shutdown() { LOGGER.warn("Shutting down ActiveManager on node " + nodeId); Map<ActiveRuntimeId, Future<Void>> stopFutures = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeId.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeId.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeId.java index f1f5876..882ed11 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeId.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeId.java @@ -19,6 +19,7 @@ package org.apache.asterix.active; import java.io.Serializable; +import java.util.Objects; public class ActiveRuntimeId implements Serializable { @@ -27,13 +28,11 @@ public class ActiveRuntimeId implements Serializable { private final EntityId entityId; private final String runtimeName; private final int partition; - private final int hashCode; public ActiveRuntimeId(EntityId entityId, String runtimeName, int partition) { this.entityId = entityId; this.runtimeName = runtimeName; this.partition = partition; - this.hashCode = toString().hashCode(); } @Override @@ -56,7 +55,7 @@ public class ActiveRuntimeId implements Serializable { @Override public int hashCode() { - return hashCode; + return Objects.hash(entityId, runtimeName, partition); } public String getRuntimeName() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java index 98a6979..a7d7796 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java @@ -26,12 +26,13 @@ import org.apache.asterix.common.api.INcApplicationContext; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable; public abstract class ActiveSourceOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable implements IActiveRuntime { - private final Logger LOGGER = Logger.getLogger(ActiveSourceOperatorNodePushable.class.getName()); + private static final Logger LOGGER = Logger.getLogger(ActiveSourceOperatorNodePushable.class.getName()); protected final IHyracksTaskContext ctx; protected final ActiveManager activeManager; /** A unique identifier for the runtime **/ @@ -88,15 +89,15 @@ public abstract class ActiveSourceOperatorNodePushable extends AbstractUnaryOutp try { // notify cc that runtime has been registered ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(), - ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED), null); + ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null), null); start(); } catch (InterruptedException e) { LOGGER.log(Level.INFO, "initialize() interrupted on ActiveSourceOperatorNodePushable", e); Thread.currentThread().interrupt(); - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } catch (Exception e) { LOGGER.log(Level.INFO, "initialize() failed on ActiveSourceOperatorNodePushable", e); - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } finally { synchronized (this) { done = true; @@ -111,10 +112,10 @@ public abstract class ActiveSourceOperatorNodePushable extends AbstractUnaryOutp activeManager.deregisterRuntime(runtimeId); try { ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(), - ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED), null); + ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED, null), null); } catch (Exception e) { LOGGER.log(Level.INFO, "deinitialize() failed on ActiveSourceOperatorNodePushable", e); - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } finally { LOGGER.log(Level.INFO, "deinitialize() returning on ActiveSourceOperatorNodePushable"); } @@ -124,4 +125,9 @@ public abstract class ActiveSourceOperatorNodePushable extends AbstractUnaryOutp public final IFrameWriter getInputFrameWriter(int index) { return null; } + + @Override + public JobId getJobId() { + return ctx.getJobletContext().getJobId(); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java index c8abb84..af8f5ca 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java @@ -20,6 +20,10 @@ package org.apache.asterix.active; public enum ActivityState { /** + * The initial state of an activity. + */ + CREATED, + /** * The starting state and a possible terminal state. Next state can only be {@code ActivityState.STARTING} */ STOPPED, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java index cdf702d..9e20e2f 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java @@ -19,6 +19,7 @@ package org.apache.asterix.active; import java.io.Serializable; +import java.util.Objects; /** * A unique identifier for a data feed. @@ -30,13 +31,11 @@ public class EntityId implements Serializable { private final String extensionName; private final String dataverse; private final String entityName; - private final int hashCode; public EntityId(String extentionName, String dataverse, String entityName) { this.extensionName = extentionName; this.dataverse = dataverse; this.entityName = entityName; - this.hashCode = toString().hashCode(); } public String getDataverse() { @@ -52,17 +51,17 @@ public class EntityId implements Serializable { if (o == null || !(o instanceof EntityId)) { return false; } - if (this == o || ((EntityId) o).getExtensionName().equals(extensionName) - && ((EntityId) o).getEntityName().equals(entityName) - && ((EntityId) o).getDataverse().equals(dataverse)) { + if (o == this) { return true; } - return false; + EntityId other = (EntityId) o; + return Objects.equals(other.dataverse, dataverse) && Objects.equals(other.entityName, entityName) && + Objects.equals(other.extensionName, extensionName); } @Override public int hashCode() { - return hashCode; + return Objects.hash(dataverse, entityName, extensionName); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java index ee8e776..4bc02f3 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java @@ -41,16 +41,6 @@ public interface IActiveEntityEventsListener { ActivityState getState(); /** - * get a subscriber that waits till state has been reached. - * - * @param state - * the desired state - * @throws HyracksDataException - * a failure happened while waiting for the state - */ - IActiveEventSubscriber subscribe(ActivityState state) throws HyracksDataException; - - /** * @return the active entity id */ EntityId getEntityId(); @@ -62,4 +52,31 @@ public interface IActiveEntityEventsListener { */ boolean isEntityUsingDataset(IDataset dataset); + /** + * subscribe to events. subscription ends when subscriber.done() returns true + * + * @param subscriber + * @throws HyracksDataException + */ + void subscribe(IActiveEventSubscriber subscriber) throws HyracksDataException; + + /** + * The most recent acquired stats for the active entity + * + * @return + */ + String getStats(); + + /** + * @return The timestamp of the most recent acquired stats for the active entity + */ + long getStatsTimeStamp(); + + /** + * refresh the stats + * + * @param timeout + * @throws HyracksDataException + */ + void refreshStats(long timeout) throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java index 7be5737..69f7f1c 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.active; +import org.apache.hyracks.api.exceptions.HyracksDataException; + /** * An active event subscriber that subscribe to events related to active entity */ @@ -25,18 +27,22 @@ public interface IActiveEventSubscriber { /** * Notify the subscriber of a new event + * * @param event + * @throws HyracksDataException */ - void notify(ActiveEvent event); + void notify(ActiveEvent event) throws HyracksDataException; /** * Checkcs whether the subscriber is done receiving events + * * @return */ - boolean done(); + boolean isDone(); /** * Wait until the terminal event has been received + * * @throws InterruptedException */ void sync() throws InterruptedException; @@ -45,4 +51,12 @@ public interface IActiveEventSubscriber { * Stop watching events */ void unsubscribe(); + + /** + * callback upon successful subscription + * + * @param eventsListener + * @throws HyracksDataException + */ + void subscribed(IActiveEntityEventsListener eventsListener) throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java index 528c220..f37b2e8 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java @@ -19,6 +19,7 @@ package org.apache.asterix.active; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobId; public interface IActiveRuntime { @@ -34,4 +35,16 @@ public interface IActiveRuntime { * @throws InterruptedException */ void stop() throws HyracksDataException, InterruptedException; + + /** + * @return the job id associated with this active runtime + */ + JobId getJobId(); + + /** + * @return the runtime stats for monitoring purposes + */ + default String getStats() { + return "\"Runtime stats is not available.\""; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java index 231ec25..9772698 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java @@ -27,15 +27,14 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public class ActiveManagerMessage implements INcAddressedMessage { public static final byte STOP_ACTIVITY = 0x00; + public static final byte REQUEST_STATS = 0x01; private static final long serialVersionUID = 1L; private final byte kind; - private final String src; private final Serializable payload; - public ActiveManagerMessage(byte kind, String src, Serializable payload) { + public ActiveManagerMessage(byte kind, Serializable payload) { this.kind = kind; - this.src = src; this.payload = payload; } @@ -47,10 +46,6 @@ public class ActiveManagerMessage implements INcAddressedMessage { return kind; } - public String getSrc() { - return src; - } - @Override public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { ((ActiveManager) appCtx.getActiveManager()).submit(this); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java index 335121a..9391044 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java @@ -19,6 +19,7 @@ package org.apache.asterix.active.message; import java.io.Serializable; +import java.util.Objects; import org.apache.asterix.active.ActiveLifecycleListener; import org.apache.asterix.active.ActiveRuntimeId; @@ -29,19 +30,15 @@ import org.apache.hyracks.api.job.JobId; public class ActivePartitionMessage implements ICcAddressedMessage { + private static final long serialVersionUID = 1L; public static final byte ACTIVE_RUNTIME_REGISTERED = 0x00; public static final byte ACTIVE_RUNTIME_DEREGISTERED = 0x01; public static final byte GENERIC_EVENT = 0x02; - private static final long serialVersionUID = 1L; private final ActiveRuntimeId activeRuntimeId; private final JobId jobId; private final Serializable payload; private final byte event; - public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId jobId, byte event) { - this(activeRuntimeId, jobId, event, null); - } - public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId jobId, byte event, Serializable payload) { this.activeRuntimeId = activeRuntimeId; this.jobId = jobId; @@ -73,6 +70,24 @@ public class ActivePartitionMessage implements ICcAddressedMessage { @Override public String toString() { - return ActivePartitionMessage.class.getSimpleName(); + return ActivePartitionMessage.class.getSimpleName() + event; + } + + @Override + public int hashCode() { + return Objects.hash(activeRuntimeId, jobId, payload, event); + } + + @Override + public boolean equals(Object o) { + if (o == null || !(o instanceof ActivePartitionMessage)) { + return false; + } + if (this == o) { + return true; + } + ActivePartitionMessage other = (ActivePartitionMessage) o; + return Objects.equals(other.activeRuntimeId, activeRuntimeId) && Objects.equals(other.jobId, jobId) && Objects + .equals(other.payload, payload); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsResponse.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsResponse.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsResponse.java new file mode 100644 index 0000000..8738a06 --- /dev/null +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsResponse.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.active.message; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.asterix.common.messaging.api.ICCMessageBroker.ResponseState; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; +import org.apache.asterix.common.messaging.api.INcResponse; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class ActiveStatsResponse implements ICcAddressedMessage, INcResponse { + + private static final long serialVersionUID = 1L; + private final long reqId; + private final String stats; + private final Exception failure; + + public ActiveStatsResponse(long reqId, String stats, Exception failure) { + this.reqId = reqId; + this.stats = stats; + this.failure = failure; + } + + @SuppressWarnings("unchecked") + @Override + public void setResult(MutablePair<ResponseState, Object> result) { + ResponseState responseState = result.getLeft(); + if (failure != null) { + result.setLeft(ResponseState.FAILURE); + result.setRight(failure); + return; + } + switch (responseState) { + case UNINITIALIZED: + // First to arrive + result.setRight(new ArrayList<String>()); + // No failure, change state to success + result.setLeft(ResponseState.SUCCESS); + // Fallthrough + case SUCCESS: + List<String> response = (List<String>) result.getRight(); + response.add(stats); + break; + default: + break; + + } + } + + @Override + public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + broker.respond(reqId, this); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java new file mode 100644 index 0000000..8fa5f19 --- /dev/null +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.active.message; + +import java.io.Serializable; + +public class StatsRequestMessage extends ActiveManagerMessage { + private static final long serialVersionUID = 1L; + private final long reqId; + + public StatsRequestMessage(byte kind, Serializable payload, long reqId) { + super(kind, payload); + this.reqId = reqId; + } + + public long getReqId() { + return reqId; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java new file mode 100644 index 0000000..e02f09b --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.api.http.server; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.concurrent.ConcurrentMap; + +import org.apache.asterix.active.ActiveLifecycleListener; +import org.apache.asterix.active.IActiveEntityEventsListener; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.external.feed.watch.StatsSubscriber; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; +import org.apache.hyracks.http.server.AbstractServlet; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.node.ObjectNode; + +public class ActiveStatsApiServlet extends AbstractServlet { + + private static final int DEFAULT_EXPIRE_TIME = 2000; + private final ActiveLifecycleListener activeLifecycleListener; + + public ActiveStatsApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, ICcApplicationContext appCtx) { + super(ctx, paths); + this.activeLifecycleListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); + } + + private JsonNode constructNode(ObjectMapper om, IActiveEntityEventsListener eventListener, long currentTime, + long ttl) throws InterruptedException, IOException { + long statsTimeStamp = eventListener.getStatsTimeStamp(); + if (currentTime - statsTimeStamp > ttl) { + StatsSubscriber subscriber = new StatsSubscriber(eventListener); + // refresh + eventListener.refreshStats(5000); + subscriber.sync(); + } + return om.readTree(eventListener.getStats()); + } + + @Override + protected void get(IServletRequest request, IServletResponse response) throws Exception { + // Obtain all feed status + String localPath = localPath(request); + int expireTime; + IActiveEntityEventsListener[] listeners = activeLifecycleListener.getNotificationHandler().getEventListeners(); + ObjectMapper om = new ObjectMapper(); + om.enable(SerializationFeature.INDENT_OUTPUT); + ObjectNode resNode = om.createObjectNode(); + + if (localPath.length() == 0 || localPath.length() == 1) { + expireTime = DEFAULT_EXPIRE_TIME; + } else { + expireTime = Integer.valueOf(localPath.substring(1)); + } + long currentTime = System.currentTimeMillis(); + for (int iter1 = 0; iter1 < listeners.length; iter1++) { + resNode.putPOJO(listeners[iter1].getEntityId().toString(), + constructNode(om, listeners[iter1], currentTime, expireTime)); + } + + // Construct Response + PrintWriter responseWriter = response.writer(); + responseWriter.write(om.writerWithDefaultPrettyPrinter().writeValueAsString(resNode)); + responseWriter.flush(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index 55b9adc..29bc95e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -200,7 +200,8 @@ public class NCAppRuntimeContext implements INcApplicationContext { isShuttingdown = false; activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(), - activeProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize()); + activeProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize(), + this.ncServiceContext); if (replicationProperties.isParticipant(getServiceContext().getNodeId())) { String nodeId = getServiceContext().getNodeId(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 8a7d757..79d660c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -73,10 +73,12 @@ import org.apache.asterix.common.utils.JobUtils; import org.apache.asterix.common.utils.JobUtils.ProgressState; import org.apache.asterix.compiler.provider.AqlCompilationProvider; import org.apache.asterix.compiler.provider.ILangCompilationProvider; +import org.apache.asterix.external.feed.management.ActiveEntityEventsListener; import org.apache.asterix.external.feed.management.FeedConnectionId; -import org.apache.asterix.external.feed.management.FeedEventsListener; +import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.external.indexing.IndexingConstants; +import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.file.StorageComponentProvider; import org.apache.asterix.formats.nontagged.TypeTraitProvider; @@ -937,7 +939,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // #. add a new index with PendingAddOp index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(), indexFields, keySourceIndicators, indexFieldTypes, stmtCreateIndex.getGramLength(), overridesFieldTypes, - stmtCreateIndex.isEnforced(),false, MetadataUtil.PENDING_ADD_OP); + stmtCreateIndex.isEnforced(), false, MetadataUtil.PENDING_ADD_OP); MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index); // #. prepare to create the index artifact in NC. @@ -1087,8 +1089,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen protected void validateIndexKeyFields(CreateIndexStatement stmtCreateIndex, List<Integer> keySourceIndicators, ARecordType aRecordType, ARecordType metaRecordType, List<List<String>> indexFields, List<IAType> indexFieldTypes) throws AlgebricksException { - ValidateUtil.validateKeyFields(aRecordType, metaRecordType, indexFields, keySourceIndicators, - indexFieldTypes, stmtCreateIndex.getIndexType()); + ValidateUtil.validateKeyFields(aRecordType, metaRecordType, indexFields, keySourceIndicators, indexFieldTypes, + stmtCreateIndex.getIndexType()); } protected void handleCreateTypeStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { @@ -1192,11 +1194,11 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); for (int k = 0; k < indexes.size(); k++) { if (ExternalIndexingOperations.isFileIndex(indexes.get(k))) { - jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, - dataset)); - } else { jobsToExecute.add( - IndexUtil.buildDropIndexJobSpec(indexes.get(k), metadataProvider, dataset)); + ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, dataset)); + } else { + jobsToExecute + .add(IndexUtil.buildDropIndexJobSpec(indexes.get(k), metadataProvider, dataset)); } } ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(dataset); @@ -1957,7 +1959,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen EntityId feedId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName); ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); - FeedEventsListener listener = (FeedEventsListener) activeEventHandler.getActiveEntityListener(feedId); + ActiveEntityEventsListener listener = + (ActiveEntityEventsListener) activeEventHandler.getActiveEntityListener(feedId); if (listener != null) { throw new AlgebricksException("Feed " + feedId + " is currently active and connected to the following dataset(s) \n" + listener.toString()); @@ -2028,7 +2031,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen DefaultStatementExecutorFactory qtFactory = new DefaultStatementExecutorFactory(); ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); - FeedEventsListener listener = (FeedEventsListener) activeEventHandler.getActiveEntityListener(entityId); + ActiveEntityEventsListener listener = (ActiveEntityEventsListener) activeEventHandler + .getActiveEntityListener(entityId); if (listener != null) { throw new AlgebricksException("Feed " + feedName + " is started already."); } @@ -2047,11 +2051,11 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen compilationProvider, storageComponentProvider, qtFactory, hcc); JobSpecification feedJob = jobInfo.getLeft(); - listener = new FeedEventsListener(appCtx, entityId, datasets, jobInfo.getRight().getLocations()); + listener = new ActiveEntityEventsListener(appCtx, entityId, datasets, jobInfo.getRight(), + FeedIntakeOperatorNodePushable.class.getSimpleName()); activeEventHandler.registerListener(listener); - IActiveEventSubscriber eventSubscriber = listener.subscribe(ActivityState.STARTED); + IActiveEventSubscriber eventSubscriber = new WaitForStateSubscriber(listener, ActivityState.STARTED); feedJob.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId); - // TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs. // We will need to design general exception handling mechanism for feeds. JobUtils.runJob(hcc, feedJob, @@ -2077,11 +2081,12 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); // Obtain runtime info from ActiveListener - FeedEventsListener listener = (FeedEventsListener) activeEventHandler.getActiveEntityListener(feedId); + ActiveEntityEventsListener listener = + (ActiveEntityEventsListener) activeEventHandler.getActiveEntityListener(feedId); if (listener == null) { throw new AlgebricksException("Feed " + feedName + " is not started."); } - IActiveEventSubscriber eventSubscriber = listener.subscribe(ActivityState.STOPPED); + IActiveEventSubscriber eventSubscriber = new WaitForStateSubscriber(listener, ActivityState.STOPPED); // Transaction MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); @@ -2090,8 +2095,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // validate FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName, mdTxnCtx); // Construct ActiveMessage - for (int i = 0; i < listener.getSources().length; i++) { - String intakeLocation = listener.getSources()[i]; + for (int i = 0; i < listener.getLocations().getLocations().length; i++) { + String intakeLocation = listener.getLocations().getLocations()[i]; FeedOperations.SendStopMessageToNode(appCtx, feedId, intakeLocation, i); } eventSubscriber.sync(); @@ -2231,8 +2236,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen if (ds.getDatasetType() == DatasetType.INTERNAL) { for (Index index : indexes) { if (index.isSecondaryIndex()) { - jobsToExecute - .add(IndexUtil.buildSecondaryIndexCompactJobSpec(ds, index, metadataProvider)); + jobsToExecute.add(IndexUtil.buildSecondaryIndexCompactJobSpec(ds, index, metadataProvider)); } } } else { @@ -2257,8 +2261,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } protected void prepareCompactJobsForExternalDataset(List<Index> indexes, Dataset ds, - List<JobSpecification> jobsToExecute, MetadataProvider metadataProvider) - throws AlgebricksException { + List<JobSpecification> jobsToExecute, MetadataProvider metadataProvider) throws AlgebricksException { for (int j = 0; j < indexes.size(); j++) { jobsToExecute.add(IndexUtil.buildSecondaryIndexCompactJobSpec(ds, indexes.get(j), metadataProvider)); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index 35e0466..cd9138a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -38,6 +38,7 @@ import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet; import org.apache.asterix.api.http.server.ConnectorApiServlet; import org.apache.asterix.api.http.server.DdlApiServlet; import org.apache.asterix.api.http.server.DiagnosticsApiServlet; +import org.apache.asterix.api.http.server.ActiveStatsApiServlet; import org.apache.asterix.api.http.server.FullApiServlet; import org.apache.asterix.api.http.server.NodeControllerDetailsApiServlet; import org.apache.asterix.api.http.server.QueryApiServlet; @@ -233,6 +234,7 @@ public class CCApplication extends BaseCCApplication { addServlet(jsonAPIServer, Servlets.CLUSTER_STATE_NODE_DETAIL); // must not precede add of CLUSTER_STATE addServlet(jsonAPIServer, Servlets.CLUSTER_STATE_CC_DETAIL); // must not precede add of CLUSTER_STATE addServlet(jsonAPIServer, Servlets.DIAGNOSTICS); + addServlet(jsonAPIServer, Servlets.ACTIVE_STATS); return jsonAPIServer; } @@ -305,6 +307,8 @@ public class CCApplication extends BaseCCApplication { return new ClusterControllerDetailsApiServlet(ctx, paths); case Servlets.DIAGNOSTICS: return new DiagnosticsApiServlet(ctx, paths, appCtx); + case Servlets.ACTIVE_STATS: + return new ActiveStatsApiServlet(ctx, paths, appCtx); default: throw new IllegalStateException(String.valueOf(key)); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java index 5932aff..de2ca11 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java @@ -72,6 +72,7 @@ public class CCMessageBroker implements ICCMessageBroker { state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId); } + @Override public long newRequestId() { return REQUEST_ID_GENERATOR.incrementAndGet(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java index 6155450..09c4983 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java @@ -381,7 +381,7 @@ public class FeedOperations { public static void SendStopMessageToNode(ICcApplicationContext appCtx, EntityId feedId, String intakeNodeLocation, Integer partition) throws Exception { - ActiveManagerMessage stopFeedMessage = new ActiveManagerMessage(ActiveManagerMessage.STOP_ACTIVITY, "SRC", + ActiveManagerMessage stopFeedMessage = new ActiveManagerMessage(ActiveManagerMessage.STOP_ACTIVITY, new ActiveRuntimeId(feedId, FeedIntakeOperatorNodePushable.class.getSimpleName(), partition)); SendActiveMessage(appCtx, stopFeedMessage, intakeNodeLocation); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveMessageTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveMessageTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveMessageTest.java new file mode 100644 index 0000000..2dc1782 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveMessageTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.test.active; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.asterix.active.ActiveEvent; +import org.apache.asterix.active.ActiveJobNotificationHandler; +import org.apache.asterix.active.ActiveLifecycleListener; +import org.apache.asterix.active.ActiveRuntimeId; +import org.apache.asterix.active.EntityId; +import org.apache.asterix.active.IActiveRuntime; +import org.apache.asterix.active.message.ActivePartitionMessage; +import org.apache.asterix.app.nc.NCAppRuntimeContext; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.metadata.IDataset; +import org.apache.asterix.external.feed.management.ActiveEntityEventsListener; +import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable; +import org.apache.asterix.runtime.utils.CcApplicationContext; +import org.apache.asterix.test.runtime.ExecutionTestUtil; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobSpecification; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class ActiveMessageTest { + + protected boolean cleanUp = true; + private static String EXPECTED_STATS = "Mock stats"; + + @Before + public void setUp() throws Exception { + ExecutionTestUtil.setUp(cleanUp); + } + + @Test + public void refreshStatsTest() throws HyracksException { + // Entities to be used + EntityId entityId = new EntityId("MockExtension", "MockDataverse", "MockEntity"); + ActiveRuntimeId activeRuntimeId = + new ActiveRuntimeId(entityId, FeedIntakeOperatorNodePushable.class.getSimpleName(), 0); + List<IDataset> datasetList = new ArrayList<>(); + AlgebricksAbsolutePartitionConstraint partitionConstraint = + new AlgebricksAbsolutePartitionConstraint(new String[] { "asterix_nc1" }); + String requestedStats; + CcApplicationContext appCtx = + (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(); + ActiveLifecycleListener activeLifecycleListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); + ActiveJobNotificationHandler activeJobNotificationHandler = activeLifecycleListener.getNotificationHandler(); + JobId jobId = new JobId(1); + + // Mock ActiveRuntime + IActiveRuntime mockRuntime = Mockito.mock(IActiveRuntime.class); + Mockito.when(mockRuntime.getRuntimeId()).thenReturn(activeRuntimeId); + Mockito.when(mockRuntime.getStats()).thenReturn(EXPECTED_STATS); + + // Mock JobSpecification + JobSpecification jobSpec = Mockito.mock(JobSpecification.class); + Mockito.when(jobSpec.getProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME)) + .thenReturn(entityId); + + // Add event listener + ActiveEntityEventsListener eventsListener = + new ActiveEntityEventsListener(appCtx, entityId, datasetList, partitionConstraint, + FeedIntakeOperatorNodePushable.class.getSimpleName()); + activeJobNotificationHandler.registerListener(eventsListener); + + // Register mock runtime + NCAppRuntimeContext nc1AppCtx = + (NCAppRuntimeContext) ExecutionTestUtil.integrationUtil.ncs[0].getApplicationContext(); + nc1AppCtx.getActiveManager().registerRuntime(mockRuntime); + + // Check init stats + requestedStats = eventsListener.getStats(); + Assert.assertTrue(requestedStats.equals("N/A")); + + // Update stats of not-started job + eventsListener.refreshStats(1000); + requestedStats = eventsListener.getStats(); + Assert.assertTrue(requestedStats.equals("N/A")); + + // Update stats of created/started job without joined partition + activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec); + activeLifecycleListener.notifyJobStart(jobId); + eventsListener.refreshStats(1000); + requestedStats = eventsListener.getStats(); + Assert.assertTrue(requestedStats.equals("N/A")); + + // Fake partition message and notify eventListener + ActivePartitionMessage partitionMessage = + new ActivePartitionMessage(activeRuntimeId, jobId, ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, + null); + eventsListener.notify(new ActiveEvent(jobId, ActiveEvent.Kind.PARTITION_EVENT, entityId, partitionMessage)); + eventsListener.refreshStats(100000); + requestedStats = eventsListener.getStats(); + Assert.assertTrue(requestedStats.contains(EXPECTED_STATS)); + ObjectMapper objectMapper = new ObjectMapper(); + try { + objectMapper.readTree(requestedStats); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } + // Ask for runtime that is not registered + HyracksDataException expectedException = null; + nc1AppCtx.getActiveManager().deregisterRuntime(activeRuntimeId); + try { + eventsListener.refreshStats(100000); + } catch (HyracksDataException e) { + expectedException = e; + } + Assert.assertTrue(expectedException != null + && expectedException.getErrorCode() == ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java index 5deaf54..5b28d3f 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java @@ -191,10 +191,12 @@ public class ErrorCode { public static final int FEED_METADATA_SOCKET_ADAPTOR_SOCKET_NOT_PROPERLY_CONFIGURED = 3081; public static final int FEED_METADATA_SOCKET_ADAPTOR_SOCKET_INVALID_HOST_NC = 3082; public static final int PROVIDER_DATASOURCE_FACTORY_DUPLICATE_FORMAT_MAPPING = 3083; - public static final int PROVIDER_STREAM_RECORD_READER_DUPLICATE_FORMAT_MAPPING = 3084; + public static final int CANNOT_WAIT_FOR_STATE = 3084; public static final int FEED_UNKNOWN_ADAPTER_NAME = 3085; public static final int PROVIDER_STREAM_RECORD_READER_WRONG_CONFIGURATION = 3086; public static final int FEED_CONNECT_FEED_APPLIED_INVALID_FUNCTION = 3087; + public static final int ACTIVE_MANAGER_INVALID_RUNTIME = 3088; + public static final int CANNOT_SUBSCRIBE_TO_FAILED_ACTIVE_ENTITY = 3089; // Lifecycle management errors public static final int DUPLICATE_PARTITION_ID = 4000; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java index 69c0ca0..33a8ff3 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java @@ -56,4 +56,9 @@ public interface ICCMessageBroker extends IMessageBroker { * @param response */ void respond(Long reqId, INcResponse response); + + /** + * @return a new request id + */ + long newRequestId(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java index a8a7390..0f7ab4d 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java @@ -41,6 +41,7 @@ public class Servlets { public static final String CLUSTER_STATE_NODE_DETAIL = "/admin/cluster/node/*"; public static final String CLUSTER_STATE_CC_DETAIL = "/admin/cluster/cc/*"; public static final String DIAGNOSTICS = "/admin/diagnostics"; + public static final String ACTIVE_STATS = "/admin/active/*"; private Servlets() { } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index c755e40..8045531 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -52,8 +52,8 @@ 16 = Storage metadata directory of %1$s in %2$s couldn't be created 17 = Unknown external file pending operation %1$s 18 = Cannot convert the %1$s type to the %2$s type. -19 = Can't convert integer types. The source type should be one of %1$s. -20 = Can't convert integer types. The target type should be one of %1$s. +19 = Cannot convert integer types. The source type should be one of %1$s. +20 = Cannot convert integer types. The target type should be one of %1$s. 21 = Source value %1$s is out of range that %2$s can hold - %2$s.MAX_VALUE: %3$s, %2$s.MIN_VALUE: %4$s 22 = The accessed field is untyped, but should be typed 23 = %1$ss passed before getting back the responses from NCs @@ -76,7 +76,7 @@ 1020 = Cannot autogenerate a primary key for primary key of type %1$s. Autogenerated primary keys must be of type %2$s 1021 = The primary key field \"%1$s\" cannot be nullable 1022 = Field of type %1$s cannot be used as a primary key field -1023 = Can't drop dataset %1$s since it is connected to active entity: %2$s +1023 = Cannot drop dataset %1$s since it is connected to active entity: %2$s 1024 = Identifier %1$s is not found in AQL+ meta-scope 1025 = There is no such join type in AQL+ 1026 = The given function expression %1$s cannot utilize index @@ -108,15 +108,15 @@ 3008 = Unable to ingest data 3009 = Exception in get record type %1$s for feed 3010 = Doesn't support Hive data with list of non-primitive types -3011 = Can't get hive type for field of type %1$s +3011 = Cannot get hive type for field of type %1$s 3012 = Failed to get columns of record -3013 = Can't deserialize Hive records with no closed columns +3013 = Cannot deserialize Hive records with no closed columns 3014 = Non-optional UNION type is not supported. 3015 = Failed to get the type information for field %1$s -3016 = can't parse null field -3017 = can't parse hive list with null values +3016 = Cannot parse null field +3017 = Cannot parse hive list with null values 3018 = Field %1$s of meta record is not an optional type so it cannot accept null value -3019 = Can't get PK from record part +3019 = Cannot get PK from record part 3020 = This operation cannot be done when Feed %1$s is alive 3021 = Malformed input stream 3022 = Unknown data source type: %1$s @@ -168,7 +168,7 @@ 3069 = Found COMMA before any list item 3070 = Found COMMA while expecting a list item 3071 = Found END_RECORD while expecting a list item -3072 = Can't cast the %1$s type to the %2$s type +3072 = Cannot cast the %1$s type to the %2$s type 3073 = Missing deserializer method for constructor: %1$s 3074 = This can not be an instance of %1$s 3075 = Closed field %1$s has null value @@ -178,12 +178,14 @@ 3079 = Cannot register runtime, active manager has been shutdown 3080 = Unexpected feed datatype '%1$s' 3081 = socket is not properly configured -3082 = "Invalid %1$s %2$s as it is not part of the AsterixDB cluster. Valid choices are %3$s" +3082 = Invalid %1$s %2$s as it is not part of the AsterixDB cluster. Valid choices are %3$s 3083 = Duplicate feed adaptor name: %1$s -3084 = Duplicate record reader format: %1$s +3084 = Cannot wait for state %1$s. The only states that can be waited for are STARTED or STOPPED 3085 = Unknown Adapter Name. -3086 = Cannot find record reader %1$s with specified configuration. +3086 = Cannot find record reader %1$s with specified configuration 3087 = Cannot find function %1$s +3088 = %1$s is not a valid runtime Id +3089 = Cannot subscribe to events of a failed active entity # Lifecycle management errors -4000 = Partition id %1$d for node %2$s already in use by node %3$s \ No newline at end of file +4000 = Partition id %1$d for node %2$s already in use by node %3$s http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java index 213231b..bcf5e25 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java @@ -61,4 +61,6 @@ public abstract class AbstractFeedDataFlowController implements IDataFlowControl public abstract boolean stop() throws HyracksDataException; public abstract boolean handleException(Throwable th) throws HyracksDataException; + + public abstract String getStats(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java index d01859e..1e62159 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java @@ -39,6 +39,8 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl protected final AtomicBoolean closed = new AtomicBoolean(false); protected static final long INTERVAL = 1000; protected boolean failed = false; + protected long incomingRecordsCount = 0; + protected long failedRecordsCount = 0; public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder, FeedLogManager feedLogManager, int numOfOutputFields, IRecordDataParser<T> dataParser, @@ -63,7 +65,10 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl continue; } tb.reset(); - parseAndForward(record); + incomingRecordsCount++; + if (!parseAndForward(record)) { + failedRecordsCount++; + } } } catch (InterruptedException e) { //TODO: Find out what could cause an interrupted exception beside termination of a job/feed @@ -104,19 +109,20 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl } } - private void parseAndForward(IRawRecord<? extends T> record) throws IOException { + private boolean parseAndForward(IRawRecord<? extends T> record) throws IOException { try { dataParser.parse(record, tb.getDataOutput()); } catch (Exception e) { LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e); feedLogManager.logRecord(record.toString(), ExternalDataConstants.ERROR_PARSE_RECORD); // continue the outer loop - return; + return false; } tb.addFieldEndOffset(); addMetaPart(tb, record); addPrimaryKeys(tb, record); tupleForwarder.addTuple(tb); + return true; } protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException { @@ -187,4 +193,9 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl public IRecordDataParser<T> getParser() { return dataParser; } + + public String getStats() { + return "{\"incoming-records-count\": " + incomingRecordsCount + ", \"failed-at-parser-records-count\": " + + failedRecordsCount + "}"; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java index 0d72682..cad11cd 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java @@ -29,6 +29,7 @@ public class FeedStreamDataFlowController extends AbstractFeedDataFlowController private final IStreamDataParser dataParser; private final AsterixInputStream stream; + protected long incomingRecordsCount = 0; public FeedStreamDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder, FeedLogManager feedLogManager, IStreamDataParser streamParser, AsterixInputStream inputStream) { @@ -48,6 +49,7 @@ public class FeedStreamDataFlowController extends AbstractFeedDataFlowController } tb.addFieldEndOffset(); tupleForwarder.addTuple(tb); + incomingRecordsCount++; } } catch (Exception e) { throw new HyracksDataException(e); @@ -83,4 +85,8 @@ public class FeedStreamDataFlowController extends AbstractFeedDataFlowController } return handled; } + + public String getStats() { + return "{\"incoming-records-number\": " + incomingRecordsCount + "}"; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java index 8d80e6f..e6d81d3 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java @@ -51,4 +51,8 @@ public class FeedAdapter implements IDataSourceAdapter { public boolean resume() throws HyracksDataException { return controller.resume(); } + + public String getStats() { + return controller.getStats(); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java index 11561b5..90a8852 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java @@ -76,12 +76,11 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat IFrameWriter writer, FeedPolicyAccessor fpa, FrameTupleAccessor fta, ConcurrentFramePool framePool) throws HyracksDataException { this.writer = writer; - this.spiller = fpa.spillToDiskOnCongestion() - ? new FrameSpiller(ctx, - connectionId.getFeedId() + "_" + connectionId.getDatasetName() + "_" - + runtimeId.getRuntimeName() + "_" + runtimeId.getPartition(), - fpa.getMaxSpillOnDisk()) - : null; + this.spiller = fpa.spillToDiskOnCongestion() ? + new FrameSpiller(ctx, + connectionId.getFeedId() + "_" + connectionId.getDatasetName() + "_" + runtimeId.getPartition(), + fpa.getMaxSpillOnDisk()) : + null; this.exceptionHandler = new FeedExceptionHandler(ctx, fta); this.fpa = fpa; this.framePool = framePool; @@ -289,8 +288,8 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat while (spiller.usedBudget() > MAX_SPILL_USED_BEFORE_RESUME) { if (DEBUG) { LOGGER.info("in stall(frame). Spilling has been consumed. We will wait for it to be less than " - + MAX_SPILL_USED_BEFORE_RESUME + " consumed. Current consumption = " - + spiller.usedBudget()); + + MAX_SPILL_USED_BEFORE_RESUME + " consumed. Current consumption = " + spiller + .usedBudget()); } spiller.wait(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/604c921a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java index 365c3ce..cee6fa9 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java @@ -18,21 +18,135 @@ */ package org.apache.asterix.external.feed.management; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.asterix.active.ActiveEvent; +import org.apache.asterix.active.ActiveEvent.Kind; +import org.apache.asterix.active.ActiveLifecycleListener; +import org.apache.asterix.active.ActiveRuntimeId; import org.apache.asterix.active.ActivityState; import org.apache.asterix.active.EntityId; import org.apache.asterix.active.IActiveEntityEventsListener; +import org.apache.asterix.active.IActiveEventSubscriber; +import org.apache.asterix.active.message.ActiveManagerMessage; +import org.apache.asterix.active.message.ActivePartitionMessage; +import org.apache.asterix.active.message.StatsRequestMessage; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; import org.apache.asterix.common.metadata.IDataset; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobStatus; -public abstract class ActiveEntityEventsListener implements IActiveEntityEventsListener { +public class ActiveEntityEventsListener implements IActiveEntityEventsListener { + + private static final Logger LOGGER = Logger.getLogger(ActiveEntityEventsListener.class.getName()); + + enum RequestState { + INIT, + STARTED, + FINISHED + } // members - protected EntityId entityId; - protected List<IDataset> datasets; protected volatile ActivityState state; protected JobId jobId; + protected final List<IActiveEventSubscriber> subscribers = new ArrayList<>(); + protected final ICcApplicationContext appCtx; + protected final EntityId entityId; + protected final List<IDataset> datasets; + protected final ActiveEvent statsUpdatedEvent; + protected long statsTimestamp; + protected String stats; + protected RequestState statsRequestState; + protected final String runtimeName; + protected final AlgebricksAbsolutePartitionConstraint locations; + protected int numRegistered; + + public ActiveEntityEventsListener(ICcApplicationContext appCtx, EntityId entityId, List<IDataset> datasets, + AlgebricksAbsolutePartitionConstraint locations, String runtimeName) { + this.appCtx = appCtx; + this.entityId = entityId; + this.datasets = datasets; + this.state = ActivityState.STOPPED; + this.statsTimestamp = Long.MIN_VALUE; + this.statsRequestState = RequestState.INIT; + this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED, entityId); + this.stats = "N/A"; + this.runtimeName = runtimeName; + this.locations = locations; + this.numRegistered = 0; + } + + public synchronized void notify(ActiveEvent event) { + try { + LOGGER.finer("EventListener is notified."); + ActiveEvent.Kind eventKind = event.getEventKind(); + switch (eventKind) { + case JOB_CREATED: + state = ActivityState.CREATED; + break; + case JOB_STARTED: + start(event); + break; + case JOB_FINISHED: + finish(); + break; + case PARTITION_EVENT: + handle((ActivePartitionMessage) event.getEventObject()); + break; + default: + LOGGER.log(Level.WARNING, "Unhandled feed event notification: " + event); + break; + } + notifySubscribers(event); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Unhandled Exception", e); + } + } + + protected synchronized void handle(ActivePartitionMessage message) { + if (message.getEvent() == ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED) { + numRegistered++; + if (numRegistered == locations.getLocations().length) { + state = ActivityState.STARTED; + } + } + } + + private void finish() throws Exception { + IHyracksClientConnection hcc = appCtx.getHcc(); + JobStatus status = hcc.getJobStatus(jobId); + state = status.equals(JobStatus.FAILURE) ? ActivityState.FAILED : ActivityState.STOPPED; + ActiveLifecycleListener activeLcListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); + activeLcListener.getNotificationHandler().removeListener(this); + } + + private void start(ActiveEvent event) { + this.jobId = event.getJobId(); + state = ActivityState.STARTING; + } + + @Override + public synchronized void subscribe(IActiveEventSubscriber subscriber) throws HyracksDataException { + if (this.state == ActivityState.FAILED) { + throw new RuntimeDataException(ErrorCode.CANNOT_SUBSCRIBE_TO_FAILED_ACTIVE_ENTITY); + } + subscriber.subscribed(this); + if (!subscriber.isDone()) { + subscribers.add(subscriber); + } + } @Override public EntityId getEntityId() { @@ -52,4 +166,80 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityEventsL public JobId getJobId() { return jobId; } + + @Override + public String getStats() { + return stats; + } + + @Override + public long getStatsTimeStamp() { + return statsTimestamp; + } + + public String formatStats(List<String> responses) { + StringBuilder strBuilder = new StringBuilder(); + strBuilder.append("{" + "\"EntityId\": \"" + entityId + "\", "); + strBuilder.append("\"Stats\": [").append("\"" + responses.get(0) + "\""); + for (int i = 1; i < responses.size(); i++) { + strBuilder.append(", ").append("\"" + responses.get(i) + "\""); + } + strBuilder.append("]}"); + return strBuilder.toString(); + } + + @SuppressWarnings("unchecked") + @Override + public void refreshStats(long timeout) throws HyracksDataException { + synchronized (this) { + if (state != ActivityState.STARTED || statsRequestState == RequestState.STARTED) { + return; + } else { + statsRequestState = RequestState.STARTED; + } + } + ICCMessageBroker messageBroker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + long reqId = messageBroker.newRequestId(); + List<INcAddressedMessage> requests = new ArrayList<>(); + List<String> ncs = Arrays.asList(locations.getLocations()); + for (int i = 0; i < ncs.size(); i++) { + requests.add(new StatsRequestMessage(ActiveManagerMessage.REQUEST_STATS, + new ActiveRuntimeId(entityId, runtimeName, i), reqId)); + } + try { + List<String> responses = (List<String>) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout); + stats = formatStats(responses); + statsTimestamp = System.currentTimeMillis(); + notifySubscribers(statsUpdatedEvent); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + // Same as above + statsRequestState = RequestState.FINISHED; + } + + protected synchronized void notifySubscribers(ActiveEvent event) { + notifyAll(); + Iterator<IActiveEventSubscriber> it = subscribers.iterator(); + while (it.hasNext()) { + IActiveEventSubscriber subscriber = it.next(); + if (subscriber.isDone()) { + it.remove(); + } else { + try { + subscriber.notify(event); + } catch (HyracksDataException e) { + LOGGER.log(Level.WARNING, "Failed to notify subscriber", e); + } + if (subscriber.isDone()) { + it.remove(); + } + } + } + } + + public AlgebricksAbsolutePartitionConstraint getLocations() { + return locations; + } + }
