formatting
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ceb0f6ae Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ceb0f6ae Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ceb0f6ae Branch: refs/heads/NewKafkaSystemConsumer Commit: ceb0f6aef45822191e29b6f43b9df76168c161e8 Parents: 332a048 Author: Boris S <[email protected]> Authored: Wed Sep 5 14:13:47 2018 -0700 Committer: Boris S <[email protected]> Committed: Wed Sep 5 14:13:47 2018 -0700 ---------------------------------------------------------------------- .../scala/org/apache/samza/job/local/ThreadJobFactory.scala | 3 ++- .../org/apache/samza/system/kafka/KafkaConsumerProxy.java | 9 +-------- .../samza/system/kafka/KafkaSystemConsumerMetrics.scala | 8 ++++---- .../org/apache/samza/validation/YarnJobValidationTool.java | 2 +- .../samza/job/yarn/TestSamzaYarnAppMasterService.scala | 4 +++- 5 files changed, 11 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/ceb0f6ae/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala index 15aa5a6..0d71303 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala @@ -50,7 +50,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging { val changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager) val coordinator = JobModelManager(coordinatorStreamManager.getConfig, changelogStreamManager.readPartitionMapping()) - coordinatorStreamManager.stop() + val jobModel = coordinator.jobModel val taskPartitionMappings: mutable.Map[TaskName, Integer] = mutable.Map[TaskName, Integer]() @@ -116,6 +116,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging { threadJob } finally { coordinator.stop + coordinatorStreamManager.stop() jmxServer.stop } } http://git-wip-us.apache.org/repos/asf/samza/blob/ceb0f6ae/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java index a6272cd..7232a0a 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java @@ -99,20 +99,14 @@ public class KafkaConsumerProxy<K, V> { "Samza KafkaConsumerProxy Poll " + consumerPollThread.getName() + " - " + systemName); consumerPollThread.start(); - System.out.println("THREAD: starting" + consumerPollThread.getName()); - - // we need to wait until the thread starts while (!isRunning) { try { consumerPollThreadStartLatch.await(3000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - LOG.info("WTH"); + LOG.info("Got InterruptedException", e); } } - new Exception().printStackTrace(System.out); - System.out.println("THREAD: started" + consumerPollThread.getName()); - } else { LOG.debug("Tried to start an already started LiKafkaConsumerProxy (%s). Ignoring.", this.toString()); } @@ -220,7 +214,6 @@ public class KafkaConsumerProxy<K, V> { kafkaConsumer.resume(topicPartitionsToPause); } } catch (InvalidOffsetException e) { - LOG.error("LiKafkaConsumer with invalidOffsetException", e); // If the consumer has thrown this exception it means that auto reset is not set for this consumer. // So we just rethrow. LOG.error("Caught InvalidOffsetException in pollConsumer", e); http://git-wip-us.apache.org/repos/asf/samza/blob/ceb0f6ae/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala index 415bd38..7dce261 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala @@ -47,10 +47,10 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr } def registerClientProxy(clientName: String) { - clientBytesRead.put(clientName, newCounter("%s-%s-bytes-read" format clientName)) - clientReads.put((clientName), newCounter("%s-%s-messages-read" format clientName)) - clientSkippedFetchRequests.put((clientName), newCounter("%s-%s-skipped-fetch-requests" format clientName)) - topicPartitions.put(clientName, newGauge("%s-%s-topic-partitions" format clientName, 0)) + clientBytesRead.put(clientName, newCounter("%s-bytes-read" format clientName)) + clientReads.put((clientName), newCounter("%s-messages-read" format clientName)) + clientSkippedFetchRequests.put((clientName), newCounter("%s-skipped-fetch-requests" format clientName)) + topicPartitions.put(clientName, newGauge("%s-topic-partitions" format clientName, 0)) } // java friendlier interfaces http://git-wip-us.apache.org/repos/asf/samza/blob/ceb0f6ae/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java index 0b405f0..b30b896 100644 --- a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java +++ b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java @@ -157,7 +157,7 @@ public class YarnJobValidationTool { coordinatorStreamManager.start(); coordinatorStreamManager.bootstrap(); ChangelogStreamManager changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager); - JobModelManager jobModelManager = JobModelManager.apply(coordinatorStreamManager, changelogStreamManager.readPartitionMapping()); + JobModelManager jobModelManager = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping()); validator.init(config); Map<String, String> jmxUrls = jobModelManager.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY); for (Map.Entry<String, String> entry : jmxUrls.entrySet()) { http://git-wip-us.apache.org/repos/asf/samza/blob/ceb0f6ae/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala index da23b91..1ad4522 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala @@ -106,7 +106,9 @@ class TestSamzaYarnAppMasterService { coordinatorStreamManager.start coordinatorStreamManager.bootstrap val changelogPartitionManager = new ChangelogStreamManager(coordinatorStreamManager) - JobModelManager(coordinatorStreamManager, changelogPartitionManager.readPartitionMapping()) + val jobModelManager = JobModelManager(coordinatorStreamManager.getConfig, changelogPartitionManager.readPartitionMapping()) + coordinatorStreamManager.stop() + jobModelManager } private def getDummyConfig: Config = new MapConfig(Map[String, String](
