[ 
https://issues.apache.org/jira/browse/BEAM-9002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17012243#comment-17012243
 ] 

Ankur Goenka commented on BEAM-9002:
------------------------------------

Based on this error stack, same Pcollection flatten is not working on dataflow

 
{
  job: "2020-01-09_13_25_08-2233620052523810338"   
  logger: 
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py:171"
   
  message: "Error processing instruction -364. Original traceback is
Traceback (most recent call last):
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
 line 165, in _execute
    response = task()
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
 line 221, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
 line 350, in do_instruction
    request.instruction_id)
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
 line 384, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
 line 811, in process_bundle
    data.transform_id].process_encoded(data.data)
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
 line 206, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 302, in 
apache_beam.runners.worker.operations.Operation.output
    def output(self, windowed_value, output_index=0):
  File "apache_beam/runners/worker/operations.py", line 304, in 
apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 178, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    self.consumer.process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 657, in 
apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 658, in 
apache_beam.runners.worker.operations.DoOperation.process
    delayed_application = self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 876, in 
apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 883, in 
apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 937, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 881, in 
apache_beam.runners.common.DoFnRunner.process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 670, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
    self._invoke_process_per_window(
  File "apache_beam/runners/common.py", line 748, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
    output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 984, in 
apache_beam.runners.common._OutputProcessor.process_outputs
    def process_outputs(self, windowed_input_element, results):
  File "apache_beam/runners/common.py", line 1024, in 
apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 178, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    self.consumer.process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 657, in 
apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 658, in 
apache_beam.runners.worker.operations.DoOperation.process
    delayed_application = self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 876, in 
apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 883, in 
apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 937, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 881, in 
apache_beam.runners.common.DoFnRunner.process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 496, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
    output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 1024, in 
apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 178, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    self.consumer.process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 657, in 
apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 658, in 
apache_beam.runners.worker.operations.DoOperation.process
    delayed_application = self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 876, in 
apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 883, in 
apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 952, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise_with_traceback(new_exn)
  File "apache_beam/runners/common.py", line 881, in 
apache_beam.runners.common.DoFnRunner.process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 497, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
    windowed_value, self.process_method(windowed_value.value))
  File 
"/usr/local/google/home/goenka/d/work/beam/beam/sdks/python/apache_beam/transforms/core.py",
 line 1435, in <lambda>
  File 
"/usr/local/google/home/goenka/d/work/beam/beam/sdks/python/apache_beam/testing/util.py",
 line 191, in _equal
BeamAssertException: Failed assert: ['a', 'b', 'a', 'b', 'a', 'b'] == ['b', 
'a'], missing elements ['a', 'b', 'a', 'b'] [while running 
'generatedPtransform-356']

> test_flatten_same_pcollections 
> (apache_beam.transforms.ptransform_test.PTransformTest) does not work in 
> Streaming VR suite on Dataflow
> --------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-9002
>                 URL: https://issues.apache.org/jira/browse/BEAM-9002
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Valentyn Tymofieiev
>            Assignee: Ankur Goenka
>            Priority: Major
>
> Per investigation in https://issues.apache.org/jira/browse/BEAM-8877, the 
> test times out and was recently added to VR test suite.
> [~liumomo315], I will sickbay this test for streaming, could you please help 
> triage the failure?
> Thank you!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to