+dev <[email protected]>

On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <[email protected]> wrote:

> +Heejong Lee <[email protected]> +Chamikara Jayalath
> <[email protected]>
>
> Do you know if your trial record has an empty key or value?
> If so, then you hit a bug and it seems as though there was a miss
> supporting this usecase.
>
> Heejong and Cham,
> It looks like the Javadoc for ByteArrayDeserializer and other
> Deserializers can return null[1, 2] and we aren't using
> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
> non XLang KafkaIO does this correctly in its regular coder inference
> logic[4]. I flied BEAM-10529[5]
>
> 1:
> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
> 2:
> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
> 3:
> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
> 4:
> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
> 5: https://issues.apache.org/jira/browse/BEAM-10529
>
>
> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <[email protected]> wrote:
>
>> Hi,
>>
>> I am trying to build a streaming beam pipeline in python which should
>> capture messages from kafka and then execute further stages of data
>> fetching from other sources and aggregation. The step-by-step process of
>> what I have built till now is:
>>
>>    1.
>>
>>    Running Kafka instance on localhost:9092
>>
>>    ./bin/kafka-server-start.sh ./config/server.properties
>>    2.
>>
>>    Run beam-flink job server using docker
>>
>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>    3.
>>
>>    Run beam-kafka pipeline
>>
>> import apache_beam as beamfrom apache_beam.io.external.kafka import 
>> ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import 
>> PipelineOptions, StandardOptions
>>
>> if __name__ == '__main__':
>>     options = PipelineOptions([
>>         "--job_endpoint=localhost:8099",
>>         "--environment_type=LOOPBACK",
>>         "--streaming",
>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>     ])
>>
>>     options = options.view_as(StandardOptions)
>>     options.streaming = True
>>
>>     pipeline = beam.Pipeline(options=options)
>>
>>     result = (
>>         pipeline
>>
>>         | "Read from kafka" >> ReadFromKafka(
>>             consumer_config={
>>                 "bootstrap.servers": 'localhost:9092',
>>             },
>>             topics=['mytopic'],
>>             expansion_service='localhost:8097',
>>         )
>>
>>         | beam.Map(print)
>>     )
>>
>>     pipeline.run()
>>
>>
>>    1. Publish new message using kafka-producer.sh
>>
>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
>> mytopic>tryme
>>
>> After publishing this trial message, the beam pipeline perceives the
>> message but crashes giving this error:
>>
>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: 
>> org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>     at 
>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>     at 
>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>     at 
>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>     at 
>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>     at 
>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>     at 
>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>     at 
>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
>>  Source)
>>     at 
>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>     at 
>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>     at 
>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>     at org.apache.beam
>>
>> Regards,
>>
>> Ayush Sharma.
>>
>>

Reply via email to