Repository: samza
Updated Branches:
  refs/heads/master 9a8099cda -> 93219c78d


SAMZA-1547; Parse default value grouper-factory config in KafkaCheckpointMgr

- Additionally, updated all unit-tests.

Author: Jagadish <[email protected]>

Reviewers: Prateek M <[email protected]>

Closes #394 from vjagadish1989/kcm-fix


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/93219c78
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/93219c78
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/93219c78

Branch: refs/heads/master
Commit: 93219c78d6f473d2c7ad4d8702a25f704ee61a7a
Parents: 9a8099c
Author: Jagadish <[email protected]>
Authored: Fri Dec 22 13:58:33 2017 -0800
Committer: Jagadish <[email protected]>
Committed: Fri Dec 22 13:58:33 2017 -0800

----------------------------------------------------------------------
 .../org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala | 2 +-
 .../apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala | 1 -
 2 files changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/93219c78/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 217b2b6..e1187c5 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -59,7 +59,7 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
   val checkpointTopic: String = checkpointSpec.getPhysicalName
   val checkpointSsp = new SystemStreamPartition(checkpointSystem, 
checkpointTopic, new Partition(0))
   val checkpointKeySerde = new KafkaCheckpointLogKeySerde
-  val expectedGrouperFactory = config.get(JobConfig.SSP_GROUPER_FACTORY)
+  val expectedGrouperFactory = new 
JobConfig(config).getSystemStreamPartitionGrouperFactory
 
   val systemProducer = systemFactory.getProducer(checkpointSystem, config, 
metricsRegistry)
   val systemConsumer = systemFactory.getConsumer(checkpointSystem, config, 
metricsRegistry)

http://git-wip-us.apache.org/repos/asf/samza/blob/93219c78/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index dcf4068..ec9f3a0 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -150,7 +150,6 @@ class TestKafkaCheckpointManager extends 
KafkaServerTestHarness {
     new MapConfig(new ImmutableMap.Builder[String, String]()
       .put(JobConfig.JOB_NAME, "some-job-name")
       .put(JobConfig.JOB_ID, "i001")
-      .put(JobConfig.SSP_GROUPER_FACTORY, sspGrouperFactoryName)
       .put(s"systems.$checkpointSystemName.samza.factory", 
classOf[KafkaSystemFactory].getCanonicalName)
       .put(s"systems.$checkpointSystemName.producer.bootstrap.servers", 
brokers)
       .put(s"systems.$checkpointSystemName.consumer.zookeeper.connect", 
zkConnect)

Reply via email to