[
https://issues.apache.org/jira/browse/BEAM-9656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17079131#comment-17079131
]
Léopold Boudard edited comment on BEAM-9656 at 4/9/20, 10:12 AM:
-----------------------------------------------------------------
Hi [~mxm], thanks for the advice
I did not get beam.io.WriteToText to work at all in streaming context.
I did succeed to have beam.io.fileio.WriteToFiles to work with your flag +
fixed size window, though there's a lot of latency moving files to their final
destination (I have a fixed window of 60 seconds and it finally moves files
much much later, this is reduced with smaller windows) and it stages much stuff
in temp files, but this works!
Also the issue I had mentioned in my second comment (could not import external
PubsubSource without starting expansion server beforehand) was still an issue
when I attempted to upgrade to 2.22 and passing proper flag. I think expansion
service doesn't start before pipeline.run() (which I guess happens on context
manager exit) is called, while I was initializing it on a pipeline context
manager and it seems to try to expand it directly hence failing.
{code:java}
with Pipeline(runner=runner, options=options) as p:
p | 'Read files from path' >> ReadFromPubSub{code}
On our end, we'll stop flink experimentations for now as we see that those are
not quite stable/predictable and quite difficult to debug (cannot retrieve logs
simply, on most crashes container restart and you can't properly retrieve
exceptions in history) compared to the current state of direct runner/dataflow
runner.
We can see that there seems to be not too many miles before this becomes more
usable in a flink context, and our experiments were rather positive for batch.
Hope we can help at some point in the development.
Since you mentions the impulse transform, I'm not sure of what it does in the
process?
Thanks!
was (Author: leopold.boudard):
Hi [~mxm], thanks for the advice
I did not get beam.io.WriteToText to work at all in streaming context.
I did succeed to have beam.io.fileio.WriteToFiles to work with your flag +
fixed size window (though had multiple issues without it, mainly trying to pass
custom naming function), there's a lot of latency moving files to their final
destination and it stages much stuff in temp files, but this works!
Also the issue I had mentioned in my second comment (could not import external
PubsubSource without starting expansion server beforehand) was still an issue
when I attempted to upgrade to 2.22 and passing proper flag. I think expansion
service doesn't start before pipeline.run() (which I guess happens on context
manager exit) is called, while I was initializing it on a pipeline context
manager and it seems to try to expand it directly hence failing.
{code:java}
with Pipeline(runner=runner, options=options) as p:
p | 'Read files from path' >> ReadFromPubSub{code}
On our end, we'll stop flink experimentations for now as we see that those are
not quite stable/predictable and quite difficult to debug (cannot retrieve logs
simply, on most crashes container restart and you can't properly retrieve
exceptions in history) compared to the current state of direct runner/dataflow
runner.
We can see that there seems to be not too many miles before this becomes more
usable in a flink context, and our experiments were rather positive for batch.
Hope we can help at some point in the development.
Since you mentions the impulse transform, I'm not sure of what it does in the
process?
Thanks!
> Reading from pubsub in portable FlinkRunner (ambigious ReadFromPubSub
> transform)
> --------------------------------------------------------------------------------
>
> Key: BEAM-9656
> URL: https://issues.apache.org/jira/browse/BEAM-9656
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 2.19.0
> Environment: flink 1.9, beam-runners-flink-1.9-job-server-2.19.0.jar
> Reporter: Léopold Boudard
> Priority: Major
> Attachments: Capture d’écran 2020-04-07 à 16.34.07.png
>
>
> Hi,
> I'm trying to get streaming with pubsub in flinkrunner working, though I get
> following issue on a dummy test pipeline
> {code:java}
> java.lang.IllegalArgumentException: unable to deserialize
> UnboundedSourcejava.lang.IllegalArgumentException: unable to deserialize
> UnboundedSource at
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
> at
> org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
> at
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
> at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
> at
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> at java.base/java.lang.Thread.run(Thread.java:844)Caused by:
> java.io.IOException: FAILED_TO_UNCOMPRESS(5) at
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) at
> org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at
> org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) at
> org.xerial.snappy.Snappy.uncompress(Snappy.java:513) at
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) at
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) at
> org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) at
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
> ... 14 moreERROR:root:java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> {code}
> {code:java}
> options = PipelineOptions(pipeline_args)
> with Pipeline(options=options) as p:
> bounds_to_get = (
> p | 'LoadJson' >> beam.io.ReadFromPubSub(
> topic=known_args.input_topic
> )
> | beam.Map(lambda x: json.loads(x))
> )
> {code}
> submitted on a flink cluster with following params:
> {code:java}
> GOOGLE_APPLICATION_CREDENTIALS=~/gcp/dataflow.json python -m
> listing_beam_pipeline.test_runner --runner FlinkRunner --flink_master={}
> --flink_version 1.9 --output gs://... --input_topic
> projects/pubsub-public-data/topics/taxirides-realtime --streaming{code}
> I've tried same on both DirectRunner and DataflowRunner and it seems to work.
> I don't quite understand the underlying error on traceback.
> Could you advise on this issue please?
> Thanks!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)