[ 
https://issues.apache.org/jira/browse/BEAM-9656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17079282#comment-17079282
 ] 

Léopold Boudard commented on BEAM-9656:
---------------------------------------

[~mxm], I did try the

retain_docker_images, but still same containers crash and restarted.

As for io, I've also tried externalWriteToPubsub, though also experienced 
issues:
{code:java}
java.lang.Error: Checkpointing failed because bundle failed to finalize.    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.snapshotState(DoFnOperator.java:779)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728)
    at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
    at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
    at 
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
    at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
    at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)    
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426) 
   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)    at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)    at 
java.lang.Thread.run(Thread.java:748)Caused by: java.lang.RuntimeException: 
Failed to finish remote bundle    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:701)
    at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:88)
    at 
org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:123)
    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:745)
    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.snapshotState(DoFnOperator.java:771)
    ... 19 moreCaused by: java.util.concurrent.ExecutionException: 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException: CANCELLED: 
cancelled before receiving half close    at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)    
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)    
at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)    at 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:345)
    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:698)
    ... 23 moreCaused by: 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException: CANCELLED: 
cancelled before receiving half close    at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Status.asRuntimeException(Status.java:524)
    at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:273)
    at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
    at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
    at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
    at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
    at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:337)
    at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:793)
    at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
   ... 1 more
{code}
 

And also pushing our data to GCS was our use case hence WriteToFiles was 
adapted.

I think there is an issue with the windowing and last groupby destination that 
move files from temp to their final location (as mentioned I could get it work 
only with very limited window, though it stage much more files than intended, 
this works well in directrunner/dataflow). Writing throughput in temp files 
seems okay, so I don't think it's a perf issue but rather some windowing logic 
that I dont get that is specific to Flink context.

 

This is good news to know that it's being used for production use case with 
last throughput, I'd recheck state of things in a few releases.

> Reading from pubsub in portable FlinkRunner (ambigious ReadFromPubSub 
> transform)
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-9656
>                 URL: https://issues.apache.org/jira/browse/BEAM-9656
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.19.0
>         Environment: flink 1.9, beam-runners-flink-1.9-job-server-2.19.0.jar
>            Reporter: Léopold Boudard
>            Priority: Major
>         Attachments: Capture d’écran 2020-04-07 à 16.34.07.png
>
>
> Hi,
> I'm trying to get streaming with pubsub in flinkrunner working, though I get 
> following issue on a dummy test pipeline
> {code:java}
> java.lang.IllegalArgumentException: unable to deserialize 
> UnboundedSourcejava.lang.IllegalArgumentException: unable to deserialize 
> UnboundedSource at 
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>  at 
> org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
>  at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
>  at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
>  at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
>  at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
>  at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
>  at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
>  at 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
>  at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>  at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>  at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
>  at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>  at java.base/java.lang.Thread.run(Thread.java:844)Caused by: 
> java.io.IOException: FAILED_TO_UNCOMPRESS(5) at 
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) at 
> org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at 
> org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) at 
> org.xerial.snappy.Snappy.uncompress(Snappy.java:513) at 
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) at 
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) at 
> org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) at 
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
>  ... 14 moreERROR:root:java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> {code}
> {code:java}
> options = PipelineOptions(pipeline_args)
> with Pipeline(options=options) as p:
>     bounds_to_get = (
>             p | 'LoadJson' >> beam.io.ReadFromPubSub(
>                     topic=known_args.input_topic
>             )
>             | beam.Map(lambda x: json.loads(x))
>     )
> {code}
> submitted on a flink cluster with following params:
> {code:java}
> GOOGLE_APPLICATION_CREDENTIALS=~/gcp/dataflow.json python -m 
> listing_beam_pipeline.test_runner --runner FlinkRunner --flink_master={} 
> --flink_version 1.9  --output gs://... --input_topic 
> projects/pubsub-public-data/topics/taxirides-realtime  --streaming{code}
> I've tried same on both DirectRunner and DataflowRunner and it seems to work. 
> I don't quite understand the underlying error on traceback.
> Could you advise on this issue please?
> Thanks!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to