[ 
https://issues.apache.org/jira/browse/BEAM-9656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17079131#comment-17079131
 ] 

Léopold Boudard edited comment on BEAM-9656 at 4/9/20, 10:12 AM:
-----------------------------------------------------------------

Hi [~mxm], thanks for the advice

I did not get beam.io.WriteToText to work at all in streaming context.

I did succeed to have beam.io.fileio.WriteToFiles to work with your flag + 
fixed size window, though there's a lot of latency moving files to their final 
destination (I have a fixed window of 60 seconds and it finally moves files 
much much later, this is reduced with smaller windows) and it stages much stuff 
in temp files, but this works!

Also the issue I had mentioned in my second comment (could not import external 
PubsubSource without starting expansion server beforehand) was still an issue 
when I attempted to upgrade to 2.22 and passing proper flag. I think expansion 
service doesn't start before pipeline.run() (which I guess happens on context 
manager exit) is called, while I was initializing it on a pipeline context 
manager and it seems to try to expand it directly hence failing.

 
{code:java}
with Pipeline(runner=runner, options=options) as p:
     p | 'Read files from path' >> ReadFromPubSub{code}
 

On our end, we'll stop flink experimentations for now as we see that those are 
not quite stable/predictable and quite difficult to debug (cannot retrieve logs 
simply, on most crashes container restart and you can't properly retrieve 
exceptions in history) compared to the current state of direct runner/dataflow 
runner.

We can see that there seems to be not too many miles before this becomes more 
usable in a flink context, and our experiments were rather positive for batch. 
Hope we can help at some point in the development.

Since you mentions the impulse transform, I'm not sure of what it does in the 
process?

Thanks!


was (Author: leopold.boudard):
Hi [~mxm], thanks for the advice

I did not get beam.io.WriteToText to work at all in streaming context.

I did succeed to have beam.io.fileio.WriteToFiles to work with your flag + 
fixed size window (though had multiple issues without it, mainly trying to pass 
custom naming function), there's a lot of latency moving files to their final 
destination and it stages much stuff in temp files, but this works!

Also the issue I had mentioned in my second comment (could not import external 
PubsubSource without starting expansion server beforehand) was still an issue 
when I attempted to upgrade to 2.22 and passing proper flag. I think expansion 
service doesn't start before pipeline.run() (which I guess happens on context 
manager exit) is called, while I was initializing it on a pipeline context 
manager and it seems to try to expand it directly hence failing.

 
{code:java}
with Pipeline(runner=runner, options=options) as p:
     p | 'Read files from path' >> ReadFromPubSub{code}
 

On our end, we'll stop flink experimentations for now as we see that those are 
not quite stable/predictable and quite difficult to debug (cannot retrieve logs 
simply, on most crashes container restart and you can't properly retrieve 
exceptions in history) compared to the current state of direct runner/dataflow 
runner.

We can see that there seems to be not too many miles before this becomes more 
usable in a flink context, and our experiments were rather positive for batch. 
Hope we can help at some point in the development.

Since you mentions the impulse transform, I'm not sure of what it does in the 
process?

Thanks!

> Reading from pubsub in portable FlinkRunner (ambigious ReadFromPubSub 
> transform)
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-9656
>                 URL: https://issues.apache.org/jira/browse/BEAM-9656
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.19.0
>         Environment: flink 1.9, beam-runners-flink-1.9-job-server-2.19.0.jar
>            Reporter: Léopold Boudard
>            Priority: Major
>         Attachments: Capture d’écran 2020-04-07 à 16.34.07.png
>
>
> Hi,
> I'm trying to get streaming with pubsub in flinkrunner working, though I get 
> following issue on a dummy test pipeline
> {code:java}
> java.lang.IllegalArgumentException: unable to deserialize 
> UnboundedSourcejava.lang.IllegalArgumentException: unable to deserialize 
> UnboundedSource at 
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>  at 
> org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
>  at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
>  at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
>  at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
>  at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
>  at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
>  at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
>  at 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
>  at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>  at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>  at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
>  at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>  at java.base/java.lang.Thread.run(Thread.java:844)Caused by: 
> java.io.IOException: FAILED_TO_UNCOMPRESS(5) at 
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) at 
> org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at 
> org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) at 
> org.xerial.snappy.Snappy.uncompress(Snappy.java:513) at 
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) at 
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) at 
> org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) at 
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
>  ... 14 moreERROR:root:java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> {code}
> {code:java}
> options = PipelineOptions(pipeline_args)
> with Pipeline(options=options) as p:
>     bounds_to_get = (
>             p | 'LoadJson' >> beam.io.ReadFromPubSub(
>                     topic=known_args.input_topic
>             )
>             | beam.Map(lambda x: json.loads(x))
>     )
> {code}
> submitted on a flink cluster with following params:
> {code:java}
> GOOGLE_APPLICATION_CREDENTIALS=~/gcp/dataflow.json python -m 
> listing_beam_pipeline.test_runner --runner FlinkRunner --flink_master={} 
> --flink_version 1.9  --output gs://... --input_topic 
> projects/pubsub-public-data/topics/taxirides-realtime  --streaming{code}
> I've tried same on both DirectRunner and DataflowRunner and it seems to work. 
> I don't quite understand the underlying error on traceback.
> Could you advise on this issue please?
> Thanks!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to