Repository: samza Updated Branches: refs/heads/master bc0a47b7c -> 1dfc5cecd
Misc. minor cleanup. 1. Added a meaningful name for the container thread pool threads. 2. Made the thread names for framework threads consistent. 3. Made a couple of monitoring/metrics threads daemon. 4. Fixed a few checkstyle warning about missing param/throws documentation. Author: Prateek Maheshwari <[email protected]> Reviewers: Jagadish <[email protected]>, Jacob M <[email protected]> Closes #433 from prateekm/container-thread-pool-name Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1dfc5cec Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1dfc5cec Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1dfc5cec Branch: refs/heads/master Commit: 1dfc5cecdfa358851f01b4ef105ccb0f62e9891c Parents: bc0a47b Author: Prateek Maheshwari <[email protected]> Authored: Sat Mar 3 16:30:59 2018 -0800 Committer: Jagadish <[email protected]> Committed: Sat Mar 3 16:30:59 2018 -0800 ---------------------------------------------------------------------- .../disk/PollingScanDiskSpaceMonitor.java | 20 ++++------ .../container/host/StatisticsMonitorImpl.java | 20 ++-------- .../stream/CoordinatorStreamManager.java | 2 +- .../apache/samza/container/SamzaContainer.scala | 11 ++++-- .../org/apache/samza/metrics/JvmMetrics.scala | 24 +++++------- .../reporter/MetricsSnapshotReporter.scala | 31 ++++++---------- .../filereader/FileReaderSystemConsumer.scala | 27 ++++++-------- .../apache/samza/util/DaemonThreadFactory.scala | 39 -------------------- .../samza/util/TestDaemonThreadFactory.scala | 37 ------------------- .../apache/samza/system/kafka/BrokerProxy.scala | 12 +----- .../interfaces/RelSchemaProviderFactory.java | 3 +- .../interfaces/SamzaRelConverterFactory.java | 7 ++-- .../apache/samza/tools/GenerateKafkaEvents.java | 4 +- .../job/yarn/YarnClusterResourceManager.java | 2 +- .../yarn/SamzaAppMasterSecurityManager.scala | 29 +++++++++------ .../yarn/SamzaContainerSecurityManager.scala | 34 ++++++++++------- 16 files changed, 101 insertions(+), 201 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java b/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java index 75e461d..2ae8545 100644 --- a/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java +++ b/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java @@ -18,6 +18,8 @@ */ package org.apache.samza.container.disk; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,9 +36,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; /** * An implementation of {@link DiskSpaceMonitor} that polls for disk usage based on a specified @@ -47,7 +47,6 @@ import java.util.concurrent.atomic.AtomicInteger; public class PollingScanDiskSpaceMonitor implements DiskSpaceMonitor { private enum State { INIT, RUNNING, STOPPED } - private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryImpl(); private static final Logger log = LoggerFactory.getLogger(PollingScanDiskSpaceMonitor.class); // Note: we use this as a set where the value is always Boolean.TRUE. @@ -57,7 +56,11 @@ public class PollingScanDiskSpaceMonitor implements DiskSpaceMonitor { private final Object lock = new Object(); private final ScheduledExecutorService schedulerService = - Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY); + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("Samza PollingScanDiskSpaceMonitor Thread-%d") + .setDaemon(true) + .build()); private final Set<Path> watchPaths; private final long pollingIntervalMillis; @@ -197,13 +200,4 @@ public class PollingScanDiskSpaceMonitor implements DiskSpaceMonitor { } } } - - private static class ThreadFactoryImpl implements ThreadFactory { - private static final String PREFIX = "Samza-" + PollingScanDiskSpaceMonitor.class.getSimpleName() + "-"; - private static final AtomicInteger INSTANCE_NUM = new AtomicInteger(); - - public Thread newThread(Runnable runnable) { - return new Thread(runnable, PREFIX + INSTANCE_NUM.getAndIncrement()); - } - } } http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java b/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java index 3dfdf36..23704b4 100644 --- a/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java @@ -18,6 +18,8 @@ */ package org.apache.samza.container.host; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,9 +27,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; /** * An implementation of {@link SystemStatisticsMonitor} for unix and mac platforms. Users can implement their own @@ -42,8 +42,6 @@ import java.util.concurrent.atomic.AtomicInteger; * This class is thread-safe. */ public class StatisticsMonitorImpl implements SystemStatisticsMonitor { - - private static final ThreadFactory THREAD_FACTORY = new StatisticsMonitorThreadFactory(); private static final Logger LOG = LoggerFactory.getLogger(StatisticsMonitorImpl.class); /** @@ -60,7 +58,8 @@ public class StatisticsMonitorImpl implements SystemStatisticsMonitor { // Single threaded executor to handle callback invocations. private final ScheduledExecutorService schedulerService = - Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY); + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("Samza StatisticsMonitor Thread-%d").setDaemon(true).build()); // Use this as a set with value always set to True private final ConcurrentMap<StatisticsMonitorImpl.Listener, Boolean> listenerSet = new ConcurrentHashMap<>(); @@ -174,15 +173,4 @@ public class StatisticsMonitorImpl implements SystemStatisticsMonitor { } } } - - // A convenience class that provides named threads - private static class StatisticsMonitorThreadFactory implements ThreadFactory { - private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger(); - private static final String PREFIX = "Samza-StatisticsMonitor-Thread-"; - - @Override - public Thread newThread(Runnable runnable) { - return new Thread(runnable, PREFIX + INSTANCE_COUNT.getAndIncrement()); - } - } } http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamManager.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamManager.java index f6e68b5..4edc1a5 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamManager.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamManager.java @@ -79,7 +79,7 @@ public class CoordinatorStreamManager { /** * Register source with the coordinator stream. * - * @param source + * @param source source to register with the coordinator stream */ public void register(String source) { if (coordinatorStreamConsumer != null) { http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 9b18044..bef5b41 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -26,6 +26,7 @@ import java.util import java.util.Base64 import java.util.concurrent.{ExecutorService, Executors, TimeUnit} +import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics} import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.config.MetricsConfig.Config2Metrics @@ -405,10 +406,14 @@ object SamzaContainer extends Logging { val threadPoolSize = config.getThreadPoolSize info("Got thread pool size: " + threadPoolSize) - val taskThreadPool = if (!singleThreadMode && threadPoolSize > 0) - Executors.newFixedThreadPool(threadPoolSize) - else + + val taskThreadPool = if (!singleThreadMode && threadPoolSize > 0) { + Executors.newFixedThreadPool(threadPoolSize, + new ThreadFactoryBuilder().setNameFormat("Samza Container Thread-%d").build()) + } else { null + } + val finalTaskFactory = TaskFactoryUtil.finalizeTaskFactory( taskFactory, http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala index f26bd2c..7cc1452 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala @@ -19,23 +19,18 @@ package org.apache.samza.metrics -import scala.collection._ -import scala.collection.JavaConverters._ -import java.lang.management.ManagementFactory +import com.google.common.util.concurrent.ThreadFactoryBuilder +import com.sun.management.OperatingSystemMXBean +import com.sun.management.UnixOperatingSystemMXBean +import org.apache.samza.util.Logging + import java.lang.Thread.State._ +import java.lang.management.ManagementFactory import java.util.concurrent.Executors import java.util.concurrent.TimeUnit -import com.sun.management.{OperatingSystemMXBean, UnixOperatingSystemMXBean} -import org.apache.samza.util.Logging -import org.apache.samza.util.DaemonThreadFactory - -/** - * Companion object for class JvmMetrics encapsulating various constants - */ -object JvmMetrics { - val JVM_METRICS_THREAD_NAME_PREFIX = "JVM-METRICS" -} +import scala.collection.JavaConverters._ +import scala.collection._ /** * Straight up ripoff of Hadoop's metrics2 JvmMetrics class. @@ -49,7 +44,8 @@ class JvmMetrics(val registry: MetricsRegistry) extends MetricsHelper with Runna val threadMXBean = ManagementFactory.getThreadMXBean() val osMXBean = ManagementFactory.getOperatingSystemMXBean() var gcBeanCounters = Map[String, (Counter, Counter)]() - val executor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory(JvmMetrics.JVM_METRICS_THREAD_NAME_PREFIX)) + val executor = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("Samza JvmMetrics Thread-%d").setDaemon(true).build()) // jvm metrics val gMemNonHeapUsedM = newGauge("mem-non-heap-used-mb", 0.0F) http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala index 945ae47..65ca49c 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala @@ -19,30 +19,20 @@ package org.apache.samza.metrics.reporter +import com.google.common.util.concurrent.ThreadFactoryBuilder +import org.apache.samza.metrics._ +import org.apache.samza.serializers.Serializer +import org.apache.samza.system.OutgoingMessageEnvelope +import org.apache.samza.system.SystemProducer +import org.apache.samza.system.SystemStream +import org.apache.samza.util.Logging + import java.util.HashMap import java.util.Map -import scala.collection.JavaConverters._ -import org.apache.samza.util.Logging -import org.apache.samza.metrics.Counter -import org.apache.samza.metrics.Gauge -import org.apache.samza.metrics.Timer -import org.apache.samza.metrics.MetricsReporter -import org.apache.samza.metrics.MetricsVisitor -import org.apache.samza.metrics.ReadableMetricsRegistry import java.util.concurrent.Executors -import org.apache.samza.util.DaemonThreadFactory import java.util.concurrent.TimeUnit -import org.apache.samza.serializers.Serializer -import org.apache.samza.system.SystemProducer -import org.apache.samza.system.SystemStream -import org.apache.samza.system.OutgoingMessageEnvelope -/** - * Companion object for class MetricsSnapshotReporter encapsulating various constants - */ -object MetricsSnapshotReporter { - val METRIC_SNAPSHOT_REPORTER_THREAD_NAME_PREFIX = "METRIC-SNAPSHOT-REPORTER" -} +import scala.collection.JavaConverters._ /** * MetricsSnapshotReporter is a generic metrics reporter that sends metrics to a stream. @@ -66,7 +56,8 @@ class MetricsSnapshotReporter( serializer: Serializer[MetricsSnapshot] = null, clock: () => Long = () => { System.currentTimeMillis }) extends MetricsReporter with Runnable with Logging { - val executor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory(MetricsSnapshotReporter.METRIC_SNAPSHOT_REPORTER_THREAD_NAME_PREFIX)) + val executor = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("Samza MetricsSnapshotReporter Thread-%d").setDaemon(true).build()) val resetTime = clock() var registries = List[(String, ReadableMetricsRegistry)]() http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala index 84dd6b4..ad7577c 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala @@ -19,24 +19,20 @@ package org.apache.samza.system.filereader -import org.apache.samza.util.BlockingEnvelopeMap +import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.samza.metrics.MetricsRegistry +import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.system.SystemStreamPartition -import scala.collection.mutable.Map +import org.apache.samza.util.BlockingEnvelopeMap +import org.apache.samza.util.Logging + import java.io.RandomAccessFile -import org.apache.samza.system.IncomingMessageEnvelope -import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.Executors import java.util.concurrent.ExecutorService -import org.apache.samza.util.DaemonThreadFactory -import org.apache.samza.util.Logging +import java.util.concurrent.Executors +import java.util.concurrent.LinkedBlockingQueue + +import scala.collection.mutable.Map -object FileReaderSystemConsumer { - /** - * prefix for the file reader system thread names - */ - val FILE_READER_SYSTEM_THREAD_PREFIX = "filereader-" -} class FileReaderSystemConsumer( systemName: String, @@ -77,8 +73,9 @@ class FileReaderSystemConsumer( * start one thread for each file reader */ override def start { - pool = Executors.newFixedThreadPool(systemStreamPartitionAndStartingOffset.size, new DaemonThreadFactory(FileReaderSystemConsumer.FILE_READER_SYSTEM_THREAD_PREFIX)) - systemStreamPartitionAndStartingOffset.map { case (ssp, offset) => pool.execute(readInputFiles(ssp, offset)) } + pool = Executors.newFixedThreadPool(systemStreamPartitionAndStartingOffset.size, + new ThreadFactoryBuilder().setNameFormat("Samza FileReader Thread-%d").setDaemon(true).build()) + systemStreamPartitionAndStartingOffset.foreach { case (ssp, offset) => pool.execute(readInputFiles(ssp, offset)) } } /** http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala b/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala deleted file mode 100644 index d2015ab..0000000 --- a/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.util - -import java.util.concurrent.ThreadFactory - - -object ThreadNamePrefix { - val SAMZA_THREAD_NAME_PREFIX = "SAMZA-" -} - -class DaemonThreadFactory(name: String) extends ThreadFactory { - - def newThread(r: Runnable) = { - val thread = new Thread(r) - thread.setDaemon(true) - if (name.nonEmpty) { - thread.setName(ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX+name) - } - thread - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala b/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala deleted file mode 100644 index ee56e20..0000000 --- a/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.util - -import org.junit.Assert._ -import org.junit.Test - -class TestDaemonThreadFactory { - @Test - def testDaemonThreadFactoryCanCreatThreadGivenName() { - val testThreadName = "JvmMetrics" - val dtf = new DaemonThreadFactory(testThreadName) - val threadWithName = dtf.newThread(new Runnable { - def run() { - // Not testing this particular method - } - }) - assertEquals(ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX + testThreadName, threadWithName.getName) - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index 8a6618d..e5482a9 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -31,19 +31,11 @@ import kafka.consumer.ConsumerConfig import kafka.message.MessageSet import org.apache.samza.SamzaException import org.apache.samza.util.ExponentialSleepStrategy +import org.apache.samza.util.KafkaUtil import org.apache.samza.util.Logging -import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX import scala.collection.JavaConverters._ import scala.collection.concurrent -import org.apache.samza.util.KafkaUtil - -/** - * Companion object for class JvmMetrics encapsulating various constants - */ -object BrokerProxy { - val BROKER_PROXY_THREAD_NAME_PREFIX = "BROKER-PROXY-" -} /** * A BrokerProxy consolidates Kafka fetches meant for a particular broker and retrieves them all at once, providing @@ -294,7 +286,7 @@ class BrokerProxy( if (!thread.isAlive) { info("Starting " + toString) thread.setDaemon(true) - thread.setName(SAMZA_THREAD_NAME_PREFIX + BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX + thread.getName) + thread.setName("Samza BrokerProxy " + thread.getName) thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler { override def uncaughtException(t: Thread, e: Throwable) = error("Uncaught exception in broker proxy:", e) }) http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java index c614cdf..f9e7cd6 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java @@ -26,7 +26,8 @@ public interface RelSchemaProviderFactory { /** * Create a {@link RelSchemaProvider} given the config - * @param config Config needed to create the {@link RelSchemaProvider} + * @param systemStream the system stream to create the {@link RelSchemaProvider} for + * @param config config needed to create the {@link RelSchemaProvider} * @return {@link RelSchemaProvider} object created. */ RelSchemaProvider create(SystemStream systemStream, Config config); http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java index f239df6..0a6f275 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java @@ -31,9 +31,10 @@ public interface SamzaRelConverterFactory { /** * Create a {@link SamzaRelConverter}. This method is called when the framework wants to create the * {@link SamzaRelConverter} corresponding to the system. - * @param config - * config that is used to create the object - * @return Returns the object created. + * @param systemStream the systemStream to create a converter for + * @param relSchemaProvider the relational schema provider + * @param config config that is used to create the object + * @return the object created. */ SamzaRelConverter create(SystemStream systemStream, RelSchemaProvider relSchemaProvider, Config config); } http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-tools/src/main/java/org/apache/samza/tools/GenerateKafkaEvents.java ---------------------------------------------------------------------- diff --git a/samza-tools/src/main/java/org/apache/samza/tools/GenerateKafkaEvents.java b/samza-tools/src/main/java/org/apache/samza/tools/GenerateKafkaEvents.java index 6c30eee..2beef06 100644 --- a/samza-tools/src/main/java/org/apache/samza/tools/GenerateKafkaEvents.java +++ b/samza-tools/src/main/java/org/apache/samza/tools/GenerateKafkaEvents.java @@ -172,11 +172,11 @@ public class GenerateKafkaEvents { /** * Encode an Avro record into byte array * - * @param clazz The class type of the Avro record + * @param clazz The class type of the avro record * @param record the instance of the avro record * @param <T> The type of the avro record. * @return encoded bytes - * @throws java.io.IOException + * @throws IOException on I/O errors encoding the avro record */ public static <T> byte[] encodeAvroSpecificRecord(Class<T> clazz, T record) throws IOException { DatumWriter<T> msgDatumWriter = new SpecificDatumWriter<>(clazz); http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java index 9be8475..695b35a 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java @@ -539,7 +539,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement * @param samzaContainerId id of the samza Container to run (passed as a command line parameter to the process) * @param container the samza container to run. * @param cmdBuilder the command builder that encapsulates the command, and the context - * + * @throws IOException on IO exceptions running the container */ public void runContainer(String samzaContainerId, Container container, CommandBuilder cmdBuilder) throws IOException { String containerIdStr = ConverterUtils.toString(container.getId()); http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterSecurityManager.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterSecurityManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterSecurityManager.scala index 8dba96a..185cda0 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterSecurityManager.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterSecurityManager.scala @@ -19,20 +19,22 @@ package org.apache.samza.job.yarn -import java.security.PrivilegedExceptionAction -import java.util.concurrent.{TimeUnit, Executors} - +import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{Path, FileSystem} -import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.UserGroupInformation import org.apache.samza.SamzaException -import org.apache.samza.config.{YarnConfig, Config} -import org.apache.samza.util.{DaemonThreadFactory, Logging} +import org.apache.samza.config.Config +import org.apache.samza.config.YarnConfig import org.apache.samza.container.SecurityManager +import org.apache.samza.util.Logging + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit -object SamzaAppMasterSecurityManager { - val TOKEN_RENEW_THREAD_NAME_PREFIX = "TOKEN-RENEW-PREFIX" -} /** * The SamzaAppMasterSecurityManager is responsible for renewing and distributing HDFS delegation tokens on a secure YARN @@ -47,8 +49,11 @@ object SamzaAppMasterSecurityManager { * @param hadoopConf the hadoop configuration */ class SamzaAppMasterSecurityManager(config: Config, hadoopConf: Configuration) extends SecurityManager with Logging { - private val tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory(SamzaAppMasterSecurityManager - .TOKEN_RENEW_THREAD_NAME_PREFIX)) + private val tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("Samza AMSecurityManager TokenRenewer Thread-%d") + .setDaemon(true) + .build()) def start() = { val yarnConfig = new YarnConfig(config) http://git-wip-us.apache.org/repos/asf/samza/blob/1dfc5cec/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaContainerSecurityManager.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaContainerSecurityManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaContainerSecurityManager.scala index 10b971f..fce840c 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaContainerSecurityManager.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaContainerSecurityManager.scala @@ -19,24 +19,30 @@ package org.apache.samza.job.yarn -import java.util.concurrent.{TimeUnit, Executors} - +import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.security.{UserGroupInformation, Credentials} -import org.apache.samza.config.{Config, YarnConfig} -import org.apache.samza.util.{Logging, DaemonThreadFactory} +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.UserGroupInformation +import org.apache.samza.config.Config +import org.apache.samza.config.YarnConfig import org.apache.samza.container.SecurityManager +import org.apache.samza.util.Logging -object SamzaContainerSecurityManager { - val TOKEN_RENEW_THREAD_NAME_PREFIX = "TOKEN-RENEW-PREFIX" - val INITIAL_DELAY_IN_SECONDS = 60 -} +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit class SamzaContainerSecurityManager(config: Config, hadoopConfig: Configuration) extends SecurityManager with Logging { - private val tokenRenewExecutor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory(SamzaContainerSecurityManager.TOKEN_RENEW_THREAD_NAME_PREFIX)) - private var lastRefreshTimestamp = 0L + private val InitialDelayInSeconds = 60 + + private val tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("Samza ContainerSecurityManager TokenRenewer Thread-%d") + .setDaemon(true) + .build()) + private var lastRefreshTimestamp = 0L def start() = { val yarnConfig = new YarnConfig(config) @@ -75,8 +81,8 @@ class SamzaContainerSecurityManager(config: Config, hadoopConfig: Configuration) } } - info(s"Schedule the next fetch in ${renewalInterval + SamzaContainerSecurityManager.INITIAL_DELAY_IN_SECONDS} seconds") - tokenRenewExecutor.schedule(tokenRenewRunnable, renewalInterval + SamzaContainerSecurityManager.INITIAL_DELAY_IN_SECONDS, TimeUnit.SECONDS) + info(s"Schedule the next fetch in ${renewalInterval + InitialDelayInSeconds} seconds") + tokenRenewExecutor.schedule(tokenRenewRunnable, renewalInterval + InitialDelayInSeconds, TimeUnit.SECONDS) } private def getCredentialsFromHDFS(fs: FileSystem, tokenPath: Path): Credentials = {
