[
https://issues.apache.org/jira/browse/BEAM-9656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074069#comment-17074069
]
Léopold Boudard commented on BEAM-9656:
---------------------------------------
Thanks [~mxm], though it's not yet working. From my understanding, it seems
that it needs expansion_service up (which I guess register given
transformation?) beforehand:
{code:java}
OOGLE_APPLICATION_CREDENTIALS=~/gcp/dataflow.json python -m
listing_beam_pipeline.test_runner --runner FlinkRunner
--flink_master=35.233.103.103:41695 --flink_version 1.9 --output gs://...
--input_topic ... --streaming
Traceback (most recent call last): File
"/Users/leopold/.pyenv/versions/3.6.0/lib/python3.6/runpy.py", line 193, in
_run_module_as_main "__main__", mod_spec) File
"/Users/leopold/.pyenv/versions/3.6.0/lib/python3.6/runpy.py", line 85, in
_run_code exec(code, run_globals) File
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/test_runner.py",
line 81, in <module> run() File
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/test_runner.py",
line 67, in run known_args.input_topic).with_output_types(bytes) File
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/transforms/ptransform.py",
line 989, in __ror__ return self.transform.__ror__(pvalueish, self.label)
File
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/transforms/ptransform.py",
line 549, in __ror__ result = p.apply(self, pvalueish, label) File
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/pipeline.py",
line 536, in apply return self.apply(transform, pvalueish) File
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/pipeline.py",
line 577, in apply pvalueish_result = self.runner.apply(transform,
pvalueish, self._options) File
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/runners/runner.py",
line 195, in apply return m(transform, input, options) File
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/runners/runner.py",
line 225, in apply_PTransform return transform.expand(input) File
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/io/external/gcp/pubsub.py",
line 101, in expand self.expansion_service)) File
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/pvalue.py",
line 135, in apply return self.pipeline.apply(*arglist, **kwargs) File
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/pipeline.py",
line 577, in apply pvalueish_result = self.runner.apply(transform,
pvalueish, self._options) File
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/runners/runner.py",
line 195, in apply return m(transform, input, options) File
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/runners/runner.py",
line 225, in apply_PTransform return transform.expand(input) File
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/transforms/external.py",
line 326, in expand channel).Expand(request) File
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/grpc/_channel.py",
line 826, in __call__ return _end_unary_response_blocking(state, call,
False, None) File
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/grpc/_channel.py",
line 729, in _end_unary_response_blocking raise
_InactiveRpcError(state)grpc._channel._InactiveRpcError: <_InactiveRpcError of
RPC that terminated with: status = StatusCode.UNAVAILABLE details = "failed to
connect to all addresses" debug_error_string =
"{"created":"@1585858798.118025000","description":"Failed to pick
subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3941,"referenced_errors":[{"created":"@1585858798.118023000","description":"failed
to connect to all
addresses","file":"src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc","file_line":393,"grpc_status":14}]}"
{code}
While it hasn't started job server nor expansion service at this point.
Expansion service seems to be started with job server later on with FlinkRunner
option.
Any flags I'd be missing?
I've checked some examples, though I'm not sure how I should start expension
service nor if I should
[https://github.com/apache/beam/blob/b91560cc354da471e3de502aad78dd059997a3d0/sdks/python/apache_beam/examples/wordcount_xlang.py]
Thanks for helping!
> Reading from pubsub in portable FlinkRunner (ambigious ReadFromPubSub
> transform)
> --------------------------------------------------------------------------------
>
> Key: BEAM-9656
> URL: https://issues.apache.org/jira/browse/BEAM-9656
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 2.19.0
> Environment: flink 1.9, beam-runners-flink-1.9-job-server-2.19.0.jar
> Reporter: Léopold Boudard
> Priority: Major
>
> Hi,
> I'm trying to get streaming with pubsub in flinkrunner working, though I get
> following issue on a dummy test pipeline
> {code:java}
> java.lang.IllegalArgumentException: unable to deserialize
> UnboundedSourcejava.lang.IllegalArgumentException: unable to deserialize
> UnboundedSource at
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
> at
> org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
> at
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
> at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
> at
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> at java.base/java.lang.Thread.run(Thread.java:844)Caused by:
> java.io.IOException: FAILED_TO_UNCOMPRESS(5) at
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) at
> org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at
> org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) at
> org.xerial.snappy.Snappy.uncompress(Snappy.java:513) at
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) at
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) at
> org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) at
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
> ... 14 moreERROR:root:java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> {code}
> {code:java}
> options = PipelineOptions(pipeline_args)
> with Pipeline(options=options) as p:
> bounds_to_get = (
> p | 'LoadJson' >> beam.io.ReadFromPubSub(
> topic=known_args.input_topic
> )
> | beam.Map(lambda x: json.loads(x))
> )
> {code}
> submitted on a flink cluster with following params:
> {code:java}
> GOOGLE_APPLICATION_CREDENTIALS=~/gcp/dataflow.json python -m
> listing_beam_pipeline.test_runner --runner FlinkRunner --flink_master={}
> --flink_version 1.9 --output gs://... --input_topic
> projects/pubsub-public-data/topics/taxirides-realtime --streaming{code}
> I've tried same on both DirectRunner and DataflowRunner and it seems to work.
> I don't quite understand the underlying error on traceback.
> Could you advise on this issue please?
> Thanks!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)