I tried enabling auto commit using flink and direct runners individually
using the code below.
Irrespective of environment-type ["DOCKER", "LOOPBACK"] for flink runner,
the pipeline crashes with ERROR: java.lang.UnsupportedOperationException:
The ActiveBundle does not have a registered bundle checkpoint handler.
And direct runner (no args in PipelineOptions()) does not respond to the
reception of the published kafka message.

# USAGE:
# 1. python sof_kafka_read_v2.py --runner flink
# 2. python sof_kafka_read_v2.py --runner flink --environment-type DOCKER
# 3. python sof_kafka_read_v2.py

import apache_beam as beam
from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka
from apache_beam.options.pipeline_options import PipelineOptions,
StandardOptions, SetupOptions
import argparse

if __name__ == '__main__':

    parser = argparse.ArgumentParser()

    parser.add_argument(
        '--runner',
        type=str,
        default="direct",
    )
    parser.add_argument(
        '--environment-type',
        type=str,
        default="LOOPBACK",
    )

    args = parser.parse_args()

    runner = args.runner
    environment_type = args.environment_type

    if runner == "direct":
        options = PipelineOptions()
    elif runner == "flink":
        options = PipelineOptions([
        "--runner=FlinkRunner",
        "--flink_version=1.9",
        "--flink_master=localhost:8081",
        "--environment_type=" + environment_type,
        "--job_name=try_kafka",
        "--streaming",
    ])

    options.view_as(SetupOptions).save_main_session = True
    options.view_as(StandardOptions).streaming = True

    pipeline = beam.Pipeline(options=options)

    result = (
        pipeline

        | "Read from kafka" >> ReadFromKafka(
            consumer_config={
                "bootstrap.servers": 'localhost:9092',
                "enable.auto.commit": 'true',
            },
            topics=['mytopic'],
            expansion_service='localhost:8097',
        )

        | "print" >> beam.Map(print)
    )

    res = pipeline.run()
    res.wait_until_finish()



On Wed, Jul 22, 2020 at 7:39 PM Robert Bradshaw <[email protected]> wrote:

> On Sat, Jul 18, 2020 at 12:08 PM Chamikara Jayalath <[email protected]>
> wrote:
>
>>
>>
>> On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <[email protected]>
>> wrote:
>>
>>> Thank you guys for the reply. I am really stuck and could not proceed
>>> further.
>>> Yes, the previous trial published message had null key.
>>> But when I send key:value pair through producer using
>>>
>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>>> mytopic --property *"parse.key=true" --property "key.separator=:"*
>>> > tryKey:tryValue
>>>
>>> I do not get any error but beam does not print the received message.
>>> Here is how my pipeline looks like,
>>> result = (
>>>         pipeline
>>>
>>>         | "Read from kafka" >> ReadFromKafka(
>>>             consumer_config={
>>>                 "bootstrap.servers": 'localhost:9092',
>>>             },
>>>             topics=['mytopic'],
>>>             expansion_service='localhost:8097',
>>>
>>>         | "print" >> beam.Map(print)
>>>         )
>>>
>>>
>> I suspect DirectRunner in LOOPBACK mode might not be working for
>> cross-language transforms today.
>>
>
> When running a Streaming pipeline, the DirectRuner falls back to the old
> runner that does not support cross-language.
> https://issues.apache.org/jira/browse/BEAM-7514
>
> Please note that cross-language transforms framework is fairly new [1] and
>> we are adding support for various runners and environment configurations.
>> Can you try with Flink in DOCKER mode ?
>>
>>
>>> If this is not the way we make beam and kafka communicate then please
>>> share a working example which showcases how a message published in kafka
>>> gets received by beam while streaming.
>>>
>>
>> I'm adding an example but I've only tested this with Dataflow yet. I hope
>> to test that example for more runners and add additional instructions
>> there.
>> https://github.com/apache/beam/pull/12188
>>
>> Thanks,
>> Cham
>>
>> [1] https://beam.apache.org/roadmap/connectors-multi-sdk/
>>
>>>
>>> Regards,
>>> Ayush Sharma
>>>
>>> On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <
>>> [email protected]> wrote:
>>>
>>>> Yes, seems like this is due to the key being null. XLang KafkaIO has to
>>>> be updated to support this. You should not run into this error if you
>>>> publish keys and values that are not null.
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <[email protected]> wrote:
>>>>
>>>>> +dev <[email protected]>
>>>>>
>>>>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <[email protected]> wrote:
>>>>>
>>>>>> +Heejong Lee <[email protected]> +Chamikara Jayalath
>>>>>> <[email protected]>
>>>>>>
>>>>>> Do you know if your trial record has an empty key or value?
>>>>>> If so, then you hit a bug and it seems as though there was a miss
>>>>>> supporting this usecase.
>>>>>>
>>>>>> Heejong and Cham,
>>>>>> It looks like the Javadoc for ByteArrayDeserializer and other
>>>>>> Deserializers can return null[1, 2] and we aren't using
>>>>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>>>>>> non XLang KafkaIO does this correctly in its regular coder inference
>>>>>> logic[4]. I flied BEAM-10529[5]
>>>>>>
>>>>>> 1:
>>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>>> 2:
>>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>>> 3:
>>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>>>>>> 4:
>>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>>>>>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>>>>>
>>>>>>
>>>>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am trying to build a streaming beam pipeline in python which
>>>>>>> should capture messages from kafka and then execute further stages of 
>>>>>>> data
>>>>>>> fetching from other sources and aggregation. The step-by-step process of
>>>>>>> what I have built till now is:
>>>>>>>
>>>>>>>    1.
>>>>>>>
>>>>>>>    Running Kafka instance on localhost:9092
>>>>>>>
>>>>>>>    ./bin/kafka-server-start.sh ./config/server.properties
>>>>>>>    2.
>>>>>>>
>>>>>>>    Run beam-flink job server using docker
>>>>>>>
>>>>>>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>>>>>>    3.
>>>>>>>
>>>>>>>    Run beam-kafka pipeline
>>>>>>>
>>>>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import 
>>>>>>> ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options 
>>>>>>> import PipelineOptions, StandardOptions
>>>>>>>
>>>>>>> if __name__ == '__main__':
>>>>>>>     options = PipelineOptions([
>>>>>>>         "--job_endpoint=localhost:8099",
>>>>>>>         "--environment_type=LOOPBACK",
>>>>>>>         "--streaming",
>>>>>>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>>>>>     ])
>>>>>>>
>>>>>>>     options = options.view_as(StandardOptions)
>>>>>>>     options.streaming = True
>>>>>>>
>>>>>>>     pipeline = beam.Pipeline(options=options)
>>>>>>>
>>>>>>>     result = (
>>>>>>>         pipeline
>>>>>>>
>>>>>>>         | "Read from kafka" >> ReadFromKafka(
>>>>>>>             consumer_config={
>>>>>>>                 "bootstrap.servers": 'localhost:9092',
>>>>>>>             },
>>>>>>>             topics=['mytopic'],
>>>>>>>             expansion_service='localhost:8097',
>>>>>>>         )
>>>>>>>
>>>>>>>         | beam.Map(print)
>>>>>>>     )
>>>>>>>
>>>>>>>     pipeline.run()
>>>>>>>
>>>>>>>
>>>>>>>    1. Publish new message using kafka-producer.sh
>>>>>>>
>>>>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
>>>>>>> mytopic>tryme
>>>>>>>
>>>>>>> After publishing this trial message, the beam pipeline perceives the
>>>>>>> message but crashes giving this error:
>>>>>>>
>>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: 
>>>>>>> org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>>>>>     at 
>>>>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>>>>     at 
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>>>>>     at 
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>>>>>     at 
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>>>>>     at 
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>>>>>     at 
>>>>>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>>>>>     at 
>>>>>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
>>>>>>>  Source)
>>>>>>>     at 
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>>>>>     at 
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>>>>>     at 
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>>>>>     at org.apache.beam
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Ayush Sharma.
>>>>>>>
>>>>>>>

Reply via email to