Is there any workaround to this issue? On Mon, Jul 20, 2020 at 5:33 PM ayush sharma <[email protected]> wrote:
> Thank you for the suggestions. I tried using FlinkRunner and > setting environment_type either to DOCKER or LOOPBACK gives an error - > java.lang.UnsupportedOperationException: The ActiveBundle does not have a > registered bundle checkpoint handler. > > I found that this issue has been reported ( > https://issues.apache.org/jira/browse/BEAM-6868) and hence upvoting it. > Thank you for the prompt responses and looking forward to using this > feature in the future. > > Regards, > Ayush. > > On Sat, Jul 18, 2020 at 3:14 PM Chamikara Jayalath <[email protected]> > wrote: > >> >> >> On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <[email protected]> >> wrote: >> >>> Thank you guys for the reply. I am really stuck and could not proceed >>> further. >>> Yes, the previous trial published message had null key. >>> But when I send key:value pair through producer using >>> >>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic >>> mytopic --property *"parse.key=true" --property "key.separator=:"* >>> > tryKey:tryValue >>> >>> I do not get any error but beam does not print the received message. >>> Here is how my pipeline looks like, >>> result = ( >>> pipeline >>> >>> | "Read from kafka" >> ReadFromKafka( >>> consumer_config={ >>> "bootstrap.servers": 'localhost:9092', >>> }, >>> topics=['mytopic'], >>> expansion_service='localhost:8097', >>> >>> | "print" >> beam.Map(print) >>> ) >>> >>> >> I suspect DirectRunner in LOOPBACK mode might not be working for >> cross-language transforms today. Please note that cross-language transforms >> framework is fairly new [1] and we are adding support for various runners >> and environment configurations. >> Can you try with Flink in DOCKER mode ? >> >> >>> If this is not the way we make beam and kafka communicate then please >>> share a working example which showcases how a message published in kafka >>> gets received by beam while streaming. >>> >> >> I'm adding an example but I've only tested this with Dataflow yet. I hope >> to test that example for more runners and add additional instructions >> there. >> https://github.com/apache/beam/pull/12188 >> >> Thanks, >> Cham >> >> [1] https://beam.apache.org/roadmap/connectors-multi-sdk/ >> >>> >>> Regards, >>> Ayush Sharma >>> >>> On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath < >>> [email protected]> wrote: >>> >>>> Yes, seems like this is due to the key being null. XLang KafkaIO has to >>>> be updated to support this. You should not run into this error if you >>>> publish keys and values that are not null. >>>> >>>> >>>> >>>> >>>> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <[email protected]> wrote: >>>> >>>>> +dev <[email protected]> >>>>> >>>>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <[email protected]> wrote: >>>>> >>>>>> +Heejong Lee <[email protected]> +Chamikara Jayalath >>>>>> <[email protected]> >>>>>> >>>>>> Do you know if your trial record has an empty key or value? >>>>>> If so, then you hit a bug and it seems as though there was a miss >>>>>> supporting this usecase. >>>>>> >>>>>> Heejong and Cham, >>>>>> It looks like the Javadoc for ByteArrayDeserializer and other >>>>>> Deserializers can return null[1, 2] and we aren't using >>>>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the >>>>>> non XLang KafkaIO does this correctly in its regular coder inference >>>>>> logic[4]. I flied BEAM-10529[5] >>>>>> >>>>>> 1: >>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A- >>>>>> 2: >>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A- >>>>>> 3: >>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478 >>>>>> 4: >>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85 >>>>>> 5: https://issues.apache.org/jira/browse/BEAM-10529 >>>>>> >>>>>> >>>>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I am trying to build a streaming beam pipeline in python which >>>>>>> should capture messages from kafka and then execute further stages of >>>>>>> data >>>>>>> fetching from other sources and aggregation. The step-by-step process of >>>>>>> what I have built till now is: >>>>>>> >>>>>>> 1. >>>>>>> >>>>>>> Running Kafka instance on localhost:9092 >>>>>>> >>>>>>> ./bin/kafka-server-start.sh ./config/server.properties >>>>>>> 2. >>>>>>> >>>>>>> Run beam-flink job server using docker >>>>>>> >>>>>>> docker run --net=host apache/beam_flink1.10_job_server:latest >>>>>>> 3. >>>>>>> >>>>>>> Run beam-kafka pipeline >>>>>>> >>>>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import >>>>>>> ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options >>>>>>> import PipelineOptions, StandardOptions >>>>>>> >>>>>>> if __name__ == '__main__': >>>>>>> options = PipelineOptions([ >>>>>>> "--job_endpoint=localhost:8099", >>>>>>> "--environment_type=LOOPBACK", >>>>>>> "--streaming", >>>>>>> "--environment_config={\"command\":\"/opt/apache/beam/boot\"}", >>>>>>> ]) >>>>>>> >>>>>>> options = options.view_as(StandardOptions) >>>>>>> options.streaming = True >>>>>>> >>>>>>> pipeline = beam.Pipeline(options=options) >>>>>>> >>>>>>> result = ( >>>>>>> pipeline >>>>>>> >>>>>>> | "Read from kafka" >> ReadFromKafka( >>>>>>> consumer_config={ >>>>>>> "bootstrap.servers": 'localhost:9092', >>>>>>> }, >>>>>>> topics=['mytopic'], >>>>>>> expansion_service='localhost:8097', >>>>>>> ) >>>>>>> >>>>>>> | beam.Map(print) >>>>>>> ) >>>>>>> >>>>>>> pipeline.run() >>>>>>> >>>>>>> >>>>>>> 1. Publish new message using kafka-producer.sh >>>>>>> >>>>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic >>>>>>> mytopic>tryme >>>>>>> >>>>>>> After publishing this trial message, the beam pipeline perceives the >>>>>>> message but crashes giving this error: >>>>>>> >>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: >>>>>>> org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[] >>>>>>> at >>>>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) >>>>>>> at >>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014) >>>>>>> at >>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132) >>>>>>> at >>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483) >>>>>>> at >>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478) >>>>>>> at >>>>>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042) >>>>>>> at >>>>>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown >>>>>>> Source) >>>>>>> at >>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740) >>>>>>> at >>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132) >>>>>>> at >>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203) >>>>>>> at org.apache.beam >>>>>>> >>>>>>> Regards, >>>>>>> >>>>>>> Ayush Sharma. >>>>>>> >>>>>>>
