liferoad opened a new issue, #34792:
URL: https://github.com/apache/beam/issues/34792

   ### What happened?
   
   One example is below with 50K GCS files generated by BQ Export, one Dataflow 
worker needs to call the GCS IO to check the file exists, which can take around 
one hour:
   ```
   Operation ongoing in bundle process_bundle-8807319671714420869-40 for 
PTransform{name=GenerateDataset/ReadFromBigQuery/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitWithSizing-ptransform-38,
 state=process-msecs} for at least 476.93 seconds without outputting or 
completing.
   Current Traceback:
     File "/usr/local/lib/python3.10/threading.py", line 973, in _bootstrap
       self._bootstrap_inner()
   
     File "/usr/local/lib/python3.10/threading.py", line 1016, in 
_bootstrap_inner
       self.run()
   
     File 
"/usr/local/lib/python3.10/site-packages/apache_beam/utils/thread_pool_executor.py",
 line 53, in run
       self._work_item.run()
   
     File 
"/usr/local/lib/python3.10/site-packages/apache_beam/utils/thread_pool_executor.py",
 line 37, in run
       self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
   
     File 
"/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 387, in task
       self._execute(
   
     File 
"/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 313, in _execute
       response = task()
   
     File 
"/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 388, in <lambda>
       lambda: self.create_worker().do_instruction(request), request)
   
     File 
"/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 658, in do_instruction
       return getattr(self, request_type)(
   
     File 
"/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 696, in process_bundle
       bundle_processor.process_bundle(instruction_id))
   
     File 
"/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1274, in process_bundle
       input_op_by_transform_id[element.transform_id].process_encoded(
   
     File 
"/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 237, in process_encoded
       self.output(decoded_value)
   
     File 
"/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1735, in process
       for part, size in self.restriction_provider.split_and_size(
   
     File 
"/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py", line 
334, in split_and_size
       for part in self.split(element, restriction):
   
     File "/usr/local/lib/python3.10/site-packages/apache_beam/io/iobase.py", 
line 1631, in split
       for source_bundle in source_bundles:
   
     File 
"/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery.py", line 
827, in split
       source = self._create_source(path, self.export_result.coder)
   
     File 
"/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery.py", line 
787, in _create_source
       return create_avro_source(path)
   
     File 
"/usr/local/lib/python3.10/site-packages/apache_beam/io/filebasedsource.py", 
line 127, in __init__
       self._validate()
   
     File 
"/usr/local/lib/python3.10/site-packages/apache_beam/options/value_provider.py",
 line 193, in _f
       return fnc(self, *args, **kwargs)
   
     File 
"/usr/local/lib/python3.10/site-packages/apache_beam/io/filebasedsource.py", 
line 190, in _validate
       match_result = FileSystems.match([pattern], limits=[1])[0]
   
     File 
"/usr/local/lib/python3.10/site-packages/apache_beam/io/filesystems.py", line 
240, in match
       return filesystem.match(patterns, limits)
   
     File 
"/usr/local/lib/python3.10/site-packages/apache_beam/io/filesystem.py", line 
774, in match
       result.append(_match(pattern, limit))
   
     File 
"/usr/local/lib/python3.10/site-packages/apache_beam/io/filesystem.py", line 
749, in _match
       if self.exists(pattern):
   
     File 
"/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/gcsfilesystem.py", 
line 282, in exists
       return self._gcsIO().exists(path)
   
     File 
"/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/gcsio.py", line 
472, in exists
       self._gcs_object(path)
   
     File 
"/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/gcsio.py", line 
549, in _gcs_object
       blob = bucket.get_blob(blob_name, retry=self._storage_client_retry)
   
     File "/usr/local/lib/python3.10/contextlib.py", line 79, in inner
       return func(*args, **kwds)
   
     File 
"/usr/local/lib/python3.10/site-packages/google/cloud/storage/bucket.py", line 
1343, in get_blob
       blob.reload(
   
     File 
"/usr/local/lib/python3.10/site-packages/google/cloud/storage/_helpers.py", 
line 303, in reload
       api_response = client._get_resource(
   
     File 
"/usr/local/lib/python3.10/site-packages/google/cloud/storage/client.py", line 
474, in _get_resource
       return self._connection.api_request(
   
     File 
"/usr/local/lib/python3.10/site-packages/google/cloud/storage/_http.py", line 
90, in api_request
       return call()
   
     File 
"/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py",
 line 293, in retry_wrapped_func
       return retry_target(
   
     File 
"/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py",
 line 144, in retry_target
       result = target()
   
     File 
"/usr/local/lib/python3.10/site-packages/google/cloud/_http/__init__.py", line 
482, in api_request
       response = self._make_request(
   
     File 
"/usr/local/lib/python3.10/site-packages/google/cloud/_http/__init__.py", line 
341, in _make_request
       return self._do_request(
   
     File 
"/usr/local/lib/python3.10/site-packages/google/cloud/_http/__init__.py", line 
379, in _do_request
       return self.http.request(
   
     File 
"/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/google/auth/transport/requests.py",
 line 538, in request
       response = super(AuthorizedSession, self).request(
   
     File "/usr/local/lib/python3.10/site-packages/requests/sessions.py", line 
589, in request
       resp = self.send(prep, **send_kwargs)
   
     File "/usr/local/lib/python3.10/site-packages/requests/sessions.py", line 
703, in send
       r = adapter.send(request, **kwargs)
   
     File "/usr/local/lib/python3.10/site-packages/requests/adapters.py", line 
667, in send
       resp = conn.urlopen(
   
     File 
"/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/urllib3/connectionpool.py",
 line 789, in urlopen
       response = self._make_request(
   
     File 
"/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/urllib3/connectionpool.py",
 line 536, in _make_request
       response = conn.getresponse()
   
     File 
"/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/urllib3/connection.py",
 line 507, in getresponse
       httplib_response = super().getresponse()
   
     File "/usr/local/lib/python3.10/http/client.py", line 1375, in getresponse
       response.begin()
   
     File "/usr/local/lib/python3.10/http/client.py", line 318, in begin
       version, status, reason = self._read_status()
   
     File "/usr/local/lib/python3.10/http/client.py", line 279, in _read_status
       line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
   
     File "/usr/local/lib/python3.10/socket.py", line 717, in readinto
       return self._sock.recv_into(b)
   
     File "/usr/local/lib/python3.10/ssl.py", line 1307, in recv_into
       return self.read(nbytes, buffer)
   
     File "/usr/local/lib/python3.10/ssl.py", line 1163, in read
       return self._sslobj.read(len, buffer)
   ```
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [x] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [x] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to