Raj Subramani created BEAM-6580:
-----------------------------------
Summary: Beam 2.10.0 RC1
Key: BEAM-6580
URL: https://issues.apache.org/jira/browse/BEAM-6580
Project: Beam
Issue Type: Test
Components: io-ideas
Affects Versions: 2.10.0
Reporter: Raj Subramani
Assignee: Eugene Kirpichov
I was testing beam.io.WriteToText on RC1 of 2.10
This addresses the writing to CMEK GCS buckets (which currently fails due to
failure in the copy process from staging to CMEK bucket - remedy was to change
"copy" to "rewrite")
As per this commit changes:
[https://github.com/apache/beam/commit/e6e85edbeade3a4c038aca85821d2b265ac33909#diff-246c5b6386cc533fbabca3b397ca3c17]
StorageObjectsCopyRequest
was being replaced with
StorageObjectsRewriteRequest
I downloaded RC1 of 2.10 and tried the following code:
def runPipeline(self):
options =
pipeline.PipelineOptions(self.__pipelineargs).view_as(TestPipelineOptions)
with (beam.Pipeline(options=options)) as p:
textPColl = p | \
'read text from GCS bucket' >>
beam.io.ReadFromText(options.testfile)
textPColl | 'write to bucket' >> beam.io.WriteToText(options.outputfile)
But got the following exception
INFO:root:2019-02-04T09:17:13.452Z: JOB_MESSAGE_ERROR: Traceback (most recent
call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py",
line 642, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
line 172, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in
dataflow_worker.native_operations.NativeReadOperation.start
def start(self):
File "dataflow_worker/native_operations.py", line 39, in
dataflow_worker.native_operations.NativeReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/native_operations.py", line 44, in
dataflow_worker.native_operations.NativeReadOperation.start
with self.spec.source.reader() as reader:
File "dataflow_worker/native_operations.py", line 54, in
dataflow_worker.native_operations.NativeReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 183, in
apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 89, in
apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 497, in
apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 498, in
apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 680, in
apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 686, in
apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 724, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 684, in
apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 535, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_per_window(
File "apache_beam/runners/common.py", line 604, in
apache_beam.runners.common.PerWindowInvoker._invoke_per_window
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 755, in
apache_beam.runners.common._OutputProcessor.process_outputs
def process_outputs(self, windowed_input_element, results):
File "apache_beam/runners/common.py", line 770, in
apache_beam.runners.common._OutputProcessor.process_outputs
for result in results:
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", line
1077, in <genexpr>
window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py",
line 321, in finalize_write
'Encountered exceptions in finalize_write: %s' % all_exceptions)
Exception: Encountered exceptions in finalize_write: [HttpBadRequestError(),
HttpBadRequestError(), HttpBadRequestError()] [while running 'write to
bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite']
INFO:root:2019-02-04T09:17:18.376Z: JOB_MESSAGE_ERROR: Traceback (most recent
call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py",
line 642, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
line 172, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in
dataflow_worker.native_operations.NativeReadOperation.start
def start(self):
File "dataflow_worker/native_operations.py", line 39, in
dataflow_worker.native_operations.NativeReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/native_operations.py", line 44, in
dataflow_worker.native_operations.NativeReadOperation.start
with self.spec.source.reader() as reader:
File "dataflow_worker/native_operations.py", line 54, in
dataflow_worker.native_operations.NativeReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 183, in
apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 89, in
apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 497, in
apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 498, in
apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 680, in
apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 686, in
apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 724, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 684, in
apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 535, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_per_window(
File "apache_beam/runners/common.py", line 604, in
apache_beam.runners.common.PerWindowInvoker._invoke_per_window
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 755, in
apache_beam.runners.common._OutputProcessor.process_outputs
def process_outputs(self, windowed_input_element, results):
File "apache_beam/runners/common.py", line 770, in
apache_beam.runners.common._OutputProcessor.process_outputs
for result in results:
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", line
1077, in <genexpr>
window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py",
line 321, in finalize_write
'Encountered exceptions in finalize_write: %s' % all_exceptions)
Exception: Encountered exceptions in finalize_write: [HttpBadRequestError(),
HttpBadRequestError(), HttpBadRequestError()] [while running 'write to
bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite']
INFO:root:2019-02-04T09:17:23.304Z: JOB_MESSAGE_ERROR: Traceback (most recent
call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py",
line 642, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
line 172, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in
dataflow_worker.native_operations.NativeReadOperation.start
def start(self):
File "dataflow_worker/native_operations.py", line 39, in
dataflow_worker.native_operations.NativeReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/native_operations.py", line 44, in
dataflow_worker.native_operations.NativeReadOperation.start
with self.spec.source.reader() as reader:
File "dataflow_worker/native_operations.py", line 54, in
dataflow_worker.native_operations.NativeReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 183, in
apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 89, in
apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 497, in
apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 498, in
apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 680, in
apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 686, in
apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 724, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 684, in
apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 535, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_per_window(
File "apache_beam/runners/common.py", line 604, in
apache_beam.runners.common.PerWindowInvoker._invoke_per_window
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 755, in
apache_beam.runners.common._OutputProcessor.process_outputs
def process_outputs(self, windowed_input_element, results):
File "apache_beam/runners/common.py", line 770, in
apache_beam.runners.common._OutputProcessor.process_outputs
for result in results:
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", line
1077, in <genexpr>
window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py",
line 321, in finalize_write
'Encountered exceptions in finalize_write: %s' % all_exceptions)
Exception: Encountered exceptions in finalize_write: [HttpBadRequestError(),
HttpBadRequestError(), HttpBadRequestError()] [while running 'write to
bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite']
INFO:root:2019-02-04T09:17:28.056Z: JOB_MESSAGE_ERROR: Traceback (most recent
call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py",
line 642, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
line 172, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in
dataflow_worker.native_operations.NativeReadOperation.start
def start(self):
File "dataflow_worker/native_operations.py", line 39, in
dataflow_worker.native_operations.NativeReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/native_operations.py", line 44, in
dataflow_worker.native_operations.NativeReadOperation.start
with self.spec.source.reader() as reader:
File "dataflow_worker/native_operations.py", line 54, in
dataflow_worker.native_operations.NativeReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 183, in
apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 89, in
apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 497, in
apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 498, in
apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 680, in
apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 686, in
apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 724, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 684, in
apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 535, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_per_window(
File "apache_beam/runners/common.py", line 604, in
apache_beam.runners.common.PerWindowInvoker._invoke_per_window
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 755, in
apache_beam.runners.common._OutputProcessor.process_outputs
def process_outputs(self, windowed_input_element, results):
File "apache_beam/runners/common.py", line 770, in
apache_beam.runners.common._OutputProcessor.process_outputs
for result in results:
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", line
1077, in <genexpr>
window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py",
line 321, in finalize_write
'Encountered exceptions in finalize_write: %s' % all_exceptions)
Exception: Encountered exceptions in finalize_write: [HttpBadRequestError(),
HttpBadRequestError(), HttpBadRequestError()] [while running 'write to
bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite']
INFO:root:2019-02-04T09:17:28.086Z: JOB_MESSAGE_DEBUG: Executing failure step
failure15
INFO:root:2019-02-04T09:17:28.101Z: JOB_MESSAGE_ERROR: Workflow failed. Causes:
S13:write to bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite failed., A work
item was attempted 4 times without success. Each time the worker eventually
lost contact with the service. The work item was attempted on:
rajsubtest-02040113-nvra-harness-t62t,
rajsubtest-02040113-nvra-harness-t62t,
rajsubtest-02040113-nvra-harness-t62t,
rajsubtest-02040113-nvra-harness-t62t
INFO:root:2019-02-04T09:17:28.794Z: JOB_MESSAGE_DETAILED: Cleaning up.
INFO:root:2019-02-04T09:17:28.834Z: JOB_MESSAGE_DEBUG: Starting worker pool
teardown.
INFO:root:2019-02-04T09:17:28.847Z: JOB_MESSAGE_BASIC: Stopping worker pool...
INFO:root:2019-02-04T09:18:21.451Z: JOB_MESSAGE_DETAILED: Autoscaling: Resized
worker pool from 1 to 0.
INFO:root:2019-02-04T09:18:21.468Z: JOB_MESSAGE_DETAILED: Autoscaling: Would
further reduce the number of workers but reached the minimum number allowed for
the job.
INFO:root:2019-02-04T09:18:21.496Z: JOB_MESSAGE_BASIC: Worker pool stopped.
INFO:root:2019-02-04T09:18:21.514Z: JOB_MESSAGE_DEBUG: Tearing down pending
resources...
INFO:root:Job 2019-02-04_01_13_57-9915685959768429617 is in state
JOB_STATE_FAILED
Traceback (most recent call last):
File "C:/sandbox/python-workspace/dftest/beampkg/dftest.py", line 53, in
<module>
main()
File "C:/sandbox/python-workspace/dftest/beampkg/dftest.py", line 11, in main
testdf.runPipeline()
File "C:/sandbox/python-workspace/dftest/beampkg/dftest.py", line 29, in
runPipeline
textPColl | 'write to bucket' >> beam.io.WriteToText(options.outputfile)
File
"C:\Users\subrdhar\beam_2_10_RC1_env\lib\site-packages\apache_beam\pipeline.py",
line 425, in __exit__
self.run().wait_until_finish()
File
"C:\Users\subrdhar\beam_2_10_RC1_env\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py",
line 1186, in wait_until_finish
(self.state, getattr(self._runner, 'last_error_msg', None)), self)
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow
pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py",
line 642, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
line 172, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in
dataflow_worker.native_operations.NativeReadOperation.start
def start(self):
File "dataflow_worker/native_operations.py", line 39, in
dataflow_worker.native_operations.NativeReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/native_operations.py", line 44, in
dataflow_worker.native_operations.NativeReadOperation.start
with self.spec.source.reader() as reader:
File "dataflow_worker/native_operations.py", line 54, in
dataflow_worker.native_operations.NativeReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 183, in
apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 89, in
apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 497, in
apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 498, in
apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 680, in
apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 686, in
apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 724, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 684, in
apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 535, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_per_window(
File "apache_beam/runners/common.py", line 604, in
apache_beam.runners.common.PerWindowInvoker._invoke_per_window
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 755, in
apache_beam.runners.common._OutputProcessor.process_outputs
def process_outputs(self, windowed_input_element, results):
File "apache_beam/runners/common.py", line 770, in
apache_beam.runners.common._OutputProcessor.process_outputs
for result in results:
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", line
1077, in <genexpr>
window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py",
line 321, in finalize_write
'Encountered exceptions in finalize_write: %s' % all_exceptions)
Exception: Encountered exceptions in finalize_write: [HttpBadRequestError(),
HttpBadRequestError(), HttpBadRequestError()] [while running 'write to
bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite']
I presume this is still work in progress
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)