[
https://issues.apache.org/jira/browse/BEAM-9656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17077041#comment-17077041
]
Léopold Boudard commented on BEAM-9656:
---------------------------------------
Hi, [~mxm],
One last question, I've rebuilt job-server.jar with above modifications, though
whenever I try to submit a job using this jar, it attempts to start a container
from flink clusters that is tagged as apachebeam/java_sdk:2.19.0-SNAPSHOT and
hence fails.
I tried to customize image with --environment_config
eu.gcr.io/ma-dev2/benchmark_beam:latest, , as mentioned in
[https://beam.apache.org/documentation/runtime/environments/]
[https://beam.apache.org/documentation/runtime/sdk-harness-config/]
Though it doesn't seems to work. I've also attempted the
--{{}}{{worker_harness_container_image settings, since I figure it's rather the
layer that is responsible for portable fn_api communication (harness) that has
a separate container? Though it doesn't seem to do the trick either.}}
Could you advise on this last issue?
{{Thanks!}}
{{}}{{}}
> Reading from pubsub in portable FlinkRunner (ambigious ReadFromPubSub
> transform)
> --------------------------------------------------------------------------------
>
> Key: BEAM-9656
> URL: https://issues.apache.org/jira/browse/BEAM-9656
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 2.19.0
> Environment: flink 1.9, beam-runners-flink-1.9-job-server-2.19.0.jar
> Reporter: Léopold Boudard
> Priority: Major
>
> Hi,
> I'm trying to get streaming with pubsub in flinkrunner working, though I get
> following issue on a dummy test pipeline
> {code:java}
> java.lang.IllegalArgumentException: unable to deserialize
> UnboundedSourcejava.lang.IllegalArgumentException: unable to deserialize
> UnboundedSource at
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
> at
> org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
> at
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
> at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
> at
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> at java.base/java.lang.Thread.run(Thread.java:844)Caused by:
> java.io.IOException: FAILED_TO_UNCOMPRESS(5) at
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) at
> org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at
> org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) at
> org.xerial.snappy.Snappy.uncompress(Snappy.java:513) at
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) at
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) at
> org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) at
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
> ... 14 moreERROR:root:java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> {code}
> {code:java}
> options = PipelineOptions(pipeline_args)
> with Pipeline(options=options) as p:
> bounds_to_get = (
> p | 'LoadJson' >> beam.io.ReadFromPubSub(
> topic=known_args.input_topic
> )
> | beam.Map(lambda x: json.loads(x))
> )
> {code}
> submitted on a flink cluster with following params:
> {code:java}
> GOOGLE_APPLICATION_CREDENTIALS=~/gcp/dataflow.json python -m
> listing_beam_pipeline.test_runner --runner FlinkRunner --flink_master={}
> --flink_version 1.9 --output gs://... --input_topic
> projects/pubsub-public-data/topics/taxirides-realtime --streaming{code}
> I've tried same on both DirectRunner and DataflowRunner and it seems to work.
> I don't quite understand the underlying error on traceback.
> Could you advise on this issue please?
> Thanks!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)