Hi Rion, Thanks for the explanation. I can see the case now. To my knowledge, Splittable DoFn cannot help on this case and if you want watermark from sources to be separated, I believe you have to have them in deperated pipelines. I don't think we support per-key watermark like ifeature n one pipeline.
On Mon, Feb 1, 2021 at 12:26 PM Rion Williams <[email protected]> wrote: > Hi again Boyuan, > > Close, I believe. I'll describe the scenario a bit more specifically. > Basically, I have a Kafka topic with 10 partitions and each of these > contains records for various combinations of tenants and sources that come > in interspersed across these partitions. This pipeline applies some > windowing downstream, however I think for that to work properly the > pipelines would need to be segregated in some fashion so data coming in for > one tenant or source doesn't interfere with windowing for another. > > The pipeline itself looks like this: > > val pipeline = Pipeline.create(options) > > // Partition Events according to their data sources > val partitionedEvents = pipeline > .apply("Read Events from Kafka", > KafkaIO > .read<String, Log>() > .withBootstrapServers(options.brokerUrl) > .withTopic(options.logsTopic) > .withKeyDeserializer(StringDeserializer::class.java) > .withValueDeserializerAndCoder( > SpecificAvroDeserializer<Log>()::class.java, > AvroCoder.of(Log::class.java) > ) > .withReadCommitted() > .commitOffsetsInFinalize() > .withTimestampPolicyFactory { _, previousWatermark -> > WatermarkPolicy(previousWatermark) } > .withConsumerConfigUpdates( > ImmutableMap.of<String, Any>( > ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest", > ConsumerConfig.GROUP_ID_CONFIG, "log-processor-pipeline", > "schema.registry.url", options.schemaRegistryUrl > ) > ).withoutMetadata() > ) > .apply("Log Events", ParDo.of(Logs.log())) > .apply("Rekey Logs by Tenant", ParDo.of(Logs.key())) > .apply("Partition Logs by Source", Partition.of(sources.size, > Events.partition<KV<String, Log>>(sources))) > > dataSources.forEach { dataSource -> > // Store a reference to the data source name to avoid serialization issues > val sourceName = dataSource.name > > // Apply source-specific windowing strategies > partitionedLogs[dataSource.partition] > .apply("Building Windows for $sourceName", > SourceSpecificWindow.of<KV<String, Log>>(dataSource)) > .apply("Group Windowed Logs by Key for $sourceName", > GroupByKey.create()) > .apply("Log After Windowing for $sourceName", > ParDo.of(Logs.logAfterWindowing())) > .apply( > "Writing Windowed Logs to Files for $sourceName", > FileIO.writeDynamic<String, KV<String, MutableIterable<Log>>>() > .withNumShards(1) > .by { row -> "${row.key}/${sourceName}" } > .withDestinationCoder(StringUtf8Coder.of()) > .via(Contextful.fn(SerializableFunction { logs -> > Files.stringify(logs.value) }), TextIO.sink()) > .to(options.output) > .withNaming { partition -> Files.name(partition)} > ) > } > > pipeline.run().waitUntilFinish() > > Sorry - I know that's a lot, but in a nutshell I'm attempting to: > > - Read from a multi-tenant/source topic (10 partitions) > - Partition those events by source > - Window events according to their defined source (according to > event-time fields within the records) > - Write out files on windows closing to the appropriate tenant/source > directory > > At present, it seems that because the WatermarkPolicy is only capable of > keeping a separate watermark per partition and since it is using an > event-time property to handle that, that multiple tenants/source > combinations could impact others, cause windows to close unexpected/early, > data to be missed, etc. This is why I believe that perhaps a SDF that was > evaluated prior to reading from Kafka could allow me to treat each of these > tenant-source pairs as separate pipelines without a major architectural > overhaul. > > Is this something that an SDF might excel at or is there some other > mechanism that I might consider to accomplish this? > > > > On Mon, Feb 1, 2021 at 1:09 PM Boyuan Zhang <[email protected]> wrote: > >> Hi Rion, >> >> Let's say that you have topic with 3 partitions and what you want to do >> is to read from these 3 partitions and each partition maintains its own >> watermark instead of having a watermark over these 3 partitions. Do I >> understand this correctly? >> >> If so, I think you need separated pipelines. If you only want to know >> which records come from which partitions, ReadFromKafkaDoFn emits a KV pair >> where the KafkaSourceDescriptor is the key and KafkaRecord is the value. >> >> On Mon, Feb 1, 2021 at 11:01 AM Rion Williams <[email protected]> >> wrote: >> >>> Hi Boyuan, >>> >>> Do you know if it’s possible to do something similar to this with a >>> single topic, essentially treat records with the same keys as their own >>> distinct pipelines. The challenge I’m encountering for splitting things >>> downstream ends up being related to watermarking at the partition-level >>> (via a WatermarkPolicy) and I essentially need to track watermarking or >>> treat records with a particular key the same/independently. >>> >>> I’d assumed that would need to be done prior to reading from Kafka, >>> which is where the SDF would come in. >>> >>> On Feb 1, 2021, at 12:48 PM, Boyuan Zhang <[email protected]> wrote: >>> >>> >>> Hi Rion, >>> >>> It sounds like ReadFromKafkaDoFn >>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java> >>> could be one of the solutions. It takes KafkaSourceDescritpor >>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java>(basically >>> it's a topic + partition) as input and emit KafkaRecords. Then your >>> pipeline can look like: >>> testPipeline >>> .apply(your source that generates KafkaSourceDescriptor) >>> .apply(ParDo.of(ReadFromKafkaDoFn)) >>> .apply(other parts) >>> >>> On Mon, Feb 1, 2021 at 8:06 AM Rion Williams <[email protected]> >>> wrote: >>> >>>> Hey all, >>>> >>>> I'm currently in a situation where I have a single Kafka topic with >>>> data across multiple partitions and covers data from multiple sources. I'm >>>> trying to see if there's a way that I'd be able to accomplish reading from >>>> these different sources as different pipelines and if a Splittable DoFn can >>>> do this. >>>> >>>> Basically - what I'd like to do is for a given key on a record, treat >>>> this as a separate pipeline from Kafka: >>>> >>>> testPipeline >>>> .apply( >>>> /* >>>> Apply some function here to tell Kafka how to describe how to >>>> split up >>>> the sources that I want to read from >>>> */ >>>> ) >>>> .apply("Ready from Kafka", KafkaIO.read(...)) >>>> .apply("Remaining Pipeline Omitted for Brevity" >>>> >>>> Is it possible to do this? I'm trying to avoid a major architectural >>>> change that would require multiple separate topics by source, however if I >>>> can guarantee that a given key (and it's associated watermark) are treated >>>> separately, that would be ideal. >>>> >>>> Any advice or recommendations for a strategy that might work would be >>>> helpful! >>>> >>>> Thanks, >>>> >>>> Rion >>>> >>>
