Repository: incubator-samza Updated Branches: refs/heads/master 4d0ad620b -> 7d7ba7fbc
SAMZA-180: Command-line tool for manipulating checkpoints. Reviewed by Chris Riccomini. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/7d7ba7fb Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/7d7ba7fb Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/7d7ba7fb Branch: refs/heads/master Commit: 7d7ba7fbc1300c90ed762d8ee5bde800e72bd776 Parents: 4d0ad62 Author: Martin Kleppmann <[email protected]> Authored: Thu Mar 20 18:36:01 2014 +0000 Committer: Martin Kleppmann <[email protected]> Committed: Wed Apr 2 18:57:21 2014 +0100 ---------------------------------------------------------------------- README.md | 15 ++ build.gradle | 34 +++- gradle/dependency-versions.gradle | 2 + .../samza/checkpoint/CheckpointTool.scala | 164 +++++++++++++++++++ .../scala/org/apache/samza/job/JobRunner.scala | 56 +------ .../org/apache/samza/util/CommandLine.scala | 73 +++++++++ .../samza/checkpoint/TestCheckpointTool.scala | 96 +++++++++++ .../kafka/KafkaCheckpointManager.scala | 2 + samza-shell/src/main/bash/checkpoint-tool.sh | 19 +++ 9 files changed, 412 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 7a4a232..7d0ff10 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,21 @@ To run a single test: ./gradlew clean :samza-test:test -Dtest.single=TestStatefulTask +### Job Management + +To run a job (defined in a properties file): + + ./gradlew samza-shell:runJob -PconfigPath=file:///path/to/job/config.properties + +To inspect a job's latest checkpoint: + + ./gradlew samza-shell:checkpointTool -PconfigPath=file:///path/to/job/config.properties + +To modify a job's checkpoint (assumes that the job is not currently running), give it a file with the new offset for each partition, in the format `systems.<system>.streams.<topic>.partitions.<partition>=<offset>`: + + ./gradlew samza-shell:checkpointTool -PconfigPath=file:///path/to/job/config.properties \ + -PnewOffsets=file:///path/to/new/offsets.properties + #### Maven Samza uses Kafka, which is not managed by Maven. To use Kafka as though it were a Maven artifact, Samza installs Kafka into a local repository using the `mvn install` command. You must have Maven installed to build Samza. http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 4775087..75b6c98 100644 --- a/build.gradle +++ b/build.gradle @@ -62,6 +62,8 @@ project(":samza-core_$scalaVersion") { compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion" compile "org.codehaus.jackson:jackson-jaxrs:$jacksonVersion" testCompile "junit:junit:$junitVersion" + testCompile "org.mockito:mockito-all:$mockitoVersion" + testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion" } } @@ -91,7 +93,7 @@ project(":samza-kafka_$scalaVersion") { testCompile files("lib/kafka_$scalaVersion-0.8.1-SNAPSHOT-test.jar") // Logging in tests is good. - testRuntime "org.slf4j:slf4j-simple:1.6.2" + testRuntime "org.slf4j:slf4j-simple:$slf4jVersion" } test { @@ -161,11 +163,39 @@ project(":samza-yarn_$scalaVersion") { project(":samza-shell") { apply plugin: 'java' + configurations { + gradleShell + } + + dependencies { + gradleShell project(":samza-core_$scalaVersion") + gradleShell project(":samza-kafka_$scalaVersion") + gradleShell project(":samza-yarn_$scalaVersion") + gradleShell "org.slf4j:slf4j-simple:$slf4jVersion" + } + task shellTarGz(type: Tar) { compression = Compression.GZIP classifier = 'dist' from 'src/main/bash' } + + // Usage: ./gradlew samza-shell:runJob \ + // -PconfigPath=file:///path/to/job/config.properties + task runJob(type:JavaExec) { + main = 'org.apache.samza.job.JobRunner' + classpath = configurations.gradleShell + if (project.hasProperty('configPath')) args += ['--config-path', configPath] + } + + // Usage: ./gradlew samza-shell:checkpointTool \ + // -PconfigPath=file:///path/to/job/config.properties -PnewOffsets=file:///path/to/new/offsets.properties + task checkpointTool(type:JavaExec) { + main = 'org.apache.samza.checkpoint.CheckpointTool' + classpath = configurations.gradleShell + if (project.hasProperty('configPath')) args += ['--config-path', configPath] + if (project.hasProperty('newOffsets')) args += ['--new-offsets', newOffsets] + } } project(":samza-kv_$scalaVersion") { @@ -207,7 +237,7 @@ project(":samza-test_$scalaVersion") { testCompile files("../samza-kafka/lib/kafka_$scalaVersion-0.8.1-SNAPSHOT-test.jar") testCompile "com.101tec:zkclient:$zkClientVersion" testCompile project(":samza-kafka_$scalaVersion") - testRuntime "org.slf4j:slf4j-simple:1.6.2" + testRuntime "org.slf4j:slf4j-simple:$slf4jVersion" } test { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/gradle/dependency-versions.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index 612670d..6e51c04 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -3,6 +3,7 @@ ext { jacksonVersion = "1.8.5" junitVersion = "4.8.1" mockitoVersion = "1.8.4" + scalaTestVersion = "2.1.0" zkClientVersion = "0.3" zookeeperVersion = "3.3.4" metricsVersion = "2.2.0" @@ -10,4 +11,5 @@ ext { commonsHttpClientVersion = "3.1" leveldbVersion = "1.8" yarnVersion = "2.2.0" + slf4jVersion = "1.6.2" } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala new file mode 100644 index 0000000..5735a39 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.checkpoint + +import java.net.URI +import java.util.regex.Pattern +import joptsimple.OptionSet +import org.apache.samza.{Partition, SamzaException} +import org.apache.samza.config.{Config, StreamConfig} +import org.apache.samza.config.TaskConfig.Config2Task +import org.apache.samza.metrics.MetricsRegistryMap +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.util.{CommandLine, Util} +import scala.collection.JavaConversions._ +import grizzled.slf4j.Logging + +/** + * Command-line tool for inspecting and manipulating the checkpoints for a job. + * This can be used, for example, to force a job to re-process a stream from the + * beginning. + * + * When running this tool, you need to provide the configuration URI of job you + * want to inspect/manipulate. The tool prints out the latest checkpoint for that + * job (latest offset of each partition of each input stream). + * + * To update the checkpoint, you need to provide a second properties file + * containing the offsets you want. It needs to be in the same format as the tool + * prints out the latest checkpoint: + * + * systems.<system>.streams.<topic>.partitions.<partition>=<offset> + * + * NOTE: A job only reads its checkpoint when it starts up. Therefore, if you want + * your checkpoint change to take effect, you have to first stop the job, then + * write a new checkpoint, then start it up again. Writing a new checkpoint while + * the job is running may not have any effect. + * + * If you're building Samza from source, you can use the 'checkpointTool' gradle + * task as a short-cut to running this tool. + */ +object CheckpointTool { + /** Format in which SystemStreamPartition is represented in a properties file */ + val SSP_PATTERN = StreamConfig.STREAM_PREFIX + "partitions.%d" + val SSP_REGEX = Pattern.compile("systems\\.(.+)\\.streams\\.(.+)\\.partitions\\.([0-9]+)") + + class CheckpointToolCommandLine extends CommandLine with Logging { + val newOffsetsOpt = + parser.accepts("new-offsets", "URI of file (e.g. file:///some/local/path.properties) " + + "containing offsets to write to the job's checkpoint topic. " + + "If not given, this tool prints out the current offsets.") + .withRequiredArg + .ofType(classOf[URI]) + .describedAs("path") + + var newOffsets: Map[SystemStreamPartition, String] = null + + def parseOffsets(propertiesFile: Config): Map[SystemStreamPartition, String] = { + propertiesFile.entrySet.flatMap(entry => { + val matcher = SSP_REGEX.matcher(entry.getKey) + if (matcher.matches) { + val partition = new Partition(Integer.parseInt(matcher.group(3))) + val ssp = new SystemStreamPartition(matcher.group(1), matcher.group(2), partition) + Some(ssp -> entry.getValue) + } else { + warn("Warning: ignoring unrecognised property: %s = %s" format (entry.getKey, entry.getValue)) + None + } + }).toMap + } + + override def loadConfig(options: OptionSet) = { + val config = super.loadConfig(options) + if (options.has(newOffsetsOpt)) { + val properties = configFactory.getConfig(options.valueOf(newOffsetsOpt)) + newOffsets = parseOffsets(properties) + } + config + } + } + + def main(args: Array[String]) { + val cmdline = new CheckpointToolCommandLine + val options = cmdline.parser.parse(args: _*) + val config = cmdline.loadConfig(options) + val tool = new CheckpointTool(config, cmdline.newOffsets) + tool.run + } +} + +class CheckpointTool(config: Config, newOffsets: Map[SystemStreamPartition, String]) extends Logging { + val manager = config.getCheckpointManagerFactory match { + case Some(className) => + Util.getObj[CheckpointManagerFactory](className).getCheckpointManager(config, new MetricsRegistryMap) + case _ => + throw new SamzaException("This job does not use checkpointing (task.checkpoint.factory is not set).") + } + + // The CheckpointManagerFactory needs to perform this same operation when initializing + // the manager. TODO figure out some way of avoiding duplicated work. + val partitions = Util.getInputStreamPartitions(config).map(_.getPartition).toSet + + def run { + info("Using %s" format manager) + partitions.foreach(manager.register) + manager.start + val lastCheckpoint = readLastCheckpoint + + logCheckpoint(lastCheckpoint, "Current checkpoint") + + if (newOffsets != null) { + logCheckpoint(newOffsets, "New offset to be written") + writeNewCheckpoint(newOffsets) + manager.stop + info("Ok, new checkpoint has been written.") + } + } + + /** Load the most recent checkpoint state for all partitions. */ + def readLastCheckpoint: Map[SystemStreamPartition, String] = { + partitions.flatMap(partition => { + manager.readLastCheckpoint(partition) + .getOffsets + .map { case (systemStream, offset) => + new SystemStreamPartition(systemStream, partition) -> offset + } + }).toMap + } + + /** + * Store a new checkpoint state for all given partitions, overwriting the + * current state. Any partitions that are not mentioned will not + * be changed. + */ + def writeNewCheckpoint(newOffsets: Map[SystemStreamPartition, String]) { + newOffsets.groupBy(_._1.getPartition).foreach { + case (partition, offsets) => + val streamOffsets = offsets.map { case (ssp, offset) => ssp.getSystemStream -> offset }.toMap + val checkpoint = new Checkpoint(streamOffsets) + manager.writeCheckpoint(partition, checkpoint) + } + } + + def logCheckpoint(checkpoint: Map[SystemStreamPartition, String], prefix: String) { + checkpoint.map { case (ssp, offset) => + (CheckpointTool.SSP_PATTERN + " = %s") format (ssp.getSystem, ssp.getStream, ssp.getPartition.getPartitionId, offset) + }.toList.sorted.foreach(line => info(prefix + ": " + line)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala index f3a75af..19c9538 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala @@ -19,60 +19,22 @@ package org.apache.samza.job -import java.lang.String -import java.net.URI -import org.apache.samza.job.ApplicationStatus.Running -import org.apache.samza.config.JobConfig.Config2Job -import org.apache.samza.config._ -import grizzled.slf4j.Logging -import joptsimple.OptionParser -import joptsimple.util.KeyValuePair import org.apache.samza.SamzaException +import org.apache.samza.config.{Config, ConfigRewriter} +import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.config.factories.PropertiesConfigFactory -import scala.Some +import org.apache.samza.job.ApplicationStatus.Running import org.apache.samza.util.Util -import scala.collection.mutable.Buffer +import org.apache.samza.util.CommandLine +import grizzled.slf4j.Logging import scala.collection.JavaConversions._ object JobRunner extends Logging { def main(args: Array[String]) { - // Define parameters. - var parser = new OptionParser() - val configFactoryOpt = - parser.accepts("config-factory", "The config factory to use to read your config file.") - .withRequiredArg - .ofType(classOf[java.lang.String]) - .describedAs("com.foo.bar.ClassName") - .defaultsTo(classOf[PropertiesConfigFactory].getName) - val configPathOpt = - parser.accepts("config-path", "URI location to a config file (e.g. file:///some/local/path.properties). " + - "If multiple files are given they are all used with later files overriding any values that appear in earlier files.") - .withRequiredArg - .ofType(classOf[URI]) - .describedAs("path") - val configOverrideOpt = - parser.accepts("config", "A configuration value in the form key=value. Command line properties override any configuration values given.") - .withRequiredArg - .ofType(classOf[KeyValuePair]) - .describedAs("key=value") - var options = parser.parse(args: _*) - - // Verify legitimate parameters. - if (!options.has(configPathOpt)) { - parser.printHelpOn(System.err) - System.exit(-1) - } - - // Set up the job parameters. - val configFactoryClass = options.valueOf(configFactoryOpt) - val configPaths = options.valuesOf(configPathOpt) - val configFactory = Class.forName(configFactoryClass).newInstance.asInstanceOf[ConfigFactory] - val configOverrides = options.valuesOf(configOverrideOpt).map(kv => (kv.key, kv.value)).toMap - - val configs: Buffer[java.util.Map[String, String]] = configPaths.map(configFactory.getConfig) - configs += configOverrides - - new JobRunner(new MapConfig(configs)).run + val cmdline = new CommandLine + val options = cmdline.parser.parse(args: _*) + val config = cmdline.loadConfig(options) + new JobRunner(config).run } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala new file mode 100644 index 0000000..f26501b --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.util + +import java.net.URI +import joptsimple.{OptionParser, OptionSet} +import joptsimple.util.KeyValuePair +import org.apache.samza.config.{ConfigFactory, MapConfig} +import org.apache.samza.config.factories.PropertiesConfigFactory +import scala.collection.mutable.Buffer +import scala.collection.JavaConversions._ + +/** + * Defines a basic set of command-line options for Samza tasks. Tools can use this + * class directly, or subclass it to add their own options. + */ +class CommandLine { + val parser = new OptionParser() + val configFactoryOpt = + parser.accepts("config-factory", "The config factory to use to read your config file.") + .withRequiredArg + .ofType(classOf[java.lang.String]) + .describedAs("com.foo.bar.ClassName") + .defaultsTo(classOf[PropertiesConfigFactory].getName) + val configPathOpt = + parser.accepts("config-path", "URI location to a config file (e.g. file:///some/local/path.properties). " + + "If multiple files are given they are all used with later files overriding any values that appear in earlier files.") + .withRequiredArg + .ofType(classOf[URI]) + .describedAs("path") + val configOverrideOpt = + parser.accepts("config", "A configuration value in the form key=value. Command line properties override any configuration values given.") + .withRequiredArg + .ofType(classOf[KeyValuePair]) + .describedAs("key=value") + + var configFactory: ConfigFactory = null + + def loadConfig(options: OptionSet) = { + // Verify legitimate parameters. + if (!options.has(configPathOpt)) { + parser.printHelpOn(System.err) + System.exit(-1) + } + + // Set up the job parameters. + val configFactoryClass = options.valueOf(configFactoryOpt) + val configPaths = options.valuesOf(configPathOpt) + configFactory = Class.forName(configFactoryClass).newInstance.asInstanceOf[ConfigFactory] + val configOverrides = options.valuesOf(configOverrideOpt).map(kv => (kv.key, kv.value)).toMap + + val configs: Buffer[java.util.Map[String, String]] = configPaths.map(configFactory.getConfig) + configs += configOverrides + new MapConfig(configs) + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala new file mode 100644 index 0000000..bc54f9e --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.checkpoint + +import org.junit.{Before, Test} +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.junit.AssertionsForJUnit +import org.scalatest.mock.MockitoSugar +import scala.collection.JavaConversions._ +import org.apache.samza.Partition +import org.apache.samza.checkpoint.TestCheckpointTool.{MockCheckpointManagerFactory, MockSystemFactory} +import org.apache.samza.config.{Config, MapConfig, SystemConfig, TaskConfig} +import org.apache.samza.metrics.MetricsRegistry +import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemFactory, SystemProducer, SystemStream, SystemStreamMetadata, SystemStreamPartition} +import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata + +object TestCheckpointTool { + var checkpointManager: CheckpointManager = null + var systemConsumer: SystemConsumer = null + var systemProducer: SystemProducer = null + var systemAdmin: SystemAdmin = null + + class MockCheckpointManagerFactory extends CheckpointManagerFactory { + override def getCheckpointManager(config: Config, registry: MetricsRegistry) = checkpointManager + } + + class MockSystemFactory extends SystemFactory { + override def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = systemConsumer + override def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = systemProducer + override def getAdmin(systemName: String, config: Config) = systemAdmin + } +} + +class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar { + var config: MapConfig = null + + @Before + def setup { + config = new MapConfig(Map( + TaskConfig.INPUT_STREAMS -> "test.foo", + TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getName, + SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getName + )) + val metadata = new SystemStreamMetadata("foo", Map[Partition, SystemStreamPartitionMetadata]( + new Partition(0) -> new SystemStreamPartitionMetadata("0", "100", "101"), + new Partition(1) -> new SystemStreamPartitionMetadata("0", "200", "201") + )) + + TestCheckpointTool.checkpointManager = mock[CheckpointManager] + TestCheckpointTool.systemAdmin = mock[SystemAdmin] + when(TestCheckpointTool.systemAdmin.getSystemStreamMetadata(Set("foo"))) + .thenReturn(Map("foo" -> metadata)) + when(TestCheckpointTool.checkpointManager.readLastCheckpoint(new Partition(0))) + .thenReturn(new Checkpoint(Map(new SystemStream("test", "foo") -> "1234"))) + when(TestCheckpointTool.checkpointManager.readLastCheckpoint(new Partition(1))) + .thenReturn(new Checkpoint(Map(new SystemStream("test", "foo") -> "4321"))) + } + + @Test + def testReadLatestCheckpoint { + new CheckpointTool(config, null).run + verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(new Partition(0)) + verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(new Partition(1)) + verify(TestCheckpointTool.checkpointManager, never()).writeCheckpoint(any(), any()) + } + + @Test + def testOverwriteCheckpoint { + new CheckpointTool(config, Map( + new SystemStreamPartition("test", "foo", new Partition(0)) -> "42", + new SystemStreamPartition("test", "foo", new Partition(1)) -> "43" + )).run + verify(TestCheckpointTool.checkpointManager) + .writeCheckpoint(new Partition(0), new Checkpoint(Map(new SystemStream("test", "foo") -> "42"))) + verify(TestCheckpointTool.checkpointManager) + .writeCheckpoint(new Partition(1), new Checkpoint(Map(new SystemStream("test", "foo") -> "43"))) + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index e98b50a..fed6eee 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -270,6 +270,8 @@ class KafkaCheckpointManager( } ) } + + override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format (systemName, checkpointTopic) } /** http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7d7ba7fb/samza-shell/src/main/bash/checkpoint-tool.sh ---------------------------------------------------------------------- diff --git a/samza-shell/src/main/bash/checkpoint-tool.sh b/samza-shell/src/main/bash/checkpoint-tool.sh new file mode 100755 index 0000000..1a455df --- /dev/null +++ b/samza-shell/src/main/bash/checkpoint-tool.sh @@ -0,0 +1,19 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +exec $(dirname $0)/run-class.sh org.apache.samza.checkpoint.CheckpointTool $@
