Hi Paul, Similar to CheckpointTool (see Manipulating checkpoints manually) <https://samza.apache.org/learn/documentation/latest/container/checkpointing.html> a StartpointTool can be written. The org.apache.samza.checkpoint.CheckpointTool takes in the config.properties and offset.properties as input files which determine the job & its config and the new offsets. For StartpointTool will need the same input files but the second one will have SSP and new Startpoints.
Attaching a barebones tool based on CheckpointTool code. regards, Manasa StartpointTool.scala object StartpointTool { type SSPToStartpointMap = Map[SystemStreamPartition, Startpoint]] class SPToolCommandLine extends CommandLine with Logging { var newOffsets: SSPToStartpointMap = _ def parseOffsets(propertiesFile: Properties): SSPToStartpointMap = { // similar to org.apache.samza.checkpoint.CheckpointTool.CheckpointToolCommandLine.parseOffsets to get startpoints // startpoints can be offset, timestamp, oldest, upcoming - please see org.apache.samza.startpoint.{StartpointOldest, StartpointTimestamp..} // based on the format in second input file to the tool, parse to get startpoint type and value ad add for SSP as key } override def loadConfig(options: OptionSet): Config = { // similar to org.apache.samza.checkpoint.CheckpointTool.CheckpointToolCommandLine.loadConfig newOffsets = parseOffsets(..) super.loadConfig(options); } } def apply(config: Config, offsets: SSPToStartpointMap): CheckpointTool = { val metadataStore: CoordinatorStreamStore = new CoordinatorStreamStore(config, new MetricsRegistryMap()) metadataStore.init() new StartpointTool(offsets, metadataStore, config) } def main(args: Array[String]) { // pretty much the same as Checkpoint tool val cmdline = new SPToolCommandLine val options = cmdline.parser.parse(args: _*) val userConfig = cmdline.loadConfig(options) val jobConfig = JobPlanner.generateSingleJobConfig(userConfig) val rewrittenConfig = ConfigUtil.rewriteConfig(jobConfig) val tool = StartpointTool(rewrittenConfig, cmdline.newOffsets) tool.run() } } class StartpointTool(newOffsets: SSPToStartpointMap, coordinatorStreamStore: CoordinatorStreamStore, userDefinedConfig: Config) extends Logging { def run() { val startpointManager: StartpointManager = new StartpointManager(coordinatorStreamStore); startpointManager.start(); startpointManager.writeStartpoint(ssp, startpoint)// for each ssp and startpoint from newOffsets -- taskname is not really needed startpointManager.stop() coordinatorStreamStore.close() } } On Thu, Dec 17, 2020 at 8:05 AM Paul <tso...@gmail.com> wrote: > Hi All, > > I am using the Samza low level API and would like to be able to manipulate > the offsets for my Kafka input topics to be able to reprocess data from a > previous point in time. > > It looks like the functionality to do this was introduced by SEP-18. Does > anyone know of any code example showing how to use this feature? > > Thanks.