Repository: apex-core Updated Branches: refs/heads/master 16d1bf62d -> 5f95ee0e9
APEXCORE-649 Infrastructure for user define stram event listeners. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/5f95ee0e Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/5f95ee0e Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/5f95ee0e Branch: refs/heads/master Commit: 5f95ee0e954fb505f4a0022fdf9ecdc0a80df6ca Parents: 16d1bf6 Author: Tushar R. Gosavi <[email protected]> Authored: Tue Mar 21 12:39:43 2017 +0530 Committer: Tushar R. Gosavi <[email protected]> Committed: Fri Mar 24 13:57:02 2017 +0530 ---------------------------------------------------------------------- .../java/org/apache/apex/api/ApexPlugin.java | 29 +++ .../org/apache/apex/api/ApexPluginContext.java | 29 +++ .../org/apache/apex/api/DAGSetupPlugin.java | 6 +- .../stram/StreamingAppMasterService.java | 23 ++- .../stram/StreamingContainerManager.java | 18 ++ .../plan/logical/DAGSetupPluginManager.java | 21 +- .../apex/engine/api/DAGExecutionPlugin.java | 44 +++++ .../engine/api/DAGExecutionPluginContext.java | 89 +++++++++ .../apache/apex/engine/api/PluginLocator.java | 39 ++++ .../AbstractDAGExecutionPluginContext.java | 139 ++++++++++++++ .../engine/plugin/ApexPluginDispatcher.java | 27 +++ .../apex/engine/plugin/ApexPluginManager.java | 191 +++++++++++++++++++ .../plugin/DefaultApexPluginDispatcher.java | 123 ++++++++++++ .../engine/plugin/NoOpApexPluginDispatcher.java | 36 ++++ .../plugin/loaders/ChainedPluginLocator.java | 58 ++++++ .../loaders/PropertyBasedPluginLocator.java | 65 +++++++ .../ServiceLoaderBasedPluginLocator.java | 48 +++++ .../plugin/loaders/StaticPluginLocator.java | 46 +++++ .../apache/apex/engine/plugin/DebugPlugin.java | 114 +++++++++++ .../apache/apex/engine/plugin/NoOpPlugin.java | 42 ++++ .../apache/apex/engine/plugin/PluginTests.java | 130 +++++++++++++ ...rg.apache.apex.engine.api.DAGExecutionPlugin | 19 ++ 22 files changed, 1314 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/api/src/main/java/org/apache/apex/api/ApexPlugin.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/apex/api/ApexPlugin.java b/api/src/main/java/org/apache/apex/api/ApexPlugin.java new file mode 100644 index 0000000..b9a8b78 --- /dev/null +++ b/api/src/main/java/org/apache/apex/api/ApexPlugin.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.api; + +import com.datatorrent.api.Component; + +/** + * Marker interface for ApexPlugins. + * @param <T> + */ +public interface ApexPlugin<T extends ApexPluginContext> extends Component<T> +{ +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/api/src/main/java/org/apache/apex/api/ApexPluginContext.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/apex/api/ApexPluginContext.java b/api/src/main/java/org/apache/apex/api/ApexPluginContext.java new file mode 100644 index 0000000..1b72f63 --- /dev/null +++ b/api/src/main/java/org/apache/apex/api/ApexPluginContext.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.api; + +import com.datatorrent.api.Context; + +/** + * Marker interface for Context used by ApexPlugins. Plugin interfaces with + * the Apex through the context. + */ +public interface ApexPluginContext extends Context +{ +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java b/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java index d2e7199..6c54bed 100644 --- a/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java +++ b/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java @@ -25,8 +25,6 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.Attribute; -import com.datatorrent.api.Component; -import com.datatorrent.api.Context; import com.datatorrent.api.DAG; /** @@ -43,7 +41,7 @@ import com.datatorrent.api.DAG; * </ul> */ @InterfaceStability.Evolving -public interface DAGSetupPlugin extends Component<DAGSetupPlugin.DAGSetupPluginContext>, Serializable +public interface DAGSetupPlugin extends ApexPlugin<DAGSetupPlugin.DAGSetupPluginContext>, Serializable { /** @@ -89,7 +87,7 @@ public interface DAGSetupPlugin extends Component<DAGSetupPlugin.DAGSetupPluginC */ void postValidateDAG(); - public static class DAGSetupPluginContext implements Context + public static class DAGSetupPluginContext implements ApexPluginContext { private final DAG dag; private final Configuration conf; http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index c0e09ab..a885a49 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -40,6 +40,14 @@ import javax.xml.bind.annotation.XmlElement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.apex.engine.api.DAGExecutionPlugin; +import org.apache.apex.engine.api.PluginLocator; +import org.apache.apex.engine.plugin.ApexPluginDispatcher; +import org.apache.apex.engine.plugin.DefaultApexPluginDispatcher; +import org.apache.apex.engine.plugin.loaders.ChainedPluginLocator; +import org.apache.apex.engine.plugin.loaders.PropertyBasedPluginLocator; +import org.apache.apex.engine.plugin.loaders.ServiceLoaderBasedPluginLocator; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.tuple.MutablePair; @@ -151,6 +159,7 @@ public class StreamingAppMasterService extends CompositeService private final ClusterAppStats stats = new ClusterAppStats(); private StramDelegationTokenManager delegationTokenManager = null; private AppDataPushAgent appDataPushAgent; + private ApexPluginDispatcher apexPluginDispatcher; public StreamingAppMasterService(ApplicationAttemptId appAttemptID) { @@ -575,10 +584,22 @@ public class StreamingAppMasterService extends CompositeService this.appDataPushAgent = new AppDataPushAgent(dnmgr, appContext); addService(this.appDataPushAgent); } - // initialize all services added above + initApexPluginDispatcher(); + + // Initialize all services added above super.serviceInit(conf); } + public static final String PLUGINS_CONF_KEY = "apex.plugin.stram.plugins"; + private void initApexPluginDispatcher() + { + PluginLocator<DAGExecutionPlugin> locator = new ChainedPluginLocator<>(new ServiceLoaderBasedPluginLocator<>(DAGExecutionPlugin.class), + new PropertyBasedPluginLocator<>(DAGExecutionPlugin.class, PLUGINS_CONF_KEY)); + apexPluginDispatcher = new DefaultApexPluginDispatcher(locator, appContext, dnmgr, stats); + dnmgr.apexPluginDispatcher = apexPluginDispatcher; + addService(apexPluginDispatcher); + } + @Override protected void serviceStart() throws Exception { http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index ee07af1..f229e80 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -65,6 +65,8 @@ import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.engine.plugin.ApexPluginDispatcher; +import org.apache.apex.engine.plugin.NoOpApexPluginDispatcher; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.ToStringBuilder; @@ -175,6 +177,10 @@ import com.datatorrent.stram.webapp.StreamInfo; import net.engio.mbassy.bus.MBassador; import net.engio.mbassy.bus.config.BusConfiguration; +import static org.apache.apex.engine.api.DAGExecutionPluginContext.COMMIT_EVENT; +import static org.apache.apex.engine.api.DAGExecutionPluginContext.HEARTBEAT; +import static org.apache.apex.engine.api.DAGExecutionPluginContext.STRAM_EVENT; + /** * Tracks topology provisioning/allocation to containers<p> * <br> @@ -231,6 +237,7 @@ public class StreamingContainerManager implements PlanContext private final ConcurrentSkipListMap<Long, Map<Integer, EndWindowStats>> endWindowStatsOperatorMap = new ConcurrentSkipListMap<>(); private final ConcurrentMap<PTOperator, PTOperator> slowestUpstreamOp = new ConcurrentHashMap<>(); private long committedWindowId; + private long lastCommittedWindowId = Checkpoint.INITIAL_CHECKPOINT.getWindowId(); // (operator id, port name) to timestamp private final Map<Pair<Integer, String>, Long> operatorPortLastEndWindowTimestamps = Maps.newConcurrentMap(); private final Map<Integer, Long> operatorLastEndWindowTimestamps = Maps.newConcurrentMap(); @@ -252,6 +259,7 @@ public class StreamingContainerManager implements PlanContext //logical operator name to latest counters. exists for backward compatibility. private final Map<String, Object> latestLogicalCounters = Maps.newHashMap(); + public transient ApexPluginDispatcher apexPluginDispatcher = new NoOpApexPluginDispatcher(); private final LinkedHashMap<String, ContainerInfo> completedContainers = new LinkedHashMap<String, ContainerInfo>() { @@ -807,6 +815,10 @@ public class StreamingContainerManager implements PlanContext processEvents(); committedWindowId = updateCheckpoints(waitForRecovery); + if (lastCommittedWindowId != committedWindowId) { + apexPluginDispatcher.dispatch(COMMIT_EVENT, committedWindowId); + lastCommittedWindowId = committedWindowId; + } calculateEndWindowStats(); if (this.vars.enableStatsRecording) { recordStats(currentTms); @@ -1802,6 +1814,7 @@ public class StreamingContainerManager implements PlanContext rsp.stackTraceRequired = sca.stackTraceRequested; sca.stackTraceRequested = false; + apexPluginDispatcher.dispatch(HEARTBEAT, heartbeat); return rsp; } @@ -2394,6 +2407,7 @@ public class StreamingContainerManager implements PlanContext @Override public void recordEventAsync(StramEvent ev) { + apexPluginDispatcher.dispatch(STRAM_EVENT, ev); if (eventBus != null) { eventBus.publishAsync(ev); } @@ -3299,4 +3313,8 @@ public class StreamingContainerManager implements PlanContext return latestLogicalCounters.get(operatorName); } + public void setApexPluginDispatcher(ApexPluginDispatcher manager) + { + this.apexPluginDispatcher = manager; + } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java index ad37071..2f1a904 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java @@ -24,10 +24,9 @@ import java.util.List; import org.slf4j.Logger; import org.apache.apex.api.DAGSetupPlugin; +import org.apache.apex.engine.plugin.loaders.PropertyBasedPluginLocator; import org.apache.hadoop.conf.Configuration; -import com.datatorrent.stram.StramUtils; - import static org.slf4j.LoggerFactory.getLogger; public class DAGSetupPluginManager @@ -37,7 +36,7 @@ public class DAGSetupPluginManager private final transient List<DAGSetupPlugin> plugins = new ArrayList<>(); private Configuration conf; - public static final String DAGSETUP_PLUGINS_CONF_KEY = "org.apache.apex.api"; + public static final String DAGSETUP_PLUGINS_CONF_KEY = "apex.plugin.dag.setup"; private DAGSetupPlugin.DAGSetupPluginContext contex; private void loadVisitors(Configuration conf) @@ -47,20 +46,8 @@ public class DAGSetupPluginManager return; } - String classNamesStr = conf.get(DAGSETUP_PLUGINS_CONF_KEY); - if (classNamesStr == null) { - return; - } - String[] classNames = classNamesStr.split(","); - for (String className : classNames) { - try { - Class<? extends DAGSetupPlugin> plugin = StramUtils.classForName(className, DAGSetupPlugin.class); - plugins.add(StramUtils.newInstance(plugin)); - LOG.info("Found DAG setup plugin {}", plugin); - } catch (IllegalArgumentException e) { - LOG.warn("Could not load plugin {}", className); - } - } + PropertyBasedPluginLocator<DAGSetupPlugin> locator = new PropertyBasedPluginLocator<>(DAGSetupPlugin.class, DAGSETUP_PLUGINS_CONF_KEY); + this.plugins.addAll(locator.discoverPlugins(conf)); } public void setup(DAGSetupPlugin.DAGSetupPluginContext context) http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/api/DAGExecutionPlugin.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/api/DAGExecutionPlugin.java b/engine/src/main/java/org/apache/apex/engine/api/DAGExecutionPlugin.java new file mode 100644 index 0000000..5a3b5b9 --- /dev/null +++ b/engine/src/main/java/org/apache/apex/engine/api/DAGExecutionPlugin.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine.api; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Component; + +/** + * An Apex plugin is a user code which runs inside Stram. The interaction + * between plugin and Stram is managed by DAGExecutionPluginContext. Plugin can register to handle event in interest + * with callback handler using ${@link DAGExecutionPluginContext#register(DAGExecutionPluginContext.RegistrationType, DAGExecutionPluginContext.Handler)} + * + * Following events are supported + * <ul> + * <li>{@see DAGExecutionPluginContext.HEARTBEAT} The heartbeat from a container is delivered to the plugin after it has been handled by stram</li> + * <li>{@see DAGExecutionPluginContext.STRAM_EVENT} All the Stram event generated in Stram will be delivered to the plugin</li> + * <li>{@see DAGExecutionPluginContext.COMMIT_EVENT} When committedWindowId changes in the platform an event will be delivered to the plugin</li> + * </ul> + * + * A plugin should register a single handler for an event, In case multiple handlers are registered for an event, + * then the last registered handler will be used. Plugin should cleanup additional resources created by it during shutdown + * such as helper threads and open files. + */ [email protected] +public interface DAGExecutionPlugin extends Component<DAGExecutionPluginContext> +{ +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/api/DAGExecutionPluginContext.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/api/DAGExecutionPluginContext.java b/engine/src/main/java/org/apache/apex/engine/api/DAGExecutionPluginContext.java new file mode 100644 index 0000000..dc3153e --- /dev/null +++ b/engine/src/main/java/org/apache/apex/engine/api/DAGExecutionPluginContext.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine.api; + +import java.util.List; +import java.util.Map; +import java.util.Queue; + +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StatsListener.BatchedOperatorStats; +import com.datatorrent.common.util.Pair; +import com.datatorrent.stram.StramAppContext; +import com.datatorrent.stram.api.StramEvent; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol; +import com.datatorrent.stram.webapp.AppInfo; +import com.datatorrent.stram.webapp.LogicalOperatorInfo; + +/** + * An Apex plugin is a user code which runs inside Stram. The interaction + * between plugin and Stram is managed by DAGExecutionPluginContext. Plugin can register to handle event in interest + * with callback handler using ${@link DAGExecutionPluginContext#register(DAGExecutionPluginContext.RegistrationType, DAGExecutionPluginContext.Handler)} + * + * Following events are supported + * <ul> + * <li>{@see DAGExecutionPluginContext.HEARTBEAT} The heartbeat from a container is delivered to the plugin after it has been handled by stram</li> + * <li>{@see DAGExecutionPluginContext.STRAM_EVENT} All the Stram event generated in Stram will be delivered to the plugin</li> + * <li>{@see DAGExecutionPluginContext.COMMIT_EVENT} When committedWindowId changes in the platform an event will be delivered to the plugin</li> + * </ul> + * + * A plugin should register a single handler for an event, In case multiple handlers are registered for an event, + * then the last registered handler will be used. Plugin should cleanup additional resources created by it during shutdown + * such as helper threads and open files. + */ [email protected] +public interface DAGExecutionPluginContext extends Context +{ + class RegistrationType<T> + { + } + + RegistrationType<StreamingContainerUmbilicalProtocol.ContainerHeartbeat> HEARTBEAT = new RegistrationType<>(); + RegistrationType<StramEvent> STRAM_EVENT = new RegistrationType<>(); + RegistrationType<Long> COMMIT_EVENT = new RegistrationType<>(); + + <T> void register(RegistrationType<T> type, Handler<T> handler); + + interface Handler<T> + { + void handle(T data); + } + + public StramAppContext getApplicationContext(); + + public AppInfo.AppStats getApplicationStats(); + + public Configuration getLaunchConfig(); + + public DAG getDAG(); + + public String getOperatorName(int id); + + public BatchedOperatorStats getPhysicalOperatorStats(int id); + + public List<LogicalOperatorInfo> getLogicalOperatorInfoList(); + + public Queue<Pair<Long, Map<String, Object>>> getWindowMetrics(String operatorName); + + public long windowIdToMillis(long windowId); +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/api/PluginLocator.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/api/PluginLocator.java b/engine/src/main/java/org/apache/apex/engine/api/PluginLocator.java new file mode 100644 index 0000000..e0f70be --- /dev/null +++ b/engine/src/main/java/org/apache/apex/engine/api/PluginLocator.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine.api; + +import java.util.Collection; + +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; + +/** + * Interface to discover plugins during Apex Master initialization. This should return collection of + * objects implementing an interface of type T. + */ [email protected] +public interface PluginLocator<T> +{ + /** + * Discover list of apex plugins. + * + * @return list of apex plugins. + */ + Collection<T> discoverPlugins(Configuration conf); +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java new file mode 100644 index 0000000..a92b57b --- /dev/null +++ b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine.plugin; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Queue; + +import org.apache.apex.engine.api.DAGExecutionPluginContext; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StatsListener; +import com.datatorrent.common.util.Pair; +import com.datatorrent.stram.StramAppContext; +import com.datatorrent.stram.StreamingContainerManager; +import com.datatorrent.stram.plan.physical.PTOperator; +import com.datatorrent.stram.webapp.AppInfo; +import com.datatorrent.stram.webapp.LogicalOperatorInfo; + +public abstract class AbstractDAGExecutionPluginContext implements DAGExecutionPluginContext +{ + private final StreamingContainerManager dnmgr; + private final Configuration launchConf; + private final StramAppContext appContext; + private final AppInfo.AppStats stats; + + public AbstractDAGExecutionPluginContext(StramAppContext appContext, StreamingContainerManager dnmgr, AppInfo.AppStats stats, Configuration launcConf) + { + this.appContext = appContext; + this.dnmgr = dnmgr; + this.launchConf = launcConf; + this.stats = stats; + } + + @Override + public StramAppContext getApplicationContext() + { + return appContext; + } + + @Override + public AppInfo.AppStats getApplicationStats() + { + return stats; + } + + @Override + public DAG getDAG() + { + return dnmgr.getLogicalPlan(); + } + + @Override + public String getOperatorName(int id) + { + PTOperator ptOperator = dnmgr.getPhysicalPlan().getAllOperators().get(id); + if (ptOperator != null) { + return ptOperator.getName(); + } + return null; + } + + @Override + public Configuration getLaunchConfig() + { + return launchConf; + } + + @Override + public StatsListener.BatchedOperatorStats getPhysicalOperatorStats(int id) + { + PTOperator ptOperator = dnmgr.getPhysicalPlan().getAllOperators().get(id); + if (ptOperator != null) { + return ptOperator.stats; + } + return null; + } + + @Override + public List<LogicalOperatorInfo> getLogicalOperatorInfoList() + { + return dnmgr.getLogicalOperatorInfoList(); + } + + @Override + public Queue<Pair<Long, Map<String, Object>>> getWindowMetrics(String operatorName) + { + return dnmgr.getWindowMetrics(operatorName); + } + + @Override + public long windowIdToMillis(long windowId) + { + return dnmgr.windowIdToMillis(windowId); + } + + @Override + public Attribute.AttributeMap getAttributes() + { + return appContext.getAttributes(); + } + + @Override + public <T> T getValue(Attribute<T> key) + { + return appContext.getValue(key); + } + + @Override + public void setCounters(Object counters) + { + appContext.setCounters(counters); + } + + @Override + public void sendMetrics(Collection<String> metricNames) + { + appContext.sendMetrics(metricNames); + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java new file mode 100644 index 0000000..62dd255 --- /dev/null +++ b/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine.plugin; + +import org.apache.apex.engine.api.DAGExecutionPluginContext.RegistrationType; +import org.apache.hadoop.service.Service; + +public interface ApexPluginDispatcher extends Service +{ + <T> void dispatch(RegistrationType<T> registrationType, T data); +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginManager.java b/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginManager.java new file mode 100644 index 0000000..190cb6b --- /dev/null +++ b/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginManager.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine.plugin; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.engine.api.DAGExecutionPlugin; +import org.apache.apex.engine.api.DAGExecutionPluginContext.Handler; +import org.apache.apex.engine.api.DAGExecutionPluginContext.RegistrationType; +import org.apache.apex.engine.api.PluginLocator; +import org.apache.commons.digester.plugins.PluginContext; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import com.google.common.collect.Lists; + +import com.datatorrent.stram.StramAppContext; +import com.datatorrent.stram.StreamingContainerManager; +import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.webapp.AppInfo; + +/** + * A default implementation for ApexPluginDispatcher. It handler common tasks such as per handler + * registration. actual dispatching is left for classes extending from it. + */ +public abstract class ApexPluginManager extends AbstractService +{ + private static final Logger LOG = LoggerFactory.getLogger(ApexPluginManager.class); + protected final Collection<DAGExecutionPlugin> plugins = Lists.newArrayList(); + protected final StramAppContext appContext; + protected final StreamingContainerManager dmgr; + private final PluginLocator locator; + private final AppInfo.AppStats stats; + protected Configuration launchConfig; + protected FileContext fileContext; + protected final Map<DAGExecutionPlugin, PluginInfo> pluginInfoMap = new HashMap<>(); + protected PluginContext pluginContext; + + public ApexPluginManager(PluginLocator locator, StramAppContext context, StreamingContainerManager dmgr, AppInfo.AppStats stats) + { + super(ApexPluginManager.class.getName()); + this.locator = locator; + this.appContext = context; + this.dmgr = dmgr; + this.stats = stats; + LOG.debug("Creating apex service "); + } + + private Configuration readLaunchConfiguration() throws IOException + { + Path appPath = new Path(appContext.getApplicationPath()); + Path configFilePath = new Path(appPath, LogicalPlan.LAUNCH_CONFIG_FILE_NAME); + try { + LOG.debug("Reading launch configuration file "); + URI uri = appPath.toUri(); + Configuration config = new YarnConfiguration(); + fileContext = uri.getScheme() == null ? FileContext.getFileContext(config) : FileContext.getFileContext(uri, config); + FSDataInputStream is = fileContext.open(configFilePath); + config.addResource(is); + LOG.debug("Read launch configuration"); + return config; + } catch (FileNotFoundException ex) { + LOG.warn("Configuration file not found {}", configFilePath); + return new Configuration(); + } + } + + @Override + protected void serviceInit(Configuration conf) throws Exception + { + super.serviceInit(conf); + this.launchConfig = readLaunchConfiguration(); + if (locator != null) { + Collection<DAGExecutionPlugin> plugins = locator.discoverPlugins(this.launchConfig); + if (plugins != null) { + this.plugins.addAll(plugins); + for (DAGExecutionPlugin plugin : plugins) { + LOG.info("Detected plugin {}", plugin); + } + } + } + + for (DAGExecutionPlugin plugin : plugins) { + plugin.setup(new PluginManagerImpl(plugin)); + } + } + + @Override + protected void serviceStop() throws Exception + { + for (DAGExecutionPlugin plugin : plugins) { + plugin.teardown(); + } + super.serviceStop(); + } + + /** + * Keeps information about plugin and its registrations. Dispatcher use this + * information while delivering events to plugin. + */ + class PluginInfo + { + private final DAGExecutionPlugin plugin; + private final Map<RegistrationType<?>, Handler<?>> registrationMap = new HashMap<>(); + + <T> void put(RegistrationType<T> registrationType, Handler<T> handler) + { + registrationMap.put(registrationType, handler); + } + + <T> Handler<T> get(RegistrationType<T> registrationType) + { + return (Handler<T>)registrationMap.get(registrationType); + } + + public PluginInfo(DAGExecutionPlugin plugin) + { + this.plugin = plugin; + } + + public DAGExecutionPlugin getPlugin() + { + return plugin; + } + } + + PluginInfo getPluginInfo(DAGExecutionPlugin plugin) + { + PluginInfo pInfo = pluginInfoMap.get(plugin); + if (pInfo == null) { + pInfo = new PluginInfo(plugin); + pluginInfoMap.put(plugin, pInfo); + } + return pInfo; + } + + public <T> void register(RegistrationType<T> type, Handler<T> handler, DAGExecutionPlugin owner) + { + PluginInfo pInfo = getPluginInfo(owner); + pInfo.put(type, handler); + } + + /** + * A wrapper PluginManager to track registration from a plugin. with this plugin + * don't need to pass explicit owner argument during registration. + */ + class PluginManagerImpl extends AbstractDAGExecutionPluginContext + { + private final DAGExecutionPlugin owner; + + PluginManagerImpl(DAGExecutionPlugin plugin) + { + super(appContext, dmgr, stats, launchConfig); + this.owner = plugin; + } + + @Override + public <T> void register(RegistrationType<T> type, Handler<T> handler) + { + ApexPluginManager.this.register(type, handler, owner); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java new file mode 100644 index 0000000..0c30943 --- /dev/null +++ b/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine.plugin; + +import java.util.NoSuchElementException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.engine.api.DAGExecutionPluginContext.Handler; +import org.apache.apex.engine.api.DAGExecutionPluginContext.RegistrationType; +import org.apache.apex.engine.api.PluginLocator; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.common.util.NameableThreadFactory; +import com.datatorrent.stram.StramAppContext; +import com.datatorrent.stram.StreamingContainerManager; +import com.datatorrent.stram.webapp.AppInfo; + +/** + * Handle dispatching of events from Stram to Plugins. This implementation creates an executor + * service to process the event asynchronously. A separate task {@link DefaultApexPluginDispatcher.ProcessEventTask} + * is created to process an event and then submitted to the executor for execution. + */ +public class DefaultApexPluginDispatcher extends ApexPluginManager implements ApexPluginDispatcher +{ + private static final Logger LOG = LoggerFactory.getLogger(DefaultApexPluginDispatcher.class); + private int qsize = 4098; + private ArrayBlockingQueue<Runnable> blockingQueue; + private ExecutorService executorService; + + public DefaultApexPluginDispatcher(PluginLocator locator, StramAppContext context, StreamingContainerManager dmgr, AppInfo.AppStats stats) + { + super(locator, context, dmgr, stats); + } + + @Override + public <T> void dispatch(RegistrationType<T> registrationType, T data) + { + if (executorService != null) { + executorService.submit(new ProcessEventTask<>(registrationType, data)); + } + } + + @Override + protected void serviceInit(Configuration conf) throws Exception + { + super.serviceInit(conf); + LOG.debug("Creating plugin dispatch queue with size {}", qsize); + blockingQueue = new ArrayBlockingQueue<>(qsize); + RejectedExecutionHandler rejectionHandler = new RejectedExecutionHandler() + { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) + { + try { + blockingQueue.remove(); + executor.submit(r); + } catch (NoSuchElementException ex) { + // Ignore no-such element as queue may finish, while this handler is called. + } + } + }; + + executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, + blockingQueue, new NameableThreadFactory("PluginExecutorThread"), rejectionHandler); + } + + @Override + protected void serviceStop() throws Exception + { + executorService.shutdownNow(); + executorService.awaitTermination(10, TimeUnit.SECONDS); + if (!executorService.isTerminated()) { + LOG.warn("Executor service still active for plugins"); + } + executorService = null; + } + + private class ProcessEventTask<T> implements Runnable + { + private final RegistrationType<T> registrationType; + private final T data; + + public ProcessEventTask(RegistrationType<T> type, T data) + { + this.registrationType = type; + this.data = data; + } + + @Override + public void run() + { + for (final PluginInfo pInfo : pluginInfoMap.values()) { + final Handler<T> handler = pInfo.get(registrationType); + if (handler != null) { + handler.handle(data); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/plugin/NoOpApexPluginDispatcher.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/NoOpApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/NoOpApexPluginDispatcher.java new file mode 100644 index 0000000..11eb5d1 --- /dev/null +++ b/engine/src/main/java/org/apache/apex/engine/plugin/NoOpApexPluginDispatcher.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine.plugin; + +import org.apache.apex.engine.api.DAGExecutionPluginContext.RegistrationType; +import org.apache.hadoop.service.AbstractService; + +public class NoOpApexPluginDispatcher extends AbstractService implements ApexPluginDispatcher +{ + public NoOpApexPluginDispatcher() + { + super(NoOpApexPluginDispatcher.class.getName()); + } + + @Override + public <T> void dispatch(RegistrationType<T> registrationType, T data) + { + + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/plugin/loaders/ChainedPluginLocator.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/loaders/ChainedPluginLocator.java b/engine/src/main/java/org/apache/apex/engine/plugin/loaders/ChainedPluginLocator.java new file mode 100644 index 0000000..42d4dc4 --- /dev/null +++ b/engine/src/main/java/org/apache/apex/engine/plugin/loaders/ChainedPluginLocator.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine.plugin.loaders; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.engine.api.PluginLocator; +import org.apache.hadoop.conf.Configuration; + +public class ChainedPluginLocator<T> implements PluginLocator<T> +{ + private static final Logger LOG = LoggerFactory.getLogger(ChainedPluginLocator.class); + List<PluginLocator> locators = new ArrayList<>(); + + public ChainedPluginLocator(PluginLocator<T>... locators) + { + for (PluginLocator locator : locators) { + this.locators.add(locator); + } + } + + @Override + public Collection<T> discoverPlugins(Configuration conf) + { + List<T> plugins = new ArrayList<>(); + + for (PluginLocator<T> locator : locators) { + Collection<T> currentPlugins = locator.discoverPlugins(conf); + if (currentPlugins != null) { + LOG.info("Loader {} detected {} plugins ", locator.getClass().getName(), currentPlugins.size()); + plugins.addAll(currentPlugins); + } + } + + return plugins; + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/plugin/loaders/PropertyBasedPluginLocator.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/loaders/PropertyBasedPluginLocator.java b/engine/src/main/java/org/apache/apex/engine/plugin/loaders/PropertyBasedPluginLocator.java new file mode 100644 index 0000000..b9fc2a5 --- /dev/null +++ b/engine/src/main/java/org/apache/apex/engine/plugin/loaders/PropertyBasedPluginLocator.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine.plugin.loaders; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.engine.api.PluginLocator; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.stram.StramUtils; + +public class PropertyBasedPluginLocator<T> implements PluginLocator<T> +{ + private static final Logger LOG = LoggerFactory.getLogger(PropertyBasedPluginLocator.class); + private final Class<T> klass; + private final String propertyName; + + public PropertyBasedPluginLocator(Class<T> klass, String propertyName) + { + this.klass = klass; + this.propertyName = propertyName; + } + + @Override + public Collection<T> discoverPlugins(Configuration conf) + { + List<T> detectedPlugins = new ArrayList<>(); + String classNamesStr = conf.get(this.propertyName); + if (classNamesStr == null) { + return detectedPlugins; + } + + String[] classNames = classNamesStr.split(","); + for (String className : classNames) { + try { + Class<? extends T> plugin = StramUtils.classForName(className, this.klass); + detectedPlugins.add(StramUtils.newInstance(plugin)); + } catch (IllegalArgumentException e) { + LOG.warn("Could not load plugin {}", className); + } + } + return detectedPlugins; + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/plugin/loaders/ServiceLoaderBasedPluginLocator.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/loaders/ServiceLoaderBasedPluginLocator.java b/engine/src/main/java/org/apache/apex/engine/plugin/loaders/ServiceLoaderBasedPluginLocator.java new file mode 100644 index 0000000..3295329 --- /dev/null +++ b/engine/src/main/java/org/apache/apex/engine/plugin/loaders/ServiceLoaderBasedPluginLocator.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine.plugin.loaders; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.ServiceLoader; + +import org.apache.apex.engine.api.PluginLocator; +import org.apache.hadoop.conf.Configuration; + +public class ServiceLoaderBasedPluginLocator<T> implements PluginLocator<T> +{ + private final Class<T> klass; + + public ServiceLoaderBasedPluginLocator(Class<T> klass) + { + this.klass = klass; + } + + @Override + public Collection<T> discoverPlugins(Configuration conf) + { + List<T> discovered = new ArrayList<>(); + ServiceLoader<T> loader = ServiceLoader.load(this.klass); + for (T plugin : loader) { + discovered.add(plugin); + } + return discovered; + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/main/java/org/apache/apex/engine/plugin/loaders/StaticPluginLocator.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/loaders/StaticPluginLocator.java b/engine/src/main/java/org/apache/apex/engine/plugin/loaders/StaticPluginLocator.java new file mode 100644 index 0000000..f6b0dfc --- /dev/null +++ b/engine/src/main/java/org/apache/apex/engine/plugin/loaders/StaticPluginLocator.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine.plugin.loaders; + +import java.util.Arrays; +import java.util.Collection; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.engine.api.PluginLocator; +import org.apache.hadoop.conf.Configuration; + +public class StaticPluginLocator<T> implements PluginLocator<T> +{ + private static final Logger LOG = LoggerFactory.getLogger(StaticPluginLocator.class); + + private final T[] plugins; + + public StaticPluginLocator(T... plugins) + { + this.plugins = plugins; + } + + @Override + public Collection<T> discoverPlugins(Configuration conf) + { + return Arrays.asList(plugins); + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java new file mode 100644 index 0000000..6ad8073 --- /dev/null +++ b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine.plugin; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.apex.engine.api.DAGExecutionPlugin; +import org.apache.apex.engine.api.DAGExecutionPluginContext; +import org.apache.apex.engine.api.DAGExecutionPluginContext.Handler; + +import com.datatorrent.stram.api.StramEvent; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol; + +import static org.apache.apex.engine.api.DAGExecutionPluginContext.COMMIT_EVENT; +import static org.apache.apex.engine.api.DAGExecutionPluginContext.HEARTBEAT; +import static org.apache.apex.engine.api.DAGExecutionPluginContext.STRAM_EVENT; + +public class DebugPlugin implements DAGExecutionPlugin +{ + private int eventCount = 0; + private int heartbeatCount = 0; + private int commitCount = 0; + private final Lock lock = new ReentrantLock(); + final Condition events = lock.newCondition(); + + @Override + public void setup(DAGExecutionPluginContext context) + { + context.register(STRAM_EVENT, new Handler<StramEvent>() + { + @Override + public void handle(StramEvent stramEvent) + { + lock(); + eventCount++; + events.signal(); + unlock(); + } + }); + + context.register(HEARTBEAT, new Handler<StreamingContainerUmbilicalProtocol.ContainerHeartbeat>() + { + @Override + public void handle(StreamingContainerUmbilicalProtocol.ContainerHeartbeat heartbeat) + { + lock(); + heartbeatCount++; + events.signal(); + unlock(); + } + }); + + context.register(COMMIT_EVENT, new Handler<Long>() + { + @Override + public void handle(Long aLong) + { + lock(); + commitCount++; + events.signal(); + unlock(); + } + }); + } + + @Override + public void teardown() + { + + } + + public int getEventCount() + { + return eventCount; + } + + public int getHeartbeatCount() + { + return heartbeatCount; + } + + public int getCommitCount() + { + return commitCount; + } + + void lock() + { + this.lock.lock(); + } + + void unlock() + { + this.lock.unlock(); + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/test/java/org/apache/apex/engine/plugin/NoOpPlugin.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/NoOpPlugin.java b/engine/src/test/java/org/apache/apex/engine/plugin/NoOpPlugin.java new file mode 100644 index 0000000..4aad641 --- /dev/null +++ b/engine/src/test/java/org/apache/apex/engine/plugin/NoOpPlugin.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine.plugin; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.engine.api.DAGExecutionPlugin; +import org.apache.apex.engine.api.DAGExecutionPluginContext; + +public class NoOpPlugin implements DAGExecutionPlugin +{ + private static final Logger LOG = LoggerFactory.getLogger(NoOpPlugin.class); + + @Override + public void setup(DAGExecutionPluginContext context) + { + LOG.info("NoOpPlugin plugin called init "); + } + + @Override + public void teardown() + { + LOG.info("NoOpPlugin plugin teardown called "); + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java new file mode 100644 index 0000000..4848983 --- /dev/null +++ b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine.plugin; + +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.apex.engine.api.DAGExecutionPlugin; +import org.apache.apex.engine.plugin.loaders.ChainedPluginLocator; +import org.apache.apex.engine.plugin.loaders.ServiceLoaderBasedPluginLocator; +import org.apache.apex.engine.plugin.loaders.StaticPluginLocator; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Attribute; +import com.datatorrent.stram.api.StramEvent; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol; +import com.datatorrent.stram.support.StramTestSupport; + +import static org.apache.apex.engine.api.DAGExecutionPluginContext.COMMIT_EVENT; +import static org.apache.apex.engine.api.DAGExecutionPluginContext.HEARTBEAT; +import static org.apache.apex.engine.api.DAGExecutionPluginContext.STRAM_EVENT; + +public class PluginTests +{ + + private static final Configuration conf = new Configuration(); + + @Test + public void testStaticPluginLoader() + { + DAGExecutionPlugin plugin1 = new NoOpPlugin(); + DAGExecutionPlugin plugin2 = new DebugPlugin(); + + StaticPluginLocator<DAGExecutionPlugin> locator1 = new StaticPluginLocator<>(plugin1); + StaticPluginLocator<DAGExecutionPlugin> locator2 = new StaticPluginLocator<>(plugin2); + + Collection<DAGExecutionPlugin> discovered1 = locator1.discoverPlugins(conf); + Assert.assertEquals("Number of plugins discovered ", 1, discovered1.size()); + Assert.assertEquals("Type is NoOpPlugin", discovered1.iterator().next().getClass(), NoOpPlugin.class); + Assert.assertEquals("Type is NoOpPlugin", discovered1.iterator().next(), plugin1); + + Collection<DAGExecutionPlugin> discovered2 = locator2.discoverPlugins(conf); + Assert.assertEquals("Number of plugins discovered ", 1, discovered2.size()); + Assert.assertEquals("Type is NoOpPlugin", discovered2.iterator().next().getClass(), DebugPlugin.class); + Assert.assertEquals("Type is NoOpPlugin", discovered2.iterator().next(), plugin2); + + ChainedPluginLocator<DAGExecutionPlugin> chained = new ChainedPluginLocator<>(locator1, locator2); + Collection<DAGExecutionPlugin> chainedDiscovered = chained.discoverPlugins(conf); + Assert.assertEquals("Number of plugins discovered ", 2, chainedDiscovered.size()); + Assert.assertTrue(chainedDiscovered.contains(plugin1)); + Assert.assertTrue(chainedDiscovered.contains(plugin2)); + } + + @Test + public void testServicePluginLoader() + { + ServiceLoaderBasedPluginLocator<DAGExecutionPlugin> locator = new ServiceLoaderBasedPluginLocator<>(DAGExecutionPlugin.class); + Collection<DAGExecutionPlugin> discovered = locator.discoverPlugins(conf); + Assert.assertEquals("Total number of plugins detected ", 1, discovered.size()); + Assert.assertEquals("Type is NoOpPlugin", discovered.iterator().next().getClass(), DebugPlugin.class); + } + + @Test + public void testDispatch() throws InterruptedException + { + DebugPlugin debugPlugin = new DebugPlugin(); + StaticPluginLocator<? extends DAGExecutionPlugin> locator = new StaticPluginLocator<>(debugPlugin); + ApexPluginDispatcher pluginManager = new DefaultApexPluginDispatcher(locator, + new StramTestSupport.TestAppContext(new Attribute.AttributeMap.DefaultAttributeMap()), null, null); + pluginManager.init(new Configuration()); + int count = debugPlugin.getEventCount(); + pluginManager.dispatch(STRAM_EVENT, new StramEvent(StramEvent.LogLevel.DEBUG) + { + @Override + public String getType() + { + return "TestEvent"; + } + }); + + debugPlugin.lock(); + while (debugPlugin.getEventCount() == count) { + debugPlugin.events.await(5, TimeUnit.SECONDS); + } + debugPlugin.unlock(); + + Assert.assertEquals("Total stram event received ", debugPlugin.getEventCount(), 1); + + count = debugPlugin.getCommitCount(); + pluginManager.dispatch(COMMIT_EVENT, new Long(1234)); + debugPlugin.lock(); + while (debugPlugin.getCommitCount() == count) { + debugPlugin.events.await(5, TimeUnit.SECONDS); + } + debugPlugin.unlock(); + + count = debugPlugin.getHeartbeatCount(); + pluginManager.dispatch(HEARTBEAT, new StreamingContainerUmbilicalProtocol.ContainerHeartbeat()); + debugPlugin.lock(); + while (debugPlugin.getHeartbeatCount() == count) { + debugPlugin.events.await(5, TimeUnit.SECONDS); + } + debugPlugin.unlock(); + pluginManager.stop(); + + Assert.assertEquals(1, debugPlugin.getEventCount()); + Assert.assertEquals(1, debugPlugin.getHeartbeatCount()); + Assert.assertEquals(1, debugPlugin.getCommitCount()); + } + +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/5f95ee0e/engine/src/test/resources/META-INF/services/org.apache.apex.engine.api.DAGExecutionPlugin ---------------------------------------------------------------------- diff --git a/engine/src/test/resources/META-INF/services/org.apache.apex.engine.api.DAGExecutionPlugin b/engine/src/test/resources/META-INF/services/org.apache.apex.engine.api.DAGExecutionPlugin new file mode 100644 index 0000000..cd70a45 --- /dev/null +++ b/engine/src/test/resources/META-INF/services/org.apache.apex.engine.api.DAGExecutionPlugin @@ -0,0 +1,19 @@ +## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.apex.engine.plugin.DebugPlugin
