On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <[email protected]> wrote:

> Thank you guys for the reply. I am really stuck and could not proceed
> further.
> Yes, the previous trial published message had null key.
> But when I send key:value pair through producer using
>
> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> mytopic --property *"parse.key=true" --property "key.separator=:"*
> > tryKey:tryValue
>
> I do not get any error but beam does not print the received message. Here
> is how my pipeline looks like,
> result = (
>         pipeline
>
>         | "Read from kafka" >> ReadFromKafka(
>             consumer_config={
>                 "bootstrap.servers": 'localhost:9092',
>             },
>             topics=['mytopic'],
>             expansion_service='localhost:8097',
>
>         | "print" >> beam.Map(print)
>         )
>
>
I suspect DirectRunner in LOOPBACK mode might not be working for
cross-language transforms today. Please note that cross-language transforms
framework is fairly new [1] and we are adding support for various runners
and environment configurations.
Can you try with Flink in DOCKER mode ?


> If this is not the way we make beam and kafka communicate then please
> share a working example which showcases how a message published in kafka
> gets received by beam while streaming.
>

I'm adding an example but I've only tested this with Dataflow yet. I hope
to test that example for more runners and add additional instructions
there.
https://github.com/apache/beam/pull/12188

Thanks,
Cham

[1] https://beam.apache.org/roadmap/connectors-multi-sdk/

>
> Regards,
> Ayush Sharma
>
> On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <[email protected]>
> wrote:
>
>> Yes, seems like this is due to the key being null. XLang KafkaIO has to
>> be updated to support this. You should not run into this error if you
>> publish keys and values that are not null.
>>
>>
>>
>>
>> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <[email protected]> wrote:
>>
>>> +dev <[email protected]>
>>>
>>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <[email protected]> wrote:
>>>
>>>> +Heejong Lee <[email protected]> +Chamikara Jayalath
>>>> <[email protected]>
>>>>
>>>> Do you know if your trial record has an empty key or value?
>>>> If so, then you hit a bug and it seems as though there was a miss
>>>> supporting this usecase.
>>>>
>>>> Heejong and Cham,
>>>> It looks like the Javadoc for ByteArrayDeserializer and other
>>>> Deserializers can return null[1, 2] and we aren't using
>>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>>>> non XLang KafkaIO does this correctly in its regular coder inference
>>>> logic[4]. I flied BEAM-10529[5]
>>>>
>>>> 1:
>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>>>> 2:
>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>>>> 3:
>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>>>> 4:
>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>>>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>>>
>>>>
>>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am trying to build a streaming beam pipeline in python which should
>>>>> capture messages from kafka and then execute further stages of data
>>>>> fetching from other sources and aggregation. The step-by-step process of
>>>>> what I have built till now is:
>>>>>
>>>>>    1.
>>>>>
>>>>>    Running Kafka instance on localhost:9092
>>>>>
>>>>>    ./bin/kafka-server-start.sh ./config/server.properties
>>>>>    2.
>>>>>
>>>>>    Run beam-flink job server using docker
>>>>>
>>>>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>>>>    3.
>>>>>
>>>>>    Run beam-kafka pipeline
>>>>>
>>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import 
>>>>> ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options 
>>>>> import PipelineOptions, StandardOptions
>>>>>
>>>>> if __name__ == '__main__':
>>>>>     options = PipelineOptions([
>>>>>         "--job_endpoint=localhost:8099",
>>>>>         "--environment_type=LOOPBACK",
>>>>>         "--streaming",
>>>>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>>>     ])
>>>>>
>>>>>     options = options.view_as(StandardOptions)
>>>>>     options.streaming = True
>>>>>
>>>>>     pipeline = beam.Pipeline(options=options)
>>>>>
>>>>>     result = (
>>>>>         pipeline
>>>>>
>>>>>         | "Read from kafka" >> ReadFromKafka(
>>>>>             consumer_config={
>>>>>                 "bootstrap.servers": 'localhost:9092',
>>>>>             },
>>>>>             topics=['mytopic'],
>>>>>             expansion_service='localhost:8097',
>>>>>         )
>>>>>
>>>>>         | beam.Map(print)
>>>>>     )
>>>>>
>>>>>     pipeline.run()
>>>>>
>>>>>
>>>>>    1. Publish new message using kafka-producer.sh
>>>>>
>>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
>>>>> mytopic>tryme
>>>>>
>>>>> After publishing this trial message, the beam pipeline perceives the
>>>>> message but crashes giving this error:
>>>>>
>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: 
>>>>> org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>>>     at 
>>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>>     at 
>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>>>     at 
>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>>>     at 
>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>>>     at 
>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>>>     at 
>>>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>>>     at 
>>>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
>>>>>  Source)
>>>>>     at 
>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>>>     at 
>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>>>     at 
>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>>>     at org.apache.beam
>>>>>
>>>>> Regards,
>>>>>
>>>>> Ayush Sharma.
>>>>>
>>>>>

Reply via email to