Hi everyone,

Last few days I have been trying to run a streaming pipeline (code on Github
<https://github.com/matthiasa4/beam-demo>) on a Flink Runner.

I am running a Flink cluster locally (v1.5.6
<https://flink.apache.org/downloads.html>)
I have built the SDK Harness Container: *./gradlew
:beam-sdks-python-container:docker*
and started the JobServer: *./gradlew
:beam-runners-flink_2.11-job-server:runShadow
-PflinkMasterUrl=localhost:8081.*

I run my pipeline with: *env/bin/python streaming_pipeline.py
--runner=PortableRunner --job_endpoint=localhost:8099 --output xxx
--input_subscription xxx --output_subscription xxx*

All this is running inside a Ubuntu (Bionic) in a Virtualbox.

The job submits fine, but unfortunately fails after a few seconds with the
error attached.

Anything I am missing or doing wrong?

Many thanks.
Best,
Matthias
TimerException{java.lang.RuntimeException: Failed to finish remote bundle}
        at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Failed to finish remote bundle
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:624)
        at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:87)
        at 
org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:679)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:673)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:378)
        at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
        ... 7 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: 
Error received from SDK harness for instruction 5: Traceback (most recent call 
last):
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 148, in _execute
    response = task()
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 183, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 256, in do_instruction
    request.instruction_id)
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 272, in process_bundle
    bundle_processor.process_bundle(instruction_id)
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 489, in process_bundle
    ].process_encoded(data.data)
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 126, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 182, in 
apache_beam.runners.worker.operations.Operation.output
    def output(self, windowed_value, output_index=0):
  File "apache_beam/runners/worker/operations.py", line 183, in 
apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 89, in 
apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 497, in 
apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 498, in 
apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 678, in 
apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 684, in 
apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 722, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise_with_traceback(new_exn)
  File "apache_beam/runners/common.py", line 682, in 
apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 419, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
    windowed_value, self.process_method(windowed_value.value))
  File 
"/home/matthias/Coding/GDE-demo-beam/env/local/lib/python2.7/site-packages/apache_beam/io/iobase.py",
 line 859, in split_source
AttributeError: '_PubSubSource' object has no attribute 'estimate_size' [while 
running 'Read from PubSub/Read/Split']

        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
        at 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:263)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:621)
        ... 13 more
Caused by: java.lang.RuntimeException: Error received from SDK harness for 
instruction 5: Traceback (most recent call last):
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 148, in _execute
    response = task()
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 183, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 256, in do_instruction
    request.instruction_id)
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 272, in process_bundle
    bundle_processor.process_bundle(instruction_id)
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 489, in process_bundle
    ].process_encoded(data.data)
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 126, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 182, in 
apache_beam.runners.worker.operations.Operation.output
    def output(self, windowed_value, output_index=0):
  File "apache_beam/runners/worker/operations.py", line 183, in 
apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 89, in 
apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 497, in 
apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 498, in 
apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 678, in 
apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 684, in 
apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 722, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise_with_traceback(new_exn)
  File "apache_beam/runners/common.py", line 682, in 
apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 419, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
    windowed_value, self.process_method(windowed_value.value))
  File 
"/home/matthias/Coding/GDE-demo-beam/env/local/lib/python2.7/site-packages/apache_beam/io/iobase.py",
 line 859, in split_source
AttributeError: '_PubSubSource' object has no attribute 'estimate_size' [while 
running 'Read from PubSub/Read/Split']

        at 
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
        at 
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140)
        at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
        at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
        at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
        at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
        at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
        at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        ... 3 more

Reply via email to