[ 
https://issues.apache.org/jira/browse/BEAM-9656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17079185#comment-17079185
 ] 

Maximilian Michels commented on BEAM-9656:
------------------------------------------

I'm sorry you are experiencing problems. I think we are on the verge though of 
ironing out the last problems, so I think it is worth to stick around. We have 
other companies using the Flink portable runner successfully in production 
(e.g. Lyft) with huge data volume. Every use case is different though and it 
takes a couple of tries to catch all the production issues.

{quote}
I did succeed to have beam.io.fileio.WriteToFiles to work with your flag + 
fixed size window, though there's a lot of latency moving files to their final 
destination (I have a fixed window of 60 seconds and it finally moves files 
much much later, this is reduced with smaller windows) and it stages much stuff 
in temp files, but this works!
{quote}

Maybe WriteToFiles is not optimal for your use case. However, the result of the 
transform should be the same for all runners, since they share the same 
transform code.

{quote}
quite difficult to debug (cannot retrieve logs simply, on most crashes 
container restart and you can't properly retrieve exceptions in history) 
{quote}

Too bad you ran into this regression. I mentioned in [this 
comment|https://issues.apache.org/jira/browse/BEAM-9645?focusedCommentId=17074672&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17074672]
 that you could use the {{--retain_docker_images}} flag, did you try that?

{quote}
Since you mentions the impulse transform, I'm not sure of what it does in the 
process?
{quote}

In portability (=framework for Python and other languages), the impulse source 
is the bootstrapping transform which kicks off the pipeline execution. I've 
filed the issue BEAM-9733 which will be included in the 2.21.0 release.

> Reading from pubsub in portable FlinkRunner (ambigious ReadFromPubSub 
> transform)
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-9656
>                 URL: https://issues.apache.org/jira/browse/BEAM-9656
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.19.0
>         Environment: flink 1.9, beam-runners-flink-1.9-job-server-2.19.0.jar
>            Reporter: Léopold Boudard
>            Priority: Major
>         Attachments: Capture d’écran 2020-04-07 à 16.34.07.png
>
>
> Hi,
> I'm trying to get streaming with pubsub in flinkrunner working, though I get 
> following issue on a dummy test pipeline
> {code:java}
> java.lang.IllegalArgumentException: unable to deserialize 
> UnboundedSourcejava.lang.IllegalArgumentException: unable to deserialize 
> UnboundedSource at 
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>  at 
> org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
>  at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
>  at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
>  at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
>  at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
>  at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
>  at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
>  at 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
>  at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>  at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>  at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
>  at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>  at java.base/java.lang.Thread.run(Thread.java:844)Caused by: 
> java.io.IOException: FAILED_TO_UNCOMPRESS(5) at 
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) at 
> org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at 
> org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) at 
> org.xerial.snappy.Snappy.uncompress(Snappy.java:513) at 
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) at 
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) at 
> org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) at 
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
>  ... 14 moreERROR:root:java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> {code}
> {code:java}
> options = PipelineOptions(pipeline_args)
> with Pipeline(options=options) as p:
>     bounds_to_get = (
>             p | 'LoadJson' >> beam.io.ReadFromPubSub(
>                     topic=known_args.input_topic
>             )
>             | beam.Map(lambda x: json.loads(x))
>     )
> {code}
> submitted on a flink cluster with following params:
> {code:java}
> GOOGLE_APPLICATION_CREDENTIALS=~/gcp/dataflow.json python -m 
> listing_beam_pipeline.test_runner --runner FlinkRunner --flink_master={} 
> --flink_version 1.9  --output gs://... --input_topic 
> projects/pubsub-public-data/topics/taxirides-realtime  --streaming{code}
> I've tried same on both DirectRunner and DataflowRunner and it seems to work. 
> I don't quite understand the underlying error on traceback.
> Could you advise on this issue please?
> Thanks!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to