Repository: samza Updated Branches: refs/heads/master 1bb737a8d -> 9d52e996b
SAMZA-1137: Instantiate ApplicationRunner in SamzaContainer Create an ApplicationRunner in SamzaContainer to provide StreamSpecs for fluent API. Author: Xinyu Liu <[email protected]> Reviewers: Jacob Maes <[email protected]> Closes #94 from xinyuiscool/SAMZA-1137 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9d52e996 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9d52e996 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9d52e996 Branch: refs/heads/master Commit: 9d52e996b5221c8aae71446b9ce2ebfa0dde3e82 Parents: 1bb737a Author: Xinyu Liu <[email protected]> Authored: Fri Mar 24 13:49:30 2017 -0700 Committer: Xinyu Liu <[email protected]> Committed: Fri Mar 24 13:49:30 2017 -0700 ---------------------------------------------------------------------- .../processor/SamzaContainerController.java | 4 +- .../apache/samza/processor/StreamProcessor.java | 11 -- .../samza/runtime/LocalContainerRunner.java | 124 +++++++++++++++++++ .../org/apache/samza/task/TaskFactoryUtil.java | 65 +++++----- .../apache/samza/container/SamzaContainer.scala | 53 +------- .../samza/job/local/ThreadJobFactory.scala | 10 +- .../samza/runtime/TestLocalContainerRunner.java | 49 ++++++++ .../apache/samza/task/TestTaskFactoryUtil.java | 68 +++++----- .../samza/container/TestSamzaContainer.scala | 24 ---- samza-shell/src/main/bash/run-container.sh | 2 +- .../test/processor/TestStreamProcessor.java | 4 +- 11 files changed, 255 insertions(+), 159 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/9d52e996/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java index 81f9d2b..76e2053 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java +++ b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java @@ -105,9 +105,7 @@ public class SamzaContainerController { localityManager, new JmxServer(), Util.<String, MetricsReporter>javaMapAsScalaMap(metricsReporterMap), - taskFactory, - // TODO: need to use the correct local ApplicationRunner here - null); + taskFactory); log.info("About to start container: " + containerModel.getContainerId()); containerFuture = executorService.submit(() -> container.run()); } http://git-wip-us.apache.org/repos/asf/samza/blob/9d52e996/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 4d3e8ab..15d9b9d 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -108,17 +108,6 @@ public class StreamProcessor { this(processorId, config, customMetricsReporters, (Object) streamTaskFactory); } - /** - * Same as {@link #StreamProcessor(int, Config, Map, AsyncStreamTaskFactory)}, except task instances are created - * using the "task.class" configuration instead of a task factory. - * @param processorId - this processor Id - * @param config - config - * @param customMetricsReporters metrics - */ - public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters) { - this(processorId, config, customMetricsReporters, (Object) null); - } - private StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters, Object taskFactory) { this.processorId = processorId; http://git-wip-us.apache.org/repos/asf/samza/blob/9d52e996/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java new file mode 100644 index 0000000..4bfad65 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.runtime; + +import java.util.HashMap; +import java.util.Random; +import org.apache.log4j.MDC; +import org.apache.samza.SamzaException; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.ShellCommandConfig; +import org.apache.samza.container.SamzaContainer; +import org.apache.samza.container.SamzaContainer$; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.metrics.JmxServer; +import org.apache.samza.metrics.MetricsReporter; +import org.apache.samza.task.TaskFactoryUtil; +import org.apache.samza.util.ScalaToJavaUtils; +import org.apache.samza.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * LocalContainerRunner is the local runner for Yarn {@link SamzaContainer}s. It is an intermediate step to + * have a local runner for yarn before we consolidate the Yarn container and coordination into a + * {@link org.apache.samza.processor.StreamProcessor}. This class will be replaced by the {@link org.apache.samza.processor.StreamProcessor} + * local runner once that's done. + * + * Since we don't have the {@link org.apache.samza.coordinator.JobCoordinator} implementation in Yarn, the components (jobModel and containerId) + * are directly inside the runner. + */ +public class LocalContainerRunner extends AbstractApplicationRunner { + private static final Logger log = LoggerFactory.getLogger(LocalContainerRunner.class); + private final JobModel jobModel; + private final int containerId; + + public LocalContainerRunner(JobModel jobModel, int containerId) { + super(jobModel.getConfig()); + this.jobModel = jobModel; + this.containerId = containerId; + } + + @Override + public void run(StreamApplication streamApp) { + JmxServer jmxServer = null; + try { + jmxServer = new JmxServer(); + ContainerModel containerModel = jobModel.getContainers().get(containerId); + Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, this); + + SamzaContainer container = SamzaContainer$.MODULE$.apply( + containerModel.getContainerId(), + containerModel, + config, + jobModel.maxChangeLogStreamPartitions, + SamzaContainer.getLocalityManager(containerId, config), + jmxServer, + Util.javaMapAsScalaMap(new HashMap<String, MetricsReporter>()), + taskFactory); + + container.run(); + } finally { + if (jmxServer != null) { + jmxServer.stop(); + } + } + } + + + public static void main(String[] args) throws Exception { + setExceptionHandler(() -> { + log.info("Exiting process now."); + System.exit(1); + }); + + Integer containerId = Integer.valueOf(System.getenv(ShellCommandConfig.ENV_CONTAINER_ID())); + log.info(String.format("Got container ID: %d", containerId)); + String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL()); + log.info(String.format("Got coordinator URL: %s", coordinatorUrl)); + int delay = new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1; + JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay); + Config config = jobModel.getConfig(); + JobConfig jobConfig = new JobConfig(config); + if (jobConfig.getName().isEmpty()) { + throw new SamzaException("can not find the job name"); + } + String jobName = jobConfig.getName().get(); + String jobId = jobConfig.getJobId().getOrElse(ScalaToJavaUtils.defaultValue("1")); + MDC.put("containerName", "samza-container-" + containerId); + MDC.put("jobName", jobName); + MDC.put("jobId", jobId); + + StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config); + new LocalContainerRunner(jobModel, containerId).run(streamApp); + } + + /* package private */ static void setExceptionHandler(Runnable runnable) { + Thread.UncaughtExceptionHandler exceptionHandler = (t, e) -> { + log.error(String.format("Uncaught exception in thread (name=%s).", t.getName(), e)); + e.printStackTrace(System.err); + runnable.run(); + }; + Thread.setDefaultUncaughtExceptionHandler(exceptionHandler); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/9d52e996/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java index e79d278..f8d1c87 100644 --- a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java +++ b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java @@ -39,28 +39,35 @@ public class TaskFactoryUtil { private static final Logger log = LoggerFactory.getLogger(TaskFactoryUtil.class); /** - * This method loads a task factory class based on the configuration + * This method creates a task factory class based on the configuration and {@link StreamApplication} * * @param config the {@link Config} for this job + * @param streamApp the {@link StreamApplication} * @param runner the {@link ApplicationRunner} to run this job * @return a task factory object, either a instance of {@link StreamTaskFactory} or {@link AsyncStreamTaskFactory} */ - public static Object fromTaskClassConfig(Config config, ApplicationRunner runner) { + public static Object createTaskFactory(Config config, StreamApplication streamApp, ApplicationRunner runner) { + return (streamApp != null) ? createStreamOperatorTaskFactory(streamApp, runner) : fromTaskClassConfig(config); + } - String taskClassName; + private static StreamTaskFactory createStreamOperatorTaskFactory(StreamApplication streamApp, ApplicationRunner runner) { + return () -> new StreamOperatorTask(streamApp, runner); + } + /** + * Create {@link StreamTaskFactory} or {@link AsyncStreamTaskFactory} based on the configured task.class. + * @param config the {@link Config} + * @return task factory instance + */ + private static Object fromTaskClassConfig(Config config) { // if there is configuration to set the job w/ a specific type of task, instantiate the corresponding task factory - if (isStreamOperatorTask(config)) { - taskClassName = StreamOperatorTask.class.getName(); - } else { - taskClassName = new TaskConfig(config).getTaskClass().getOrElse( - new AbstractFunction0<String>() { - @Override - public String apply() { - throw new ConfigException("There is no task class defined in the configuration. Failed to create a valid TaskFactory"); - } - }); - } + String taskClassName = new TaskConfig(config).getTaskClass().getOrElse( + new AbstractFunction0<String>() { + @Override + public String apply() { + throw new ConfigException("There is no task class defined in the configuration. Failed to create a valid TaskFactory"); + } + }); log.info("Got task class name: {}", taskClassName); @@ -89,9 +96,6 @@ public class TaskFactoryUtil { @Override public StreamTask createInstance() { try { - if (taskClassName.equals(StreamOperatorTask.class.getName())) { - return createStreamOperatorTask(config, runner); - } return (StreamTask) Class.forName(taskClassName).newInstance(); } catch (Throwable t) { log.error("Error loading StreamTask class: {}. error: {}", taskClassName, t); @@ -147,29 +151,28 @@ public class TaskFactoryUtil { } } - private static StreamTask createStreamOperatorTask(Config config, ApplicationRunner runner) throws Exception { - Class<?> builderClass = Class.forName(config.get(StreamApplication.APP_CLASS_CONFIG)); - StreamApplication graphBuilder = (StreamApplication) builderClass.newInstance(); - return new StreamOperatorTask(graphBuilder, runner); - } - - private static boolean isStreamOperatorTask(Config config) { + /** + * Returns {@link StreamApplication} if it's configured, otherwise null. + * @param config Config + * throws {@link ConfigException} if there is misconfiguration of StreamApp. + * @return {@link StreamApplication} instance + */ + public static StreamApplication createStreamApplication(Config config) { if (config.get(StreamApplication.APP_CLASS_CONFIG) != null && !config.get(StreamApplication.APP_CLASS_CONFIG).isEmpty()) { - TaskConfig taskConfig = new TaskConfig(config); if (taskConfig.getTaskClass() != null && !taskConfig.getTaskClass().isEmpty()) { throw new ConfigException("High level StreamApplication API cannot be used together with low-level API using task.class."); } + String appClassName = config.get(StreamApplication.APP_CLASS_CONFIG); try { - Class<?> builderClass = Class.forName(config.get(StreamApplication.APP_CLASS_CONFIG)); - return StreamApplication.class.isAssignableFrom(builderClass); + Class<?> builderClass = Class.forName(appClassName); + return (StreamApplication) builderClass.newInstance(); } catch (Throwable t) { - log.error("Failed to validate StreamApplication class from the config. {}={}", - StreamApplication.APP_CLASS_CONFIG, config.get(StreamApplication.APP_CLASS_CONFIG)); - return false; + throw new ConfigException(String.format("%s is not a StreamApplication.", appClassName)); } + } else { + return null; } - return false; } } http://git-wip-us.apache.org/repos/asf/samza/blob/9d52e996/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 77959dd..e43ddfe 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -23,7 +23,6 @@ import java.io.File import java.nio.file.Path import java.util import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, TimeUnit} -import java.lang.Thread.UncaughtExceptionHandler import java.net.{URL, UnknownHostException} import org.apache.samza.SamzaException @@ -82,49 +81,6 @@ object SamzaContainer extends Logging { val DEFAULT_READ_JOBMODEL_DELAY_MS = 100 val DISK_POLL_INTERVAL_KEY = "container.disk.poll.interval.ms" - def main(args: Array[String]) { - safeMain(() => new JmxServer, new SamzaContainerExceptionHandler(() => System.exit(1))) - } - - def safeMain( - newJmxServer: () => JmxServer, - exceptionHandler: UncaughtExceptionHandler = null) { - if (exceptionHandler != null) { - Thread.setDefaultUncaughtExceptionHandler(exceptionHandler) - } - putMDC("containerName", "samza-container-" + System.getenv(ShellCommandConfig.ENV_CONTAINER_ID)) - // Break out the main method to make the JmxServer injectable so we can - // validate that we don't leak JMX non-daemon threads if we have an - // exception in the main method. - val containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID).toInt - logger.info("Got container ID: %s" format containerId) - val coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL) - logger.info("Got coordinator URL: %s" format coordinatorUrl) - val jobModel = readJobModel(coordinatorUrl) - val containerModel = jobModel.getContainers()(containerId.toInt) - val config = jobModel.getConfig - putMDC("jobName", config.getName.getOrElse(throw new SamzaException("can not find the job name"))) - putMDC("jobId", config.getJobId.getOrElse("1")) - var jmxServer: JmxServer = null - - try { - jmxServer = newJmxServer() - val containerModel = jobModel.getContainers.get(containerId.toInt) - // TODO: add actual local runner in a container to the parameters - SamzaContainer( - containerId.toInt, - containerModel, - config, - jobModel.maxChangeLogStreamPartitions, - getLocalityManager(containerId, config), - jmxServer).run - } finally { - if (jmxServer != null) { - jmxServer.stop - } - } - } - def getLocalityManager(containerId: Int, config: Config): LocalityManager = { val containerName = getSamzaContainerName(containerId) val registryMap = new MetricsRegistryMap(containerName) @@ -164,9 +120,7 @@ object SamzaContainer extends Logging { localityManager: LocalityManager, jmxServer: JmxServer, customReporters: Map[String, MetricsReporter] = Map[String, MetricsReporter](), - taskFactory: Object = null, - // SAMZA-1137: need to instantiate the ApplicationRunner in the container local JVM and pass it in - appRunner: ApplicationRunner = null) = { + taskFactory: Object) = { val containerName = getSamzaContainerName(containerId) val containerPID = Util.getContainerPID @@ -438,11 +392,8 @@ object SamzaContainer extends Logging { else null - val taskFactoryInstance = Option(taskFactory) - .getOrElse(TaskFactoryUtil.fromTaskClassConfig(config, appRunner)) - val finalTaskFactory = TaskFactoryUtil.finalizeTaskFactory( - taskFactoryInstance, + taskFactory, singleThreadMode, taskThreadPool) http://git-wip-us.apache.org/repos/asf/samza/blob/9d52e996/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala index 164e319..f218543 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala @@ -20,7 +20,10 @@ package org.apache.samza.job.local +import org.apache.samza.metrics.MetricsReporter import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap} +import org.apache.samza.runtime.LocalContainerRunner +import org.apache.samza.task.TaskFactoryUtil import org.apache.samza.util.Logging import org.apache.samza.SamzaException import org.apache.samza.config.Config @@ -41,6 +44,9 @@ class ThreadJobFactory extends StreamJobFactory with Logging { val jobModel = coordinator.jobModel val containerModel = jobModel.getContainers.get(0) val jmxServer = new JmxServer + val streamApp = TaskFactoryUtil.createStreamApplication(config) + val appRunner = new LocalContainerRunner(jobModel, 0) + val taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, appRunner) // Give developers a nice friendly warning if they've specified task.opts and are using a threaded job. config.getTaskOpts match { @@ -57,7 +63,9 @@ class ThreadJobFactory extends StreamJobFactory with Logging { config, jobModel.maxChangeLogStreamPartitions, null, - jmxServer)) + jmxServer, + Map[String, MetricsReporter](), + taskFactory)) } finally { coordinator.stop jmxServer.stop http://git-wip-us.apache.org/repos/asf/samza/blob/9d52e996/samza-core/src/test/java/org/apache/samza/runtime/TestLocalContainerRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalContainerRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalContainerRunner.java new file mode 100644 index 0000000..f9465e3 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalContainerRunner.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.runtime; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestLocalContainerRunner { + private boolean caughtException = false; + + @Test + public void testUncaughtExceptionHandler() throws Exception { + Runnable runnable = () -> { caughtException = true; }; + LocalContainerRunner.setExceptionHandler(runnable); + + try { + ((String) null).length(); + } catch (Exception e) { + // catch null pointer exception + } + assertFalse(caughtException); + + Thread t = new Thread(() -> { + throw new RuntimeException("Uncaught exception in another thread. Catch this."); + }); + t.start(); + t.join(); + assertTrue(caughtException); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/9d52e996/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java index 27168f3..0b051e8 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java @@ -33,6 +33,8 @@ import java.util.HashMap; import java.util.concurrent.ExecutorService; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -51,7 +53,7 @@ public class TestTaskFactoryUtil { this.put("task.class", "org.apache.samza.testUtils.TestStreamTask"); } }); - Object retFactory = TaskFactoryUtil.fromTaskClassConfig(config, mockRunner); + Object retFactory = TaskFactoryUtil.createTaskFactory(config, null, null); assertTrue(retFactory instanceof StreamTaskFactory); assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask); @@ -61,7 +63,7 @@ public class TestTaskFactoryUtil { } }); try { - TaskFactoryUtil.fromTaskClassConfig(config, mockRunner); + TaskFactoryUtil.createTaskFactory(config, null, null); fail("Should have failed w/ no.such.class"); } catch (ConfigException cfe) { // expected @@ -69,13 +71,15 @@ public class TestTaskFactoryUtil { } @Test - public void testStreamOperatorTaskClass() { + public void testCreateStreamApplication() throws Exception { Config config = new MapConfig(new HashMap<String, String>() { { this.put(StreamApplication.APP_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamApplication"); } }); - Object retFactory = TaskFactoryUtil.fromTaskClassConfig(config, mockRunner); + StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config); + assertNotNull(streamApp); + Object retFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, mockRunner); assertTrue(retFactory instanceof StreamTaskFactory); assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask); @@ -85,7 +89,7 @@ public class TestTaskFactoryUtil { } }); try { - TaskFactoryUtil.fromTaskClassConfig(config, mockRunner); + TaskFactoryUtil.createStreamApplication(config); fail("Should have failed w/ no.such.class"); } catch (ConfigException ce) { // expected @@ -97,7 +101,7 @@ public class TestTaskFactoryUtil { } }); try { - TaskFactoryUtil.fromTaskClassConfig(config, mockRunner); + TaskFactoryUtil.createStreamApplication(config); fail("Should have failed w/ no.such.class"); } catch (ConfigException ce) { // expected @@ -108,32 +112,23 @@ public class TestTaskFactoryUtil { this.put(StreamApplication.APP_CLASS_CONFIG, ""); } }); - try { - TaskFactoryUtil.fromTaskClassConfig(config, mockRunner); - fail("Should have failed w/ empty class name for StreamApplication"); - } catch (ConfigException ce) { - // expected - } + streamApp = TaskFactoryUtil.createStreamApplication(config); + assertNull(streamApp); config = new MapConfig(new HashMap<>()); - try { - TaskFactoryUtil.fromTaskClassConfig(config, mockRunner); - fail(String.format("Should have failed w/ non-existing entry for %s", StreamApplication.APP_CLASS_CONFIG)); - } catch (ConfigException ce) { - // expected - } + streamApp = TaskFactoryUtil.createStreamApplication(config); + assertNull(streamApp); } @Test - public void testStreamOperatorTaskClassWithTaskClass() { + public void testCreateStreamApplicationWithTaskClass() throws Exception { Config config = new MapConfig(new HashMap<String, String>() { { this.put(StreamApplication.APP_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamApplication"); } }); - Object retFactory = TaskFactoryUtil.fromTaskClassConfig(config, mockRunner); - assertTrue(retFactory instanceof StreamTaskFactory); - assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask); + StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config); + assertNotNull(streamApp); config = new MapConfig(new HashMap<String, String>() { { @@ -142,7 +137,7 @@ public class TestTaskFactoryUtil { } }); try { - TaskFactoryUtil.fromTaskClassConfig(config, mockRunner); + TaskFactoryUtil.createStreamApplication(config); fail("should have failed with invalid config"); } catch (ConfigException ce) { // expected @@ -155,7 +150,7 @@ public class TestTaskFactoryUtil { } }); try { - TaskFactoryUtil.fromTaskClassConfig(config, mockRunner); + TaskFactoryUtil.createStreamApplication(config); fail("should have failed with invalid config"); } catch (ConfigException ce) { // expected @@ -163,7 +158,7 @@ public class TestTaskFactoryUtil { } @Test - public void testStreamTaskClassWithInvalidStreamGraphBuilder() { + public void testStreamTaskClassWithInvalidStreamApplication() throws Exception { Config config = new MapConfig(new HashMap<String, String>() { { @@ -171,8 +166,8 @@ public class TestTaskFactoryUtil { } }); try { - TaskFactoryUtil.fromTaskClassConfig(config, mockRunner); - fail("should have failed with invalid config"); + TaskFactoryUtil.createStreamApplication(config); + fail("Should have failed w/ no.such.class"); } catch (ConfigException ce) { // expected } @@ -183,7 +178,8 @@ public class TestTaskFactoryUtil { this.put(StreamApplication.APP_CLASS_CONFIG, ""); } }); - Object retFactory = TaskFactoryUtil.fromTaskClassConfig(config, mockRunner); + StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config); + Object retFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, mockRunner); assertTrue(retFactory instanceof StreamTaskFactory); assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask); @@ -194,7 +190,7 @@ public class TestTaskFactoryUtil { } }); try { - TaskFactoryUtil.fromTaskClassConfig(config, mockRunner); + TaskFactoryUtil.createStreamApplication(config); fail("Should have failed w/ no class not found"); } catch (ConfigException cne) { // expected @@ -208,7 +204,7 @@ public class TestTaskFactoryUtil { this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask"); } }); - Object retFactory = TaskFactoryUtil.fromTaskClassConfig(config, mockRunner); + Object retFactory = TaskFactoryUtil.createTaskFactory(config, null, null); assertTrue(retFactory instanceof AsyncStreamTaskFactory); assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask); @@ -218,7 +214,7 @@ public class TestTaskFactoryUtil { } }); try { - TaskFactoryUtil.fromTaskClassConfig(config, mockRunner); + TaskFactoryUtil.createTaskFactory(config, null, null); fail("Should have failed w/ no.such.class"); } catch (ConfigException cfe) { // expected @@ -226,7 +222,7 @@ public class TestTaskFactoryUtil { } @Test - public void testAsyncStreamTaskWithInvalidStreamGraphBuilder() throws ClassNotFoundException { + public void testAsyncStreamTaskWithInvalidStreamGraphBuilder() throws Exception { Config config = new MapConfig(new HashMap<String, String>() { { @@ -234,7 +230,7 @@ public class TestTaskFactoryUtil { } }); try { - TaskFactoryUtil.fromTaskClassConfig(config, mockRunner); + TaskFactoryUtil.createStreamApplication(config); fail("Should have failed w/ no.such.class"); } catch (ConfigException cfe) { // expected @@ -246,7 +242,8 @@ public class TestTaskFactoryUtil { this.put(StreamApplication.APP_CLASS_CONFIG, ""); } }); - Object retFactory = TaskFactoryUtil.fromTaskClassConfig(config, mockRunner); + StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config); + Object retFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, mockRunner); assertTrue(retFactory instanceof AsyncStreamTaskFactory); assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask); @@ -256,7 +253,8 @@ public class TestTaskFactoryUtil { this.put(StreamApplication.APP_CLASS_CONFIG, null); } }); - retFactory = TaskFactoryUtil.fromTaskClassConfig(config, mockRunner); + streamApp = TaskFactoryUtil.createStreamApplication(config); + retFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, mockRunner); assertTrue(retFactory instanceof AsyncStreamTaskFactory); assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask); } http://git-wip-us.apache.org/repos/asf/samza/blob/9d52e996/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index cf93928..a72a59a 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -214,30 +214,6 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { } @Test - def testUncaughtExceptionHandler { - var caughtException = false - val exceptionHandler = new UncaughtExceptionHandler { - def uncaughtException(t: Thread, e: Throwable) { - caughtException = true - } - } - try { - SamzaContainer.safeMain(() => null, exceptionHandler) - } catch { - case _: Exception => - // Expect some random exception from SamzaContainer because we haven't - // set any environment variables for container ID, etc. - } - assertFalse(caughtException) - val t = new Thread(new Runnable { - def run = throw new RuntimeException("Uncaught exception in another thread. Catch this.") - }) - t.start - t.join - assertTrue(caughtException) - } - - @Test def testErrorInTaskInitShutsDownTask { val task = new StreamTask with InitableTask with ClosableTask { var wasShutdown = false http://git-wip-us.apache.org/repos/asf/samza/blob/9d52e996/samza-shell/src/main/bash/run-container.sh ---------------------------------------------------------------------- diff --git a/samza-shell/src/main/bash/run-container.sh b/samza-shell/src/main/bash/run-container.sh index b75e668..bc10338 100755 --- a/samza-shell/src/main/bash/run-container.sh +++ b/samza-shell/src/main/bash/run-container.sh @@ -25,4 +25,4 @@ # Set container name system property for use in Log4J [[ $JAVA_OPTS != *-Dsamza.container.name* && ! -z "$SAMZA_CONTAINER_ID" ]] && export JAVA_OPTS="$JAVA_OPTS -Dsamza.container.name=samza-container-$SAMZA_CONTAINER_ID" -exec $(dirname $0)/run-class.sh org.apache.samza.container.SamzaContainer "$@" \ No newline at end of file +exec $(dirname $0)/run-class.sh org.apache.samza.runtime.LocalContainerRunner "$@" \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/9d52e996/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java index f58bd8f..a1ad363 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java @@ -69,7 +69,7 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness { // Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a // TopicExistsException since StreamProcessor auto-creates them. createTopics(inputTopic, outputTopic); - final StreamProcessor processor = new StreamProcessor(1, new MapConfig(configs), new HashMap<>()); + final StreamProcessor processor = new StreamProcessor(1, new MapConfig(configs), new HashMap<>(), IdentityStreamTask::new); produceMessages(inputTopic, messageCount); run(processor, endLatch); @@ -132,7 +132,7 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness { configMap.remove("task.class"); final Config configs = new MapConfig(configMap); - StreamProcessor processor = new StreamProcessor(1, configs, new HashMap<>()); + StreamProcessor processor = new StreamProcessor(1, configs, new HashMap<>(), (StreamTaskFactory) null); run(processor, endLatch); }
