APEX-39 Added store and embeddable interfaces for app data which allow query operators to be embedded in a store. Also added detection of a store with an embeddable query operator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d748ed46 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d748ed46 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d748ed46 Branch: refs/heads/devel-3 Commit: d748ed46fb2ef91d74a70c2be52dc4a56bb4df71 Parents: aceaeeb Author: Timothy Farkas <[email protected]> Authored: Fri Aug 7 17:47:46 2015 -0700 Committer: David Yan <[email protected]> Committed: Fri Aug 28 14:19:55 2015 -0700 ---------------------------------------------------------------------- .../common/experimental/AppData.java | 53 +++++++++++++++++++- .../stram/StreamingContainerManager.java | 31 ++++++++++-- 2 files changed, 79 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d748ed46/common/src/main/java/com/datatorrent/common/experimental/AppData.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/experimental/AppData.java b/common/src/main/java/com/datatorrent/common/experimental/AppData.java index fbdc82c..22259c5 100644 --- a/common/src/main/java/com/datatorrent/common/experimental/AppData.java +++ b/common/src/main/java/com/datatorrent/common/experimental/AppData.java @@ -15,13 +15,15 @@ */ package com.datatorrent.common.experimental; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; - import org.apache.hadoop.classification.InterfaceStability; /** @@ -33,6 +35,55 @@ import org.apache.hadoop.classification.InterfaceStability; public interface AppData { /** + * This interface is for App Data stores which support embedding a query operator. + * @param <QUERY_TYPE> The type of the query tuple emitted by the embedded query operator. + */ + interface Store<QUERY_TYPE> extends Operator.ActivationListener<OperatorContext> + { + /** + * Gets the query connector which is used by the store operator to receive queries. If this method returns + * null then this Store should have a separate query operator connected to it. + * @return The query connector which is used by the store operator to receive queries. + */ + public EmbeddableQueryInfoProvider<QUERY_TYPE> getEmbeddableQueryInfoProvider(); + + /** + * Sets the query connector which is used by the store operator to receive queries. The store operator will call + * the {@link EmbeddableQueryInfoProvider#enableEmbeddedMode} method of the embeddable query operator before + * its {@link Operator#setup} method is called. + * @param embeddableQueryInfoProvider The query connector which is used by the store operator to receive queries. + */ + public void setEmbeddableQueryInfoProvider(EmbeddableQueryInfoProvider<QUERY_TYPE> embeddableQueryInfoProvider); + } + + /** + * This interface represents a query operator which can be embedded into an AppData data source. This operator could also + * be used as a standalone operator. The distinction between being used in a standalone or embedded context is made by + * the {@link EmbeddableQueryInfoProvider#enableEmbeddedMode} method. If this method is called at least once then the {@link EmbeddableQueryInfoProvider} + * will operate as if it were embedded in an {@link AppData.Store} operator. If this method is never called then the operator will behave as if + * it were a standalone operator.<br/><br/> + * <b>Note:</b> When an {@link EmbeddableQueryInfoProvider} is set on an {@link AppData.Store} then it's {@link EmbeddableQueryInfoProvider#enableEmbeddedMode} + * method is called before {@link Operator#setup}. + * @param <QUERY_TYPE> The type of the query emitted by the operator. + */ + interface EmbeddableQueryInfoProvider<QUERY_TYPE> extends Operator, ConnectionInfoProvider, Operator.ActivationListener<OperatorContext> + { + /** + * Gets the output port for queries. + * @return The output port for queries. + */ + public DefaultOutputPort<QUERY_TYPE> getOutputPort(); + + /** + * If this method is called at least once then this operator will work as if it were embedded in an {@link AppData.Store}. + * If this method is never called then this operator will behave as a standalone operator. When an {@link EmbeddableQueryInfoProvider} + * is set on an {@link AppData.Store} then the {@link AppData.Store} will call the {@link EmbeddableQueryInfoProvider#enableEmbeddedMode} + * method once before the {@link Operator.setup} is called. + */ + public void enableEmbeddedMode(); + } + + /** * This interface should be implemented by AppData Query and Result operators. */ interface ConnectionInfoProvider http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d748ed46/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 7002c1d..7944a4b 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -132,6 +132,7 @@ public class StreamingContainerManager implements PlanContext public final static String APP_META_FILENAME = "meta.json"; public final static String APP_META_KEY_ATTRIBUTES = "attributes"; public final static String APP_META_KEY_METRICS = "metrics"; + public static final String EMBEDDABLE_QUERY_NAME_SUFFIX = ".query"; public final static long LATENCY_WARNING_THRESHOLD_MILLIS = 10 * 60 * 1000; // 10 minutes public final static Recoverable SET_OPERATOR_PROPERTY = new SetOperatorProperty(); @@ -567,6 +568,22 @@ public class StreamingContainerManager implements PlanContext String queryUrl = null; String queryTopic = null; + boolean hasEmbeddedQuery = false; + + //Discover embeddable query connectors + if (operatorMeta.getOperator() instanceof AppData.Store<?>) { + AppData.Store<?> store = (AppData.Store<?>)operatorMeta.getOperator(); + AppData.EmbeddableQueryInfoProvider<?> embeddableQuery = store.getEmbeddableQueryInfoProvider(); + + if (embeddableQuery != null) { + hasEmbeddedQuery = true; + queryOperatorName = operatorMeta.getName() + EMBEDDABLE_QUERY_NAME_SUFFIX; + queryUrl = embeddableQuery.getAppDataURL(); + queryTopic = embeddableQuery.getTopic(); + } + } + + //Discover separate query operators LOG.warn("DEBUG: looking at operator {} {}", operatorMeta.getName(), Thread.currentThread().getId()); for (Map.Entry<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta> entry : inputStreams.entrySet()) { LogicalPlan.InputPortMeta portMeta = entry.getKey(); @@ -574,10 +591,16 @@ public class StreamingContainerManager implements PlanContext if (queryUrl == null) { OperatorMeta queryOperatorMeta = entry.getValue().getSource().getOperatorMeta(); if (queryOperatorMeta.getOperator() instanceof AppData.ConnectionInfoProvider) { - AppData.ConnectionInfoProvider queryOperator = (AppData.ConnectionInfoProvider) queryOperatorMeta.getOperator(); - queryOperatorName = queryOperatorMeta.getName(); - queryUrl = queryOperator.getAppDataURL(); - queryTopic = queryOperator.getTopic(); + if (!hasEmbeddedQuery) { + AppData.ConnectionInfoProvider queryOperator = (AppData.ConnectionInfoProvider)queryOperatorMeta.getOperator(); + queryOperatorName = queryOperatorMeta.getName(); + queryUrl = queryOperator.getAppDataURL(); + queryTopic = queryOperator.getTopic(); + } else { + LOG.warn("An embeddable query connector and the {} query operator were discovered. " + + "The query operator will be ignored and the embeddable query connector will be used instead.", + operatorMeta.getName()); + } } } else { LOG.warn("Multiple query ports found in operator {}. Ignoring the App Data Source.", operatorMeta.getName());
