Repository: apex-core Updated Branches: refs/heads/master 6cb3e3510 -> cfe9cefed
APEXCORE-649 Provide snapshot of DAG to plugins instead of actual DAG Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/d705ed43 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/d705ed43 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/d705ed43 Branch: refs/heads/master Commit: d705ed433f3bc6750e4ff693196490d9e9b07061 Parents: 25e4c4c Author: Tushar R. Gosavi <[email protected]> Authored: Mon Mar 27 11:21:16 2017 +0530 Committer: Tushar R. Gosavi <[email protected]> Committed: Fri Apr 21 12:09:30 2017 +0530 ---------------------------------------------------------------------- .../stram/StreamingAppMasterService.java | 2 + .../stram/StreamingContainerManager.java | 2 + .../plugin/AbstractApexPluginDispatcher.java | 216 +++++++++++++++++++ .../AbstractDAGExecutionPluginContext.java | 9 +- .../engine/plugin/ApexPluginDispatcher.java | 8 + .../apex/engine/plugin/ApexPluginManager.java | 191 ---------------- .../plugin/DefaultApexPluginDispatcher.java | 4 +- .../apache/apex/engine/plugin/DebugPlugin.java | 29 +-- .../apache/apex/engine/plugin/PluginTests.java | 25 +-- 9 files changed, 242 insertions(+), 244 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 4e4f501..2e88114 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -115,6 +115,7 @@ import com.datatorrent.stram.webapp.AppInfo; import com.datatorrent.stram.webapp.StramWebApp; import static java.lang.Thread.sleep; +import static org.apache.apex.engine.plugin.ApexPluginDispatcher.DAG_CHANGE_EVENT; /** * Streaming Application Master @@ -599,6 +600,7 @@ public class StreamingAppMasterService extends CompositeService apexPluginDispatcher = new DefaultApexPluginDispatcher(locator, appContext, dnmgr, stats); dnmgr.apexPluginDispatcher = apexPluginDispatcher; addService(apexPluginDispatcher); + apexPluginDispatcher.dispatch(DAG_CHANGE_EVENT, dnmgr.getLogicalPlan()); } @Override http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 92fce54..6ec5267 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -181,6 +181,7 @@ import net.engio.mbassy.bus.config.BusConfiguration; import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.COMMIT_EVENT; import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.HEARTBEAT; import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.STRAM_EVENT; +import static org.apache.apex.engine.plugin.ApexPluginDispatcher.DAG_CHANGE_EVENT; /** * Tracks topology provisioning/allocation to containers<p> @@ -3081,6 +3082,7 @@ public class StreamingContainerManager implements PlanContext recordEventAsync(new StramEvent.ChangeLogicalPlanEvent(request)); } pm.applyChanges(StreamingContainerManager.this); + apexPluginDispatcher.dispatch(DAG_CHANGE_EVENT, plan.getLogicalPlan()); LOG.info("Plan changes applied: {}", requests); return null; } http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java new file mode 100644 index 0000000..2b96632 --- /dev/null +++ b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine.plugin; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.engine.api.plugin.DAGExecutionPlugin; +import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.Handler; +import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.RegistrationType; +import org.apache.apex.engine.api.plugin.PluginLocator; +import org.apache.commons.lang3.SerializationUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.DAG; +import com.datatorrent.stram.StramAppContext; +import com.datatorrent.stram.StreamingContainerManager; +import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.webapp.AppInfo; + +/** + * A default implementation for ApexPluginDispatcher. It handles common tasks, such as handler + * registrations. Actual dispatching is left for classes extending from it. + */ +public abstract class AbstractApexPluginDispatcher extends AbstractService implements ApexPluginDispatcher +{ + private static final Logger LOG = LoggerFactory.getLogger(AbstractApexPluginDispatcher.class); + protected final Collection<DAGExecutionPlugin> plugins = Lists.newArrayList(); + protected final StramAppContext appContext; + protected final StreamingContainerManager dmgr; + private final PluginLocator locator; + private final AppInfo.AppStats stats; + protected Configuration launchConfig; + protected FileContext fileContext; + protected final Map<DAGExecutionPlugin, PluginInfo> pluginInfoMap = new HashMap<>(); + private volatile DAG clonedDAG = null; + + public AbstractApexPluginDispatcher(PluginLocator locator, StramAppContext context, StreamingContainerManager dmgr, AppInfo.AppStats stats) + { + super(AbstractApexPluginDispatcher.class.getName()); + this.locator = locator; + this.appContext = context; + this.dmgr = dmgr; + this.stats = stats; + LOG.debug("Creating apex service "); + } + + private Configuration readLaunchConfiguration() throws IOException + { + Path appPath = new Path(appContext.getApplicationPath()); + Path configFilePath = new Path(appPath, LogicalPlan.LAUNCH_CONFIG_FILE_NAME); + try { + LOG.debug("Reading launch configuration file "); + URI uri = appPath.toUri(); + Configuration config = new YarnConfiguration(); + fileContext = uri.getScheme() == null ? FileContext.getFileContext(config) : FileContext.getFileContext(uri, config); + FSDataInputStream is = fileContext.open(configFilePath); + config.addResource(is); + LOG.debug("Read launch configuration"); + return config; + } catch (FileNotFoundException ex) { + LOG.warn("Configuration file not found {}", configFilePath); + return new Configuration(); + } + } + + @Override + protected void serviceInit(Configuration conf) throws Exception + { + super.serviceInit(conf); + this.launchConfig = readLaunchConfiguration(); + if (locator != null) { + Collection<DAGExecutionPlugin> plugins = locator.discoverPlugins(this.launchConfig); + if (plugins != null) { + this.plugins.addAll(plugins); + for (DAGExecutionPlugin plugin : plugins) { + LOG.info("Detected plugin {}", plugin); + } + } + } + + for (DAGExecutionPlugin plugin : plugins) { + plugin.setup(new PluginManagerImpl(plugin)); + } + } + + @Override + protected void serviceStop() throws Exception + { + for (DAGExecutionPlugin plugin : plugins) { + plugin.teardown(); + } + super.serviceStop(); + } + + /** + * Keeps information about plugin and its registrations. Dispatcher use this + * information while delivering events to plugin. + */ + protected class PluginInfo + { + private final DAGExecutionPlugin plugin; + private final Map<RegistrationType<?>, Handler<?>> registrationMap = new HashMap<>(); + + <T> void put(RegistrationType<T> registrationType, Handler<T> handler) + { + registrationMap.put(registrationType, handler); + } + + <T> Handler<T> get(RegistrationType<T> registrationType) + { + return (Handler<T>)registrationMap.get(registrationType); + } + + public PluginInfo(DAGExecutionPlugin plugin) + { + this.plugin = plugin; + } + + public DAGExecutionPlugin getPlugin() + { + return plugin; + } + } + + PluginInfo getPluginInfo(DAGExecutionPlugin plugin) + { + PluginInfo pInfo = pluginInfoMap.get(plugin); + if (pInfo == null) { + pInfo = new PluginInfo(plugin); + pluginInfoMap.put(plugin, pInfo); + } + return pInfo; + } + + private <T> void register(RegistrationType<T> type, Handler<T> handler, DAGExecutionPlugin owner) + { + PluginInfo pInfo = getPluginInfo(owner); + pInfo.put(type, handler); + } + + /** + * A wrapper PluginManager to track registration from a plugin. with this plugin + * don't need to pass explicit owner argument during registration. + */ + private class PluginManagerImpl extends AbstractDAGExecutionPluginContext + { + private final DAGExecutionPlugin owner; + + PluginManagerImpl(DAGExecutionPlugin plugin) + { + super(appContext, dmgr, stats, launchConfig); + this.owner = plugin; + } + + @Override + public <T> void register(RegistrationType<T> type, Handler<T> handler) + { + AbstractApexPluginDispatcher.this.register(type, handler, owner); + } + + @Override + public DAG getDAG() + { + return clonedDAG; + } + } + + /** + * Dispatch events to plugins. + * @param registrationType + * @param data + * @param <T> + */ + protected abstract <T> void dispatchEvent(RegistrationType<T> registrationType, T data); + + @Override + public <T> void dispatch(RegistrationType<T> registrationType, T data) + { + if (registrationType == ApexPluginDispatcher.DAG_CHANGE_EVENT) { + clonedDAG = SerializationUtils.clone((DAG)data); + } else { + dispatchEvent(registrationType, data); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java index 19ef91a..b17d5f8 100644 --- a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java +++ b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java @@ -53,6 +53,9 @@ public abstract class AbstractDAGExecutionPluginContext implements DAGExecutionP } @Override + public abstract DAG getDAG(); + + @Override public StramAppContext getApplicationContext() { return appContext; @@ -65,12 +68,6 @@ public abstract class AbstractDAGExecutionPluginContext implements DAGExecutionP } @Override - public DAG getDAG() - { - return dnmgr.getLogicalPlan(); - } - - @Override public String getOperatorName(int id) { PTOperator ptOperator = dnmgr.getPhysicalPlan().getAllOperators().get(id); http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java index c6ef54d..234195f 100644 --- a/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java +++ b/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java @@ -21,7 +21,15 @@ package org.apache.apex.engine.plugin; import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.RegistrationType; import org.apache.hadoop.service.Service; +import com.datatorrent.api.DAG; + public interface ApexPluginDispatcher extends Service { + + /** + * This is internal event, which is not delivered to the plugins. + */ + RegistrationType<DAG> DAG_CHANGE_EVENT = new RegistrationType<>(); + <T> void dispatch(RegistrationType<T> registrationType, T data); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginManager.java b/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginManager.java deleted file mode 100644 index 9f070a1..0000000 --- a/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginManager.java +++ /dev/null @@ -1,191 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.engine.plugin; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.apex.engine.api.plugin.DAGExecutionPlugin; -import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.Handler; -import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.RegistrationType; -import org.apache.apex.engine.api.plugin.PluginLocator; -import org.apache.commons.digester.plugins.PluginContext; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.conf.YarnConfiguration; - -import com.google.common.collect.Lists; - -import com.datatorrent.stram.StramAppContext; -import com.datatorrent.stram.StreamingContainerManager; -import com.datatorrent.stram.plan.logical.LogicalPlan; -import com.datatorrent.stram.webapp.AppInfo; - -/** - * A default implementation for ApexPluginDispatcher. It handler common tasks such as per handler - * registration. actual dispatching is left for classes extending from it. - */ -public abstract class ApexPluginManager extends AbstractService -{ - private static final Logger LOG = LoggerFactory.getLogger(ApexPluginManager.class); - protected final Collection<DAGExecutionPlugin> plugins = Lists.newArrayList(); - protected final StramAppContext appContext; - protected final StreamingContainerManager dmgr; - private final PluginLocator locator; - private final AppInfo.AppStats stats; - protected Configuration launchConfig; - protected FileContext fileContext; - protected final Map<DAGExecutionPlugin, PluginInfo> pluginInfoMap = new HashMap<>(); - protected PluginContext pluginContext; - - public ApexPluginManager(PluginLocator locator, StramAppContext context, StreamingContainerManager dmgr, AppInfo.AppStats stats) - { - super(ApexPluginManager.class.getName()); - this.locator = locator; - this.appContext = context; - this.dmgr = dmgr; - this.stats = stats; - LOG.debug("Creating apex service "); - } - - private Configuration readLaunchConfiguration() throws IOException - { - Path appPath = new Path(appContext.getApplicationPath()); - Path configFilePath = new Path(appPath, LogicalPlan.LAUNCH_CONFIG_FILE_NAME); - try { - LOG.debug("Reading launch configuration file "); - URI uri = appPath.toUri(); - Configuration config = new YarnConfiguration(); - fileContext = uri.getScheme() == null ? FileContext.getFileContext(config) : FileContext.getFileContext(uri, config); - FSDataInputStream is = fileContext.open(configFilePath); - config.addResource(is); - LOG.debug("Read launch configuration"); - return config; - } catch (FileNotFoundException ex) { - LOG.warn("Configuration file not found {}", configFilePath); - return new Configuration(); - } - } - - @Override - protected void serviceInit(Configuration conf) throws Exception - { - super.serviceInit(conf); - this.launchConfig = readLaunchConfiguration(); - if (locator != null) { - Collection<DAGExecutionPlugin> plugins = locator.discoverPlugins(this.launchConfig); - if (plugins != null) { - this.plugins.addAll(plugins); - for (DAGExecutionPlugin plugin : plugins) { - LOG.info("Detected plugin {}", plugin); - } - } - } - - for (DAGExecutionPlugin plugin : plugins) { - plugin.setup(new PluginManagerImpl(plugin)); - } - } - - @Override - protected void serviceStop() throws Exception - { - for (DAGExecutionPlugin plugin : plugins) { - plugin.teardown(); - } - super.serviceStop(); - } - - /** - * Keeps information about plugin and its registrations. Dispatcher use this - * information while delivering events to plugin. - */ - class PluginInfo - { - private final DAGExecutionPlugin plugin; - private final Map<RegistrationType<?>, Handler<?>> registrationMap = new HashMap<>(); - - <T> void put(RegistrationType<T> registrationType, Handler<T> handler) - { - registrationMap.put(registrationType, handler); - } - - <T> Handler<T> get(RegistrationType<T> registrationType) - { - return (Handler<T>)registrationMap.get(registrationType); - } - - public PluginInfo(DAGExecutionPlugin plugin) - { - this.plugin = plugin; - } - - public DAGExecutionPlugin getPlugin() - { - return plugin; - } - } - - PluginInfo getPluginInfo(DAGExecutionPlugin plugin) - { - PluginInfo pInfo = pluginInfoMap.get(plugin); - if (pInfo == null) { - pInfo = new PluginInfo(plugin); - pluginInfoMap.put(plugin, pInfo); - } - return pInfo; - } - - public <T> void register(RegistrationType<T> type, Handler<T> handler, DAGExecutionPlugin owner) - { - PluginInfo pInfo = getPluginInfo(owner); - pInfo.put(type, handler); - } - - /** - * A wrapper PluginManager to track registration from a plugin. with this plugin - * don't need to pass explicit owner argument during registration. - */ - class PluginManagerImpl extends AbstractDAGExecutionPluginContext - { - private final DAGExecutionPlugin owner; - - PluginManagerImpl(DAGExecutionPlugin plugin) - { - super(appContext, dmgr, stats, launchConfig); - this.owner = plugin; - } - - @Override - public <T> void register(RegistrationType<T> type, Handler<T> handler) - { - ApexPluginManager.this.register(type, handler, owner); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java index 0eee85e..bea011c 100644 --- a/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java +++ b/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java @@ -43,7 +43,7 @@ import com.datatorrent.stram.webapp.AppInfo; * service to process the event asynchronously. A separate task {@link DefaultApexPluginDispatcher.ProcessEventTask} * is created to process an event and then submitted to the executor for execution. */ -public class DefaultApexPluginDispatcher extends ApexPluginManager implements ApexPluginDispatcher +public class DefaultApexPluginDispatcher extends AbstractApexPluginDispatcher { private static final Logger LOG = LoggerFactory.getLogger(DefaultApexPluginDispatcher.class); private int qsize = 4098; @@ -56,7 +56,7 @@ public class DefaultApexPluginDispatcher extends ApexPluginManager implements Ap } @Override - public <T> void dispatch(RegistrationType<T> registrationType, T data) + protected <T> void dispatchEvent(RegistrationType<T> registrationType, T data) { if (executorService != null) { executorService.submit(new ProcessEventTask<>(registrationType, data)); http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java index 4a64b10..5b8ca11 100644 --- a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java +++ b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java @@ -18,9 +18,8 @@ */ package org.apache.apex.engine.plugin; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.apex.engine.api.plugin.DAGExecutionPlugin; import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext; @@ -38,8 +37,7 @@ public class DebugPlugin implements DAGExecutionPlugin private int eventCount = 0; private int heartbeatCount = 0; private int commitCount = 0; - private final Lock lock = new ReentrantLock(); - final Condition events = lock.newCondition(); + CountDownLatch latch = new CountDownLatch(3); @Override public void setup(DAGExecutionPluginContext context) @@ -49,10 +47,8 @@ public class DebugPlugin implements DAGExecutionPlugin @Override public void handle(StramEvent stramEvent) { - lock(); eventCount++; - events.signal(); - unlock(); + latch.countDown(); } }); @@ -61,10 +57,8 @@ public class DebugPlugin implements DAGExecutionPlugin @Override public void handle(StreamingContainerUmbilicalProtocol.ContainerHeartbeat heartbeat) { - lock(); heartbeatCount++; - events.signal(); - unlock(); + latch.countDown(); } }); @@ -73,10 +67,8 @@ public class DebugPlugin implements DAGExecutionPlugin @Override public void handle(Long aLong) { - lock(); commitCount++; - events.signal(); - unlock(); + latch.countDown(); } }); } @@ -102,13 +94,8 @@ public class DebugPlugin implements DAGExecutionPlugin return commitCount; } - void lock() + public void waitForEventDelivery(long timeout) throws InterruptedException { - this.lock.lock(); - } - - void unlock() - { - this.lock.unlock(); + latch.await(timeout, TimeUnit.SECONDS); } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/d705ed43/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java index fda607f..84dc4ba 100644 --- a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java +++ b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java @@ -19,7 +19,6 @@ package org.apache.apex.engine.plugin; import java.util.Collection; -import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Test; @@ -87,7 +86,6 @@ public class PluginTests ApexPluginDispatcher pluginManager = new DefaultApexPluginDispatcher(locator, new StramTestSupport.TestAppContext(new Attribute.AttributeMap.DefaultAttributeMap()), null, null); pluginManager.init(new Configuration()); - int count = debugPlugin.getEventCount(); pluginManager.dispatch(STRAM_EVENT, new StramEvent(StramEvent.LogLevel.DEBUG) { @Override @@ -96,30 +94,9 @@ public class PluginTests return "TestEvent"; } }); - - debugPlugin.lock(); - while (debugPlugin.getEventCount() == count) { - debugPlugin.events.await(5, TimeUnit.SECONDS); - } - debugPlugin.unlock(); - - Assert.assertEquals("Total stram event received ", debugPlugin.getEventCount(), 1); - - count = debugPlugin.getCommitCount(); pluginManager.dispatch(COMMIT_EVENT, new Long(1234)); - debugPlugin.lock(); - while (debugPlugin.getCommitCount() == count) { - debugPlugin.events.await(5, TimeUnit.SECONDS); - } - debugPlugin.unlock(); - - count = debugPlugin.getHeartbeatCount(); pluginManager.dispatch(HEARTBEAT, new StreamingContainerUmbilicalProtocol.ContainerHeartbeat()); - debugPlugin.lock(); - while (debugPlugin.getHeartbeatCount() == count) { - debugPlugin.events.await(5, TimeUnit.SECONDS); - } - debugPlugin.unlock(); + debugPlugin.waitForEventDelivery(10); pluginManager.stop(); Assert.assertEquals(1, debugPlugin.getEventCount());
