Is there any workaround to this issue?

On Mon, Jul 20, 2020 at 5:33 PM ayush sharma <[email protected]> wrote:

> Thank you for the suggestions. I tried using FlinkRunner and
> setting environment_type either to DOCKER or LOOPBACK gives an error -
> java.lang.UnsupportedOperationException: The ActiveBundle does not have a
> registered bundle checkpoint handler.
>
> I found that this issue has been reported (
> https://issues.apache.org/jira/browse/BEAM-6868) and hence upvoting it.
> Thank you for the prompt responses and looking forward to using this
> feature in the future.
>
> Regards,
> Ayush.
>
> On Sat, Jul 18, 2020 at 3:14 PM Chamikara Jayalath <[email protected]>
> wrote:
>
>>
>>
>> On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <[email protected]>
>> wrote:
>>
>>> Thank you guys for the reply. I am really stuck and could not proceed
>>> further.
>>> Yes, the previous trial published message had null key.
>>> But when I send key:value pair through producer using
>>>
>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>>> mytopic --property *"parse.key=true" --property "key.separator=:"*
>>> > tryKey:tryValue
>>>
>>> I do not get any error but beam does not print the received message.
>>> Here is how my pipeline looks like,
>>> result = (
>>>         pipeline
>>>
>>>         | "Read from kafka" >> ReadFromKafka(
>>>             consumer_config={
>>>                 "bootstrap.servers": 'localhost:9092',
>>>             },
>>>             topics=['mytopic'],
>>>             expansion_service='localhost:8097',
>>>
>>>         | "print" >> beam.Map(print)
>>>         )
>>>
>>>
>> I suspect DirectRunner in LOOPBACK mode might not be working for
>> cross-language transforms today. Please note that cross-language transforms
>> framework is fairly new [1] and we are adding support for various runners
>> and environment configurations.
>> Can you try with Flink in DOCKER mode ?
>>
>>
>>> If this is not the way we make beam and kafka communicate then please
>>> share a working example which showcases how a message published in kafka
>>> gets received by beam while streaming.
>>>
>>
>> I'm adding an example but I've only tested this with Dataflow yet. I hope
>> to test that example for more runners and add additional instructions
>> there.
>> https://github.com/apache/beam/pull/12188
>>
>> Thanks,
>> Cham
>>
>> [1] https://beam.apache.org/roadmap/connectors-multi-sdk/
>>
>>>
>>> Regards,
>>> Ayush Sharma
>>>
>>> On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <
>>> [email protected]> wrote:
>>>
>>>> Yes, seems like this is due to the key being null. XLang KafkaIO has to
>>>> be updated to support this. You should not run into this error if you
>>>> publish keys and values that are not null.
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <[email protected]> wrote:
>>>>
>>>>> +dev <[email protected]>
>>>>>
>>>>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <[email protected]> wrote:
>>>>>
>>>>>> +Heejong Lee <[email protected]> +Chamikara Jayalath
>>>>>> <[email protected]>
>>>>>>
>>>>>> Do you know if your trial record has an empty key or value?
>>>>>> If so, then you hit a bug and it seems as though there was a miss
>>>>>> supporting this usecase.
>>>>>>
>>>>>> Heejong and Cham,
>>>>>> It looks like the Javadoc for ByteArrayDeserializer and other
>>>>>> Deserializers can return null[1, 2] and we aren't using
>>>>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>>>>>> non XLang KafkaIO does this correctly in its regular coder inference
>>>>>> logic[4]. I flied BEAM-10529[5]
>>>>>>
>>>>>> 1:
>>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>>> 2:
>>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>>> 3:
>>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>>>>>> 4:
>>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>>>>>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>>>>>
>>>>>>
>>>>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am trying to build a streaming beam pipeline in python which
>>>>>>> should capture messages from kafka and then execute further stages of 
>>>>>>> data
>>>>>>> fetching from other sources and aggregation. The step-by-step process of
>>>>>>> what I have built till now is:
>>>>>>>
>>>>>>>    1.
>>>>>>>
>>>>>>>    Running Kafka instance on localhost:9092
>>>>>>>
>>>>>>>    ./bin/kafka-server-start.sh ./config/server.properties
>>>>>>>    2.
>>>>>>>
>>>>>>>    Run beam-flink job server using docker
>>>>>>>
>>>>>>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>>>>>>    3.
>>>>>>>
>>>>>>>    Run beam-kafka pipeline
>>>>>>>
>>>>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import 
>>>>>>> ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options 
>>>>>>> import PipelineOptions, StandardOptions
>>>>>>>
>>>>>>> if __name__ == '__main__':
>>>>>>>     options = PipelineOptions([
>>>>>>>         "--job_endpoint=localhost:8099",
>>>>>>>         "--environment_type=LOOPBACK",
>>>>>>>         "--streaming",
>>>>>>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>>>>>     ])
>>>>>>>
>>>>>>>     options = options.view_as(StandardOptions)
>>>>>>>     options.streaming = True
>>>>>>>
>>>>>>>     pipeline = beam.Pipeline(options=options)
>>>>>>>
>>>>>>>     result = (
>>>>>>>         pipeline
>>>>>>>
>>>>>>>         | "Read from kafka" >> ReadFromKafka(
>>>>>>>             consumer_config={
>>>>>>>                 "bootstrap.servers": 'localhost:9092',
>>>>>>>             },
>>>>>>>             topics=['mytopic'],
>>>>>>>             expansion_service='localhost:8097',
>>>>>>>         )
>>>>>>>
>>>>>>>         | beam.Map(print)
>>>>>>>     )
>>>>>>>
>>>>>>>     pipeline.run()
>>>>>>>
>>>>>>>
>>>>>>>    1. Publish new message using kafka-producer.sh
>>>>>>>
>>>>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
>>>>>>> mytopic>tryme
>>>>>>>
>>>>>>> After publishing this trial message, the beam pipeline perceives the
>>>>>>> message but crashes giving this error:
>>>>>>>
>>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: 
>>>>>>> org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>>>>>     at 
>>>>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>>>>     at 
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>>>>>     at 
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>>>>>     at 
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>>>>>     at 
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>>>>>     at 
>>>>>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>>>>>     at 
>>>>>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
>>>>>>>  Source)
>>>>>>>     at 
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>>>>>     at 
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>>>>>     at 
>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>>>>>     at org.apache.beam
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Ayush Sharma.
>>>>>>>
>>>>>>>

Reply via email to