Raj Subramani created BEAM-6580:
-----------------------------------

             Summary: Beam 2.10.0 RC1
                 Key: BEAM-6580
                 URL: https://issues.apache.org/jira/browse/BEAM-6580
             Project: Beam
          Issue Type: Test
          Components: io-ideas
    Affects Versions: 2.10.0
            Reporter: Raj Subramani
            Assignee: Eugene Kirpichov


I was testing beam.io.WriteToText on RC1 of 2.10
This addresses the writing to CMEK GCS buckets (which currently fails due to 
failure in the copy process from staging to CMEK bucket - remedy was to change 
"copy" to "rewrite")

As per this commit changes:

[https://github.com/apache/beam/commit/e6e85edbeade3a4c038aca85821d2b265ac33909#diff-246c5b6386cc533fbabca3b397ca3c17]

StorageObjectsCopyRequest

was being replaced with

StorageObjectsRewriteRequest

 

I downloaded RC1 of 2.10 and tried the following code:

def runPipeline(self):
 options = 
pipeline.PipelineOptions(self.__pipelineargs).view_as(TestPipelineOptions)
   with (beam.Pipeline(options=options)) as p:
       textPColl = p | \
             'read text from GCS bucket' >> 
beam.io.ReadFromText(options.testfile)
       textPColl | 'write to bucket' >> beam.io.WriteToText(options.outputfile)

But got the following exception

INFO:root:2019-02-04T09:17:13.452Z: JOB_MESSAGE_ERROR: Traceback (most recent 
call last):
 File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", 
line 642, in do_work
 work_executor.execute()
 File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
line 172, in execute
 op.start()
 File "dataflow_worker/native_operations.py", line 38, in 
dataflow_worker.native_operations.NativeReadOperation.start
 def start(self):
 File "dataflow_worker/native_operations.py", line 39, in 
dataflow_worker.native_operations.NativeReadOperation.start
 with self.scoped_start_state:
 File "dataflow_worker/native_operations.py", line 44, in 
dataflow_worker.native_operations.NativeReadOperation.start
 with self.spec.source.reader() as reader:
 File "dataflow_worker/native_operations.py", line 54, in 
dataflow_worker.native_operations.NativeReadOperation.start
 self.output(windowed_value)
 File "apache_beam/runners/worker/operations.py", line 183, in 
apache_beam.runners.worker.operations.Operation.output
 cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
 File "apache_beam/runners/worker/operations.py", line 89, in 
apache_beam.runners.worker.operations.ConsumerSet.receive
 cython.cast(Operation, consumer).process(windowed_value)
 File "apache_beam/runners/worker/operations.py", line 497, in 
apache_beam.runners.worker.operations.DoOperation.process
 with self.scoped_process_state:
 File "apache_beam/runners/worker/operations.py", line 498, in 
apache_beam.runners.worker.operations.DoOperation.process
 self.dofn_receiver.receive(o)
 File "apache_beam/runners/common.py", line 680, in 
apache_beam.runners.common.DoFnRunner.receive
 self.process(windowed_value)
 File "apache_beam/runners/common.py", line 686, in 
apache_beam.runners.common.DoFnRunner.process
 self._reraise_augmented(exn)
 File "apache_beam/runners/common.py", line 724, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
 raise_with_traceback(new_exn)
 File "apache_beam/runners/common.py", line 684, in 
apache_beam.runners.common.DoFnRunner.process
 self.do_fn_invoker.invoke_process(windowed_value)
 File "apache_beam/runners/common.py", line 535, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
 self._invoke_per_window(
 File "apache_beam/runners/common.py", line 604, in 
apache_beam.runners.common.PerWindowInvoker._invoke_per_window
 output_processor.process_outputs(
 File "apache_beam/runners/common.py", line 755, in 
apache_beam.runners.common._OutputProcessor.process_outputs
 def process_outputs(self, windowed_input_element, results):
 File "apache_beam/runners/common.py", line 770, in 
apache_beam.runners.common._OutputProcessor.process_outputs
 for result in results:
 File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", line 
1077, in <genexpr>
 window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs)
 File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
line 321, in finalize_write
 'Encountered exceptions in finalize_write: %s' % all_exceptions)
Exception: Encountered exceptions in finalize_write: [HttpBadRequestError(), 
HttpBadRequestError(), HttpBadRequestError()] [while running 'write to 
bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite']

INFO:root:2019-02-04T09:17:18.376Z: JOB_MESSAGE_ERROR: Traceback (most recent 
call last):
 File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", 
line 642, in do_work
 work_executor.execute()
 File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
line 172, in execute
 op.start()
 File "dataflow_worker/native_operations.py", line 38, in 
dataflow_worker.native_operations.NativeReadOperation.start
 def start(self):
 File "dataflow_worker/native_operations.py", line 39, in 
dataflow_worker.native_operations.NativeReadOperation.start
 with self.scoped_start_state:
 File "dataflow_worker/native_operations.py", line 44, in 
dataflow_worker.native_operations.NativeReadOperation.start
 with self.spec.source.reader() as reader:
 File "dataflow_worker/native_operations.py", line 54, in 
dataflow_worker.native_operations.NativeReadOperation.start
 self.output(windowed_value)
 File "apache_beam/runners/worker/operations.py", line 183, in 
apache_beam.runners.worker.operations.Operation.output
 cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
 File "apache_beam/runners/worker/operations.py", line 89, in 
apache_beam.runners.worker.operations.ConsumerSet.receive
 cython.cast(Operation, consumer).process(windowed_value)
 File "apache_beam/runners/worker/operations.py", line 497, in 
apache_beam.runners.worker.operations.DoOperation.process
 with self.scoped_process_state:
 File "apache_beam/runners/worker/operations.py", line 498, in 
apache_beam.runners.worker.operations.DoOperation.process
 self.dofn_receiver.receive(o)
 File "apache_beam/runners/common.py", line 680, in 
apache_beam.runners.common.DoFnRunner.receive
 self.process(windowed_value)
 File "apache_beam/runners/common.py", line 686, in 
apache_beam.runners.common.DoFnRunner.process
 self._reraise_augmented(exn)
 File "apache_beam/runners/common.py", line 724, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
 raise_with_traceback(new_exn)
 File "apache_beam/runners/common.py", line 684, in 
apache_beam.runners.common.DoFnRunner.process
 self.do_fn_invoker.invoke_process(windowed_value)
 File "apache_beam/runners/common.py", line 535, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
 self._invoke_per_window(
 File "apache_beam/runners/common.py", line 604, in 
apache_beam.runners.common.PerWindowInvoker._invoke_per_window
 output_processor.process_outputs(
 File "apache_beam/runners/common.py", line 755, in 
apache_beam.runners.common._OutputProcessor.process_outputs
 def process_outputs(self, windowed_input_element, results):
 File "apache_beam/runners/common.py", line 770, in 
apache_beam.runners.common._OutputProcessor.process_outputs
 for result in results:
 File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", line 
1077, in <genexpr>
 window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs)
 File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
line 321, in finalize_write
 'Encountered exceptions in finalize_write: %s' % all_exceptions)
Exception: Encountered exceptions in finalize_write: [HttpBadRequestError(), 
HttpBadRequestError(), HttpBadRequestError()] [while running 'write to 
bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite']

INFO:root:2019-02-04T09:17:23.304Z: JOB_MESSAGE_ERROR: Traceback (most recent 
call last):
 File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", 
line 642, in do_work
 work_executor.execute()
 File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
line 172, in execute
 op.start()
 File "dataflow_worker/native_operations.py", line 38, in 
dataflow_worker.native_operations.NativeReadOperation.start
 def start(self):
 File "dataflow_worker/native_operations.py", line 39, in 
dataflow_worker.native_operations.NativeReadOperation.start
 with self.scoped_start_state:
 File "dataflow_worker/native_operations.py", line 44, in 
dataflow_worker.native_operations.NativeReadOperation.start
 with self.spec.source.reader() as reader:
 File "dataflow_worker/native_operations.py", line 54, in 
dataflow_worker.native_operations.NativeReadOperation.start
 self.output(windowed_value)
 File "apache_beam/runners/worker/operations.py", line 183, in 
apache_beam.runners.worker.operations.Operation.output
 cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
 File "apache_beam/runners/worker/operations.py", line 89, in 
apache_beam.runners.worker.operations.ConsumerSet.receive
 cython.cast(Operation, consumer).process(windowed_value)
 File "apache_beam/runners/worker/operations.py", line 497, in 
apache_beam.runners.worker.operations.DoOperation.process
 with self.scoped_process_state:
 File "apache_beam/runners/worker/operations.py", line 498, in 
apache_beam.runners.worker.operations.DoOperation.process
 self.dofn_receiver.receive(o)
 File "apache_beam/runners/common.py", line 680, in 
apache_beam.runners.common.DoFnRunner.receive
 self.process(windowed_value)
 File "apache_beam/runners/common.py", line 686, in 
apache_beam.runners.common.DoFnRunner.process
 self._reraise_augmented(exn)
 File "apache_beam/runners/common.py", line 724, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
 raise_with_traceback(new_exn)
 File "apache_beam/runners/common.py", line 684, in 
apache_beam.runners.common.DoFnRunner.process
 self.do_fn_invoker.invoke_process(windowed_value)
 File "apache_beam/runners/common.py", line 535, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
 self._invoke_per_window(
 File "apache_beam/runners/common.py", line 604, in 
apache_beam.runners.common.PerWindowInvoker._invoke_per_window
 output_processor.process_outputs(
 File "apache_beam/runners/common.py", line 755, in 
apache_beam.runners.common._OutputProcessor.process_outputs
 def process_outputs(self, windowed_input_element, results):
 File "apache_beam/runners/common.py", line 770, in 
apache_beam.runners.common._OutputProcessor.process_outputs
 for result in results:
 File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", line 
1077, in <genexpr>
 window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs)
 File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
line 321, in finalize_write
 'Encountered exceptions in finalize_write: %s' % all_exceptions)
Exception: Encountered exceptions in finalize_write: [HttpBadRequestError(), 
HttpBadRequestError(), HttpBadRequestError()] [while running 'write to 
bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite']

INFO:root:2019-02-04T09:17:28.056Z: JOB_MESSAGE_ERROR: Traceback (most recent 
call last):
 File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", 
line 642, in do_work
 work_executor.execute()
 File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
line 172, in execute
 op.start()
 File "dataflow_worker/native_operations.py", line 38, in 
dataflow_worker.native_operations.NativeReadOperation.start
 def start(self):
 File "dataflow_worker/native_operations.py", line 39, in 
dataflow_worker.native_operations.NativeReadOperation.start
 with self.scoped_start_state:
 File "dataflow_worker/native_operations.py", line 44, in 
dataflow_worker.native_operations.NativeReadOperation.start
 with self.spec.source.reader() as reader:
 File "dataflow_worker/native_operations.py", line 54, in 
dataflow_worker.native_operations.NativeReadOperation.start
 self.output(windowed_value)
 File "apache_beam/runners/worker/operations.py", line 183, in 
apache_beam.runners.worker.operations.Operation.output
 cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
 File "apache_beam/runners/worker/operations.py", line 89, in 
apache_beam.runners.worker.operations.ConsumerSet.receive
 cython.cast(Operation, consumer).process(windowed_value)
 File "apache_beam/runners/worker/operations.py", line 497, in 
apache_beam.runners.worker.operations.DoOperation.process
 with self.scoped_process_state:
 File "apache_beam/runners/worker/operations.py", line 498, in 
apache_beam.runners.worker.operations.DoOperation.process
 self.dofn_receiver.receive(o)
 File "apache_beam/runners/common.py", line 680, in 
apache_beam.runners.common.DoFnRunner.receive
 self.process(windowed_value)
 File "apache_beam/runners/common.py", line 686, in 
apache_beam.runners.common.DoFnRunner.process
 self._reraise_augmented(exn)
 File "apache_beam/runners/common.py", line 724, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
 raise_with_traceback(new_exn)
 File "apache_beam/runners/common.py", line 684, in 
apache_beam.runners.common.DoFnRunner.process
 self.do_fn_invoker.invoke_process(windowed_value)
 File "apache_beam/runners/common.py", line 535, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
 self._invoke_per_window(
 File "apache_beam/runners/common.py", line 604, in 
apache_beam.runners.common.PerWindowInvoker._invoke_per_window
 output_processor.process_outputs(
 File "apache_beam/runners/common.py", line 755, in 
apache_beam.runners.common._OutputProcessor.process_outputs
 def process_outputs(self, windowed_input_element, results):
 File "apache_beam/runners/common.py", line 770, in 
apache_beam.runners.common._OutputProcessor.process_outputs
 for result in results:
 File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", line 
1077, in <genexpr>
 window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs)
 File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
line 321, in finalize_write
 'Encountered exceptions in finalize_write: %s' % all_exceptions)
Exception: Encountered exceptions in finalize_write: [HttpBadRequestError(), 
HttpBadRequestError(), HttpBadRequestError()] [while running 'write to 
bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite']

INFO:root:2019-02-04T09:17:28.086Z: JOB_MESSAGE_DEBUG: Executing failure step 
failure15
INFO:root:2019-02-04T09:17:28.101Z: JOB_MESSAGE_ERROR: Workflow failed. Causes: 
S13:write to bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite failed., A work 
item was attempted 4 times without success. Each time the worker eventually 
lost contact with the service. The work item was attempted on: 
 rajsubtest-02040113-nvra-harness-t62t,
 rajsubtest-02040113-nvra-harness-t62t,
 rajsubtest-02040113-nvra-harness-t62t,
 rajsubtest-02040113-nvra-harness-t62t
INFO:root:2019-02-04T09:17:28.794Z: JOB_MESSAGE_DETAILED: Cleaning up.
INFO:root:2019-02-04T09:17:28.834Z: JOB_MESSAGE_DEBUG: Starting worker pool 
teardown.
INFO:root:2019-02-04T09:17:28.847Z: JOB_MESSAGE_BASIC: Stopping worker pool...
INFO:root:2019-02-04T09:18:21.451Z: JOB_MESSAGE_DETAILED: Autoscaling: Resized 
worker pool from 1 to 0.
INFO:root:2019-02-04T09:18:21.468Z: JOB_MESSAGE_DETAILED: Autoscaling: Would 
further reduce the number of workers but reached the minimum number allowed for 
the job.
INFO:root:2019-02-04T09:18:21.496Z: JOB_MESSAGE_BASIC: Worker pool stopped.
INFO:root:2019-02-04T09:18:21.514Z: JOB_MESSAGE_DEBUG: Tearing down pending 
resources...
INFO:root:Job 2019-02-04_01_13_57-9915685959768429617 is in state 
JOB_STATE_FAILED
Traceback (most recent call last):
 File "C:/sandbox/python-workspace/dftest/beampkg/dftest.py", line 53, in 
<module>
 main()
 File "C:/sandbox/python-workspace/dftest/beampkg/dftest.py", line 11, in main
 testdf.runPipeline()
 File "C:/sandbox/python-workspace/dftest/beampkg/dftest.py", line 29, in 
runPipeline
 textPColl | 'write to bucket' >> beam.io.WriteToText(options.outputfile)
 File 
"C:\Users\subrdhar\beam_2_10_RC1_env\lib\site-packages\apache_beam\pipeline.py",
 line 425, in __exit__
 self.run().wait_until_finish()
 File 
"C:\Users\subrdhar\beam_2_10_RC1_env\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py",
 line 1186, in wait_until_finish
 (self.state, getattr(self._runner, 'last_error_msg', None)), self)
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow 
pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
 File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", 
line 642, in do_work
 work_executor.execute()
 File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
line 172, in execute
 op.start()
 File "dataflow_worker/native_operations.py", line 38, in 
dataflow_worker.native_operations.NativeReadOperation.start
 def start(self):
 File "dataflow_worker/native_operations.py", line 39, in 
dataflow_worker.native_operations.NativeReadOperation.start
 with self.scoped_start_state:
 File "dataflow_worker/native_operations.py", line 44, in 
dataflow_worker.native_operations.NativeReadOperation.start
 with self.spec.source.reader() as reader:
 File "dataflow_worker/native_operations.py", line 54, in 
dataflow_worker.native_operations.NativeReadOperation.start
 self.output(windowed_value)
 File "apache_beam/runners/worker/operations.py", line 183, in 
apache_beam.runners.worker.operations.Operation.output
 cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
 File "apache_beam/runners/worker/operations.py", line 89, in 
apache_beam.runners.worker.operations.ConsumerSet.receive
 cython.cast(Operation, consumer).process(windowed_value)
 File "apache_beam/runners/worker/operations.py", line 497, in 
apache_beam.runners.worker.operations.DoOperation.process
 with self.scoped_process_state:
 File "apache_beam/runners/worker/operations.py", line 498, in 
apache_beam.runners.worker.operations.DoOperation.process
 self.dofn_receiver.receive(o)
 File "apache_beam/runners/common.py", line 680, in 
apache_beam.runners.common.DoFnRunner.receive
 self.process(windowed_value)
 File "apache_beam/runners/common.py", line 686, in 
apache_beam.runners.common.DoFnRunner.process
 self._reraise_augmented(exn)
 File "apache_beam/runners/common.py", line 724, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
 raise_with_traceback(new_exn)
 File "apache_beam/runners/common.py", line 684, in 
apache_beam.runners.common.DoFnRunner.process
 self.do_fn_invoker.invoke_process(windowed_value)
 File "apache_beam/runners/common.py", line 535, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
 self._invoke_per_window(
 File "apache_beam/runners/common.py", line 604, in 
apache_beam.runners.common.PerWindowInvoker._invoke_per_window
 output_processor.process_outputs(
 File "apache_beam/runners/common.py", line 755, in 
apache_beam.runners.common._OutputProcessor.process_outputs
 def process_outputs(self, windowed_input_element, results):
 File "apache_beam/runners/common.py", line 770, in 
apache_beam.runners.common._OutputProcessor.process_outputs
 for result in results:
 File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", line 
1077, in <genexpr>
 window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs)
 File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
line 321, in finalize_write
 'Encountered exceptions in finalize_write: %s' % all_exceptions)
Exception: Encountered exceptions in finalize_write: [HttpBadRequestError(), 
HttpBadRequestError(), HttpBadRequestError()] [while running 'write to 
bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite']

 

I presume this is still work in progress
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to