fixed some merge issues

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/90fa985e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/90fa985e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/90fa985e

Branch: refs/heads/0.13.2
Commit: 90fa985ece6f07dc2520e526c323b67064c8d02a
Parents: 8ee61a6
Author: Boris S <[email protected]>
Authored: Wed Oct 4 14:50:26 2017 -0700
Committer: Boris S <[email protected]>
Committed: Wed Oct 4 14:50:26 2017 -0700

----------------------------------------------------------------------
 .../samza/checkpoint/kafka/KafkaCheckpointManager.scala      | 8 +-------
 .../checkpoint/kafka/KafkaCheckpointManagerFactory.scala     | 2 +-
 2 files changed, 2 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/90fa985e/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 1e22763..f66097c 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -29,7 +29,7 @@ import org.apache.samza.checkpoint.{Checkpoint, 
CheckpointManager}
 import org.apache.samza.container.TaskName
 import org.apache.samza.serializers.CheckpointSerde
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.{StreamSpec, SystemAdmin, _}
+import org.apache.samza.system.{SystemAdmin, _}
 import org.apache.samza.util._
 import org.apache.samza.{Partition, SamzaException}
 
@@ -272,11 +272,5 @@ class KafkaCheckpointManager(
 
   }
 
-  override def clearCheckpoints = {
-    info("Clear checkpoint stream %s in system %s" format (checkpointTopic, 
systemName))
-    val spec = StreamSpec.createCheckpointStreamSpec(checkpointTopic, 
systemName)
-    systemAdmin.clearStream(spec)
-  }
-
   override def toString = "KafkaCheckpointManager [systemName=%s, 
checkpointTopic=%s]" format(systemName, checkpointTopic)
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/90fa985e/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index 402248f..9e8aefb 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -100,7 +100,7 @@ class KafkaCheckpointManagerFactory extends 
CheckpointManagerFactory with Loggin
 
     new KafkaCheckpointManager(
       clientId,
-      KafkaUtil.getCheckpointTopic(jobName, jobId, config),
+      KafkaUtil.getCheckpointTopic(jobName, jobId),
       systemName,
       kafkaConfig.getCheckpointReplicationFactor.get.toInt,
       socketTimeout,

Reply via email to