Hi again Boyuan,

Close, I believe. I'll describe the scenario a bit more specifically.
Basically, I have a Kafka topic with 10 partitions and each of these
contains records for various combinations of tenants and sources that come
in interspersed across these partitions. This pipeline applies some
windowing downstream, however I think for that to work properly the
pipelines would need to be segregated in some fashion so data coming in for
one tenant or source doesn't interfere with windowing for another.

The pipeline itself looks like this:

val pipeline = Pipeline.create(options)

// Partition Events according to their data sources
val partitionedEvents = pipeline
    .apply("Read Events from Kafka",
        KafkaIO
            .read<String, Log>()
            .withBootstrapServers(options.brokerUrl)
            .withTopic(options.logsTopic)
            .withKeyDeserializer(StringDeserializer::class.java)
            .withValueDeserializerAndCoder(
                SpecificAvroDeserializer<Log>()::class.java,
                AvroCoder.of(Log::class.java)
            )
            .withReadCommitted()
            .commitOffsetsInFinalize()
            .withTimestampPolicyFactory { _, previousWatermark ->
WatermarkPolicy(previousWatermark) }
            .withConsumerConfigUpdates(
            ImmutableMap.of<String, Any>(
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
                ConsumerConfig.GROUP_ID_CONFIG, "log-processor-pipeline",
                "schema.registry.url", options.schemaRegistryUrl
            )
        ).withoutMetadata()
    )
    .apply("Log Events", ParDo.of(Logs.log()))
    .apply("Rekey Logs by Tenant", ParDo.of(Logs.key()))
    .apply("Partition Logs by Source", Partition.of(sources.size,
Events.partition<KV<String, Log>>(sources)))

dataSources.forEach { dataSource ->
    // Store a reference to the data source name to avoid serialization issues
    val sourceName = dataSource.name

    // Apply source-specific windowing strategies
    partitionedLogs[dataSource.partition]
        .apply("Building Windows for $sourceName",
SourceSpecificWindow.of<KV<String, Log>>(dataSource))
        .apply("Group Windowed Logs by Key for $sourceName",
GroupByKey.create())
        .apply("Log After Windowing for $sourceName",
ParDo.of(Logs.logAfterWindowing()))
        .apply(
            "Writing Windowed Logs to Files for $sourceName",
            FileIO.writeDynamic<String, KV<String, MutableIterable<Log>>>()
                .withNumShards(1)
                .by { row -> "${row.key}/${sourceName}" }
                .withDestinationCoder(StringUtf8Coder.of())
                .via(Contextful.fn(SerializableFunction { logs ->
Files.stringify(logs.value) }), TextIO.sink())
                .to(options.output)
                .withNaming { partition -> Files.name(partition)}
        )
}

pipeline.run().waitUntilFinish()

Sorry - I know that's a lot, but in a nutshell I'm attempting to:

   - Read from a multi-tenant/source topic (10 partitions)
   - Partition those events by source
   - Window events according to their defined source (according to
   event-time fields within the records)
   - Write out files on windows closing to the appropriate tenant/source
   directory

At present, it seems that because the WatermarkPolicy is only capable of
keeping a separate watermark per partition and since it is using an
event-time property to handle that, that multiple tenants/source
combinations could impact others, cause windows to close unexpected/early,
data to be missed, etc. This is why I believe that perhaps a SDF that was
evaluated prior to reading from Kafka could allow me to treat each of these
tenant-source pairs as separate pipelines without a major architectural
overhaul.

Is this something that an SDF might excel at or is there some other
mechanism that I might consider to accomplish this?



On Mon, Feb 1, 2021 at 1:09 PM Boyuan Zhang <[email protected]> wrote:

> Hi Rion,
>
> Let's say that you have topic with 3 partitions and what you want to do is
> to read from these 3 partitions and each partition maintains its own
> watermark instead of having a watermark over these 3 partitions. Do I
> understand this correctly?
>
>  If so, I think you need separated pipelines. If you only want to know
> which records come from which partitions, ReadFromKafkaDoFn emits a KV pair
> where the KafkaSourceDescriptor is the key and KafkaRecord is the value.
>
> On Mon, Feb 1, 2021 at 11:01 AM Rion Williams <[email protected]>
> wrote:
>
>> Hi Boyuan,
>>
>> Do you know if it’s possible to do something similar to this with a
>> single topic, essentially treat records with the same keys as their own
>> distinct pipelines. The challenge I’m encountering for splitting things
>> downstream ends up being related to watermarking at the partition-level
>> (via a WatermarkPolicy) and I essentially need to track watermarking or
>> treat records with a particular key the same/independently.
>>
>> I’d assumed that would need to be done prior to reading from Kafka, which
>> is where the SDF would come in.
>>
>> On Feb 1, 2021, at 12:48 PM, Boyuan Zhang <[email protected]> wrote:
>>
>> 
>> Hi Rion,
>>
>> It sounds like ReadFromKafkaDoFn
>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java>
>> could be one of the solutions. It takes KafkaSourceDescritpor
>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java>(basically
>> it's a topic + partition) as input and emit KafkaRecords. Then your
>> pipeline can look like:
>> testPipeline
>>   .apply(your source that generates KafkaSourceDescriptor)
>>   .apply(ParDo.of(ReadFromKafkaDoFn))
>>   .apply(other parts)
>>
>> On Mon, Feb 1, 2021 at 8:06 AM Rion Williams <[email protected]>
>> wrote:
>>
>>> Hey all,
>>>
>>> I'm currently in a situation where I have a single Kafka topic with data
>>> across multiple partitions and covers data from multiple sources. I'm
>>> trying to see if there's a way that I'd be able to accomplish reading from
>>> these different sources as different pipelines and if a Splittable DoFn can
>>> do this.
>>>
>>> Basically - what I'd like to do is for a given key on a record, treat
>>> this as a separate pipeline from Kafka:
>>>
>>> testPipeline
>>>     .apply(
>>>         /*
>>>             Apply some function here to tell Kafka how to describe how to 
>>> split up
>>>             the sources that I want to read from
>>>          */
>>>     )
>>>     .apply("Ready from Kafka", KafkaIO.read(...))
>>>     .apply("Remaining Pipeline Omitted for Brevity"
>>>
>>> Is it possible to do this? I'm trying to avoid a major architectural
>>> change that would require multiple separate topics by source, however if I
>>> can guarantee that a given key (and it's associated watermark) are treated
>>> separately, that would be ideal.
>>>
>>> Any advice or recommendations for a strategy that might work would be
>>> helpful!
>>>
>>> Thanks,
>>>
>>> Rion
>>>
>>

Reply via email to