[
https://issues.apache.org/jira/browse/BEAM-9656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17077305#comment-17077305
]
Léopold Boudard commented on BEAM-9656:
---------------------------------------
[~mxm], it did work indeed once customizing the image, thanks for your advices.
Pubsub reading seems to work \o/! Though I'm now having an issue writing :(.
I've tried this very simple pipeline
{code:java}
p | 'Read files from path' >> ReadFromPubSub(
#known_args.input_topic,
subscription=known_args.input_subscription,
expansion_service='localhost:8097',
with_attributes=False
)
| 'parse' >> beam.Map(json_decode)
| 'encode' >> beam.Map(json_encode)
| 'write to file' >> WriteToText(known_args.output)
{code}
And (or even without the encode/decode), though it never writes anything, seems
to be stucked at write to file/Write/WriteImpl/\{WriteBundles, Pair,
WindowInto(WindowIntoFn)} -> ToKeyedWorkItem (I did try to impose a global
window, though without success).
>From what I understand it is keyed/windowed by source impulse trigger? I can
>see some emissions from impulse though it never seems to write in the end.
Also we had some containers sometimes crashing directly on errors without any
error reporting which made debugging difficult.
I understand that all this part is still in progress and experimental, though
we were benchmarking some solutions to mirgate our python streaming pipelines
and we were quite appealed by the beam/flink combined features, and our
experimentations on batch mode have been quite successful till we got to try
streaming mode :P.
Thanks for all your help on this and work on this project!
!Capture d’écran 2020-04-07 à 16.34.07.png!
> Reading from pubsub in portable FlinkRunner (ambigious ReadFromPubSub
> transform)
> --------------------------------------------------------------------------------
>
> Key: BEAM-9656
> URL: https://issues.apache.org/jira/browse/BEAM-9656
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 2.19.0
> Environment: flink 1.9, beam-runners-flink-1.9-job-server-2.19.0.jar
> Reporter: Léopold Boudard
> Priority: Major
> Attachments: Capture d’écran 2020-04-07 à 16.34.07.png
>
>
> Hi,
> I'm trying to get streaming with pubsub in flinkrunner working, though I get
> following issue on a dummy test pipeline
> {code:java}
> java.lang.IllegalArgumentException: unable to deserialize
> UnboundedSourcejava.lang.IllegalArgumentException: unable to deserialize
> UnboundedSource at
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
> at
> org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
> at
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
> at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
> at
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> at java.base/java.lang.Thread.run(Thread.java:844)Caused by:
> java.io.IOException: FAILED_TO_UNCOMPRESS(5) at
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) at
> org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at
> org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) at
> org.xerial.snappy.Snappy.uncompress(Snappy.java:513) at
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) at
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) at
> org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) at
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
> ... 14 moreERROR:root:java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> {code}
> {code:java}
> options = PipelineOptions(pipeline_args)
> with Pipeline(options=options) as p:
> bounds_to_get = (
> p | 'LoadJson' >> beam.io.ReadFromPubSub(
> topic=known_args.input_topic
> )
> | beam.Map(lambda x: json.loads(x))
> )
> {code}
> submitted on a flink cluster with following params:
> {code:java}
> GOOGLE_APPLICATION_CREDENTIALS=~/gcp/dataflow.json python -m
> listing_beam_pipeline.test_runner --runner FlinkRunner --flink_master={}
> --flink_version 1.9 --output gs://... --input_topic
> projects/pubsub-public-data/topics/taxirides-realtime --streaming{code}
> I've tried same on both DirectRunner and DataflowRunner and it seems to work.
> I don't quite understand the underlying error on traceback.
> Could you advise on this issue please?
> Thanks!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)