Léopold Boudard created BEAM-9656:
-------------------------------------
Summary: Reading from pubsub in portable FlinkRunner
Key: BEAM-9656
URL: https://issues.apache.org/jira/browse/BEAM-9656
Project: Beam
Issue Type: Bug
Components: runner-flink
Affects Versions: 2.19.0
Environment: flink 1.9
Reporter: Léopold Boudard
Hi,
I'm trying to get streaming with pubsub in flinkrunner working, though I get
following issue on a dummy test pipeline
{code:java}
java.lang.IllegalArgumentException: unable to deserialize
UnboundedSourcejava.lang.IllegalArgumentException: unable to deserialize
UnboundedSource at
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
at
org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
at
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
at
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
at
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
at
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
at
org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
at
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
at
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
{code}
{code:java}
options = PipelineOptions(pipeline_args)
with Pipeline(options=options) as p:
bounds_to_get = (
p | 'LoadJson' >> beam.io.ReadFromPubSub(
topic=known_args.input_topic
)
| beam.Map(lambda x: json.loads(x))
)
{code}
submitted on a flink cluster with following params:
{code:java}
GOOGLE_APPLICATION_CREDENTIALS=~/gcp/dataflow.json python -m
listing_beam_pipeline.test_runner --runner FlinkRunner --flink_master={}
--flink_version 1.9 --output gs://... --input_topic
projects/pubsub-public-data/topics/taxirides-realtime --streaming{code}
I've tried same on both DirectRunner and DataflowRunner and it seems to work. I
don't quite understand the underlying error on traceback.
Could you advise on this issue please?
Thanks!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)