Hi Rion,

Thanks for the explanation. I can see the case now. To my knowledge,
Splittable DoFn cannot help on this case and if you want watermark from
sources to be separated, I believe you have to have them in deperated
pipelines. I don't think we support per-key watermark like ifeature n one
pipeline.

On Mon, Feb 1, 2021 at 12:26 PM Rion Williams <[email protected]> wrote:

> Hi again Boyuan,
>
> Close, I believe. I'll describe the scenario a bit more specifically.
> Basically, I have a Kafka topic with 10 partitions and each of these
> contains records for various combinations of tenants and sources that come
> in interspersed across these partitions. This pipeline applies some
> windowing downstream, however I think for that to work properly the
> pipelines would need to be segregated in some fashion so data coming in for
> one tenant or source doesn't interfere with windowing for another.
>
> The pipeline itself looks like this:
>
> val pipeline = Pipeline.create(options)
>
> // Partition Events according to their data sources
> val partitionedEvents = pipeline
>     .apply("Read Events from Kafka",
>         KafkaIO
>             .read<String, Log>()
>             .withBootstrapServers(options.brokerUrl)
>             .withTopic(options.logsTopic)
>             .withKeyDeserializer(StringDeserializer::class.java)
>             .withValueDeserializerAndCoder(
>                 SpecificAvroDeserializer<Log>()::class.java,
>                 AvroCoder.of(Log::class.java)
>             )
>             .withReadCommitted()
>             .commitOffsetsInFinalize()
>             .withTimestampPolicyFactory { _, previousWatermark -> 
> WatermarkPolicy(previousWatermark) }
>             .withConsumerConfigUpdates(
>             ImmutableMap.of<String, Any>(
>                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
>                 ConsumerConfig.GROUP_ID_CONFIG, "log-processor-pipeline",
>                 "schema.registry.url", options.schemaRegistryUrl
>             )
>         ).withoutMetadata()
>     )
>     .apply("Log Events", ParDo.of(Logs.log()))
>     .apply("Rekey Logs by Tenant", ParDo.of(Logs.key()))
>     .apply("Partition Logs by Source", Partition.of(sources.size, 
> Events.partition<KV<String, Log>>(sources)))
>
> dataSources.forEach { dataSource ->
>     // Store a reference to the data source name to avoid serialization issues
>     val sourceName = dataSource.name
>
>     // Apply source-specific windowing strategies
>     partitionedLogs[dataSource.partition]
>         .apply("Building Windows for $sourceName", 
> SourceSpecificWindow.of<KV<String, Log>>(dataSource))
>         .apply("Group Windowed Logs by Key for $sourceName", 
> GroupByKey.create())
>         .apply("Log After Windowing for $sourceName", 
> ParDo.of(Logs.logAfterWindowing()))
>         .apply(
>             "Writing Windowed Logs to Files for $sourceName",
>             FileIO.writeDynamic<String, KV<String, MutableIterable<Log>>>()
>                 .withNumShards(1)
>                 .by { row -> "${row.key}/${sourceName}" }
>                 .withDestinationCoder(StringUtf8Coder.of())
>                 .via(Contextful.fn(SerializableFunction { logs -> 
> Files.stringify(logs.value) }), TextIO.sink())
>                 .to(options.output)
>                 .withNaming { partition -> Files.name(partition)}
>         )
> }
>
> pipeline.run().waitUntilFinish()
>
> Sorry - I know that's a lot, but in a nutshell I'm attempting to:
>
>    - Read from a multi-tenant/source topic (10 partitions)
>    - Partition those events by source
>    - Window events according to their defined source (according to
>    event-time fields within the records)
>    - Write out files on windows closing to the appropriate tenant/source
>    directory
>
> At present, it seems that because the WatermarkPolicy is only capable of
> keeping a separate watermark per partition and since it is using an
> event-time property to handle that, that multiple tenants/source
> combinations could impact others, cause windows to close unexpected/early,
> data to be missed, etc. This is why I believe that perhaps a SDF that was
> evaluated prior to reading from Kafka could allow me to treat each of these
> tenant-source pairs as separate pipelines without a major architectural
> overhaul.
>
> Is this something that an SDF might excel at or is there some other
> mechanism that I might consider to accomplish this?
>
>
>
> On Mon, Feb 1, 2021 at 1:09 PM Boyuan Zhang <[email protected]> wrote:
>
>> Hi Rion,
>>
>> Let's say that you have topic with 3 partitions and what you want to do
>> is to read from these 3 partitions and each partition maintains its own
>> watermark instead of having a watermark over these 3 partitions. Do I
>> understand this correctly?
>>
>>  If so, I think you need separated pipelines. If you only want to know
>> which records come from which partitions, ReadFromKafkaDoFn emits a KV pair
>> where the KafkaSourceDescriptor is the key and KafkaRecord is the value.
>>
>> On Mon, Feb 1, 2021 at 11:01 AM Rion Williams <[email protected]>
>> wrote:
>>
>>> Hi Boyuan,
>>>
>>> Do you know if it’s possible to do something similar to this with a
>>> single topic, essentially treat records with the same keys as their own
>>> distinct pipelines. The challenge I’m encountering for splitting things
>>> downstream ends up being related to watermarking at the partition-level
>>> (via a WatermarkPolicy) and I essentially need to track watermarking or
>>> treat records with a particular key the same/independently.
>>>
>>> I’d assumed that would need to be done prior to reading from Kafka,
>>> which is where the SDF would come in.
>>>
>>> On Feb 1, 2021, at 12:48 PM, Boyuan Zhang <[email protected]> wrote:
>>>
>>> 
>>> Hi Rion,
>>>
>>> It sounds like ReadFromKafkaDoFn
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java>
>>> could be one of the solutions. It takes KafkaSourceDescritpor
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java>(basically
>>> it's a topic + partition) as input and emit KafkaRecords. Then your
>>> pipeline can look like:
>>> testPipeline
>>>   .apply(your source that generates KafkaSourceDescriptor)
>>>   .apply(ParDo.of(ReadFromKafkaDoFn))
>>>   .apply(other parts)
>>>
>>> On Mon, Feb 1, 2021 at 8:06 AM Rion Williams <[email protected]>
>>> wrote:
>>>
>>>> Hey all,
>>>>
>>>> I'm currently in a situation where I have a single Kafka topic with
>>>> data across multiple partitions and covers data from multiple sources. I'm
>>>> trying to see if there's a way that I'd be able to accomplish reading from
>>>> these different sources as different pipelines and if a Splittable DoFn can
>>>> do this.
>>>>
>>>> Basically - what I'd like to do is for a given key on a record, treat
>>>> this as a separate pipeline from Kafka:
>>>>
>>>> testPipeline
>>>>     .apply(
>>>>         /*
>>>>             Apply some function here to tell Kafka how to describe how to 
>>>> split up
>>>>             the sources that I want to read from
>>>>          */
>>>>     )
>>>>     .apply("Ready from Kafka", KafkaIO.read(...))
>>>>     .apply("Remaining Pipeline Omitted for Brevity"
>>>>
>>>> Is it possible to do this? I'm trying to avoid a major architectural
>>>> change that would require multiple separate topics by source, however if I
>>>> can guarantee that a given key (and it's associated watermark) are treated
>>>> separately, that would be ideal.
>>>>
>>>> Any advice or recommendations for a strategy that might work would be
>>>> helpful!
>>>>
>>>> Thanks,
>>>>
>>>> Rion
>>>>
>>>

Reply via email to