+dev <[email protected]> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <[email protected]> wrote:
> +Heejong Lee <[email protected]> +Chamikara Jayalath > <[email protected]> > > Do you know if your trial record has an empty key or value? > If so, then you hit a bug and it seems as though there was a miss > supporting this usecase. > > Heejong and Cham, > It looks like the Javadoc for ByteArrayDeserializer and other > Deserializers can return null[1, 2] and we aren't using > NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the > non XLang KafkaIO does this correctly in its regular coder inference > logic[4]. I flied BEAM-10529[5] > > 1: > https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A- > 2: > https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A- > 3: > https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478 > 4: > https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85 > 5: https://issues.apache.org/jira/browse/BEAM-10529 > > > On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <[email protected]> wrote: > >> Hi, >> >> I am trying to build a streaming beam pipeline in python which should >> capture messages from kafka and then execute further stages of data >> fetching from other sources and aggregation. The step-by-step process of >> what I have built till now is: >> >> 1. >> >> Running Kafka instance on localhost:9092 >> >> ./bin/kafka-server-start.sh ./config/server.properties >> 2. >> >> Run beam-flink job server using docker >> >> docker run --net=host apache/beam_flink1.10_job_server:latest >> 3. >> >> Run beam-kafka pipeline >> >> import apache_beam as beamfrom apache_beam.io.external.kafka import >> ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import >> PipelineOptions, StandardOptions >> >> if __name__ == '__main__': >> options = PipelineOptions([ >> "--job_endpoint=localhost:8099", >> "--environment_type=LOOPBACK", >> "--streaming", >> "--environment_config={\"command\":\"/opt/apache/beam/boot\"}", >> ]) >> >> options = options.view_as(StandardOptions) >> options.streaming = True >> >> pipeline = beam.Pipeline(options=options) >> >> result = ( >> pipeline >> >> | "Read from kafka" >> ReadFromKafka( >> consumer_config={ >> "bootstrap.servers": 'localhost:9092', >> }, >> topics=['mytopic'], >> expansion_service='localhost:8097', >> ) >> >> | beam.Map(print) >> ) >> >> pipeline.run() >> >> >> 1. Publish new message using kafka-producer.sh >> >> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic >> mytopic>tryme >> >> After publishing this trial message, the beam pipeline perceives the >> message but crashes giving this error: >> >> RuntimeError: org.apache.beam.sdk.util.UserCodeException: >> org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[] >> at >> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478) >> at >> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042) >> at >> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown >> Source) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203) >> at org.apache.beam >> >> Regards, >> >> Ayush Sharma. >> >>
