Repository: samza Updated Branches: refs/heads/master f6c99a4c4 -> c5557140d
SAMZA-1738: Merge in some minor additions from Linkedin branch Author: Cameron Lee <[email protected]> Reviewers: Yi Pan <[email protected]> Closes #549 from cameronlee314/sync_li_trunk Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c5557140 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c5557140 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c5557140 Branch: refs/heads/master Commit: c5557140d454d5e6ddfc4e8eeaae112178a1d54f Parents: f6c99a4 Author: Cameron Lee <[email protected]> Authored: Thu Jun 28 09:10:44 2018 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Thu Jun 28 09:10:44 2018 -0700 ---------------------------------------------------------------------- .../executors/KeyBasedExecutorService.java | 174 +++++++++++++++++++ .../org/apache/samza/config/TaskConfig.scala | 4 - .../org/apache/samza/container/RunLoop.scala | 5 +- .../MockCoordinatorStreamSystemFactory.java | 8 +- .../executors/TestKeyBasedExecutorService.java | 84 +++++++++ .../apache/samza/system/kafka/BrokerProxy.scala | 8 +- .../system/kafka/KafkaSystemConsumer.scala | 17 +- .../kafka/KafkaSystemConsumerMetrics.scala | 6 +- .../util/ClientUtilTopicMetadataStore.scala | 3 + .../scala/org/apache/samza/util/KafkaUtil.scala | 6 +- .../system/kafka/TestKafkaSystemAdmin.scala | 2 +- .../system/kafka/TestKafkaSystemConsumer.scala | 2 +- .../apache/samza/rest/SamzaRestApplication.java | 1 - .../org/apache/samza/rest/SamzaRestService.java | 15 +- .../org/apache/samza/config/YarnConfig.java | 10 -- 15 files changed, 303 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-core/src/main/java/org/apache/samza/executors/KeyBasedExecutorService.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/executors/KeyBasedExecutorService.java b/samza-core/src/main/java/org/apache/samza/executors/KeyBasedExecutorService.java new file mode 100644 index 0000000..a7c19d2 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/executors/KeyBasedExecutorService.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.executors; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + + +/** + * This class supports submitting {@link Runnable} tasks with an ordering key, so that tasks submitted with the + * same key will always be executed in order, but tasks across different keys can be executed in parallel and out of + * order. + * Ordering is achieved by hashing the key objects to threads by their {@link #hashCode()} method. + * Ordering is guaranteed only when using the {@link #submitOrdered(Object, Runnable)} method. None of the + * {@link #submit} and {@link #execute(Runnable)} method(s) guarantee the ordering semantics. + */ +public class KeyBasedExecutorService extends AbstractExecutorService { + final String threadPoolNamePrefix; + final ExecutorService[] executors; + final Random rand = new Random(); + final int numThreads; + + public KeyBasedExecutorService(int numThreads) { + this("KeyBasedExecutor", numThreads); + } + + /** + * Constructs an instance of a KeyBasedExecutorService that manages the underlying threads + * + * @param threadPoolNamePrefix String identifier for this ExecutorService. It forms the prefix for each of the + * underlying thread pool executors + * @param numThreads Total number of threads required, mainly dependent on the key set size and the degree of + * parallelism. Highest level of parallelism can be achieved by setting the + * number of threads = key set size. + * @throws IllegalArgumentException if numThreads {@literal <}= 0 + */ + public KeyBasedExecutorService(String threadPoolNamePrefix, + int numThreads) { + if (numThreads <= 0) { + throw new IllegalArgumentException("numThreads has to be greater than 0 in KeyBasedExecutor!"); + } + this.numThreads = numThreads; + this.threadPoolNamePrefix = threadPoolNamePrefix; + this.executors = new ExecutorService[numThreads]; + final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory(); + + for (int i = 0; i < numThreads; i++) { + final ExecutorService threadPoolExecutorPerQueue = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder() + .setThreadFactory(defaultThreadFactory) + .setNameFormat(this.threadPoolNamePrefix + "-" + i + "-%d") + .build() + ); + executors[i] = threadPoolExecutorPerQueue; + } + } + + protected ExecutorService chooseRandomExecutor() { + if (executors.length == 1) { + return executors[0]; + } + return executors[rand.nextInt(executors.length)]; + } + + protected ExecutorService chooseExecutor(Object object) { + if (executors.length == 1) { + return executors[0]; + } + return executors[signSafeMod(object.hashCode(), executors.length)]; + } + + private static int signSafeMod(long dividend, int divisor) { + int mod = (int) (dividend % divisor); + if (mod < 0) { + mod += divisor; + } + return mod; + } + + @Override + public void shutdown() { + for (int i = 0; i < executors.length; i++) { + executors[i].shutdown(); + } + } + + @Override + public List<Runnable> shutdownNow() { + List<Runnable> unexecutedRunnables = new ArrayList<>(); + for (int i = 0; i < executors.length; i++) { + List<Runnable> unexecutedRunnablesPerQueue = executors[i].shutdownNow(); + if (unexecutedRunnablesPerQueue != null && unexecutedRunnablesPerQueue.size() > 0) { + unexecutedRunnables.addAll(unexecutedRunnablesPerQueue); + } + } + return unexecutedRunnables; + } + + @Override + public boolean isShutdown() { + boolean ret = true; + for (int i = 0; i < executors.length; i++) { + ret = ret && executors[i].isShutdown(); + } + return ret; + } + + @Override + public boolean isTerminated() { + boolean ret = true; + for (int i = 0; i < executors.length; i++) { + ret = ret && executors[i].isTerminated(); + } + return ret; + } + + /** + * Awaits termination of each of the underlying threads + * + * Note: This can potentially block longer than the given timeout, since the timeout applies for each of the + * underlying threads. + * + * @param timeout time to wait for each thread to terminate + * @param unit unit of time for specifying timeout + * @return Returns True, if all threads terminate successfully within their timeout. False, otherwise. + * @throws InterruptedException thrown when the current executing thread is interrupted + */ + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + boolean ret = true; + for (int i = 0; i < executors.length; i++) { + ret = ret && executors[i].awaitTermination(timeout, unit); + } + return ret; + } + + public Future<?> submitOrdered(Object key, Runnable task) { + return chooseExecutor(key).submit(task); + } + + /** + * Executes the given {@link Runnable} task in a randomly chosen thread-pool + * @param command An instance of the {@link Runnable} task + */ + @Override + public void execute(Runnable command) { + chooseRandomExecutor().execute(command); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala index 206eb8f..ab11785 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala @@ -101,10 +101,6 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging { case _ => None } - def getLifecycleListeners(): Option[String] = getOption(TaskConfig.LIFECYCLE_LISTENERS) - - def getLifecycleListenerClass(name: String): Option[String] = getOption(TaskConfig.LIFECYCLE_LISTENER format name) - def getTaskClass = getOption(TaskConfig.TASK_CLASS) def getCommandClass = getOption(TaskConfig.COMMAND_BUILDER) http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala index a738616..b082a95 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala @@ -91,7 +91,10 @@ class RunLoop ( window commit val totalNs = clock() - loopStartTime - metrics.utilization.set(activeNs.toFloat / totalNs) + + if (totalNs != 0) { + metrics.utilization.set(activeNs.toFloat / totalNs) + } activeNs = 0L } } http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java index e537a91..7b7d41f 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java @@ -48,6 +48,10 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory { private static SystemConsumer mockConsumer = null; private static boolean useCachedConsumer = false; + public MockCoordinatorStreamSystemFactory() { + disableMockConsumerCache(); + } + public static void enableMockConsumerCache() { mockConsumer = null; useCachedConsumer = true; @@ -74,8 +78,8 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory { * ch:source:taskname -> changelogPartition for changelog * Everything else is processed as normal config */ + @Override public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) { - if (useCachedConsumer && mockConsumer != null) { return mockConsumer; } @@ -104,6 +108,7 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory { /** * Returns a MockCoordinatorSystemProducer. */ + @Override public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) { return new MockSystemProducer(null); } @@ -124,6 +129,7 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory { * Returns a single partition admin that pretends to create a coordinator * stream. */ + @Override public SystemAdmin getAdmin(String systemName, Config config) { return new MockSystemAdmin(); } http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-core/src/test/java/org/apache/samza/executors/TestKeyBasedExecutorService.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/executors/TestKeyBasedExecutorService.java b/samza-core/src/test/java/org/apache/samza/executors/TestKeyBasedExecutorService.java new file mode 100644 index 0000000..fbd0f92 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/executors/TestKeyBasedExecutorService.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.executors; + +import junit.framework.Assert; +import org.junit.Test; + +import java.util.Iterator; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class TestKeyBasedExecutorService { + + @Test + public void testSubmitOrdered() { + KeyBasedExecutorService executorService = new KeyBasedExecutorService("test", 2); + ConcurrentLinkedQueue<Integer> resultFromThread0 = new ConcurrentLinkedQueue<>(); + ConcurrentLinkedQueue<Integer> resultFromThread1 = new ConcurrentLinkedQueue<>(); + + final CountDownLatch shutdownLatch = new CountDownLatch(10); + + for (int i = 0; i < 10; i++) { + final int currentStep = i; + executorService.submitOrdered(currentStep, new Runnable() { + @Override + public void run() { + String threadName = Thread.currentThread().getName(); + Pattern compiledPattern = Pattern.compile("test-(.+)-0"); + Matcher matcher = compiledPattern.matcher(threadName); + if (matcher.find()) { + String threadPoolNumber = matcher.group(1); + if ("0".equals(threadPoolNumber)) { + resultFromThread0.add(currentStep); + } else if ("1".equals(threadPoolNumber)) { + resultFromThread1.add(currentStep); + } + shutdownLatch.countDown(); + } + } + }); + } + try { + shutdownLatch.await(2, TimeUnit.SECONDS); + executorService.shutdown(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + Assert.assertEquals(5, resultFromThread0.size()); + Assert.assertEquals(5, resultFromThread1.size()); + + Iterator<Integer> iterator = resultFromThread0.iterator(); + int i = 0; + while (iterator.hasNext()) { + Assert.assertEquals(i, iterator.next().intValue()); + i += 2; + } + iterator = resultFromThread1.iterator(); + i = 1; + while (iterator.hasNext()) { + Assert.assertEquals(i, iterator.next().intValue()); + i += 2; + } + + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index e5482a9..423b68a 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -207,6 +207,7 @@ class BrokerProxy( * TopicAndPartition. */ def abdicateAll { + info("Abdicating all topic partitions.") val immutableNextOffsetsCopy = nextOffsets.toMap immutableNextOffsetsCopy.keySet.foreach(abdicate(_)) } @@ -234,7 +235,10 @@ class BrokerProxy( warn("Got non-recoverable error codes during multifetch. Throwing an exception to trigger reconnect. Errors: %s" format remainingErrors.mkString(",")) KafkaUtil.maybeThrowException(e.exception) }) - notLeaderOrUnknownTopic.foreach(e => abdicate(e.tp)) + notLeaderOrUnknownTopic.foreach(e => { + warn("Received (UnknownTopicOr|NotLeaderFor)Partition exception %s for %s. Abdicating" format(e.code, e.tp)) + abdicate(e.tp) + }) offsetOutOfRangeErrors.foreach(e => { warn("Received OffsetOutOfRange exception for %s. Current offset = %s" format (e.tp, nextOffsets.getOrElse(e.tp, "not found in map, likely removed in the interim"))) @@ -245,7 +249,7 @@ class BrokerProxy( nextOffsets.replace(e.tp, newOffset) } catch { // UnknownTopic or NotLeader are routine events and handled via abdication. All others, bail. - case _ @ (_:UnknownTopicOrPartitionException | _: NotLeaderForPartitionException) => warn("Received (UnknownTopicOr|NotLeaderFor)Partition exception. Abdicating") + case _ @ (_:UnknownTopicOrPartitionException | _: NotLeaderForPartitionException) => warn("Received (UnknownTopicOr|NotLeaderFor)Partition exception %s for %s. Abdicating" format(e.code, e.tp)) abdicate(e.tp) } }) http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index 4cebb82..fd84c4a 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -33,6 +33,7 @@ import org.apache.samza.system.SystemStreamPartition import org.apache.samza.system.IncomingMessageEnvelope import kafka.consumer.ConsumerConfig import org.apache.samza.util.TopicMetadataStore +import kafka.api.PartitionMetadata import kafka.api.TopicMetadata import org.apache.samza.util.ExponentialSleepStrategy import java.util.concurrent.ConcurrentHashMap @@ -167,17 +168,19 @@ private[kafka] class KafkaSystemConsumer( } protected def createBrokerProxy(host: String, port: Int): BrokerProxy = { + info("Creating new broker proxy for host: %s and port: %s" format(host, port)) new BrokerProxy(host, port, systemName, clientId, metrics, sink, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter) } - protected def getHostPort(topicMetadata: TopicMetadata, partition: Int): Option[(String, Int)] = { + protected def getPartitionMetadata(topicMetadata: TopicMetadata, partition: Int): Option[PartitionMetadata] = { + topicMetadata.partitionsMetadata.find(_.partitionId == partition) + } + + protected def getLeaderHostPort(partitionMetadata: Option[PartitionMetadata]): Option[(String, Int)] = { // Whatever we do, we can't say Broker, even though we're // manipulating it here. Broker is a private type and Scala doesn't seem // to care about that as long as you don't explicitly declare its type. - val brokerOption = topicMetadata - .partitionsMetadata - .find(_.partitionId == partition) - .flatMap(_.leader) + val brokerOption = partitionMetadata.flatMap(_.leader) brokerOption match { case Some(broker) => Some(broker.host, broker.port) @@ -207,8 +210,10 @@ private[kafka] class KafkaSystemConsumer( // critical section. If we don't, then notAValidEvent it. topicPartitionsAndOffsets.get(head) match { case Some(nextOffset) => - getHostPort(topicMetadata(head.topic), head.partition) match { + val partitionMetadata = getPartitionMetadata(topicMetadata(head.topic), head.partition) + getLeaderHostPort(partitionMetadata) match { case Some((host, port)) => + debug("Got partition metadata for %s: %s" format(head, partitionMetadata.get)) val brokerProxy = brokerProxies.getOrElseUpdate((host, port), createBrokerProxy(host, port)) brokerProxy.addTopicPartition(head, Option(nextOffset)) brokerProxy.start http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala index ff945da..51545a0 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala @@ -82,10 +82,10 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr reads.get(topicAndPartition).inc; } def incBytesReads(topicAndPartition: TopicAndPartition, inc: Long) { - reads.get(topicAndPartition).inc(inc); + bytesRead.get(topicAndPartition).inc(inc); } - def incBrokerBytesReads(host: String, port: Int, inc: Long) { - brokerReads.get((host,port)).inc(inc) + def incBrokerBytesReads(host: String, port: Int, incBytes: Long) { + brokerBytesRead.get((host,port)).inc(incBytes) } def incBrokerSkippedFetchRequests(host: String, port: Int) { brokerSkippedFetchRequests.get((host,port)).inc() http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala b/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala index 0f91622..4cbdc7f 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala @@ -34,7 +34,10 @@ class ClientUtilTopicMetadataStore(brokersListString: String, clientId: String, def getTopicInfo(topics: Set[String]) = { val currCorrId = corrID.getAndIncrement + + debug("Fetching topic metadata.") val response: TopicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, clientId, timeout, currCorrId) + debug("Got topic metadata response: %s" format(response)) if (response.correlationId != currCorrId) { throw new SamzaException("CorrelationID did not match for request on topics %s (sent %d, got %d)" format (topics, currCorrId, response.correlationId)) http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala index 5b0137a..601ffa2 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala @@ -19,20 +19,16 @@ package org.apache.samza.util -import java.util.Properties import java.util.concurrent.atomic.AtomicLong import kafka.admin.AdminUtils import kafka.utils.ZkUtils import org.apache.kafka.common.PartitionInfo -import org.apache.samza.config.ApplicationConfig.ApplicationMode -import org.apache.samza.config.{ApplicationConfig, Config, ConfigException} +import org.apache.samza.config.{Config, ConfigException} import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.execution.StreamManager import org.apache.samza.system.OutgoingMessageEnvelope import org.apache.kafka.common.errors.ReplicaNotAvailableException import kafka.common.ErrorMapping -import org.apache.kafka.common.errors.TopicExistsException -import org.apache.samza.system.kafka.TopicMetadataCache object KafkaUtil extends Logging { /** http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala index a533acc..cd511f2 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala @@ -134,7 +134,7 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness { def createSystemAdmin(coordinatorStreamProperties: Properties, coordinatorStreamReplicationFactor: Int, topicMetaInformation: Map[String, ChangelogInfo]): KafkaSystemAdmin = { new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties, - coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation, Map()) + coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation, Map(), false) } } http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala index 4dd170f..8656d10 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala @@ -72,7 +72,7 @@ class TestKafkaSystemConsumer { var hosts = List[String]() var getHostPortCount = 0 val consumer = new KafkaSystemConsumer(systemName, systemAdmin, metrics, metadataStore, clientId) { - override def getHostPort(topicMetadata: TopicMetadata, partition: Int): Option[(String, Int)] = { + override def getLeaderHostPort(partitionMetadata: Option[PartitionMetadata]): Option[(String, Int)] = { // Generate a unique host every time getHostPort is called. getHostPortCount += 1 Some("localhost-%s" format getHostPortCount, 0) http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java index 45b6a39..f7d5823 100644 --- a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java +++ b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java @@ -19,7 +19,6 @@ package org.apache.samza.rest; import java.util.Collection; -import java.util.List; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.rest.resources.DefaultResourceFactory; http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java index 2f940e3..b7e8b5a 100644 --- a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java +++ b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java @@ -66,9 +66,9 @@ public class SamzaRestService { private final Map<String, MetricsReporter> metricsReporters; public SamzaRestService(Server server, - ReadableMetricsRegistry metricsRegistry, - Map<String, MetricsReporter> metricsReporters, - ServletContextHandler context) { + ReadableMetricsRegistry metricsRegistry, + Map<String, MetricsReporter> metricsReporters, + ServletContextHandler context) { this.server = server; this.metricsRegistry = metricsRegistry; this.metricsReporters = metricsReporters; @@ -92,9 +92,10 @@ public class SamzaRestService { ReadableMetricsRegistry metricsRegistry = new MetricsRegistryMap(); log.info("Creating new SamzaRestService with config: {}", config); MetricsConfig metricsConfig = new MetricsConfig(config); - Map<String, MetricsReporter> metricsReporters = MetricsReporterLoader.getMetricsReporters(metricsConfig, Util.getLocalHost().getHostName()); + Map<String, MetricsReporter> metricsReporters = MetricsReporterLoader.getMetricsReporters(metricsConfig, + Util.getLocalHost().getHostName()); SamzaRestService restService = new SamzaRestService(new Server(config.getPort()), metricsRegistry, metricsReporters, - new ServletContextHandler(ServletContextHandler.SESSIONS)); + new ServletContextHandler(ServletContextHandler.SESSIONS)); // Add applications SamzaRestApplication samzaRestApplication = new SamzaRestApplication(config); @@ -108,8 +109,8 @@ public class SamzaRestService { ScheduledExecutorService schedulingService = Executors.newScheduledThreadPool(1, threadFactory); schedulingProvider = new ScheduledExecutorSchedulingProvider(schedulingService); SamzaMonitorService monitorService = new SamzaMonitorService(config, - metricsRegistry, - schedulingProvider); + metricsRegistry, + schedulingProvider); monitorService.start(); restService.runBlocking(); http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java index aa4bc3e..466b8cf 100644 --- a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java +++ b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java @@ -90,12 +90,6 @@ public class YarnConfig extends MapConfig { private static final int DEFAULT_CONTAINER_REQUEST_TIMEOUT_MS = 5000; /** - * Flag to indicate if host-affinity is enabled for the job or not - */ - public static final String HOST_AFFINITY_ENABLED = "yarn.samza.host-affinity.enabled"; - private static final boolean DEFAULT_HOST_AFFINITY_ENABLED = false; - - /** * Principal used to log in on a Kerberized secure cluster */ public static final String YARN_KERBEROS_PRINCIPAL = "yarn.kerberos.principal"; @@ -177,10 +171,6 @@ public class YarnConfig extends MapConfig { return getInt(CONTAINER_REQUEST_TIMEOUT_MS, DEFAULT_CONTAINER_REQUEST_TIMEOUT_MS); } - public boolean getHostAffinityEnabled() { - return getBoolean(HOST_AFFINITY_ENABLED, DEFAULT_HOST_AFFINITY_ENABLED); - } - public String getYarnKerberosPrincipal() { return get(YARN_KERBEROS_PRINCIPAL, null); }
