Repository: apex-core Updated Branches: refs/heads/master 8829286d1 -> 9856080ed
APEXCORE-700 Uniform interface between setup and runtime plugins Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/9856080e Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/9856080e Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/9856080e Branch: refs/heads/master Commit: 9856080ede62a4529d730bcb6724c757f5010990 Parents: 8829286 Author: Pramod Immaneni & Vlad Rozov <[email protected]> Authored: Tue Apr 18 09:37:22 2017 -0700 Committer: Pramod Immaneni <[email protected]> Committed: Wed Apr 26 22:26:46 2017 -0700 ---------------------------------------------------------------------- .../apache/apex/api/plugin/DAGSetupEvent.java | 87 ++++++++++++++ .../apache/apex/api/plugin/DAGSetupPlugin.java | 97 ++------------- .../java/org/apache/apex/api/plugin/Event.java | 59 ++++++++++ .../java/org/apache/apex/api/plugin/Plugin.java | 62 +++++++++- .../apache/apex/api/plugin/PluginContext.java | 32 ----- .../apex/common/util/BaseDAGSetupPlugin.java | 78 ------------ .../stram/StreamingAppMasterService.java | 3 +- .../stram/StreamingContainerManager.java | 14 +-- .../plan/logical/DAGSetupPluginManager.java | 118 ++++++++++++------- .../plan/logical/LogicalPlanConfiguration.java | 49 ++++---- .../engine/api/plugin/DAGExecutionEvent.java | 89 ++++++++++++++ .../engine/api/plugin/DAGExecutionPlugin.java | 64 ++++++++-- .../api/plugin/DAGExecutionPluginContext.java | 92 --------------- .../plugin/AbstractApexPluginDispatcher.java | 79 ++++--------- .../AbstractDAGExecutionPluginContext.java | 5 +- .../engine/plugin/ApexPluginDispatcher.java | 17 ++- .../plugin/DefaultApexPluginDispatcher.java | 30 ++--- .../engine/plugin/NoOpApexPluginDispatcher.java | 4 +- .../plan/logical/PropertyInjectorVisitor.java | 44 ++----- .../apache/apex/engine/plugin/DebugPlugin.java | 36 +++--- .../apache/apex/engine/plugin/NoOpPlugin.java | 5 +- .../apache/apex/engine/plugin/PluginTests.java | 13 +- 22 files changed, 549 insertions(+), 528 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/9856080e/api/src/main/java/org/apache/apex/api/plugin/DAGSetupEvent.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/apex/api/plugin/DAGSetupEvent.java b/api/src/main/java/org/apache/apex/api/plugin/DAGSetupEvent.java new file mode 100644 index 0000000..95d17e2 --- /dev/null +++ b/api/src/main/java/org/apache/apex/api/plugin/DAGSetupEvent.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.api.plugin; + +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; + +@Evolving +public class DAGSetupEvent extends Event.BaseEvent<DAGSetupEvent.Type> +{ + @Evolving + public enum Type implements Event.Type + { + /** + * This event is sent before platform adds operators and streams in the DAG. i.e this method + * will get called just before {@link com.datatorrent.api.StreamingApplication#populateDAG(DAG, Configuration)} + * + * For Application specified using property and json file format, this will be sent + * before platform adds operators and streams in the DAG as per specification in the file. + */ + PRE_POPULATE_DAG, + + /** + * This event is sent after platform adds operators and streams in the DAG. i.e this method + * will get called just after {@link com.datatorrent.api.StreamingApplication#populateDAG(DAG, Configuration)} + * in case application is specified in java. + * + * For Application specified using property and json file format, this will be sent + * after platform has added operators and streams in the DAG as per specification in the file. + */ + POST_POPULATE_DAG, + + /** + * This event is sent before DAG is configured, i.e operator and application + * properties/attributes are injected from configuration files. + */ + PRE_CONFIGURE_DAG, + + /** + * This event is sent after DAG is configured, i.e operator and application + * properties/attributes are injected from configuration files. + */ + POST_CONFIGURE_DAG, + + /** + * This event is sent just before dag is validated before final job submission. + */ + PRE_VALIDATE_DAG, + + /** + * This event is sent after dag is validated. If plugin makes in incompatible changes + * to the DAG at this stage, then application may get launched incorrectly or application + * launch may fail. + */ + POST_VALIDATE_DAG; + + public final DAGSetupEvent event; + + Type() + { + event = new DAGSetupEvent(this); + } + } + + private DAGSetupEvent(DAGSetupEvent.Type type) + { + super(type); + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/9856080e/api/src/main/java/org/apache/apex/api/plugin/DAGSetupPlugin.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/apex/api/plugin/DAGSetupPlugin.java b/api/src/main/java/org/apache/apex/api/plugin/DAGSetupPlugin.java index faa6798..31ea1f3 100644 --- a/api/src/main/java/org/apache/apex/api/plugin/DAGSetupPlugin.java +++ b/api/src/main/java/org/apache/apex/api/plugin/DAGSetupPlugin.java @@ -18,12 +18,9 @@ */ package org.apache.apex.api.plugin; -import java.util.Collection; - -import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; -import com.datatorrent.api.Attribute; import com.datatorrent.api.DAG; /** @@ -39,96 +36,18 @@ import com.datatorrent.api.DAG; * <li>After dag is validated</li> * </ul> */ [email protected] -public interface DAGSetupPlugin extends Plugin<DAGSetupPlugin.DAGSetupPluginContext> +@Evolving +public interface DAGSetupPlugin<T extends DAGSetupPlugin.Context> extends Plugin<T> { - - /** - * This method is called before platform adds operators and streams in the DAG. i.e this method - * will get called just before {@link com.datatorrent.api.StreamingApplication#populateDAG(DAG, Configuration)} - * - * For Application specified using property and json file format, this method will get called - * before platform adds operators and streams in the DAG as per specification in the file. - */ - void prePopulateDAG(); - - /** - * This method is called after platform adds operators and streams in the DAG. i.e this method - * will get called just after {@link com.datatorrent.api.StreamingApplication#populateDAG(DAG, Configuration)} - * in case application is specified in java. - * - * For Application specified using property and json file format, this method will get called - * after platform has added operators and streams in the DAG as per specification in the file. - */ - void postPopulateDAG(); - - /** - * This is method is called before DAG is configured, i.e operator and application - * properties/attributes are injected from configuration files. - */ - void preConfigureDAG(); - /** - * This is method is called after DAG is configured, i.e operator and application - * properties/attributes are injected from configuration files. + * The context for the setup plugins */ - void postConfigureDAG(); - - /** - * This method is called just before dag is validated before final job submission. - */ - void preValidateDAG(); - - /** - * This method is called after dag is validated. If plugin makes in incompatible changes - * to the DAG at this stage, then application may get launched incorrectly or application - * launch may fail. - */ - void postValidateDAG(); - - class DAGSetupPluginContext implements PluginContext + @Evolving + interface Context<E extends DAGSetupEvent> extends PluginContext<DAGSetupEvent.Type, E> { - private final DAG dag; - private final Configuration conf; - - public DAGSetupPluginContext(DAG dag, Configuration conf) - { - this.dag = dag; - this.conf = conf; - } - - public DAG getDAG() - { - return dag; - } - - public Configuration getConfiguration() - { - return conf; - } - - @Override - public Attribute.AttributeMap getAttributes() - { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public <T> T getValue(Attribute<T> key) - { - throw new UnsupportedOperationException("Not supported yet."); - } - @Override - public void setCounters(Object counters) - { - throw new UnsupportedOperationException("Not supported yet."); - } + DAG getDAG(); - @Override - public void sendMetrics(Collection<String> metricNames) - { - throw new UnsupportedOperationException("Not supported yet."); - } + Configuration getConfiguration(); } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/9856080e/api/src/main/java/org/apache/apex/api/plugin/Event.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/apex/api/plugin/Event.java b/api/src/main/java/org/apache/apex/api/plugin/Event.java new file mode 100644 index 0000000..9b95187 --- /dev/null +++ b/api/src/main/java/org/apache/apex/api/plugin/Event.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.api.plugin; + +import org.apache.apex.api.plugin.Event.Type; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +/** + * The class represents a plugin event that is delivered to plugins to notify them of important system events. + * + * Plugins express interest in receiving events by registering handlers for the event type and their handlers receive + * the events. + * @param <T> event type + */ +@Evolving +public interface Event<T extends Type> +{ + /** + * Marker interface for plugin event type. + */ + @Evolving + interface Type + { + } + + T getType(); + + @Evolving + class BaseEvent<T extends Type> implements Event<T> + { + private T type; + + protected BaseEvent(T type) + { + this.type = type; + } + + public T getType() + { + return type; + } + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/9856080e/api/src/main/java/org/apache/apex/api/plugin/Plugin.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/apex/api/plugin/Plugin.java b/api/src/main/java/org/apache/apex/api/plugin/Plugin.java index ffe52ea..e0a3872 100644 --- a/api/src/main/java/org/apache/apex/api/plugin/Plugin.java +++ b/api/src/main/java/org/apache/apex/api/plugin/Plugin.java @@ -18,15 +18,67 @@ */ package org.apache.apex.api.plugin; -import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceStability.Evolving; import com.datatorrent.api.Component; +import com.datatorrent.api.Context; /** - * Marker interface for ApexPlugins. - * @param <T> + * An Apex plugin is user code which runs inside the Apex engine. Plugin implementations implement this interface. + * + * Plugins can identify extension points by registering interest in events in the {@link Component#setup(Context)} + * initialization method. They should also cleanup any additional resources created during shutdown such as helper + * threads and open files in the {@link Component#teardown()} method. + * @param <T> plugin context type */ [email protected] -public interface Plugin<T extends PluginContext> extends Component<T> +@Evolving +public interface Plugin<T extends Plugin.PluginContext> extends Component<T> { + + /** + * An Apex plugin is user code which runs inside the Apex engine. The interaction between plugin and engine is managed + * by PluginContext. Plugins can register interest in different events in the engine using the + * ${@link PluginContext#register(Event.Type, EventHandler)} method. + * + * @param <T> the type of the Event.Type + * @param <E> the event type + */ + @Evolving + interface PluginContext<T extends Event.Type, E extends Event<T>> extends Context + { + + /** + * Register interest in an event. + * + * Plugins register interest in events using this method. They would need to specify the event type and a handler to + * handle the event, that would get called when the event occurs. A plugin can register interest in several events but + * should register only a single handler for any specific event. In case register is called multiple times with the + * same event type, then the last registered handler will be used. + * + * When an event occurs the + * {@link EventHandler#handle(Event event)} method gets called with the event data. + * + * @param type The event type + * @param handler The event handler + */ + void register(T type, EventHandler<E> handler); + } + + /** + * A handler that handles an event in the Apex engine. Plugins register interest in events by registering handlers + * using the PluginContext. + * @param <E> The event type + */ + @Evolving + interface EventHandler<E extends Event> + { + /** + * Handle a event. + * + * This method is called when the event occurs. + * + * @param event + */ + void handle(E event); + } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/9856080e/api/src/main/java/org/apache/apex/api/plugin/PluginContext.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/apex/api/plugin/PluginContext.java b/api/src/main/java/org/apache/apex/api/plugin/PluginContext.java deleted file mode 100644 index 2bdaf00..0000000 --- a/api/src/main/java/org/apache/apex/api/plugin/PluginContext.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.api.plugin; - -import org.apache.hadoop.classification.InterfaceStability; - -import com.datatorrent.api.Context; - -/** - * Marker interface for Context used by ApexPlugins. Plugin interfaces with - * the Apex through the context. - */ [email protected] -public interface PluginContext extends Context -{ -} http://git-wip-us.apache.org/repos/asf/apex-core/blob/9856080e/common/src/main/java/org/apache/apex/common/util/BaseDAGSetupPlugin.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/apex/common/util/BaseDAGSetupPlugin.java b/common/src/main/java/org/apache/apex/common/util/BaseDAGSetupPlugin.java deleted file mode 100644 index 9bc5d8e..0000000 --- a/common/src/main/java/org/apache/apex/common/util/BaseDAGSetupPlugin.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.common.util; - -import org.apache.apex.api.plugin.DAGSetupPlugin; -import org.apache.hadoop.classification.InterfaceStability; - -/** - * Base class for DAGSetupPlugin implementations that provides empty implementations - * for all interface methods. - */ [email protected] -public class BaseDAGSetupPlugin implements DAGSetupPlugin -{ - @Override - public void setup(DAGSetupPluginContext context) - { - - } - - @Override - public void prePopulateDAG() - { - - } - - @Override - public void teardown() - { - - } - - @Override - public void postPopulateDAG() - { - - } - - @Override - public void preConfigureDAG() - { - - } - - @Override - public void postConfigureDAG() - { - - } - - @Override - public void preValidateDAG() - { - - } - - @Override - public void postValidateDAG() - { - - } -} http://git-wip-us.apache.org/repos/asf/apex-core/blob/9856080e/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 2e88114..0ca8cd1 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -115,7 +115,6 @@ import com.datatorrent.stram.webapp.AppInfo; import com.datatorrent.stram.webapp.StramWebApp; import static java.lang.Thread.sleep; -import static org.apache.apex.engine.plugin.ApexPluginDispatcher.DAG_CHANGE_EVENT; /** * Streaming Application Master @@ -600,7 +599,7 @@ public class StreamingAppMasterService extends CompositeService apexPluginDispatcher = new DefaultApexPluginDispatcher(locator, appContext, dnmgr, stats); dnmgr.apexPluginDispatcher = apexPluginDispatcher; addService(apexPluginDispatcher); - apexPluginDispatcher.dispatch(DAG_CHANGE_EVENT, dnmgr.getLogicalPlan()); + apexPluginDispatcher.dispatch(new ApexPluginDispatcher.DAGChangeEvent(dnmgr.getLogicalPlan())); } @Override http://git-wip-us.apache.org/repos/asf/apex-core/blob/9856080e/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 8d99dc1..c4e76a5 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -65,6 +65,7 @@ import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.engine.api.plugin.DAGExecutionEvent; import org.apache.apex.engine.plugin.ApexPluginDispatcher; import org.apache.apex.engine.plugin.NoOpApexPluginDispatcher; import org.apache.apex.engine.util.CascadeStorageAgent; @@ -178,11 +179,6 @@ import com.datatorrent.stram.webapp.StreamInfo; import net.engio.mbassy.bus.MBassador; import net.engio.mbassy.bus.config.BusConfiguration; -import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.COMMIT_EVENT; -import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.HEARTBEAT; -import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.STRAM_EVENT; -import static org.apache.apex.engine.plugin.ApexPluginDispatcher.DAG_CHANGE_EVENT; - /** * Tracks topology provisioning/allocation to containers<p> * <br> @@ -818,7 +814,7 @@ public class StreamingContainerManager implements PlanContext committedWindowId = updateCheckpoints(waitForRecovery); if (lastCommittedWindowId != committedWindowId) { - apexPluginDispatcher.dispatch(COMMIT_EVENT, committedWindowId); + apexPluginDispatcher.dispatch(new DAGExecutionEvent.CommitExecutionEvent(committedWindowId)); lastCommittedWindowId = committedWindowId; } calculateEndWindowStats(); @@ -1817,7 +1813,7 @@ public class StreamingContainerManager implements PlanContext rsp.stackTraceRequired = sca.stackTraceRequested; sca.stackTraceRequested = false; - apexPluginDispatcher.dispatch(HEARTBEAT, heartbeat); + apexPluginDispatcher.dispatch(new DAGExecutionEvent.HeartbeatExecutionEvent(heartbeat)); return rsp; } @@ -2449,7 +2445,7 @@ public class StreamingContainerManager implements PlanContext @Override public void recordEventAsync(StramEvent ev) { - apexPluginDispatcher.dispatch(STRAM_EVENT, ev); + apexPluginDispatcher.dispatch(new DAGExecutionEvent.StramExecutionEvent(ev)); if (eventBus != null) { eventBus.publishAsync(ev); } @@ -3083,7 +3079,7 @@ public class StreamingContainerManager implements PlanContext recordEventAsync(new StramEvent.ChangeLogicalPlanEvent(request)); } pm.applyChanges(StreamingContainerManager.this); - apexPluginDispatcher.dispatch(DAG_CHANGE_EVENT, plan.getLogicalPlan()); + apexPluginDispatcher.dispatch(new ApexPluginDispatcher.DAGChangeEvent(plan.getLogicalPlan())); LOG.info("Plan changes applied: {}", requests); return null; } http://git-wip-us.apache.org/repos/asf/apex-core/blob/9856080e/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java index c7c2767..370aaaa 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java @@ -19,14 +19,23 @@ package com.datatorrent.stram.plan.logical; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import org.slf4j.Logger; +import org.apache.apex.api.plugin.DAGSetupEvent; import org.apache.apex.api.plugin.DAGSetupPlugin; +import org.apache.apex.api.plugin.Plugin.EventHandler; import org.apache.apex.engine.plugin.loaders.PropertyBasedPluginLocator; import org.apache.hadoop.conf.Configuration; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Table; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.DAG; + import static org.slf4j.LoggerFactory.getLogger; public class DAGSetupPluginManager @@ -37,7 +46,8 @@ public class DAGSetupPluginManager private Configuration conf; public static final String DAGSETUP_PLUGINS_CONF_KEY = "apex.plugin.dag.setup"; - private DAGSetupPlugin.DAGSetupPluginContext contex; + + private final Table<DAGSetupEvent.Type, DAGSetupPlugin, EventHandler<DAGSetupEvent>> table = HashBasedTable.create(); private void loadVisitors(Configuration conf) { @@ -50,56 +60,82 @@ public class DAGSetupPluginManager this.plugins.addAll(locator.discoverPlugins(conf)); } - public void setup(DAGSetupPlugin.DAGSetupPluginContext context) + private class DefaultDAGSetupPluginContext implements DAGSetupPlugin.Context<DAGSetupEvent> + { + private final DAG dag; + private final Configuration conf; + private DAGSetupPlugin plugin; + + public DefaultDAGSetupPluginContext(DAG dag, Configuration conf, DAGSetupPlugin plugin) + { + this.dag = dag; + this.conf = conf; + this.plugin = plugin; + } + + @Override + public void register(DAGSetupEvent.Type type, EventHandler<DAGSetupEvent> handler) + { + table.put(type, plugin, handler); + } + + public DAG getDAG() + { + return dag; + } + + public Configuration getConfiguration() + { + return conf; + } + + @Override + public Attribute.AttributeMap getAttributes() + { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public <T> T getValue(Attribute<T> key) + { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public void setCounters(Object counters) + { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public void sendMetrics(Collection<String> metricNames) + { + throw new UnsupportedOperationException("Not supported yet."); + } + } + + public void setup(DAG dag) { - this.contex = context; for (DAGSetupPlugin plugin : plugins) { + DAGSetupPlugin.Context context = new DefaultDAGSetupPluginContext(dag, conf, plugin); plugin.setup(context); } } - public enum DispatchType + public void teardown() { - SETUP, - PRE_POPULATE, - POST_POPULATE, - PRE_CONFIGURE, - POST_CONFIGURE, - PRE_VALIDATE, - POST_VALIDATE, - TEARDOWN + for (DAGSetupPlugin plugin : plugins) { + plugin.teardown(); + } } - public void dispatch(DispatchType type, DAGSetupPlugin.DAGSetupPluginContext context) + public void dispatch(DAGSetupEvent event) { - for (DAGSetupPlugin plugin : plugins) { - switch (type) { - case SETUP: - plugin.setup(context); - break; - case PRE_POPULATE: - plugin.prePopulateDAG(); - break; - case POST_POPULATE: - plugin.postPopulateDAG(); - break; - case PRE_CONFIGURE: - plugin.preConfigureDAG(); - break; - case POST_CONFIGURE: - plugin.postValidateDAG(); - break; - case PRE_VALIDATE: - plugin.preValidateDAG(); - break; - case POST_VALIDATE: - plugin.postValidateDAG(); - break; - case TEARDOWN: - plugin.teardown(); - break; - default: - throw new UnsupportedOperationException("Not implemented "); + for (EventHandler<DAGSetupEvent> handler : table.row(event.getType()).values()) { + try { + handler.handle(event); + } catch (RuntimeException e) { + LOG.warn("Event {} caused an exception in {} handler", event, handler, e); } } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/9856080e/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java index a7fad2a..01a4c7b 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java @@ -48,7 +48,6 @@ import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.apex.api.plugin.DAGSetupPlugin.DAGSetupPluginContext; import org.apache.commons.beanutils.BeanMap; import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.collections.CollectionUtils; @@ -86,14 +85,12 @@ import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta; import com.datatorrent.stram.util.ObjectMapperFactory; -import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.POST_CONFIGURE; -import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.POST_POPULATE; -import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.POST_VALIDATE; -import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.PRE_CONFIGURE; -import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.PRE_POPULATE; -import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.PRE_VALIDATE; -import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.SETUP; -import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.TEARDOWN; +import static org.apache.apex.api.plugin.DAGSetupEvent.Type.POST_CONFIGURE_DAG; +import static org.apache.apex.api.plugin.DAGSetupEvent.Type.POST_POPULATE_DAG; +import static org.apache.apex.api.plugin.DAGSetupEvent.Type.POST_VALIDATE_DAG; +import static org.apache.apex.api.plugin.DAGSetupEvent.Type.PRE_CONFIGURE_DAG; +import static org.apache.apex.api.plugin.DAGSetupEvent.Type.PRE_POPULATE_DAG; +import static org.apache.apex.api.plugin.DAGSetupEvent.Type.PRE_VALIDATE_DAG; /** * @@ -2076,19 +2073,18 @@ public class LogicalPlanConfiguration private LogicalPlan populateDAGAndValidate(LogicalPlanConfiguration tb, String appName) { LogicalPlan dag = new LogicalPlan(); - DAGSetupPluginContext context = new DAGSetupPluginContext(dag, this.conf); - pluginManager.dispatch(SETUP, context); - pluginManager.dispatch(PRE_POPULATE, context); + pluginManager.setup(dag); + pluginManager.dispatch(PRE_POPULATE_DAG.event); tb.populateDAG(dag); // configure with embedded settings tb.prepareDAG(dag, null, appName); - pluginManager.dispatch(POST_POPULATE, context); + pluginManager.dispatch(POST_POPULATE_DAG.event); // configure with external settings prepareDAG(dag, null, appName); - pluginManager.dispatch(PRE_VALIDATE, context); + pluginManager.dispatch(PRE_VALIDATE_DAG.event); dag.validate(); - pluginManager.dispatch(POST_VALIDATE, context); - pluginManager.dispatch(TEARDOWN, context); + pluginManager.dispatch(POST_VALIDATE_DAG.event); + pluginManager.teardown(); return dag; } @@ -2118,13 +2114,12 @@ public class LogicalPlanConfiguration public LogicalPlan createFromStreamingApplication(StreamingApplication app, String appName) { LogicalPlan dag = new LogicalPlan(); - DAGSetupPluginContext context = new DAGSetupPluginContext(dag, this.conf); - pluginManager.dispatch(SETUP, context); + pluginManager.setup(dag); prepareDAG(dag, app, appName); - pluginManager.dispatch(PRE_VALIDATE, context); + pluginManager.dispatch(PRE_VALIDATE_DAG.event); dag.validate(); - pluginManager.dispatch(POST_VALIDATE, context); - pluginManager.dispatch(TEARDOWN, context); + pluginManager.dispatch(POST_VALIDATE_DAG.event); + pluginManager.teardown(); return dag; } @@ -2256,14 +2251,13 @@ public class LogicalPlanConfiguration // EVENTUALLY to be replaced by variable enabled configuration in the demo where the attribute below is used String connectAddress = conf.get(KEY_GATEWAY_CONNECT_ADDRESS); dag.setAttribute(Context.DAGContext.GATEWAY_CONNECT_ADDRESS, connectAddress == null ? conf.get(GATEWAY_LISTEN_ADDRESS) : connectAddress); - DAGSetupPluginContext context = new DAGSetupPluginContext(dag, this.conf); + pluginManager.setup(dag); if (app != null) { - pluginManager.dispatch(SETUP, context); - pluginManager.dispatch(PRE_POPULATE, context); + pluginManager.dispatch(PRE_POPULATE_DAG.event); app.populateDAG(dag, conf); - pluginManager.dispatch(POST_POPULATE, context); + pluginManager.dispatch(POST_POPULATE_DAG.event); } - pluginManager.dispatch(PRE_CONFIGURE, context); + pluginManager.dispatch(PRE_CONFIGURE_DAG.event); String appAlias = getAppAlias(name); String appName = appAlias == null ? name : appAlias; List<AppConf> appConfs = stramConf.getMatchingChildConf(appName, StramElement.APPLICATION); @@ -2279,7 +2273,8 @@ public class LogicalPlanConfiguration // inject external operator configuration setOperatorConfiguration(dag, appConfs, appName); setStreamConfiguration(dag, appConfs, appName); - pluginManager.dispatch(POST_CONFIGURE, context); + pluginManager.dispatch(POST_CONFIGURE_DAG.event); + pluginManager.teardown(); } private void flattenDAG(LogicalPlan dag, Configuration conf) http://git-wip-us.apache.org/repos/asf/apex-core/blob/9856080e/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionEvent.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionEvent.java b/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionEvent.java new file mode 100644 index 0000000..dfffbca --- /dev/null +++ b/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionEvent.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine.api.plugin; + +import org.apache.apex.api.plugin.Event; + +import com.datatorrent.stram.api.StramEvent; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol; + +import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.COMMIT_EVENT; +import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.HEARTBEAT_EVENT; +import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.STRAM_EVENT; + +public class DAGExecutionEvent extends Event.BaseEvent<DAGExecutionEvent.Type> +{ + public enum Type implements Event.Type + { + HEARTBEAT_EVENT, STRAM_EVENT, COMMIT_EVENT + } + + public static class HeartbeatExecutionEvent extends DAGExecutionEvent + { + private final StreamingContainerUmbilicalProtocol.ContainerHeartbeat heartbeat; + + public HeartbeatExecutionEvent(StreamingContainerUmbilicalProtocol.ContainerHeartbeat heartbeat) + { + super(HEARTBEAT_EVENT); + this.heartbeat = heartbeat; + } + + public StreamingContainerUmbilicalProtocol.ContainerHeartbeat getHeartbeat() + { + return heartbeat; + } + } + + public static class StramExecutionEvent extends DAGExecutionEvent + { + private final StramEvent stramEvent; + + public StramExecutionEvent(StramEvent stramEvent) + { + super(STRAM_EVENT); + this.stramEvent = stramEvent; + } + + public StramEvent getStramEvent() + { + return stramEvent; + } + } + + public static class CommitExecutionEvent extends DAGExecutionEvent + { + private final long commitWindow; + + public CommitExecutionEvent(long commitWindow) + { + super(COMMIT_EVENT); + this.commitWindow = commitWindow; + } + + public long getCommitWindow() + { + return commitWindow; + } + } + + protected DAGExecutionEvent(DAGExecutionEvent.Type eventType) + { + super(eventType); + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/9856080e/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionPlugin.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionPlugin.java b/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionPlugin.java index 060b240..7cfb1fa 100644 --- a/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionPlugin.java +++ b/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionPlugin.java @@ -18,26 +18,66 @@ */ package org.apache.apex.engine.api.plugin; +import java.util.List; +import java.util.Map; +import java.util.Queue; + import org.apache.apex.api.plugin.Plugin; -import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StatsListener.BatchedOperatorStats; +import com.datatorrent.common.util.Pair; +import com.datatorrent.stram.StramAppContext; +import com.datatorrent.stram.util.VersionInfo; +import com.datatorrent.stram.webapp.AppInfo; +import com.datatorrent.stram.webapp.LogicalOperatorInfo; /** - * An Apex plugin is a user code which runs inside Stram. The interaction - * between plugin and Stram is managed by DAGExecutionPluginContext. Plugin can register to handle event in interest - * with callback handler using ${@link DAGExecutionPluginContext#register(DAGExecutionPluginContext.RegistrationType, DAGExecutionPluginContext.Handler)} + * DAGExecutionPlugin allows user provided code to respond to various events during the application runtime. * * Following events are supported * <ul> - * <li>{@see DAGExecutionPluginContext.HEARTBEAT} The heartbeat from a container is delivered to the plugin after it has been handled by stram</li> - * <li>{@see DAGExecutionPluginContext.STRAM_EVENT} All the Stram event generated in Stram will be delivered to the plugin</li> - * <li>{@see DAGExecutionPluginContext.COMMIT_EVENT} When committedWindowId changes in the platform an event will be delivered to the plugin</li> + * <li>{@see Context.HEARTBEAT} The heartbeat from a container is delivered to the plugin after it has been handled by stram</li> + * <li>{@see Context.STRAM_EVENT} All the Stram event generated in Stram will be delivered to the plugin</li> + * <li>{@see Context.COMMIT_EVENT} When committedWindowId changes in the platform an event will be delivered to the plugin</li> * </ul> * - * A plugin should register a single handler for an event, In case multiple handlers are registered for an event, - * then the last registered handler will be used. Plugin should cleanup additional resources created by it during shutdown - * such as helper threads and open files. */ [email protected] -public interface DAGExecutionPlugin extends Plugin<DAGExecutionPluginContext> +public interface DAGExecutionPlugin<T extends DAGExecutionPlugin.Context> extends Plugin<T> { + + /** + * The context for the execution plugins. + * + * Following events are supported + * <ul> + * <li>{@see Context.HEARTBEAT} The heartbeat from a container is delivered to the plugin after it has been handled by stram</li> + * <li>{@see Context.STRAM_EVENT} All the Stram event generated in Stram will be delivered to the plugin</li> + * <li>{@see Context.COMMIT_EVENT} When committedWindowId changes in the platform an event will be delivered to the plugin</li> + * </ul> + * + */ + interface Context<E extends DAGExecutionEvent> extends PluginContext<DAGExecutionEvent.Type, E> + { + VersionInfo getEngineVersion(); + + StramAppContext getApplicationContext(); + + AppInfo.AppStats getApplicationStats(); + + Configuration getLaunchConfig(); + + DAG getDAG(); + + String getOperatorName(int id); + + BatchedOperatorStats getPhysicalOperatorStats(int id); + + List<LogicalOperatorInfo> getLogicalOperatorInfoList(); + + Queue<Pair<Long, Map<String, Object>>> getWindowMetrics(String operatorName); + + long windowIdToMillis(long windowId); + } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/9856080e/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionPluginContext.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionPluginContext.java b/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionPluginContext.java deleted file mode 100644 index 73da7e6..0000000 --- a/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionPluginContext.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.engine.api.plugin; - -import java.util.List; -import java.util.Map; -import java.util.Queue; - -import org.apache.apex.api.plugin.PluginContext; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.StatsListener.BatchedOperatorStats; -import com.datatorrent.common.util.Pair; -import com.datatorrent.stram.StramAppContext; -import com.datatorrent.stram.api.StramEvent; -import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol; -import com.datatorrent.stram.util.VersionInfo; -import com.datatorrent.stram.webapp.AppInfo; -import com.datatorrent.stram.webapp.LogicalOperatorInfo; - -/** - * An Apex plugin is a user code which runs inside Stram. The interaction - * between plugin and Stram is managed by DAGExecutionPluginContext. Plugin can register to handle event in interest - * with callback handler using ${@link DAGExecutionPluginContext#register(DAGExecutionPluginContext.RegistrationType, DAGExecutionPluginContext.Handler)} - * - * Following events are supported - * <ul> - * <li>{@see DAGExecutionPluginContext.HEARTBEAT} The heartbeat from a container is delivered to the plugin after it has been handled by stram</li> - * <li>{@see DAGExecutionPluginContext.STRAM_EVENT} All the Stram event generated in Stram will be delivered to the plugin</li> - * <li>{@see DAGExecutionPluginContext.COMMIT_EVENT} When committedWindowId changes in the platform an event will be delivered to the plugin</li> - * </ul> - * - * A plugin should register a single handler for an event, In case multiple handlers are registered for an event, - * then the last registered handler will be used. Plugin should cleanup additional resources created by it during shutdown - * such as helper threads and open files. - */ [email protected] -public interface DAGExecutionPluginContext extends PluginContext -{ - class RegistrationType<T> - { - } - - RegistrationType<StreamingContainerUmbilicalProtocol.ContainerHeartbeat> HEARTBEAT = new RegistrationType<>(); - RegistrationType<StramEvent> STRAM_EVENT = new RegistrationType<>(); - RegistrationType<Long> COMMIT_EVENT = new RegistrationType<>(); - - <T> void register(RegistrationType<T> type, Handler<T> handler); - - interface Handler<T> - { - void handle(T data); - } - - VersionInfo getEngineVersion(); - - StramAppContext getApplicationContext(); - - AppInfo.AppStats getApplicationStats(); - - Configuration getLaunchConfig(); - - DAG getDAG(); - - String getOperatorName(int id); - - BatchedOperatorStats getPhysicalOperatorStats(int id); - - List<LogicalOperatorInfo> getLogicalOperatorInfoList(); - - Queue<Pair<Long, Map<String, Object>>> getWindowMetrics(String operatorName); - - long windowIdToMillis(long windowId); -} http://git-wip-us.apache.org/repos/asf/apex-core/blob/9856080e/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java index 2b96632..74ee0c8 100644 --- a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java +++ b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java @@ -22,15 +22,15 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.api.plugin.Event; +import org.apache.apex.api.plugin.Plugin; +import org.apache.apex.api.plugin.Plugin.EventHandler; +import org.apache.apex.engine.api.plugin.DAGExecutionEvent; import org.apache.apex.engine.api.plugin.DAGExecutionPlugin; -import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.Handler; -import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.RegistrationType; import org.apache.apex.engine.api.plugin.PluginLocator; import org.apache.commons.lang3.SerializationUtils; import org.apache.hadoop.conf.Configuration; @@ -40,7 +40,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import com.google.common.collect.HashBasedTable; import com.google.common.collect.Lists; +import com.google.common.collect.Table; import com.datatorrent.api.DAG; import com.datatorrent.stram.StramAppContext; @@ -62,7 +64,7 @@ public abstract class AbstractApexPluginDispatcher extends AbstractService imple private final AppInfo.AppStats stats; protected Configuration launchConfig; protected FileContext fileContext; - protected final Map<DAGExecutionPlugin, PluginInfo> pluginInfoMap = new HashMap<>(); + protected final Table<DAGExecutionEvent.Type, DAGExecutionPlugin, EventHandler<DAGExecutionEvent>> table = HashBasedTable.create(); private volatile DAG clonedDAG = null; public AbstractApexPluginDispatcher(PluginLocator locator, StramAppContext context, StreamingContainerManager dmgr, AppInfo.AppStats stats) @@ -123,57 +125,18 @@ public abstract class AbstractApexPluginDispatcher extends AbstractService imple super.serviceStop(); } - /** - * Keeps information about plugin and its registrations. Dispatcher use this - * information while delivering events to plugin. - */ - protected class PluginInfo + public void register(DAGExecutionEvent.Type eventType, Plugin.EventHandler<DAGExecutionEvent> handler, DAGExecutionPlugin owner) { - private final DAGExecutionPlugin plugin; - private final Map<RegistrationType<?>, Handler<?>> registrationMap = new HashMap<>(); - - <T> void put(RegistrationType<T> registrationType, Handler<T> handler) - { - registrationMap.put(registrationType, handler); + synchronized (table) { + table.put(eventType, owner, handler); } - - <T> Handler<T> get(RegistrationType<T> registrationType) - { - return (Handler<T>)registrationMap.get(registrationType); - } - - public PluginInfo(DAGExecutionPlugin plugin) - { - this.plugin = plugin; - } - - public DAGExecutionPlugin getPlugin() - { - return plugin; - } - } - - PluginInfo getPluginInfo(DAGExecutionPlugin plugin) - { - PluginInfo pInfo = pluginInfoMap.get(plugin); - if (pInfo == null) { - pInfo = new PluginInfo(plugin); - pluginInfoMap.put(plugin, pInfo); - } - return pInfo; - } - - private <T> void register(RegistrationType<T> type, Handler<T> handler, DAGExecutionPlugin owner) - { - PluginInfo pInfo = getPluginInfo(owner); - pInfo.put(type, handler); } /** * A wrapper PluginManager to track registration from a plugin. with this plugin * don't need to pass explicit owner argument during registration. */ - private class PluginManagerImpl extends AbstractDAGExecutionPluginContext + private class PluginManagerImpl extends AbstractDAGExecutionPluginContext<DAGExecutionEvent> { private final DAGExecutionPlugin owner; @@ -184,7 +147,7 @@ public abstract class AbstractApexPluginDispatcher extends AbstractService imple } @Override - public <T> void register(RegistrationType<T> type, Handler<T> handler) + public void register(DAGExecutionEvent.Type type, EventHandler<DAGExecutionEvent> handler) { AbstractApexPluginDispatcher.this.register(type, handler, owner); } @@ -198,19 +161,19 @@ public abstract class AbstractApexPluginDispatcher extends AbstractService imple /** * Dispatch events to plugins. - * @param registrationType - * @param data - * @param <T> + * @param event The dag execution event */ - protected abstract <T> void dispatchEvent(RegistrationType<T> registrationType, T data); + protected abstract void dispatchExecutionEvent(DAGExecutionEvent event); @Override - public <T> void dispatch(RegistrationType<T> registrationType, T data) + public void dispatch(Event event) { - if (registrationType == ApexPluginDispatcher.DAG_CHANGE_EVENT) { - clonedDAG = SerializationUtils.clone((DAG)data); - } else { - dispatchEvent(registrationType, data); + if (!plugins.isEmpty()) { + if (event.getType() == ApexPluginDispatcher.DAG_CHANGE) { + clonedDAG = SerializationUtils.clone(((DAGChangeEvent)event).dag); + } else if (event instanceof DAGExecutionEvent) { + dispatchExecutionEvent((DAGExecutionEvent)event); + } } } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/9856080e/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java index b17d5f8..21f29f8 100644 --- a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java +++ b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java @@ -23,7 +23,8 @@ import java.util.List; import java.util.Map; import java.util.Queue; -import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext; +import org.apache.apex.engine.api.plugin.DAGExecutionEvent; +import org.apache.apex.engine.api.plugin.DAGExecutionPlugin; import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.Attribute; @@ -37,7 +38,7 @@ import com.datatorrent.stram.util.VersionInfo; import com.datatorrent.stram.webapp.AppInfo; import com.datatorrent.stram.webapp.LogicalOperatorInfo; -public abstract class AbstractDAGExecutionPluginContext implements DAGExecutionPluginContext +public abstract class AbstractDAGExecutionPluginContext<E extends DAGExecutionEvent> implements DAGExecutionPlugin.Context<E> { private final StreamingContainerManager dnmgr; private final Configuration launchConf; http://git-wip-us.apache.org/repos/asf/apex-core/blob/9856080e/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java index 234195f..9ef2a5d 100644 --- a/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java +++ b/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java @@ -18,7 +18,7 @@ */ package org.apache.apex.engine.plugin; -import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.RegistrationType; +import org.apache.apex.api.plugin.Event; import org.apache.hadoop.service.Service; import com.datatorrent.api.DAG; @@ -29,7 +29,18 @@ public interface ApexPluginDispatcher extends Service /** * This is internal event, which is not delivered to the plugins. */ - RegistrationType<DAG> DAG_CHANGE_EVENT = new RegistrationType<>(); + Event.Type DAG_CHANGE = new Event.Type(){}; - <T> void dispatch(RegistrationType<T> registrationType, T data); + class DAGChangeEvent extends Event.BaseEvent<Event.Type> + { + final DAG dag; + + public DAGChangeEvent(DAG dag) + { + super(DAG_CHANGE); + this.dag = dag; + } + } + + void dispatch(Event e); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/9856080e/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java index bea011c..1252061 100644 --- a/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java +++ b/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java @@ -28,8 +28,9 @@ import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.Handler; -import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.RegistrationType; +import org.apache.apex.api.plugin.Event; +import org.apache.apex.api.plugin.Plugin.EventHandler; +import org.apache.apex.engine.api.plugin.DAGExecutionEvent; import org.apache.apex.engine.api.plugin.PluginLocator; import org.apache.hadoop.conf.Configuration; @@ -56,10 +57,10 @@ public class DefaultApexPluginDispatcher extends AbstractApexPluginDispatcher } @Override - protected <T> void dispatchEvent(RegistrationType<T> registrationType, T data) + protected void dispatchExecutionEvent(DAGExecutionEvent event) { if (executorService != null) { - executorService.submit(new ProcessEventTask<>(registrationType, data)); + executorService.submit(new ProcessEventTask<>(event)); } } @@ -98,24 +99,25 @@ public class DefaultApexPluginDispatcher extends AbstractApexPluginDispatcher executorService = null; } - private class ProcessEventTask<T> implements Runnable + private class ProcessEventTask<T extends DAGExecutionEvent.Type> implements Runnable { - private final RegistrationType<T> registrationType; - private final T data; + private final Event<T> event; - public ProcessEventTask(RegistrationType<T> type, T data) + public ProcessEventTask(Event<T> event) { - this.registrationType = type; - this.data = data; + this.event = event; } @Override public void run() { - for (final PluginInfo pInfo : pluginInfoMap.values()) { - final Handler<T> handler = pInfo.get(registrationType); - if (handler != null) { - handler.handle(data); + synchronized (table) { + for (EventHandler handler : table.row(event.getType()).values()) { + try { + handler.handle(event); + } catch (RuntimeException e) { + LOG.warn("Event {} caused exception in handler {}", event, handler, e); + } } } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/9856080e/engine/src/main/java/org/apache/apex/engine/plugin/NoOpApexPluginDispatcher.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/NoOpApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/NoOpApexPluginDispatcher.java index f3f3382..a629a3f 100644 --- a/engine/src/main/java/org/apache/apex/engine/plugin/NoOpApexPluginDispatcher.java +++ b/engine/src/main/java/org/apache/apex/engine/plugin/NoOpApexPluginDispatcher.java @@ -18,7 +18,7 @@ */ package org.apache.apex.engine.plugin; -import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.RegistrationType; +import org.apache.apex.api.plugin.Event; import org.apache.hadoop.service.AbstractService; public class NoOpApexPluginDispatcher extends AbstractService implements ApexPluginDispatcher @@ -29,7 +29,7 @@ public class NoOpApexPluginDispatcher extends AbstractService implements ApexPlu } @Override - public <T> void dispatch(RegistrationType<T> registrationType, T data) + public void dispatch(Event event) { } http://git-wip-us.apache.org/repos/asf/apex-core/blob/9856080e/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java index 4c8b4e5..7e38ee8 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java @@ -26,24 +26,27 @@ import java.util.Properties; import javax.validation.ValidationException; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.apex.api.plugin.DAGSetupEvent; import org.apache.apex.api.plugin.DAGSetupPlugin; +import org.apache.apex.api.plugin.Plugin.EventHandler; import com.datatorrent.api.DAG; import com.datatorrent.api.Operator; -import static org.slf4j.LoggerFactory.getLogger; +import static org.apache.apex.api.plugin.DAGSetupEvent.Type.PRE_VALIDATE_DAG; -public class PropertyInjectorVisitor implements DAGSetupPlugin +public class PropertyInjectorVisitor implements DAGSetupPlugin<DAGSetupPlugin.Context>, EventHandler<DAGSetupEvent> { - private static final Logger LOG = getLogger(PropertyInjectorVisitor.class); + private static final Logger LOG = LoggerFactory.getLogger(PropertyInjectorVisitor.class); private String path; private Map<String, String> propertyMap = new HashMap<>(); private DAG dag; @Override - public void setup(DAGSetupPluginContext context) + public void setup(DAGSetupPlugin.Context context) { this.dag = context.getDAG(); try { @@ -56,34 +59,11 @@ public class PropertyInjectorVisitor implements DAGSetupPlugin } catch (IOException ex) { throw new ValidationException("Not able to load input file " + path); } + context.register(PRE_VALIDATE_DAG, this); } @Override - public void prePopulateDAG() - { - - } - - @Override - public void postPopulateDAG() - { - - } - - @Override - public void preConfigureDAG() - { - - } - - @Override - public void postConfigureDAG() - { - - } - - @Override - public void preValidateDAG() + public void handle(DAGSetupEvent event) { for (DAG.OperatorMeta ometa : dag.getAllOperatorsMeta()) { Operator o = ometa.getOperator(); @@ -91,12 +71,6 @@ public class PropertyInjectorVisitor implements DAGSetupPlugin } } - @Override - public void postValidateDAG() - { - - } - public PropertyInjectorVisitor() { } http://git-wip-us.apache.org/repos/asf/apex-core/blob/9856080e/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java index 5b8ca11..833d69f 100644 --- a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java +++ b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java @@ -21,52 +21,56 @@ package org.apache.apex.engine.plugin; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.apex.engine.api.plugin.DAGExecutionPlugin; -import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext; -import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.Handler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import com.datatorrent.stram.api.StramEvent; -import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol; +import org.apache.apex.engine.api.plugin.DAGExecutionEvent; +import org.apache.apex.engine.api.plugin.DAGExecutionPlugin; -import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.COMMIT_EVENT; -import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.HEARTBEAT; -import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.STRAM_EVENT; +import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.COMMIT_EVENT; +import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.HEARTBEAT_EVENT; +import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.STRAM_EVENT; -public class DebugPlugin implements DAGExecutionPlugin +public class DebugPlugin implements DAGExecutionPlugin<DAGExecutionPlugin.Context> { + private static final Logger logger = LoggerFactory.getLogger(DebugPlugin.class); + private int eventCount = 0; private int heartbeatCount = 0; private int commitCount = 0; CountDownLatch latch = new CountDownLatch(3); @Override - public void setup(DAGExecutionPluginContext context) + public void setup(DAGExecutionPlugin.Context context) { - context.register(STRAM_EVENT, new Handler<StramEvent>() + context.register(STRAM_EVENT, new EventHandler<DAGExecutionEvent.StramExecutionEvent>() { @Override - public void handle(StramEvent stramEvent) + public void handle(DAGExecutionEvent.StramExecutionEvent event) { + logger.debug("Stram Event {}", event.getStramEvent()); eventCount++; latch.countDown(); } }); - context.register(HEARTBEAT, new Handler<StreamingContainerUmbilicalProtocol.ContainerHeartbeat>() + context.register(HEARTBEAT_EVENT, new EventHandler<DAGExecutionEvent.HeartbeatExecutionEvent>() { @Override - public void handle(StreamingContainerUmbilicalProtocol.ContainerHeartbeat heartbeat) + public void handle(DAGExecutionEvent.HeartbeatExecutionEvent event) { + logger.debug("Heartbeat {}", event.getHeartbeat()); heartbeatCount++; latch.countDown(); } }); - context.register(COMMIT_EVENT, new Handler<Long>() + context.register(COMMIT_EVENT, new EventHandler<DAGExecutionEvent.CommitExecutionEvent>() { @Override - public void handle(Long aLong) + public void handle(DAGExecutionEvent.CommitExecutionEvent event) { + logger.debug("Commit window id {}", event.getCommitWindow()); commitCount++; latch.countDown(); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/9856080e/engine/src/test/java/org/apache/apex/engine/plugin/NoOpPlugin.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/NoOpPlugin.java b/engine/src/test/java/org/apache/apex/engine/plugin/NoOpPlugin.java index 786e0d6..f2563c2 100644 --- a/engine/src/test/java/org/apache/apex/engine/plugin/NoOpPlugin.java +++ b/engine/src/test/java/org/apache/apex/engine/plugin/NoOpPlugin.java @@ -22,14 +22,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.apex.engine.api.plugin.DAGExecutionPlugin; -import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext; -public class NoOpPlugin implements DAGExecutionPlugin +public class NoOpPlugin implements DAGExecutionPlugin<DAGExecutionPlugin.Context> { private static final Logger LOG = LoggerFactory.getLogger(NoOpPlugin.class); @Override - public void setup(DAGExecutionPluginContext context) + public void setup(DAGExecutionPlugin.Context context) { LOG.info("NoOpPlugin plugin called init "); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/9856080e/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java index 84dc4ba..140dc65 100644 --- a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java +++ b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java @@ -23,6 +23,7 @@ import java.util.Collection; import org.junit.Assert; import org.junit.Test; +import org.apache.apex.engine.api.plugin.DAGExecutionEvent; import org.apache.apex.engine.api.plugin.DAGExecutionPlugin; import org.apache.apex.engine.plugin.loaders.ChainedPluginLocator; import org.apache.apex.engine.plugin.loaders.ServiceLoaderBasedPluginLocator; @@ -34,10 +35,6 @@ import com.datatorrent.stram.api.StramEvent; import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol; import com.datatorrent.stram.support.StramTestSupport; -import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.COMMIT_EVENT; -import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.HEARTBEAT; -import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.STRAM_EVENT; - public class PluginTests { @@ -86,16 +83,16 @@ public class PluginTests ApexPluginDispatcher pluginManager = new DefaultApexPluginDispatcher(locator, new StramTestSupport.TestAppContext(new Attribute.AttributeMap.DefaultAttributeMap()), null, null); pluginManager.init(new Configuration()); - pluginManager.dispatch(STRAM_EVENT, new StramEvent(StramEvent.LogLevel.DEBUG) + pluginManager.dispatch(new DAGExecutionEvent.StramExecutionEvent(new StramEvent(StramEvent.LogLevel.DEBUG) { @Override public String getType() { return "TestEvent"; } - }); - pluginManager.dispatch(COMMIT_EVENT, new Long(1234)); - pluginManager.dispatch(HEARTBEAT, new StreamingContainerUmbilicalProtocol.ContainerHeartbeat()); + })); + pluginManager.dispatch(new DAGExecutionEvent.CommitExecutionEvent(1234)); + pluginManager.dispatch(new DAGExecutionEvent.HeartbeatExecutionEvent(new StreamingContainerUmbilicalProtocol.ContainerHeartbeat())); debugPlugin.waitForEventDelivery(10); pluginManager.stop();
