[
https://issues.apache.org/jira/browse/BEAM-11998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17418819#comment-17418819
]
Jonathan Hourany edited comment on BEAM-11998 at 9/22/21, 8:44 PM:
-------------------------------------------------------------------
For anyone else facing this issue, I was able to get a pipeline working using
[~chamikara] suggestions (see example below) but it requires that either one of
'max_num_records' or `max_read_time` be set. When either one of those
conditions are met, however, the connection is closed which doesn't bode well
for production use.
{code:python}
import apache_beam as beam
from apache_beam.io.external.kafka import ReadFromKafka
CONSUMER_CONFIG = {
"bootstrap.servers": "your-ip:your-port",
"auto.offset.reset": "earliest",
}
CONSUMER_TOPICS = ["test-topic"]
pipeline_options = beam.options.pipeline_options.PipelineOptions(
runner="FlinkRunner",
streaming=True,
experiments=["use_deprecated_read"]
)
with beam.Pipeline(options=pipeline_options) as pipeline:
consumed = (
pipeline
| ReadFromKafka(
consumer_config=CONSUMER_CONFIG,
topics=CONSUMER_TOPICS,
max_num_records=5,
)
| beam.Map(print)
{code}
was (Author: jonathan hourany):
For anyone else facing this issue, I was able to get a pipeline working using
[~chamikara] suggestions (see example below) but it requires that either one of
'max_num_records' or `max_read_time` be set. When either one of those
conditions are met, however, the connection is closed which doesn't bode well
for production use.
{code:python}
import apache_beam as beam
from apache_beam.io.external.kafka import ReadFromKafka
CONSUMER_CONFIG = {
"bootstrap.servers": "your-ip:your-port",
"auto.offset.reset": "earliest",
}
CONSUMER_TOPICS = ["test-topic"]
pipeline_options = beam.options.pipeline_options.PipelineOptions(
runner="FlinkRunner",
streaming=True,
experiments=["use_deprecated_read"]
)
with beam.Pipeline(options=pipeline_options) as pipeline:
consumed = (
pipeline
| ReadFromKafka(
consumer_config=CONSUMER_CONFIG,
topics=CONSUMER_TOPICS,
max_num_records==5,
)
| beam.Map(print)
{code}
> Portable runners should be able to issue checkpoints to Splittable DoFn
> -----------------------------------------------------------------------
>
> Key: BEAM-11998
> URL: https://issues.apache.org/jira/browse/BEAM-11998
> Project: Beam
> Issue Type: New Feature
> Components: cross-language, runner-flink, runner-spark
> Reporter: Boyuan Zhang
> Priority: P2
> Attachments: read.png
>
>
> To execute unbounded Splittable DoFn over fnapi in streaming mode properly,
> portable runners should issue split(ProcessBundleSplitRequest with
> fraction_of_remainder > 0) or simply checkpoint(ProcessBundleSplitRequest
> with fraction_of_remainder == 0) to SDK regularly to make current bundle
> finished processing instead of running forever.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)