satybald commented on pull request #15185:
URL: https://github.com/apache/beam/pull/15185#issuecomment-892147868


   hey @vachan-shetty I've tried out a new BQ storage PTransform and found some 
issues. Let me know what do you think of them:
   
   The pipeline code:
   
   ```
      # dataset_latencies dataset is 3.2TB
       with beam.Pipeline(options=options) as pipeline:
           (
               pipeline
               | ReadBQStorage(project= "GCP_PROJECT", dataset="telemetry", 
table="dataset_latencies")
               | 'Count all elements' >> apache_beam.combiners.Count.Globally()
               | beam.Log()
           )
   ```
   
   The pipeline(let's call **V1**) run for 8 hours with 50 workers, machine 
type: n2-standard-8 with Datflow runner V1. Experiments options:
   ```
   ['use_beam_bq_sink', 'disable_runner_v2', 'shuffle_mode=service', 
'use_monitoring_state_manager', 'enable_execution_details_collection', 
'use_fastavro']
   ```
   
   Also it log following exceptions:
   
   ```
   Error message from worker: Traceback (most recent call last):
     File 
"/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 
107, in next
       return six.next(self._wrapped)
     File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 416, 
in __next__
       return self._next()
     File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 706, 
in _next
       raise self
   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC 
that terminated with:
        status = StatusCode.UNKNOWN
        details = "No status received"
        debug_error_string = 
"{"created":"@1627723826.888538925","description":"No status 
received","file":"src/core/lib/surface/call.cc","file_line":1081,"grpc_status":2}"
   >
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 
649, in do_work
       work_executor.execute()
     File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", 
line 179, in execute
       op.start()
     File "dataflow_worker/native_operations.py", line 38, in 
dataflow_worker.native_operations.NativeReadOperation.start
     File "dataflow_worker/native_operations.py", line 39, in 
dataflow_worker.native_operations.NativeReadOperation.start
     File "dataflow_worker/native_operations.py", line 44, in 
dataflow_worker.native_operations.NativeReadOperation.start
     File "dataflow_worker/native_operations.py", line 48, in 
dataflow_worker.native_operations.NativeReadOperation.start
     File 
"/usr/local/lib/python3.7/site-packages/terra/beam/io/beam_storage_api.py", 
line 294, in __next__
       self.read_rows_response = next(self.read_rows_iterator, None)
     File 
"/usr/local/lib/python3.7/site-packages/google/cloud/bigquery_storage_v1/reader.py",
 line 134, in __iter__
       for message in self._wrapped:
     File 
"/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 
110, in next
       six.raise_from(exceptions.from_grpc_error(exc), exc)
     File "<string>", line 3, in raise_from
   google.api_core.exceptions.Unknown: None No status received
   ```
   
   
   Pipeline V2 with Dataflow runner V2, experiment options:
   ```
   ['use_runner_v2', 'use_beam_bq_sink', 'shuffle_mode=service', 
'use_monitoring_state_manager', 'enable_execution_details_collection', 
'use_unified_worker', 'beam_fn_api', 'use_fastavro', 
'use_multiple_sdk_containers']
   ```
   
   It runs for 10 hours and produced the following exception stack trace:
   
   ```
   Error message from worker: Traceback (most recent call last):
     File 
"/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 
107, in next
       return six.next(self._wrapped)
     File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 416, 
in __next__
       return self._next()
     File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 706, 
in _next
       raise self
   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC 
that terminated with:
        status = StatusCode.UNKNOWN
        details = "No status received"
        debug_error_string = 
"{"created":"@1627723816.248565623","description":"No status 
received","file":"src/core/lib/surface/call.cc","file_line":1081,"grpc_status":2}"
   >
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 289, in _execute
       response = task()
     File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 362, in <lambda>
       lambda: self.create_worker().do_instruction(request), request)
     File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 607, in do_instruction
       getattr(request, request_type), request.instruction_id)
     File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 644, in process_bundle
       bundle_processor.process_bundle(instruction_id))
     File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1001, in process_bundle
       element.data)
     File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 229, in process_encoded
       self.output(decoded_value)
     File "apache_beam/runners/worker/operations.py", line 356, in 
apache_beam.runners.worker.operations.Operation.output
     File "apache_beam/runners/worker/operations.py", line 358, in 
apache_beam.runners.worker.operations.Operation.output
     File "apache_beam/runners/worker/operations.py", line 220, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 828, in 
apache_beam.runners.worker.operations.SdfProcessSizedElements.process
     File "apache_beam/runners/worker/operations.py", line 837, in 
apache_beam.runners.worker.operations.SdfProcessSizedElements.process
     File "apache_beam/runners/common.py", line 1241, in 
apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
     File "apache_beam/runners/common.py", line 742, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
     File "apache_beam/runners/common.py", line 880, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
     File "apache_beam/runners/common.py", line 1368, in 
apache_beam.runners.common._OutputProcessor.process_outputs
     File 
"/usr/local/lib/python3.7/site-packages/terra/beam/io/beam_storage_api.py", 
line 294, in __next__
       self.read_rows_response = next(self.read_rows_iterator, None)
     File 
"/usr/local/lib/python3.7/site-packages/google/cloud/bigquery_storage_v1/reader.py",
 line 134, in __iter__
       for message in self._wrapped:
     File 
"/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 
110, in next
       six.raise_from(exceptions.from_grpc_error(exc), exc)
     File "<string>", line 3, in raise_from
   google.api_core.exceptions.Unknown: None No status received
   
   ```
   
   and there were also pipeline V3 that run with `SELECT * FROM dataset` for 1 
hour, with Dataflow Runner V1 and AVRO export. 
   
   
   My questions:
   * Do you know what might cause following GRPC exceptions?
   * Why might there's such big difference between BigQuery Storage API client 
and regular QUERY-EXTRACT-READ Transform?
   
   
   Let me know if you need more questions or have any suggestions how to debug 
BQ storage client pipeline futher. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to