Repository: samza
Updated Branches:
  refs/heads/master f6c99a4c4 -> c5557140d


SAMZA-1738: Merge in some minor additions from Linkedin branch

Author: Cameron Lee <[email protected]>

Reviewers: Yi Pan <[email protected]>

Closes #549 from cameronlee314/sync_li_trunk


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c5557140
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c5557140
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c5557140

Branch: refs/heads/master
Commit: c5557140d454d5e6ddfc4e8eeaae112178a1d54f
Parents: f6c99a4
Author: Cameron Lee <[email protected]>
Authored: Thu Jun 28 09:10:44 2018 -0700
Committer: Yi Pan (Data Infrastructure) <[email protected]>
Committed: Thu Jun 28 09:10:44 2018 -0700

----------------------------------------------------------------------
 .../executors/KeyBasedExecutorService.java      | 174 +++++++++++++++++++
 .../org/apache/samza/config/TaskConfig.scala    |   4 -
 .../org/apache/samza/container/RunLoop.scala    |   5 +-
 .../MockCoordinatorStreamSystemFactory.java     |   8 +-
 .../executors/TestKeyBasedExecutorService.java  |  84 +++++++++
 .../apache/samza/system/kafka/BrokerProxy.scala |   8 +-
 .../system/kafka/KafkaSystemConsumer.scala      |  17 +-
 .../kafka/KafkaSystemConsumerMetrics.scala      |   6 +-
 .../util/ClientUtilTopicMetadataStore.scala     |   3 +
 .../scala/org/apache/samza/util/KafkaUtil.scala |   6 +-
 .../system/kafka/TestKafkaSystemAdmin.scala     |   2 +-
 .../system/kafka/TestKafkaSystemConsumer.scala  |   2 +-
 .../apache/samza/rest/SamzaRestApplication.java |   1 -
 .../org/apache/samza/rest/SamzaRestService.java |  15 +-
 .../org/apache/samza/config/YarnConfig.java     |  10 --
 15 files changed, 303 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-core/src/main/java/org/apache/samza/executors/KeyBasedExecutorService.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/executors/KeyBasedExecutorService.java
 
b/samza-core/src/main/java/org/apache/samza/executors/KeyBasedExecutorService.java
new file mode 100644
index 0000000..a7c19d2
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/executors/KeyBasedExecutorService.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.executors;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * This class supports submitting {@link Runnable} tasks with an ordering key, 
so that tasks submitted with the
+ * same key will always be executed in order, but tasks across different keys 
can be executed in parallel and out of
+ * order.
+ * Ordering is achieved by hashing the key objects to threads by their {@link 
#hashCode()} method.
+ * Ordering is guaranteed only when using the {@link #submitOrdered(Object, 
Runnable)} method. None of the
+ * {@link #submit} and {@link #execute(Runnable)} method(s) guarantee the 
ordering semantics.
+ */
+public class KeyBasedExecutorService extends AbstractExecutorService {
+  final String threadPoolNamePrefix;
+  final ExecutorService[] executors;
+  final Random rand = new Random();
+  final int numThreads;
+
+  public KeyBasedExecutorService(int numThreads) {
+    this("KeyBasedExecutor", numThreads);
+  }
+
+  /**
+   * Constructs an instance of a KeyBasedExecutorService that manages the 
underlying threads
+   *
+   * @param threadPoolNamePrefix String identifier for this ExecutorService. 
It forms the prefix for each of the
+   *                             underlying thread pool executors
+   * @param numThreads Total number of threads required, mainly dependent on 
the key set size and the degree of
+   *                   parallelism. Highest level of parallelism can be 
achieved by setting the
+   *                   number of threads = key set size.
+   * @throws IllegalArgumentException if numThreads {@literal <}= 0
+   */
+  public KeyBasedExecutorService(String threadPoolNamePrefix,
+      int numThreads) {
+    if (numThreads <= 0) {
+      throw new IllegalArgumentException("numThreads has to be greater than 0 
in KeyBasedExecutor!");
+    }
+    this.numThreads = numThreads;
+    this.threadPoolNamePrefix = threadPoolNamePrefix;
+    this.executors = new ExecutorService[numThreads];
+    final ThreadFactory defaultThreadFactory = 
Executors.defaultThreadFactory();
+
+    for (int i = 0; i < numThreads; i++) {
+      final ExecutorService threadPoolExecutorPerQueue = 
Executors.newSingleThreadExecutor(
+          new ThreadFactoryBuilder()
+              .setThreadFactory(defaultThreadFactory)
+              .setNameFormat(this.threadPoolNamePrefix + "-" + i + "-%d")
+              .build()
+      );
+      executors[i] = threadPoolExecutorPerQueue;
+    }
+  }
+
+  protected ExecutorService chooseRandomExecutor() {
+    if (executors.length == 1) {
+      return executors[0];
+    }
+    return executors[rand.nextInt(executors.length)];
+  }
+
+  protected ExecutorService chooseExecutor(Object object) {
+    if (executors.length == 1) {
+      return executors[0];
+    }
+    return executors[signSafeMod(object.hashCode(), executors.length)];
+  }
+
+  private static int signSafeMod(long dividend, int divisor) {
+    int mod = (int) (dividend % divisor);
+    if (mod < 0) {
+      mod += divisor;
+    }
+    return mod;
+  }
+
+  @Override
+  public void shutdown() {
+    for (int i = 0; i < executors.length; i++) {
+      executors[i].shutdown();
+    }
+  }
+
+  @Override
+  public List<Runnable> shutdownNow() {
+    List<Runnable> unexecutedRunnables = new ArrayList<>();
+    for (int i = 0; i < executors.length; i++) {
+      List<Runnable> unexecutedRunnablesPerQueue = executors[i].shutdownNow();
+      if (unexecutedRunnablesPerQueue != null && 
unexecutedRunnablesPerQueue.size() > 0) {
+        unexecutedRunnables.addAll(unexecutedRunnablesPerQueue);
+      }
+    }
+    return unexecutedRunnables;
+  }
+
+  @Override
+  public boolean isShutdown() {
+    boolean ret = true;
+    for (int i = 0; i < executors.length; i++) {
+      ret = ret && executors[i].isShutdown();
+    }
+    return ret;
+  }
+
+  @Override
+  public boolean isTerminated() {
+    boolean ret = true;
+    for (int i = 0; i < executors.length; i++) {
+      ret = ret && executors[i].isTerminated();
+    }
+    return ret;
+  }
+
+  /**
+   * Awaits termination of each of the underlying threads
+   *
+   * Note: This can potentially block longer than the given timeout, since the 
timeout applies for each of the
+   * underlying threads.
+   *
+   * @param timeout time to wait for each thread to terminate
+   * @param unit unit of time for specifying timeout
+   * @return Returns True, if all threads terminate successfully within their 
timeout. False, otherwise.
+   * @throws InterruptedException thrown when the current executing thread is 
interrupted
+   */
+  @Override
+  public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+    boolean ret = true;
+    for (int i = 0; i < executors.length; i++) {
+      ret = ret && executors[i].awaitTermination(timeout, unit);
+    }
+    return ret;
+  }
+
+  public Future<?> submitOrdered(Object key, Runnable task) {
+    return chooseExecutor(key).submit(task);
+  }
+
+  /**
+   * Executes the given {@link Runnable} task in a randomly chosen thread-pool
+   * @param command An instance of the {@link Runnable} task
+   */
+  @Override
+  public void execute(Runnable command) {
+    chooseRandomExecutor().execute(command);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 206eb8f..ab11785 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -101,10 +101,6 @@ class TaskConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
     case _ => None
   }
 
-  def getLifecycleListeners(): Option[String] = 
getOption(TaskConfig.LIFECYCLE_LISTENERS)
-
-  def getLifecycleListenerClass(name: String): Option[String] = 
getOption(TaskConfig.LIFECYCLE_LISTENER format name)
-
   def getTaskClass = getOption(TaskConfig.TASK_CLASS)
 
   def getCommandClass = getOption(TaskConfig.COMMAND_BUILDER)

http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index a738616..b082a95 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -91,7 +91,10 @@ class RunLoop (
       window
       commit
       val totalNs = clock() - loopStartTime
-      metrics.utilization.set(activeNs.toFloat / totalNs)
+
+      if (totalNs != 0) {
+        metrics.utilization.set(activeNs.toFloat / totalNs)
+      }
       activeNs = 0L
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
index e537a91..7b7d41f 100644
--- 
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
@@ -48,6 +48,10 @@ public class MockCoordinatorStreamSystemFactory implements 
SystemFactory {
   private static SystemConsumer mockConsumer = null;
   private static boolean useCachedConsumer = false;
 
+  public MockCoordinatorStreamSystemFactory() {
+    disableMockConsumerCache();
+  }
+
   public static void enableMockConsumerCache() {
     mockConsumer = null;
     useCachedConsumer = true;
@@ -74,8 +78,8 @@ public class MockCoordinatorStreamSystemFactory implements 
SystemFactory {
    *               ch:source:taskname -> changelogPartition for changelog
    *               Everything else is processed as normal config
    */
+  @Override
   public SystemConsumer getConsumer(String systemName, Config config, 
MetricsRegistry registry) {
-
     if (useCachedConsumer && mockConsumer != null) {
       return mockConsumer;
     }
@@ -104,6 +108,7 @@ public class MockCoordinatorStreamSystemFactory implements 
SystemFactory {
   /**
    * Returns a MockCoordinatorSystemProducer.
    */
+  @Override
   public SystemProducer getProducer(String systemName, Config config, 
MetricsRegistry registry) {
     return new MockSystemProducer(null);
   }
@@ -124,6 +129,7 @@ public class MockCoordinatorStreamSystemFactory implements 
SystemFactory {
    * Returns a single partition admin that pretends to create a coordinator
    * stream.
    */
+  @Override
   public SystemAdmin getAdmin(String systemName, Config config) {
     return new MockSystemAdmin();
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-core/src/test/java/org/apache/samza/executors/TestKeyBasedExecutorService.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/executors/TestKeyBasedExecutorService.java
 
b/samza-core/src/test/java/org/apache/samza/executors/TestKeyBasedExecutorService.java
new file mode 100644
index 0000000..fbd0f92
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/executors/TestKeyBasedExecutorService.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.executors;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class TestKeyBasedExecutorService {
+
+  @Test
+  public void testSubmitOrdered() {
+    KeyBasedExecutorService executorService = new 
KeyBasedExecutorService("test", 2);
+    ConcurrentLinkedQueue<Integer> resultFromThread0 = new 
ConcurrentLinkedQueue<>();
+    ConcurrentLinkedQueue<Integer> resultFromThread1 = new 
ConcurrentLinkedQueue<>();
+
+    final CountDownLatch shutdownLatch = new CountDownLatch(10);
+
+    for (int i = 0; i < 10; i++) {
+      final int currentStep = i;
+      executorService.submitOrdered(currentStep, new Runnable() {
+        @Override
+        public void run() {
+          String threadName = Thread.currentThread().getName();
+          Pattern compiledPattern = Pattern.compile("test-(.+)-0");
+          Matcher matcher = compiledPattern.matcher(threadName);
+          if (matcher.find()) {
+            String threadPoolNumber = matcher.group(1);
+            if ("0".equals(threadPoolNumber)) {
+              resultFromThread0.add(currentStep);
+            } else if ("1".equals(threadPoolNumber)) {
+              resultFromThread1.add(currentStep);
+            }
+            shutdownLatch.countDown();
+          }
+        }
+      });
+    }
+    try {
+      shutdownLatch.await(2, TimeUnit.SECONDS);
+      executorService.shutdown();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    Assert.assertEquals(5, resultFromThread0.size());
+    Assert.assertEquals(5, resultFromThread1.size());
+
+    Iterator<Integer> iterator = resultFromThread0.iterator();
+    int i = 0;
+    while (iterator.hasNext()) {
+      Assert.assertEquals(i, iterator.next().intValue());
+      i += 2;
+    }
+    iterator = resultFromThread1.iterator();
+    i = 1;
+    while (iterator.hasNext()) {
+      Assert.assertEquals(i, iterator.next().intValue());
+      i += 2;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index e5482a9..423b68a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -207,6 +207,7 @@ class BrokerProxy(
    * TopicAndPartition.
    */
   def abdicateAll {
+    info("Abdicating all topic partitions.")
     val immutableNextOffsetsCopy = nextOffsets.toMap
     immutableNextOffsetsCopy.keySet.foreach(abdicate(_))
   }
@@ -234,7 +235,10 @@ class BrokerProxy(
       warn("Got non-recoverable error codes during multifetch. Throwing an 
exception to trigger reconnect. Errors: %s" format 
remainingErrors.mkString(","))
       KafkaUtil.maybeThrowException(e.exception) })
 
-    notLeaderOrUnknownTopic.foreach(e => abdicate(e.tp))
+    notLeaderOrUnknownTopic.foreach(e => {
+      warn("Received (UnknownTopicOr|NotLeaderFor)Partition exception %s for 
%s. Abdicating" format(e.code, e.tp))
+      abdicate(e.tp)
+    })
 
     offsetOutOfRangeErrors.foreach(e => {
       warn("Received OffsetOutOfRange exception for %s. Current offset = %s" 
format (e.tp, nextOffsets.getOrElse(e.tp, "not found in map, likely removed in 
the interim")))
@@ -245,7 +249,7 @@ class BrokerProxy(
         nextOffsets.replace(e.tp, newOffset)
       } catch {
         // UnknownTopic or NotLeader are routine events and handled via 
abdication.  All others, bail.
-        case _ @ (_:UnknownTopicOrPartitionException | _: 
NotLeaderForPartitionException) => warn("Received 
(UnknownTopicOr|NotLeaderFor)Partition exception. Abdicating")
+        case _ @ (_:UnknownTopicOrPartitionException | _: 
NotLeaderForPartitionException) => warn("Received 
(UnknownTopicOr|NotLeaderFor)Partition exception %s for %s. Abdicating" 
format(e.code, e.tp))
                                                                                
              abdicate(e.tp)
       }
     })

http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 4cebb82..fd84c4a 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -33,6 +33,7 @@ import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.IncomingMessageEnvelope
 import kafka.consumer.ConsumerConfig
 import org.apache.samza.util.TopicMetadataStore
+import kafka.api.PartitionMetadata
 import kafka.api.TopicMetadata
 import org.apache.samza.util.ExponentialSleepStrategy
 import java.util.concurrent.ConcurrentHashMap
@@ -167,17 +168,19 @@ private[kafka] class KafkaSystemConsumer(
   }
 
   protected def createBrokerProxy(host: String, port: Int): BrokerProxy = {
+    info("Creating new broker proxy for host: %s and port: %s" format(host, 
port))
     new BrokerProxy(host, port, systemName, clientId, metrics, sink, timeout, 
bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter)
   }
 
-  protected def getHostPort(topicMetadata: TopicMetadata, partition: Int): 
Option[(String, Int)] = {
+  protected def getPartitionMetadata(topicMetadata: TopicMetadata, partition: 
Int): Option[PartitionMetadata] = {
+    topicMetadata.partitionsMetadata.find(_.partitionId == partition)
+  }
+
+  protected def getLeaderHostPort(partitionMetadata: 
Option[PartitionMetadata]): Option[(String, Int)] = {
     // Whatever we do, we can't say Broker, even though we're
     // manipulating it here. Broker is a private type and Scala doesn't seem
     // to care about that as long as you don't explicitly declare its type.
-    val brokerOption = topicMetadata
-      .partitionsMetadata
-      .find(_.partitionId == partition)
-      .flatMap(_.leader)
+    val brokerOption = partitionMetadata.flatMap(_.leader)
 
     brokerOption match {
       case Some(broker) => Some(broker.host, broker.port)
@@ -207,8 +210,10 @@ private[kafka] class KafkaSystemConsumer(
             // critical section. If we don't, then notAValidEvent it.
             topicPartitionsAndOffsets.get(head) match {
               case Some(nextOffset) =>
-                getHostPort(topicMetadata(head.topic), head.partition) match {
+                val partitionMetadata = 
getPartitionMetadata(topicMetadata(head.topic), head.partition)
+                getLeaderHostPort(partitionMetadata) match {
                   case Some((host, port)) =>
+                    debug("Got partition metadata for %s: %s" format(head, 
partitionMetadata.get))
                     val brokerProxy = brokerProxies.getOrElseUpdate((host, 
port), createBrokerProxy(host, port))
                     brokerProxy.addTopicPartition(head, Option(nextOffset))
                     brokerProxy.start

http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
index ff945da..51545a0 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
@@ -82,10 +82,10 @@ class KafkaSystemConsumerMetrics(val systemName: String = 
"unknown", val registr
     reads.get(topicAndPartition).inc;
   }
   def incBytesReads(topicAndPartition: TopicAndPartition, inc: Long) {
-    reads.get(topicAndPartition).inc(inc);
+    bytesRead.get(topicAndPartition).inc(inc);
   }
-  def incBrokerBytesReads(host: String, port: Int, inc: Long) {
-    brokerReads.get((host,port)).inc(inc)
+  def incBrokerBytesReads(host: String, port: Int, incBytes: Long) {
+    brokerBytesRead.get((host,port)).inc(incBytes)
   }
   def incBrokerSkippedFetchRequests(host: String, port: Int) {
     brokerSkippedFetchRequests.get((host,port)).inc()

http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala
index 0f91622..4cbdc7f 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala
@@ -34,7 +34,10 @@ class ClientUtilTopicMetadataStore(brokersListString: 
String, clientId: String,
 
   def getTopicInfo(topics: Set[String]) = {
     val currCorrId = corrID.getAndIncrement
+
+    debug("Fetching topic metadata.")
     val response: TopicMetadataResponse = 
ClientUtils.fetchTopicMetadata(topics, brokers, clientId, timeout, currCorrId)
+    debug("Got topic metadata response: %s" format(response))
 
     if (response.correlationId != currCorrId) {
       throw new SamzaException("CorrelationID did not match for request on 
topics %s (sent %d, got %d)" format (topics, currCorrId, 
response.correlationId))

http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala 
b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index 5b0137a..601ffa2 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -19,20 +19,16 @@
 
 package org.apache.samza.util
 
-import java.util.Properties
 import java.util.concurrent.atomic.AtomicLong
 import kafka.admin.AdminUtils
 import kafka.utils.ZkUtils
 import org.apache.kafka.common.PartitionInfo
-import org.apache.samza.config.ApplicationConfig.ApplicationMode
-import org.apache.samza.config.{ApplicationConfig, Config, ConfigException}
+import org.apache.samza.config.{Config, ConfigException}
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.execution.StreamManager
 import org.apache.samza.system.OutgoingMessageEnvelope
 import org.apache.kafka.common.errors.ReplicaNotAvailableException
 import kafka.common.ErrorMapping
-import org.apache.kafka.common.errors.TopicExistsException
-import org.apache.samza.system.kafka.TopicMetadataCache
 
 object KafkaUtil extends Logging {
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index a533acc..cd511f2 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -134,7 +134,7 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
 
   def createSystemAdmin(coordinatorStreamProperties: Properties, 
coordinatorStreamReplicationFactor: Int, topicMetaInformation: Map[String, 
ChangelogInfo]): KafkaSystemAdmin = {
     new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => 
ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties,
-      coordinatorStreamReplicationFactor, 10000, 
ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, 
topicMetaInformation, Map())
+      coordinatorStreamReplicationFactor, 10000, 
ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, 
topicMetaInformation, Map(), false)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
index 4dd170f..8656d10 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
@@ -72,7 +72,7 @@ class TestKafkaSystemConsumer {
     var hosts = List[String]()
     var getHostPortCount = 0
     val consumer = new KafkaSystemConsumer(systemName, systemAdmin, metrics, 
metadataStore, clientId) {
-      override def getHostPort(topicMetadata: TopicMetadata, partition: Int): 
Option[(String, Int)] = {
+      override def getLeaderHostPort(partitionMetadata: 
Option[PartitionMetadata]): Option[(String, Int)] = {
         // Generate a unique host every time getHostPort is called.
         getHostPortCount += 1
         Some("localhost-%s" format getHostPortCount, 0)

http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java 
b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
index 45b6a39..f7d5823 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
@@ -19,7 +19,6 @@
 package org.apache.samza.rest;
 
 import java.util.Collection;
-import java.util.List;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.rest.resources.DefaultResourceFactory;

http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java 
b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
index 2f940e3..b7e8b5a 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
@@ -66,9 +66,9 @@ public class SamzaRestService {
   private final Map<String, MetricsReporter> metricsReporters;
 
   public SamzaRestService(Server server,
-                          ReadableMetricsRegistry metricsRegistry,
-                          Map<String, MetricsReporter> metricsReporters,
-                          ServletContextHandler context) {
+      ReadableMetricsRegistry metricsRegistry,
+      Map<String, MetricsReporter> metricsReporters,
+      ServletContextHandler context) {
     this.server = server;
     this.metricsRegistry = metricsRegistry;
     this.metricsReporters = metricsReporters;
@@ -92,9 +92,10 @@ public class SamzaRestService {
       ReadableMetricsRegistry metricsRegistry = new MetricsRegistryMap();
       log.info("Creating new SamzaRestService with config: {}", config);
       MetricsConfig metricsConfig = new MetricsConfig(config);
-      Map<String, MetricsReporter> metricsReporters = 
MetricsReporterLoader.getMetricsReporters(metricsConfig, 
Util.getLocalHost().getHostName());
+      Map<String, MetricsReporter> metricsReporters = 
MetricsReporterLoader.getMetricsReporters(metricsConfig,
+          Util.getLocalHost().getHostName());
       SamzaRestService restService = new SamzaRestService(new 
Server(config.getPort()), metricsRegistry, metricsReporters,
-                                                          new 
ServletContextHandler(ServletContextHandler.SESSIONS));
+          new ServletContextHandler(ServletContextHandler.SESSIONS));
 
       // Add applications
       SamzaRestApplication samzaRestApplication = new 
SamzaRestApplication(config);
@@ -108,8 +109,8 @@ public class SamzaRestService {
       ScheduledExecutorService schedulingService = 
Executors.newScheduledThreadPool(1, threadFactory);
       schedulingProvider = new 
ScheduledExecutorSchedulingProvider(schedulingService);
       SamzaMonitorService monitorService = new SamzaMonitorService(config,
-                                                                   
metricsRegistry,
-                                                                   
schedulingProvider);
+          metricsRegistry,
+          schedulingProvider);
       monitorService.start();
 
       restService.runBlocking();

http://git-wip-us.apache.org/repos/asf/samza/blob/c5557140/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 
b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
index aa4bc3e..466b8cf 100644
--- a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
+++ b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
@@ -90,12 +90,6 @@ public class YarnConfig extends MapConfig {
   private static final int DEFAULT_CONTAINER_REQUEST_TIMEOUT_MS = 5000;
 
   /**
-   * Flag to indicate if host-affinity is enabled for the job or not
-   */
-  public static final String HOST_AFFINITY_ENABLED = 
"yarn.samza.host-affinity.enabled";
-  private static final boolean DEFAULT_HOST_AFFINITY_ENABLED = false;
-
-  /**
    * Principal used to log in on a Kerberized secure cluster
    */
   public static final String YARN_KERBEROS_PRINCIPAL = 
"yarn.kerberos.principal";
@@ -177,10 +171,6 @@ public class YarnConfig extends MapConfig {
     return getInt(CONTAINER_REQUEST_TIMEOUT_MS, 
DEFAULT_CONTAINER_REQUEST_TIMEOUT_MS);
   }
 
-  public boolean getHostAffinityEnabled() {
-    return getBoolean(HOST_AFFINITY_ENABLED, DEFAULT_HOST_AFFINITY_ENABLED);
-  }
-
   public String getYarnKerberosPrincipal() {
     return get(YARN_KERBEROS_PRINCIPAL, null);
   }

Reply via email to