[
https://issues.apache.org/jira/browse/BEAM-9656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17548511#comment-17548511
]
Danny McCormick commented on BEAM-9656:
---------------------------------------
This issue has been migrated to https://github.com/apache/beam/issues/20143
> Reading from pubsub in portable FlinkRunner (ambigious ReadFromPubSub
> transform)
> --------------------------------------------------------------------------------
>
> Key: BEAM-9656
> URL: https://issues.apache.org/jira/browse/BEAM-9656
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 2.19.0
> Environment: flink 1.9, beam-runners-flink-1.9-job-server-2.19.0.jar
> Reporter: Léopold Boudard
> Priority: P3
> Attachments: Capture d’écran 2020-04-07 à 16.34.07.png
>
>
> Hi,
> I'm trying to get streaming with pubsub in flinkrunner working, though I get
> following issue on a dummy test pipeline
> {code:java}
> java.lang.IllegalArgumentException: unable to deserialize
> UnboundedSourcejava.lang.IllegalArgumentException: unable to deserialize
> UnboundedSource at
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
> at
> org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
> at
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
> at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
> at
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> at java.base/java.lang.Thread.run(Thread.java:844)Caused by:
> java.io.IOException: FAILED_TO_UNCOMPRESS(5) at
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) at
> org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at
> org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) at
> org.xerial.snappy.Snappy.uncompress(Snappy.java:513) at
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) at
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) at
> org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) at
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
> ... 14 moreERROR:root:java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> {code}
> {code:java}
> options = PipelineOptions(pipeline_args)
> with Pipeline(options=options) as p:
> bounds_to_get = (
> p | 'LoadJson' >> beam.io.ReadFromPubSub(
> topic=known_args.input_topic
> )
> | beam.Map(lambda x: json.loads(x))
> )
> {code}
> submitted on a flink cluster with following params:
> {code:java}
> GOOGLE_APPLICATION_CREDENTIALS=~/gcp/dataflow.json python -m
> listing_beam_pipeline.test_runner --runner FlinkRunner --flink_master={}
> --flink_version 1.9 --output gs://... --input_topic
> projects/pubsub-public-data/topics/taxirides-realtime --streaming{code}
> I've tried same on both DirectRunner and DataflowRunner and it seems to work.
> I don't quite understand the underlying error on traceback.
> Could you advise on this issue please?
> Thanks!
--
This message was sent by Atlassian Jira
(v8.20.7#820007)