I tried enabling auto commit using flink and direct runners individually
using the code below.
Irrespective of environment-type ["DOCKER", "LOOPBACK"] for flink runner,
the pipeline crashes with ERROR: java.lang.UnsupportedOperationException:
The ActiveBundle does not have a registered bundle checkpoint handler.
And direct runner (no args in PipelineOptions()) does not respond to the
reception of the published kafka message.
# USAGE:
# 1. python sof_kafka_read_v2.py --runner flink
# 2. python sof_kafka_read_v2.py --runner flink --environment-type DOCKER
# 3. python sof_kafka_read_v2.py
import apache_beam as beam
from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka
from apache_beam.options.pipeline_options import PipelineOptions,
StandardOptions, SetupOptions
import argparse
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--runner',
type=str,
default="direct",
)
parser.add_argument(
'--environment-type',
type=str,
default="LOOPBACK",
)
args = parser.parse_args()
runner = args.runner
environment_type = args.environment_type
if runner == "direct":
options = PipelineOptions()
elif runner == "flink":
options = PipelineOptions([
"--runner=FlinkRunner",
"--flink_version=1.9",
"--flink_master=localhost:8081",
"--environment_type=" + environment_type,
"--job_name=try_kafka",
"--streaming",
])
options.view_as(SetupOptions).save_main_session = True
options.view_as(StandardOptions).streaming = True
pipeline = beam.Pipeline(options=options)
result = (
pipeline
| "Read from kafka" >> ReadFromKafka(
consumer_config={
"bootstrap.servers": 'localhost:9092',
"enable.auto.commit": 'true',
},
topics=['mytopic'],
expansion_service='localhost:8097',
)
| "print" >> beam.Map(print)
)
res = pipeline.run()
res.wait_until_finish()
On Wed, Jul 22, 2020 at 7:39 PM Robert Bradshaw <[email protected]> wrote:
> On Sat, Jul 18, 2020 at 12:08 PM Chamikara Jayalath <[email protected]>
> wrote:
>
>>
>>
>> On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <[email protected]>
>> wrote:
>>
>>> Thank you guys for the reply. I am really stuck and could not proceed
>>> further.
>>> Yes, the previous trial published message had null key.
>>> But when I send key:value pair through producer using
>>>
>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>>> mytopic --property *"parse.key=true" --property "key.separator=:"*
>>> > tryKey:tryValue
>>>
>>> I do not get any error but beam does not print the received message.
>>> Here is how my pipeline looks like,
>>> result = (
>>> pipeline
>>>
>>> | "Read from kafka" >> ReadFromKafka(
>>> consumer_config={
>>> "bootstrap.servers": 'localhost:9092',
>>> },
>>> topics=['mytopic'],
>>> expansion_service='localhost:8097',
>>>
>>> | "print" >> beam.Map(print)
>>> )
>>>
>>>
>> I suspect DirectRunner in LOOPBACK mode might not be working for
>> cross-language transforms today.
>>
>
> When running a Streaming pipeline, the DirectRuner falls back to the old
> runner that does not support cross-language.
> https://issues.apache.org/jira/browse/BEAM-7514
>
> Please note that cross-language transforms framework is fairly new [1] and
>> we are adding support for various runners and environment configurations.
>> Can you try with Flink in DOCKER mode ?
>>
>>
>>> If this is not the way we make beam and kafka communicate then please
>>> share a working example which showcases how a message published in kafka
>>> gets received by beam while streaming.
>>>
>>
>> I'm adding an example but I've only tested this with Dataflow yet. I hope
>> to test that example for more runners and add additional instructions
>> there.
>> https://github.com/apache/beam/pull/12188
>>
>> Thanks,
>> Cham
>>
>> [1] https://beam.apache.org/roadmap/connectors-multi-sdk/
>>
>>>
>>> Regards,
>>> Ayush Sharma
>>>
>>> On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <
>>> [email protected]> wrote:
>>>
>>>> Yes, seems like this is due to the key being null. XLang KafkaIO has to
>>>> be updated to support this. You should not run into this error if you
>>>> publish keys and values that are not null.
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <[email protected]> wrote:
>>>>
>>>>> +dev <[email protected]>
>>>>>
>>>>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <[email protected]> wrote:
>>>>>
>>>>>> +Heejong Lee <[email protected]> +Chamikara Jayalath
>>>>>> <[email protected]>
>>>>>>
>>>>>> Do you know if your trial record has an empty key or value?
>>>>>> If so, then you hit a bug and it seems as though there was a miss
>>>>>> supporting this usecase.
>>>>>>
>>>>>> Heejong and Cham,
>>>>>> It looks like the Javadoc for ByteArrayDeserializer and other
>>>>>> Deserializers can return null[1, 2] and we aren't using
>>>>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>>>>>> non XLang KafkaIO does this correctly in its regular coder inference
>>>>>> logic[4]. I flied BEAM-10529[5]
>>>>>>
>>>>>> 1:
>>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>>> 2:
>>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>>> 3:
>>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>>>>>> 4:
>>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>>>>>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>>>>>
>>>>>>
>>>>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am trying to build a streaming beam pipeline in python which
>>>>>>> should capture messages from kafka and then execute further stages of
>>>>>>> data
>>>>>>> fetching from other sources and aggregation. The step-by-step process of
>>>>>>> what I have built till now is:
>>>>>>>
>>>>>>> 1.
>>>>>>>
>>>>>>> Running Kafka instance on localhost:9092
>>>>>>>
>>>>>>> ./bin/kafka-server-start.sh ./config/server.properties
>>>>>>> 2.
>>>>>>>
>>>>>>> Run beam-flink job server using docker
>>>>>>>
>>>>>>> docker run --net=host apache/beam_flink1.10_job_server:latest
>>>>>>> 3.
>>>>>>>
>>>>>>> Run beam-kafka pipeline
>>>>>>>
>>>>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import
>>>>>>> ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options
>>>>>>> import PipelineOptions, StandardOptions
>>>>>>>
>>>>>>> if __name__ == '__main__':
>>>>>>> options = PipelineOptions([
>>>>>>> "--job_endpoint=localhost:8099",
>>>>>>> "--environment_type=LOOPBACK",
>>>>>>> "--streaming",
>>>>>>> "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>>>>> ])
>>>>>>>
>>>>>>> options = options.view_as(StandardOptions)
>>>>>>> options.streaming = True
>>>>>>>
>>>>>>> pipeline = beam.Pipeline(options=options)
>>>>>>>
>>>>>>> result = (
>>>>>>> pipeline
>>>>>>>
>>>>>>> | "Read from kafka" >> ReadFromKafka(
>>>>>>> consumer_config={
>>>>>>> "bootstrap.servers": 'localhost:9092',
>>>>>>> },
>>>>>>> topics=['mytopic'],
>>>>>>> expansion_service='localhost:8097',
>>>>>>> )
>>>>>>>
>>>>>>> | beam.Map(print)
>>>>>>> )
>>>>>>>
>>>>>>> pipeline.run()
>>>>>>>
>>>>>>>
>>>>>>> 1. Publish new message using kafka-producer.sh
>>>>>>>
>>>>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>>>>>>> mytopic>tryme
>>>>>>>
>>>>>>> After publishing this trial message, the beam pipeline perceives the
>>>>>>> message but crashes giving this error:
>>>>>>>
>>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException:
>>>>>>> org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>>>>> at
>>>>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>>>> at
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>>>>> at
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>>>>> at
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>>>>> at
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>>>>> at
>>>>>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>>>>> at
>>>>>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
>>>>>>> Source)
>>>>>>> at
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>>>>> at
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>>>>> at
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>>>>> at org.apache.beam
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Ayush Sharma.
>>>>>>>
>>>>>>>