[
https://issues.apache.org/jira/browse/BEAM-13171?focusedWorklogId=721133&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-721133
]
ASF GitHub Bot logged work on BEAM-13171:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 04/Feb/22 19:34
Start Date: 04/Feb/22 19:34
Worklog Time Spent: 10m
Work Description: nbali edited a comment on pull request #15951:
URL: https://github.com/apache/beam/pull/15951#issuecomment-1030289511
@aromanenko-dev I will move it there if it takes longer, but right now I
think I'm pretty close to the end, so a closure here might make more sense now.
So... to my own surprise I managed to make it work. I had to trigger the
"unified worker" path in the `DataflowRunner`, and now with a
`withStopReadTime` call the streaming job actually stops. _(If the necessity of
such experiment flags was obvious to everybody and assumed I already have it,
then mea culpa.)_
https://github.com/apache/beam/blob/163ac6a3c10c26898ad89ca8bedde8ef78ee7ee2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L2273-L2279
So after all this what I think the remaining problem is, that if you don't
use the "unified worker", and you configure your `KafkaIO.read*` with
configurations that require `KafkaIO.Read.ReadFromKafkaViaSDF` and you launch
it with an unbounded `PCollection`/as a streaming job, then the extra
functionality of the SDF will be gone without any warning.
https://github.com/apache/beam/blob/163ac6a3c10c26898ad89ca8bedde8ef78ee7ee2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L505-L519
So my idea is to add fail-fast checks into the corresponding
`PTransformOverrideFactory` - aka `KafkaIO.Read.KAFKA_READ_OVERRIDE`. We have
the `AppliedPTransform` available, which contains the `ReadFromKafkaViaSDF`,
which contains the `Read<> kafkaRead`, which contains the custom properties
(`checkStopReadingFn`, `stopReadTime`, etc - _I'm not sure if there is anything
else_) that are only being handled by the SDF. So if any of those properties
are not null, we should fail. Am I on the right path here?
https://github.com/apache/beam/blob/163ac6a3c10c26898ad89ca8bedde8ef78ee7ee2/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1339-L1348
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 721133)
Time Spent: 5h 40m (was: 5.5h)
> Support for stopReadTime on KafkaIO SDF
> ----------------------------------------
>
> Key: BEAM-13171
> URL: https://issues.apache.org/jira/browse/BEAM-13171
> Project: Beam
> Issue Type: Improvement
> Components: io-java-kafka
> Reporter: Mostafa Aghajani
> Assignee: Mostafa Aghajani
> Priority: P2
> Fix For: 2.36.0
>
> Time Spent: 5h 40m
> Remaining Estimate: 0h
>
> There is already the support for startReadTime using SDF when the Kafka
> version is supported.
> I want to add the support for stopReadTIme so we can extract messages from
> Kafka only up to a point in time and then the task will be finished.
> One use case: when you want to only re-process (re-read) a period of time for
> a Kafka topic in your pipeline.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)