On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <[email protected]> wrote:
> Thank you guys for the reply. I am really stuck and could not proceed
> further.
> Yes, the previous trial published message had null key.
> But when I send key:value pair through producer using
>
> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> mytopic --property *"parse.key=true" --property "key.separator=:"*
> > tryKey:tryValue
>
> I do not get any error but beam does not print the received message. Here
> is how my pipeline looks like,
> result = (
> pipeline
>
> | "Read from kafka" >> ReadFromKafka(
> consumer_config={
> "bootstrap.servers": 'localhost:9092',
> },
> topics=['mytopic'],
> expansion_service='localhost:8097',
>
> | "print" >> beam.Map(print)
> )
>
>
I suspect DirectRunner in LOOPBACK mode might not be working for
cross-language transforms today. Please note that cross-language transforms
framework is fairly new [1] and we are adding support for various runners
and environment configurations.
Can you try with Flink in DOCKER mode ?
> If this is not the way we make beam and kafka communicate then please
> share a working example which showcases how a message published in kafka
> gets received by beam while streaming.
>
I'm adding an example but I've only tested this with Dataflow yet. I hope
to test that example for more runners and add additional instructions
there.
https://github.com/apache/beam/pull/12188
Thanks,
Cham
[1] https://beam.apache.org/roadmap/connectors-multi-sdk/
>
> Regards,
> Ayush Sharma
>
> On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <[email protected]>
> wrote:
>
>> Yes, seems like this is due to the key being null. XLang KafkaIO has to
>> be updated to support this. You should not run into this error if you
>> publish keys and values that are not null.
>>
>>
>>
>>
>> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <[email protected]> wrote:
>>
>>> +dev <[email protected]>
>>>
>>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <[email protected]> wrote:
>>>
>>>> +Heejong Lee <[email protected]> +Chamikara Jayalath
>>>> <[email protected]>
>>>>
>>>> Do you know if your trial record has an empty key or value?
>>>> If so, then you hit a bug and it seems as though there was a miss
>>>> supporting this usecase.
>>>>
>>>> Heejong and Cham,
>>>> It looks like the Javadoc for ByteArrayDeserializer and other
>>>> Deserializers can return null[1, 2] and we aren't using
>>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>>>> non XLang KafkaIO does this correctly in its regular coder inference
>>>> logic[4]. I flied BEAM-10529[5]
>>>>
>>>> 1:
>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>>>> 2:
>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>>>> 3:
>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>>>> 4:
>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>>>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>>>
>>>>
>>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am trying to build a streaming beam pipeline in python which should
>>>>> capture messages from kafka and then execute further stages of data
>>>>> fetching from other sources and aggregation. The step-by-step process of
>>>>> what I have built till now is:
>>>>>
>>>>> 1.
>>>>>
>>>>> Running Kafka instance on localhost:9092
>>>>>
>>>>> ./bin/kafka-server-start.sh ./config/server.properties
>>>>> 2.
>>>>>
>>>>> Run beam-flink job server using docker
>>>>>
>>>>> docker run --net=host apache/beam_flink1.10_job_server:latest
>>>>> 3.
>>>>>
>>>>> Run beam-kafka pipeline
>>>>>
>>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import
>>>>> ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options
>>>>> import PipelineOptions, StandardOptions
>>>>>
>>>>> if __name__ == '__main__':
>>>>> options = PipelineOptions([
>>>>> "--job_endpoint=localhost:8099",
>>>>> "--environment_type=LOOPBACK",
>>>>> "--streaming",
>>>>> "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>>> ])
>>>>>
>>>>> options = options.view_as(StandardOptions)
>>>>> options.streaming = True
>>>>>
>>>>> pipeline = beam.Pipeline(options=options)
>>>>>
>>>>> result = (
>>>>> pipeline
>>>>>
>>>>> | "Read from kafka" >> ReadFromKafka(
>>>>> consumer_config={
>>>>> "bootstrap.servers": 'localhost:9092',
>>>>> },
>>>>> topics=['mytopic'],
>>>>> expansion_service='localhost:8097',
>>>>> )
>>>>>
>>>>> | beam.Map(print)
>>>>> )
>>>>>
>>>>> pipeline.run()
>>>>>
>>>>>
>>>>> 1. Publish new message using kafka-producer.sh
>>>>>
>>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>>>>> mytopic>tryme
>>>>>
>>>>> After publishing this trial message, the beam pipeline perceives the
>>>>> message but crashes giving this error:
>>>>>
>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException:
>>>>> org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>>> at
>>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>> at
>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>>> at
>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>>> at
>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>>> at
>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>>> at
>>>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>>> at
>>>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
>>>>> Source)
>>>>> at
>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>>> at
>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>>> at
>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>>> at org.apache.beam
>>>>>
>>>>> Regards,
>>>>>
>>>>> Ayush Sharma.
>>>>>
>>>>>