[
https://issues.apache.org/jira/browse/BEAM-9656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17076160#comment-17076160
]
Maximilian Michels commented on BEAM-9656:
------------------------------------------
You are right that starting a Python expansion server won't work because its
meant to expand external Java transforms to the actual Python transforms. With
the external PubSub source, we want to do the opposite; that's why we use the
Java expansion server.
It should not be necessary to start a separate container. The job server and
the expansion server are started when you submit your Python pipeline. You
should append the expansion port argument to your pipeline options. Let me know
if that works.
> Reading from pubsub in portable FlinkRunner (ambigious ReadFromPubSub
> transform)
> --------------------------------------------------------------------------------
>
> Key: BEAM-9656
> URL: https://issues.apache.org/jira/browse/BEAM-9656
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 2.19.0
> Environment: flink 1.9, beam-runners-flink-1.9-job-server-2.19.0.jar
> Reporter: Léopold Boudard
> Priority: Major
>
> Hi,
> I'm trying to get streaming with pubsub in flinkrunner working, though I get
> following issue on a dummy test pipeline
> {code:java}
> java.lang.IllegalArgumentException: unable to deserialize
> UnboundedSourcejava.lang.IllegalArgumentException: unable to deserialize
> UnboundedSource at
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
> at
> org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
> at
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
> at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
> at
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> at java.base/java.lang.Thread.run(Thread.java:844)Caused by:
> java.io.IOException: FAILED_TO_UNCOMPRESS(5) at
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) at
> org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at
> org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) at
> org.xerial.snappy.Snappy.uncompress(Snappy.java:513) at
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) at
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) at
> org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) at
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
> ... 14 moreERROR:root:java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> {code}
> {code:java}
> options = PipelineOptions(pipeline_args)
> with Pipeline(options=options) as p:
> bounds_to_get = (
> p | 'LoadJson' >> beam.io.ReadFromPubSub(
> topic=known_args.input_topic
> )
> | beam.Map(lambda x: json.loads(x))
> )
> {code}
> submitted on a flink cluster with following params:
> {code:java}
> GOOGLE_APPLICATION_CREDENTIALS=~/gcp/dataflow.json python -m
> listing_beam_pipeline.test_runner --runner FlinkRunner --flink_master={}
> --flink_version 1.9 --output gs://... --input_topic
> projects/pubsub-public-data/topics/taxirides-realtime --streaming{code}
> I've tried same on both DirectRunner and DataflowRunner and it seems to work.
> I don't quite understand the underlying error on traceback.
> Could you advise on this issue please?
> Thanks!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)