added configs for EI3 testing, thread naming, cleanup
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0a2b63a4 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0a2b63a4 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0a2b63a4 Branch: refs/heads/samza-standalone Commit: 0a2b63a44ab45a7c3a9b6d8743a5839e7d08cba4 Parents: e37f910 Author: navina <[email protected]> Authored: Fri Dec 23 17:02:45 2016 -0800 Committer: navina <[email protected]> Committed: Fri Dec 23 17:02:45 2016 -0800 ---------------------------------------------------------------------- .../apache/samza/processor/SamzaContainerController.java | 10 ++++++++-- .../java/org/apache/samza/processor/StreamProcessor.java | 1 + .../org/apache/samza/zk/ScheduleAfterDebounceTime.java | 4 +++- .../main/java/org/apache/samza/zk/ZkJobCoordinator.java | 1 + samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java | 8 ++++++++ 5 files changed, 21 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/0a2b63a4/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java index 9352f27..fb227d0 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java +++ b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java @@ -1,5 +1,6 @@ package org.apache.samza.processor; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.Config; import org.apache.samza.config.TaskConfigJava; @@ -45,8 +46,10 @@ public class SamzaContainerController { public SamzaContainerController ( Object taskFactory, long containerShutdownMs, + String processorId, Map<String, MetricsReporter> metricsReporterMap) { - this.executorService = Executors.newSingleThreadExecutor(); + this.executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() + .setNameFormat("p" + processorId + "-container-thread-%d").build()); this.taskFactory = taskFactory; this.metricsReporterMap = metricsReporterMap; if (containerShutdownMs == -1) { @@ -73,6 +76,7 @@ public class SamzaContainerController { if (new ClusterManagerConfig(config).getHostAffinityEnabled()) { localityManager = SamzaContainer$.MODULE$.getLocalityManager(containerModel.getContainerId(), config); } + log.info("About to create container: " + containerModel.getContainerId()); container = SamzaContainer$.MODULE$.apply( containerModel.getContainerId(), containerModel, @@ -82,6 +86,7 @@ public class SamzaContainerController { new JmxServer(), Util.<String, MetricsReporter>javaMapAsScalaMap(metricsReporterMap), taskFactory); + log.info("About to start container: " + containerModel.getContainerId()); containerFuture = executorService.submit(() -> container.run()); } @@ -109,7 +114,8 @@ public class SamzaContainerController { container.shutdown(); try { - containerFuture.get(containerShutdownMs, TimeUnit.MILLISECONDS); + if(containerFuture != null) + containerFuture.get(containerShutdownMs, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException e) { log.error("Ran into problems while trying to stop the container in the processor!", e); } catch (TimeoutException e) { http://git-wip-us.apache.org/repos/asf/samza/blob/0a2b63a4/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 61795e1..596f8f4 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -121,6 +121,7 @@ public class StreamProcessor { this.containerController = new SamzaContainerController( taskFactory, new TaskConfigJava(updatedConfig).getShutdownMs(), + String.valueOf(processorId), customMetricsReporters); this.jobCoordinator = Util. http://git-wip-us.apache.org/repos/asf/samza/blob/0a2b63a4/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java index 1854de6..e217dab 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java @@ -1,5 +1,6 @@ package org.apache.samza.zk; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executors; @@ -19,7 +20,8 @@ public class ScheduleAfterDebounceTime { public static final int DEBOUNCE_TIME_MS = 2000; - private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("zk-debounce-thread-%d").setDaemon(true).build()); private final Map<String, ScheduledFuture> futureHandles = new HashMap<>(); public ScheduleAfterDebounceTime () { http://git-wip-us.apache.org/repos/asf/samza/blob/0a2b63a4/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 560e19b..f661547 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -95,6 +95,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener { @Override public void stop() { zkController.stop(); + containerController.shutdown(); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/0a2b63a4/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index 0be2b04..de8c213 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -185,7 +185,15 @@ public class ZkUtils { } public void close() { + try { + zkConnnection.close(); + } catch (InterruptedException e) { + e.printStackTrace(); + } zkClient.close(); + + if(debounceTimer != null) + debounceTimer.stopScheduler(); } public void deleteRoot() {
