Léopold Boudard created BEAM-9656:
-------------------------------------

             Summary: Reading from pubsub in portable FlinkRunner
                 Key: BEAM-9656
                 URL: https://issues.apache.org/jira/browse/BEAM-9656
             Project: Beam
          Issue Type: Bug
          Components: runner-flink
    Affects Versions: 2.19.0
         Environment: flink 1.9
            Reporter: Léopold Boudard


Hi,

I'm trying to get streaming with pubsub in flinkrunner working, though I get 
following issue on a dummy test pipeline
{code:java}
java.lang.IllegalArgumentException: unable to deserialize 
UnboundedSourcejava.lang.IllegalArgumentException: unable to deserialize 
UnboundedSource at 
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
 at 
org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
 at 
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
 at 
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
 at 
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
 at 
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
 at 
org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
 at 
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
 at 
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
{code}
{code:java}
options = PipelineOptions(pipeline_args)
with Pipeline(options=options) as p:
    bounds_to_get = (
            p | 'LoadJson' >> beam.io.ReadFromPubSub(
                    topic=known_args.input_topic
            )
            | beam.Map(lambda x: json.loads(x))
    )

{code}
submitted on a flink cluster with following params:
{code:java}
GOOGLE_APPLICATION_CREDENTIALS=~/gcp/dataflow.json python -m 
listing_beam_pipeline.test_runner --runner FlinkRunner --flink_master={} 
--flink_version 1.9  --output gs://... --input_topic 
projects/pubsub-public-data/topics/taxirides-realtime  --streaming{code}
I've tried same on both DirectRunner and DataflowRunner and it seems to work. I 
don't quite understand the underlying error on traceback.

Could you advise on this issue please?

Thanks!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to