Repository: incubator-apex-core Updated Branches: refs/heads/feature-module 507fac34b -> 68183f5f8
APEX-103 - Add module and dag interface in API Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/77c09aa1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/77c09aa1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/77c09aa1 Branch: refs/heads/feature-module Commit: 77c09aa168065b0f368c4488e90efa4bcd328e5b Parents: 507fac3 Author: Vlad Rozov <[email protected]> Authored: Wed Sep 23 20:30:51 2015 -0700 Committer: Vlad Rozov <[email protected]> Committed: Wed Sep 23 20:30:51 2015 -0700 ---------------------------------------------------------------------- api/src/main/java/com/datatorrent/api/DAG.java | 24 +++++++++++++++++++ .../main/java/com/datatorrent/api/Module.java | 25 ++++++++++++++++++++ .../stram/plan/logical/LogicalPlan.java | 24 +++++++++++++++++++ 3 files changed, 73 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/77c09aa1/api/src/main/java/com/datatorrent/api/DAG.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/DAG.java b/api/src/main/java/com/datatorrent/api/DAG.java index 9c6c492..35cb2c4 100644 --- a/api/src/main/java/com/datatorrent/api/DAG.java +++ b/api/src/main/java/com/datatorrent/api/DAG.java @@ -17,6 +17,8 @@ package com.datatorrent.api; import java.io.Serializable; +import org.apache.hadoop.classification.InterfaceStability; + import com.datatorrent.api.Context.DAGContext; /** @@ -154,6 +156,16 @@ public interface DAG extends DAGContext, Serializable public OutputPortMeta getMeta(Operator.OutputPort<?> port); } + @InterfaceStability.Evolving + interface ModuleMeta extends Serializable, Context + { + String getName(); + + InputPortMeta getMeta(Operator.InputPort<?> port); + OutputPortMeta getMeta(Operator.OutputPort<?> port); + + } + /** * Add new instance of operator under given name to the DAG. * The operator class must have a default constructor. @@ -176,6 +188,12 @@ public interface DAG extends DAGContext, Serializable */ public abstract <T extends Operator> T addOperator(String name, T operator); + @InterfaceStability.Evolving + <T extends Module> T addModule(String name, Class<T> moduleClass); + + @InterfaceStability.Evolving + <T extends Module> T addModule(String name, T module); + /** * <p>addStream.</p> * @param id Identifier of the stream that will be used to identify stream in DAG @@ -253,9 +271,15 @@ public interface DAG extends DAGContext, Serializable */ public abstract OperatorMeta getOperatorMeta(String operatorId); + @InterfaceStability.Evolving + ModuleMeta getModuleMeta(String moduleId); + /** * <p>getMeta.</p> */ public abstract OperatorMeta getMeta(Operator operator); + @InterfaceStability.Evolving + ModuleMeta getMeta(Module module); + } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/77c09aa1/api/src/main/java/com/datatorrent/api/Module.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Module.java b/api/src/main/java/com/datatorrent/api/Module.java new file mode 100644 index 0000000..24a02a3 --- /dev/null +++ b/api/src/main/java/com/datatorrent/api/Module.java @@ -0,0 +1,25 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.api; + +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; + [email protected] +public interface Module +{ + void populateDAG(DAG dag, Configuration conf); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/77c09aa1/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 94d18ba..ad4c8bb 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -1075,6 +1075,18 @@ public class LogicalPlan implements Serializable, DAG return operator; } + @Override + public <T extends Module> T addModule(String name, Class<T> moduleClass) + { + throw new UnsupportedOperationException("Modules are not supported"); + } + + @Override + public <T extends Module> T addModule(String name, T module) + { + throw new UnsupportedOperationException("Modules are not supported"); + } + public void removeOperator(Operator operator) { OperatorMeta om = getMeta(operator); @@ -1227,6 +1239,12 @@ public class LogicalPlan implements Serializable, DAG } @Override + public ModuleMeta getModuleMeta(String moduleName) + { + throw new UnsupportedOperationException("Modules are not supported"); + } + + @Override public OperatorMeta getMeta(Operator operator) { // TODO: cache mapping @@ -1238,6 +1256,12 @@ public class LogicalPlan implements Serializable, DAG throw new IllegalArgumentException("Operator not associated with the DAG: " + operator); } + @Override + public ModuleMeta getMeta(Module module) + { + throw new UnsupportedOperationException("Modules are not supported"); + } + public int getMaxContainerCount() { return this.getValue(CONTAINERS_MAX_COUNT);
