formatting

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ceb0f6ae
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ceb0f6ae
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ceb0f6ae

Branch: refs/heads/NewKafkaSystemConsumer
Commit: ceb0f6aef45822191e29b6f43b9df76168c161e8
Parents: 332a048
Author: Boris S <[email protected]>
Authored: Wed Sep 5 14:13:47 2018 -0700
Committer: Boris S <[email protected]>
Committed: Wed Sep 5 14:13:47 2018 -0700

----------------------------------------------------------------------
 .../scala/org/apache/samza/job/local/ThreadJobFactory.scala | 3 ++-
 .../org/apache/samza/system/kafka/KafkaConsumerProxy.java   | 9 +--------
 .../samza/system/kafka/KafkaSystemConsumerMetrics.scala     | 8 ++++----
 .../org/apache/samza/validation/YarnJobValidationTool.java  | 2 +-
 .../samza/job/yarn/TestSamzaYarnAppMasterService.scala      | 4 +++-
 5 files changed, 11 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ceb0f6ae/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 15aa5a6..0d71303 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -50,7 +50,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
     val changelogStreamManager = new 
ChangelogStreamManager(coordinatorStreamManager)
 
     val coordinator = JobModelManager(coordinatorStreamManager.getConfig, 
changelogStreamManager.readPartitionMapping())
-    coordinatorStreamManager.stop()
+
     val jobModel = coordinator.jobModel
 
     val taskPartitionMappings: mutable.Map[TaskName, Integer] = 
mutable.Map[TaskName, Integer]()
@@ -116,6 +116,7 @@ class ThreadJobFactory extends StreamJobFactory with 
Logging {
       threadJob
     } finally {
       coordinator.stop
+      coordinatorStreamManager.stop()
       jmxServer.stop
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/ceb0f6ae/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index a6272cd..7232a0a 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -99,20 +99,14 @@ public class KafkaConsumerProxy<K, V> {
           "Samza KafkaConsumerProxy Poll " + consumerPollThread.getName() + " 
- " + systemName);
       consumerPollThread.start();
 
-      System.out.println("THREAD: starting" + consumerPollThread.getName());
-
-
       // we need to wait until the thread starts
       while (!isRunning) {
         try {
           consumerPollThreadStartLatch.await(3000, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
-          LOG.info("WTH");
+          LOG.info("Got InterruptedException", e);
         }
       }
-      new Exception().printStackTrace(System.out);
-      System.out.println("THREAD: started" + consumerPollThread.getName());
-
     } else {
       LOG.debug("Tried to start an already started LiKafkaConsumerProxy (%s). 
Ignoring.", this.toString());
     }
@@ -220,7 +214,6 @@ public class KafkaConsumerProxy<K, V> {
         kafkaConsumer.resume(topicPartitionsToPause);
       }
     } catch (InvalidOffsetException e) {
-      LOG.error("LiKafkaConsumer with invalidOffsetException", e);
       // If the consumer has thrown this exception it means that auto reset is 
not set for this consumer.
       // So we just rethrow.
       LOG.error("Caught InvalidOffsetException in pollConsumer", e);

http://git-wip-us.apache.org/repos/asf/samza/blob/ceb0f6ae/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
index 415bd38..7dce261 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
@@ -47,10 +47,10 @@ class KafkaSystemConsumerMetrics(val systemName: String = 
"unknown", val registr
   }
 
   def registerClientProxy(clientName: String) {
-    clientBytesRead.put(clientName, newCounter("%s-%s-bytes-read" format 
clientName))
-    clientReads.put((clientName), newCounter("%s-%s-messages-read" format 
clientName))
-    clientSkippedFetchRequests.put((clientName), 
newCounter("%s-%s-skipped-fetch-requests" format clientName))
-    topicPartitions.put(clientName, newGauge("%s-%s-topic-partitions" format 
clientName, 0))
+    clientBytesRead.put(clientName, newCounter("%s-bytes-read" format 
clientName))
+    clientReads.put((clientName), newCounter("%s-messages-read" format 
clientName))
+    clientSkippedFetchRequests.put((clientName), 
newCounter("%s-skipped-fetch-requests" format clientName))
+    topicPartitions.put(clientName, newGauge("%s-topic-partitions" format 
clientName, 0))
   }
 
   // java friendlier interfaces

http://git-wip-us.apache.org/repos/asf/samza/blob/ceb0f6ae/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
 
b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
index 0b405f0..b30b896 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
@@ -157,7 +157,7 @@ public class YarnJobValidationTool {
     coordinatorStreamManager.start();
     coordinatorStreamManager.bootstrap();
     ChangelogStreamManager changelogStreamManager = new 
ChangelogStreamManager(coordinatorStreamManager);
-    JobModelManager jobModelManager = 
JobModelManager.apply(coordinatorStreamManager, 
changelogStreamManager.readPartitionMapping());
+    JobModelManager jobModelManager = 
JobModelManager.apply(coordinatorStreamManager.getConfig(), 
changelogStreamManager.readPartitionMapping());
     validator.init(config);
     Map<String, String> jmxUrls = 
jobModelManager.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY);
     for (Map.Entry<String, String> entry : jmxUrls.entrySet()) {

http://git-wip-us.apache.org/repos/asf/samza/blob/ceb0f6ae/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
index da23b91..1ad4522 100644
--- 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
+++ 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
@@ -106,7 +106,9 @@ class TestSamzaYarnAppMasterService {
     coordinatorStreamManager.start
     coordinatorStreamManager.bootstrap
     val changelogPartitionManager = new 
ChangelogStreamManager(coordinatorStreamManager)
-    JobModelManager(coordinatorStreamManager, 
changelogPartitionManager.readPartitionMapping())
+    val jobModelManager = JobModelManager(coordinatorStreamManager.getConfig, 
changelogPartitionManager.readPartitionMapping())
+    coordinatorStreamManager.stop()
+    jobModelManager
   }
 
   private def getDummyConfig: Config = new MapConfig(Map[String, String](

Reply via email to