fixed some merge issues
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/90fa985e Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/90fa985e Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/90fa985e Branch: refs/heads/0.13.2 Commit: 90fa985ece6f07dc2520e526c323b67064c8d02a Parents: 8ee61a6 Author: Boris S <[email protected]> Authored: Wed Oct 4 14:50:26 2017 -0700 Committer: Boris S <[email protected]> Committed: Wed Oct 4 14:50:26 2017 -0700 ---------------------------------------------------------------------- .../samza/checkpoint/kafka/KafkaCheckpointManager.scala | 8 +------- .../checkpoint/kafka/KafkaCheckpointManagerFactory.scala | 2 +- 2 files changed, 2 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/90fa985e/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index 1e22763..f66097c 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -29,7 +29,7 @@ import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager} import org.apache.samza.container.TaskName import org.apache.samza.serializers.CheckpointSerde import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata -import org.apache.samza.system.{StreamSpec, SystemAdmin, _} +import org.apache.samza.system.{SystemAdmin, _} import org.apache.samza.util._ import org.apache.samza.{Partition, SamzaException} @@ -272,11 +272,5 @@ class KafkaCheckpointManager( } - override def clearCheckpoints = { - info("Clear checkpoint stream %s in system %s" format (checkpointTopic, systemName)) - val spec = StreamSpec.createCheckpointStreamSpec(checkpointTopic, systemName) - systemAdmin.clearStream(spec) - } - override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format(systemName, checkpointTopic) } http://git-wip-us.apache.org/repos/asf/samza/blob/90fa985e/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala index 402248f..9e8aefb 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala @@ -100,7 +100,7 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin new KafkaCheckpointManager( clientId, - KafkaUtil.getCheckpointTopic(jobName, jobId, config), + KafkaUtil.getCheckpointTopic(jobName, jobId), systemName, kafkaConfig.getCheckpointReplicationFactor.get.toInt, socketTimeout,
