[ 
https://issues.apache.org/jira/browse/BEAM-9656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074069#comment-17074069
 ] 

Léopold Boudard commented on BEAM-9656:
---------------------------------------

Thanks [~mxm], though it's not yet working. From my understanding, it seems 
that it needs expansion_service up (which I guess register given 
transformation?) beforehand:
{code:java}
OOGLE_APPLICATION_CREDENTIALS=~/gcp/dataflow.json python -m 
listing_beam_pipeline.test_runner --runner FlinkRunner 
--flink_master=35.233.103.103:41695 --flink_version 1.9  --output gs://... 
--input_topic ... --streaming

Traceback (most recent call last):  File 
"/Users/leopold/.pyenv/versions/3.6.0/lib/python3.6/runpy.py", line 193, in 
_run_module_as_main    "__main__", mod_spec)  File 
"/Users/leopold/.pyenv/versions/3.6.0/lib/python3.6/runpy.py", line 85, in 
_run_code    exec(code, run_globals)  File 
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/test_runner.py",
 line 81, in <module>    run()  File 
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/test_runner.py",
 line 67, in run    known_args.input_topic).with_output_types(bytes)  File 
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/transforms/ptransform.py",
 line 989, in __ror__    return self.transform.__ror__(pvalueish, self.label)  
File 
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/transforms/ptransform.py",
 line 549, in __ror__    result = p.apply(self, pvalueish, label)  File 
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/pipeline.py",
 line 536, in apply    return self.apply(transform, pvalueish)  File 
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/pipeline.py",
 line 577, in apply    pvalueish_result = self.runner.apply(transform, 
pvalueish, self._options)  File 
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/runners/runner.py",
 line 195, in apply    return m(transform, input, options)  File 
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/runners/runner.py",
 line 225, in apply_PTransform    return transform.expand(input)  File 
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/io/external/gcp/pubsub.py",
 line 101, in expand    self.expansion_service))  File 
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/pvalue.py",
 line 135, in apply    return self.pipeline.apply(*arglist, **kwargs)  File 
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/pipeline.py",
 line 577, in apply    pvalueish_result = self.runner.apply(transform, 
pvalueish, self._options)  File 
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/runners/runner.py",
 line 195, in apply    return m(transform, input, options)  File 
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/runners/runner.py",
 line 225, in apply_PTransform    return transform.expand(input)  File 
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/transforms/external.py",
 line 326, in expand    channel).Expand(request)  File 
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/grpc/_channel.py",
 line 826, in __call__    return _end_unary_response_blocking(state, call, 
False, None)  File 
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/grpc/_channel.py",
 line 729, in _end_unary_response_blocking    raise 
_InactiveRpcError(state)grpc._channel._InactiveRpcError: <_InactiveRpcError of 
RPC that terminated with: status = StatusCode.UNAVAILABLE details = "failed to 
connect to all addresses" debug_error_string = 
"{"created":"@1585858798.118025000","description":"Failed to pick 
subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3941,"referenced_errors":[{"created":"@1585858798.118023000","description":"failed
 to connect to all 
addresses","file":"src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc","file_line":393,"grpc_status":14}]}"
{code}
 

While it hasn't started job server nor expansion service at this point.

Expansion service seems to be started with job server later on with FlinkRunner 
option.

Any flags I'd be missing?

I've checked some examples, though I'm not sure how I should start expension 
service nor if I should

[https://github.com/apache/beam/blob/b91560cc354da471e3de502aad78dd059997a3d0/sdks/python/apache_beam/examples/wordcount_xlang.py]

 

Thanks for helping!

 

> Reading from pubsub in portable FlinkRunner (ambigious ReadFromPubSub 
> transform)
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-9656
>                 URL: https://issues.apache.org/jira/browse/BEAM-9656
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.19.0
>         Environment: flink 1.9, beam-runners-flink-1.9-job-server-2.19.0.jar
>            Reporter: Léopold Boudard
>            Priority: Major
>
> Hi,
> I'm trying to get streaming with pubsub in flinkrunner working, though I get 
> following issue on a dummy test pipeline
> {code:java}
> java.lang.IllegalArgumentException: unable to deserialize 
> UnboundedSourcejava.lang.IllegalArgumentException: unable to deserialize 
> UnboundedSource at 
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>  at 
> org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
>  at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
>  at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
>  at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
>  at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
>  at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
>  at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
>  at 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
>  at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>  at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>  at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
>  at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>  at java.base/java.lang.Thread.run(Thread.java:844)Caused by: 
> java.io.IOException: FAILED_TO_UNCOMPRESS(5) at 
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) at 
> org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at 
> org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) at 
> org.xerial.snappy.Snappy.uncompress(Snappy.java:513) at 
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) at 
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) at 
> org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) at 
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
>  ... 14 moreERROR:root:java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> {code}
> {code:java}
> options = PipelineOptions(pipeline_args)
> with Pipeline(options=options) as p:
>     bounds_to_get = (
>             p | 'LoadJson' >> beam.io.ReadFromPubSub(
>                     topic=known_args.input_topic
>             )
>             | beam.Map(lambda x: json.loads(x))
>     )
> {code}
> submitted on a flink cluster with following params:
> {code:java}
> GOOGLE_APPLICATION_CREDENTIALS=~/gcp/dataflow.json python -m 
> listing_beam_pipeline.test_runner --runner FlinkRunner --flink_master={} 
> --flink_version 1.9  --output gs://... --input_topic 
> projects/pubsub-public-data/topics/taxirides-realtime  --streaming{code}
> I've tried same on both DirectRunner and DataflowRunner and it seems to work. 
> I don't quite understand the underlying error on traceback.
> Could you advise on this issue please?
> Thanks!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to