[ 
https://issues.apache.org/jira/browse/BEAM-11998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17418819#comment-17418819
 ] 

Jonathan Hourany edited comment on BEAM-11998 at 9/22/21, 8:44 PM:
-------------------------------------------------------------------

For anyone else facing this issue, I was able to get a pipeline working using 
[~chamikara] suggestions (see example below) but it requires that either one of 
 'max_num_records' or `max_read_time` be set. When either one of those 
conditions are met, however, the connection is closed which doesn't bode well 
for production use. 

 
{code:python}
import apache_beam as beam
from apache_beam.io.external.kafka import ReadFromKafka


CONSUMER_CONFIG = {
    "bootstrap.servers": "your-ip:your-port",
    "auto.offset.reset": "earliest",
}

CONSUMER_TOPICS = ["test-topic"]

pipeline_options = beam.options.pipeline_options.PipelineOptions(
    runner="FlinkRunner",
    streaming=True,
    experiments=["use_deprecated_read"]
)


with beam.Pipeline(options=pipeline_options) as pipeline:
    consumed = (
        pipeline
        | ReadFromKafka(
            consumer_config=CONSUMER_CONFIG,
            topics=CONSUMER_TOPICS,
            max_num_records=5,
            ) 
        | beam.Map(print)
{code}
 


was (Author: jonathan hourany):
For anyone else facing this issue, I was able to get a pipeline working using 
[~chamikara] suggestions (see example below) but it requires that either one of 
 'max_num_records' or `max_read_time` be set. When either one of those 
conditions are met, however, the connection is closed which doesn't bode well 
for production use. 

 
{code:python}
import apache_beam as beam
from apache_beam.io.external.kafka import ReadFromKafka


CONSUMER_CONFIG = {
    "bootstrap.servers": "your-ip:your-port",
    "auto.offset.reset": "earliest",
}

CONSUMER_TOPICS = ["test-topic"]

pipeline_options = beam.options.pipeline_options.PipelineOptions(
    runner="FlinkRunner",
    streaming=True,
    experiments=["use_deprecated_read"]
)


with beam.Pipeline(options=pipeline_options) as pipeline:
    consumed = (
        pipeline
        | ReadFromKafka(
            consumer_config=CONSUMER_CONFIG,
            topics=CONSUMER_TOPICS,
            max_num_records==5,
            ) 
        | beam.Map(print)
{code}
 

> Portable runners should be able to issue checkpoints to Splittable DoFn
> -----------------------------------------------------------------------
>
>                 Key: BEAM-11998
>                 URL: https://issues.apache.org/jira/browse/BEAM-11998
>             Project: Beam
>          Issue Type: New Feature
>          Components: cross-language, runner-flink, runner-spark
>            Reporter: Boyuan Zhang
>            Priority: P2
>         Attachments: read.png
>
>
> To execute unbounded Splittable DoFn over fnapi in streaming mode properly, 
> portable runners should issue split(ProcessBundleSplitRequest with 
> fraction_of_remainder > 0) or simply checkpoint(ProcessBundleSplitRequest 
> with fraction_of_remainder == 0) to SDK regularly to make current bundle 
> finished processing instead of running forever.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to