This is an automated email from the ASF dual-hosted git repository. alsuliman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit d18d6ac28b154e1570f863d779b9ae59add6887f Author: Michael Blow <[email protected]> AuthorDate: Sat Apr 3 23:12:59 2021 -0400 [NO ISSUE][*DB][ACT] += ActiveManagerMessage.GENERIC_EVENT Change-Id: I8f8986a90a6ac34a24118ace1a76401d65924055 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10863 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- .../org/apache/asterix/active/ActiveManager.java | 21 +++++++++++++++++++-- .../org/apache/asterix/active/IActiveRuntime.java | 5 +++++ .../active/message/ActiveManagerMessage.java | 14 +++++++++++--- .../active/message/ActiveStatsRequestMessage.java | 6 +++--- .../active/message/StopRuntimeParameters.java | 12 ++---------- .../app/active/ActiveEntityEventsListener.java | 2 +- .../apache/asterix/external/api/IRecordReader.java | 5 +++++ .../dataflow/AbstractFeedDataFlowController.java | 5 +++++ .../dataflow/FeedRecordDataFlowController.java | 6 ++++++ .../external/dataset/adapter/FeedAdapter.java | 5 +++++ 10 files changed, 62 insertions(+), 19 deletions(-) diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java index ba04967..08e1be4 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java @@ -112,14 +112,31 @@ public class ActiveManager { case REQUEST_STATS: requestStats((ActiveStatsRequestMessage) message); break; + case GENERIC_EVENT: + deliverGenericEvent(message); + break; default: LOGGER.warn("Unknown message type received: " + message.getKind()); } } + private void deliverGenericEvent(ActiveManagerMessage message) throws HyracksDataException { + try { + ActiveRuntimeId runtimeId = message.getRuntimeId(); + IActiveRuntime runtime = runtimes.get(runtimeId); + if (runtime == null) { + LOGGER.warn("Request for a runtime {} that is not registered {}", runtimeId, message); + return; + } + runtime.handleGenericEvent(message); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + private void requestStats(ActiveStatsRequestMessage message) throws HyracksDataException { try { - ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload(); + ActiveRuntimeId runtimeId = message.getRuntimeId(); IActiveRuntime runtime = runtimes.get(runtimeId); long reqId = message.getReqId(); if (runtime == null) { @@ -168,7 +185,7 @@ public class ActiveManager { @SuppressWarnings("squid:S1181") // Catch Error private void stopRuntime(ActiveManagerMessage message) { StopRuntimeParameters content = (StopRuntimeParameters) message.getPayload(); - ActiveRuntimeId runtimeId = content.getRuntimeId(); + ActiveRuntimeId runtimeId = message.getRuntimeId(); IActiveRuntime runtime = runtimes.get(runtimeId); if (runtime == null) { LOGGER.warn("Request to stop runtime: " + runtimeId diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java index a52f01e..b8edc64 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java @@ -20,6 +20,7 @@ package org.apache.asterix.active; import java.util.concurrent.TimeUnit; +import org.apache.asterix.active.message.ActiveManagerMessage; import org.apache.hyracks.api.exceptions.HyracksDataException; public interface IActiveRuntime { @@ -48,4 +49,8 @@ public interface IActiveRuntime { default String getStats() { return "\"Runtime stats is not available.\""; } + + default void handleGenericEvent(ActiveManagerMessage event) throws HyracksDataException { + throw new IllegalStateException("generic events not supported for runtime " + getRuntimeId()); + } } diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java index 1a2af13..4d726cf 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java @@ -21,6 +21,7 @@ package org.apache.asterix.active.message; import java.io.Serializable; import org.apache.asterix.active.ActiveManager; +import org.apache.asterix.active.ActiveRuntimeId; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.messaging.CcIdentifiedMessage; import org.apache.asterix.common.messaging.api.INcAddressedMessage; @@ -29,15 +30,18 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public class ActiveManagerMessage extends CcIdentifiedMessage implements INcAddressedMessage { public enum Kind { STOP_ACTIVITY, - REQUEST_STATS + REQUEST_STATS, + GENERIC_EVENT } - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; private final Kind kind; + private final ActiveRuntimeId runtimeId; private final Serializable payload; - public ActiveManagerMessage(Kind kind, Serializable payload) { + public ActiveManagerMessage(Kind kind, ActiveRuntimeId runtimeId, Serializable payload) { this.kind = kind; + this.runtimeId = runtimeId; this.payload = payload; } @@ -45,6 +49,10 @@ public class ActiveManagerMessage extends CcIdentifiedMessage implements INcAddr return payload; } + public ActiveRuntimeId getRuntimeId() { + return runtimeId; + } + public Kind getKind() { return kind; } diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java index 117a68c..94668a0 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java @@ -18,14 +18,14 @@ */ package org.apache.asterix.active.message; -import java.io.Serializable; +import org.apache.asterix.active.ActiveRuntimeId; public class ActiveStatsRequestMessage extends ActiveManagerMessage { private static final long serialVersionUID = 1L; private final long reqId; - public ActiveStatsRequestMessage(Serializable payload, long reqId) { - super(Kind.REQUEST_STATS, payload); + public ActiveStatsRequestMessage(ActiveRuntimeId runtimeId, long reqId) { + super(Kind.REQUEST_STATS, runtimeId, null); this.reqId = reqId; } diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeParameters.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeParameters.java index fbc41a1..c21f06e 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeParameters.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeParameters.java @@ -21,25 +21,17 @@ package org.apache.asterix.active.message; import java.io.Serializable; import java.util.concurrent.TimeUnit; -import org.apache.asterix.active.ActiveRuntimeId; - public class StopRuntimeParameters implements Serializable { - private static final long serialVersionUID = 1L; - private final ActiveRuntimeId runtimeId; + private static final long serialVersionUID = 2L; private final long timeout; private final TimeUnit unit; - public StopRuntimeParameters(ActiveRuntimeId runtimeId, long timeout, TimeUnit unit) { - this.runtimeId = runtimeId; + public StopRuntimeParameters(long timeout, TimeUnit unit) { this.timeout = timeout; this.unit = unit; } - public ActiveRuntimeId getRuntimeId() { - return runtimeId; - } - public long getTimeout() { return timeout; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java index c90cde0..5f7d65e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java @@ -515,7 +515,7 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl } ActiveRuntimeId runtimeId = getActiveRuntimeId(partition++); messageBroker.sendApplicationMessageToNC(new ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY, - new StopRuntimeParameters(runtimeId, timeout, unit)), location); + runtimeId, new StopRuntimeParameters(timeout, unit)), location); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java index cb97526..0e6ddb2 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.function.LongSupplier; import java.util.function.Supplier; +import org.apache.asterix.active.message.ActiveManagerMessage; import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.FeedLogManager; @@ -95,4 +96,8 @@ public interface IRecordReader<T> extends Closeable { default LongSupplier getLineNumber() { return ExternalDataConstants.NO_LINES; } + + default void handleGenericEvent(ActiveManagerMessage event) { + throw new IllegalStateException("unexpected generic event " + event); + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java index b58e604..94d9e6e 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java @@ -21,6 +21,7 @@ package org.apache.asterix.external.dataflow; import java.io.Closeable; import java.io.IOException; +import org.apache.asterix.active.message.ActiveManagerMessage; import org.apache.asterix.external.api.IDataFlowController; import org.apache.asterix.external.util.FeedLogManager; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -57,4 +58,8 @@ public abstract class AbstractFeedDataFlowController implements IDataFlowControl feedLogManager.close(); } } + + public void handleGenericEvent(ActiveManagerMessage event) { + throw new IllegalStateException("unexpected generic event " + event); + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java index 56257a8..8cec5de 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java @@ -21,6 +21,7 @@ package org.apache.asterix.external.dataflow; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.asterix.active.message.ActiveManagerMessage; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.api.IRawRecord; @@ -280,4 +281,9 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl .append("}"); return str.toString(); } + + @Override + public void handleGenericEvent(ActiveManagerMessage event) { + recordReader.handleGenericEvent(event); + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java index 54e633a..fc9b727 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java @@ -21,6 +21,7 @@ package org.apache.asterix.external.dataset.adapter; import java.io.Closeable; import java.io.IOException; +import org.apache.asterix.active.message.ActiveManagerMessage; import org.apache.asterix.common.external.IDataSourceAdapter; import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; import org.apache.hyracks.api.comm.IFrameWriter; @@ -69,4 +70,8 @@ public class FeedAdapter implements IDataSourceAdapter, Closeable { public void close() throws IOException { controller.close(); } + + public void handleGenericEvent(ActiveManagerMessage event) { + controller.handleGenericEvent(event); + } }
