Repository: samza Updated Branches: refs/heads/master 9a8099cda -> 93219c78d
SAMZA-1547; Parse default value grouper-factory config in KafkaCheckpointMgr - Additionally, updated all unit-tests. Author: Jagadish <[email protected]> Reviewers: Prateek M <[email protected]> Closes #394 from vjagadish1989/kcm-fix Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/93219c78 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/93219c78 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/93219c78 Branch: refs/heads/master Commit: 93219c78d6f473d2c7ad4d8702a25f704ee61a7a Parents: 9a8099c Author: Jagadish <[email protected]> Authored: Fri Dec 22 13:58:33 2017 -0800 Committer: Jagadish <[email protected]> Committed: Fri Dec 22 13:58:33 2017 -0800 ---------------------------------------------------------------------- .../org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala | 2 +- .../apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/93219c78/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index 217b2b6..e1187c5 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -59,7 +59,7 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec, val checkpointTopic: String = checkpointSpec.getPhysicalName val checkpointSsp = new SystemStreamPartition(checkpointSystem, checkpointTopic, new Partition(0)) val checkpointKeySerde = new KafkaCheckpointLogKeySerde - val expectedGrouperFactory = config.get(JobConfig.SSP_GROUPER_FACTORY) + val expectedGrouperFactory = new JobConfig(config).getSystemStreamPartitionGrouperFactory val systemProducer = systemFactory.getProducer(checkpointSystem, config, metricsRegistry) val systemConsumer = systemFactory.getConsumer(checkpointSystem, config, metricsRegistry) http://git-wip-us.apache.org/repos/asf/samza/blob/93219c78/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala index dcf4068..ec9f3a0 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala @@ -150,7 +150,6 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { new MapConfig(new ImmutableMap.Builder[String, String]() .put(JobConfig.JOB_NAME, "some-job-name") .put(JobConfig.JOB_ID, "i001") - .put(JobConfig.SSP_GROUPER_FACTORY, sspGrouperFactoryName) .put(s"systems.$checkpointSystemName.samza.factory", classOf[KafkaSystemFactory].getCanonicalName) .put(s"systems.$checkpointSystemName.producer.bootstrap.servers", brokers) .put(s"systems.$checkpointSystemName.consumer.zookeeper.connect", zkConnect)
