liferoad opened a new issue, #34792: URL: https://github.com/apache/beam/issues/34792
### What happened? One example is below with 50K GCS files generated by BQ Export, one Dataflow worker needs to call the GCS IO to check the file exists, which can take around one hour: ``` Operation ongoing in bundle process_bundle-8807319671714420869-40 for PTransform{name=GenerateDataset/ReadFromBigQuery/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitWithSizing-ptransform-38, state=process-msecs} for at least 476.93 seconds without outputting or completing. Current Traceback: File "/usr/local/lib/python3.10/threading.py", line 973, in _bootstrap self._bootstrap_inner() File "/usr/local/lib/python3.10/threading.py", line 1016, in _bootstrap_inner self.run() File "/usr/local/lib/python3.10/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run self._work_item.run() File "/usr/local/lib/python3.10/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs)) File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 387, in task self._execute( File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, in _execute response = task() File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 388, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 658, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 696, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1274, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 237, in process_encoded self.output(decoded_value) File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1735, in process for part, size in self.restriction_provider.split_and_size( File "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py", line 334, in split_and_size for part in self.split(element, restriction): File "/usr/local/lib/python3.10/site-packages/apache_beam/io/iobase.py", line 1631, in split for source_bundle in source_bundles: File "/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery.py", line 827, in split source = self._create_source(path, self.export_result.coder) File "/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery.py", line 787, in _create_source return create_avro_source(path) File "/usr/local/lib/python3.10/site-packages/apache_beam/io/filebasedsource.py", line 127, in __init__ self._validate() File "/usr/local/lib/python3.10/site-packages/apache_beam/options/value_provider.py", line 193, in _f return fnc(self, *args, **kwargs) File "/usr/local/lib/python3.10/site-packages/apache_beam/io/filebasedsource.py", line 190, in _validate match_result = FileSystems.match([pattern], limits=[1])[0] File "/usr/local/lib/python3.10/site-packages/apache_beam/io/filesystems.py", line 240, in match return filesystem.match(patterns, limits) File "/usr/local/lib/python3.10/site-packages/apache_beam/io/filesystem.py", line 774, in match result.append(_match(pattern, limit)) File "/usr/local/lib/python3.10/site-packages/apache_beam/io/filesystem.py", line 749, in _match if self.exists(pattern): File "/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/gcsfilesystem.py", line 282, in exists return self._gcsIO().exists(path) File "/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/gcsio.py", line 472, in exists self._gcs_object(path) File "/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/gcsio.py", line 549, in _gcs_object blob = bucket.get_blob(blob_name, retry=self._storage_client_retry) File "/usr/local/lib/python3.10/contextlib.py", line 79, in inner return func(*args, **kwds) File "/usr/local/lib/python3.10/site-packages/google/cloud/storage/bucket.py", line 1343, in get_blob blob.reload( File "/usr/local/lib/python3.10/site-packages/google/cloud/storage/_helpers.py", line 303, in reload api_response = client._get_resource( File "/usr/local/lib/python3.10/site-packages/google/cloud/storage/client.py", line 474, in _get_resource return self._connection.api_request( File "/usr/local/lib/python3.10/site-packages/google/cloud/storage/_http.py", line 90, in api_request return call() File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 293, in retry_wrapped_func return retry_target( File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 144, in retry_target result = target() File "/usr/local/lib/python3.10/site-packages/google/cloud/_http/__init__.py", line 482, in api_request response = self._make_request( File "/usr/local/lib/python3.10/site-packages/google/cloud/_http/__init__.py", line 341, in _make_request return self._do_request( File "/usr/local/lib/python3.10/site-packages/google/cloud/_http/__init__.py", line 379, in _do_request return self.http.request( File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/google/auth/transport/requests.py", line 538, in request response = super(AuthorizedSession, self).request( File "/usr/local/lib/python3.10/site-packages/requests/sessions.py", line 589, in request resp = self.send(prep, **send_kwargs) File "/usr/local/lib/python3.10/site-packages/requests/sessions.py", line 703, in send r = adapter.send(request, **kwargs) File "/usr/local/lib/python3.10/site-packages/requests/adapters.py", line 667, in send resp = conn.urlopen( File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/urllib3/connectionpool.py", line 789, in urlopen response = self._make_request( File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/urllib3/connectionpool.py", line 536, in _make_request response = conn.getresponse() File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/urllib3/connection.py", line 507, in getresponse httplib_response = super().getresponse() File "/usr/local/lib/python3.10/http/client.py", line 1375, in getresponse response.begin() File "/usr/local/lib/python3.10/http/client.py", line 318, in begin version, status, reason = self._read_status() File "/usr/local/lib/python3.10/http/client.py", line 279, in _read_status line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1") File "/usr/local/lib/python3.10/socket.py", line 717, in readinto return self._sock.recv_into(b) File "/usr/local/lib/python3.10/ssl.py", line 1307, in recv_into return self.read(nbytes, buffer) File "/usr/local/lib/python3.10/ssl.py", line 1163, in read return self._sslobj.read(len, buffer) ``` ### Issue Priority Priority: 2 (default / most bugs should be filed as P2) ### Issue Components - [x] Component: Python SDK - [ ] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [x] Component: IO connector - [ ] Component: Beam YAML - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Infrastructure - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [ ] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org