APEX-103 - Add module and dag interface in API
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/2f1e1dfe Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/2f1e1dfe Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/2f1e1dfe Branch: refs/heads/master Commit: 2f1e1dfeb400bd7f01275a95522766abe909d936 Parents: 0d5bfa5 Author: Vlad Rozov <[email protected]> Authored: Wed Sep 23 20:30:51 2015 -0700 Committer: Tushar R. Gosavi <[email protected]> Committed: Mon Dec 21 23:41:04 2015 +0530 ---------------------------------------------------------------------- api/src/main/java/com/datatorrent/api/DAG.java | 24 +++++++++++++++++ .../main/java/com/datatorrent/api/Module.java | 28 ++++++++++++++++++++ .../stram/plan/logical/LogicalPlan.java | 24 +++++++++++++++++ 3 files changed, 76 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/2f1e1dfe/api/src/main/java/com/datatorrent/api/DAG.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/DAG.java b/api/src/main/java/com/datatorrent/api/DAG.java index 6b1d1b2..abe2954 100644 --- a/api/src/main/java/com/datatorrent/api/DAG.java +++ b/api/src/main/java/com/datatorrent/api/DAG.java @@ -20,6 +20,8 @@ package com.datatorrent.api; import java.io.Serializable; +import org.apache.hadoop.classification.InterfaceStability; + import com.datatorrent.api.Context.DAGContext; /** @@ -157,6 +159,16 @@ public interface DAG extends DAGContext, Serializable public OutputPortMeta getMeta(Operator.OutputPort<?> port); } + @InterfaceStability.Evolving + interface ModuleMeta extends Serializable, Context + { + String getName(); + + InputPortMeta getMeta(Operator.InputPort<?> port); + + OutputPortMeta getMeta(Operator.OutputPort<?> port); + } + /** * Add new instance of operator under given name to the DAG. * The operator class must have a default constructor. @@ -179,6 +191,12 @@ public interface DAG extends DAGContext, Serializable */ public abstract <T extends Operator> T addOperator(String name, T operator); + @InterfaceStability.Evolving + <T extends Module> T addModule(String name, Class<T> moduleClass); + + @InterfaceStability.Evolving + <T extends Module> T addModule(String name, T module); + /** * <p>addStream.</p> * @param id Identifier of the stream that will be used to identify stream in DAG @@ -256,9 +274,15 @@ public interface DAG extends DAGContext, Serializable */ public abstract OperatorMeta getOperatorMeta(String operatorId); + @InterfaceStability.Evolving + ModuleMeta getModuleMeta(String moduleId); + /** * <p>getMeta.</p> */ public abstract OperatorMeta getMeta(Operator operator); + @InterfaceStability.Evolving + ModuleMeta getMeta(Module module); + } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/2f1e1dfe/api/src/main/java/com/datatorrent/api/Module.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Module.java b/api/src/main/java/com/datatorrent/api/Module.java new file mode 100644 index 0000000..1220fc1 --- /dev/null +++ b/api/src/main/java/com/datatorrent/api/Module.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.api; + +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; + [email protected] +public interface Module +{ + void populateDAG(DAG dag, Configuration conf); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/2f1e1dfe/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 6405644..cca45d8 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -1079,6 +1079,18 @@ public class LogicalPlan implements Serializable, DAG return operator; } + @Override + public <T extends Module> T addModule(String name, Class<T> moduleClass) + { + throw new UnsupportedOperationException("Modules are not supported"); + } + + @Override + public <T extends Module> T addModule(String name, T module) + { + throw new UnsupportedOperationException("Modules are not supported"); + } + public void removeOperator(Operator operator) { OperatorMeta om = getMeta(operator); @@ -1231,6 +1243,12 @@ public class LogicalPlan implements Serializable, DAG } @Override + public ModuleMeta getModuleMeta(String moduleName) + { + throw new UnsupportedOperationException("Modules are not supported"); + } + + @Override public OperatorMeta getMeta(Operator operator) { // TODO: cache mapping @@ -1242,6 +1260,12 @@ public class LogicalPlan implements Serializable, DAG throw new IllegalArgumentException("Operator not associated with the DAG: " + operator); } + @Override + public ModuleMeta getMeta(Module module) + { + throw new UnsupportedOperationException("Modules are not supported"); + } + public int getMaxContainerCount() { return this.getValue(CONTAINERS_MAX_COUNT);
