lukecwik opened a new issue, #23676: URL: https://github.com/apache/beam/issues/23676
### What happened? Example: https://ci-beam.apache.org/view/PostCommit/job/beam_PostCommit_Python39/974 ``` Error Message apache_beam.io.filesystem.BeamIOError: Match operation failed with exceptions {'gs://dataflow-samples/shakespeare/kinglear.txt': RefreshError('Unable to acquire impersonated credentials', '{\n "error": {\n "code": 400,\n "message": "Request contains an invalid argument.",\n "status": "INVALID_ARGUMENT"\n }\n}\n')} Stacktrace self = <apache_beam.examples.wordcount_it_test.WordCountIT testMethod=test_wordcount_impersonation_it> @pytest.mark.it_postcommit @pytest.mark.sickbay_direct @pytest.mark.sickbay_spark @pytest.mark.sickbay_flink def test_wordcount_impersonation_it(self): """Tests impersonation on dataflow. For testing impersonation, we use three ingredients: - a principal to impersonate - a dataflow service account that only that principal is allowed to launch jobs as - a temp root that only the above two accounts have access to Jenkins and Dataflow workers both run as GCE default service account. So we remove that account from all the above. """ # Credentials need to be reset or this test will fail and credentials # from a previous test will be used. auth._Credentials._credentials_init = False ACCOUNT_TO_IMPERSONATE = ( 'allows-impersonation@apache-' 'beam-testing.iam.gserviceaccount.com') RUNNER_ACCOUNT = ( 'impersonation-dataflow-worker@' 'apache-beam-testing.iam.gserviceaccount.com') TEMP_DIR = 'gs://impersonation-test-bucket/temp-it' STAGING_LOCATION = 'gs://impersonation-test-bucket/staging-it' extra_options = { 'impersonate_service_account': ACCOUNT_TO_IMPERSONATE, 'service_account_email': RUNNER_ACCOUNT, 'temp_location': TEMP_DIR, 'staging_location': STAGING_LOCATION } > self._run_wordcount_it(wordcount.run, **extra_options) apache_beam/examples/wordcount_it_test.py:85: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ apache_beam/examples/wordcount_it_test.py:146: in _run_wordcount_it run_wordcount( apache_beam/examples/wordcount.py:90: in run lines = p | 'Read' >> ReadFromText(known_args.input) apache_beam/io/textio.py:765: in __init__ self._source = self._source_class( apache_beam/io/textio.py:131: in __init__ super().__init__( apache_beam/io/filebasedsource.py:127: in __init__ self._validate() apache_beam/options/value_provider.py:193: in _f return fnc(self, *args, **kwargs) apache_beam/io/filebasedsource.py:188: in _validate match_result = FileSystems.match([pattern], limits=[1])[0] apache_beam/io/filesystems.py:204: in match return filesystem.match(patterns, limits) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <apache_beam.io.gcp.gcsfilesystem.GCSFileSystem object at 0x7f86bd27bfa0> patterns = ['gs://dataflow-samples/shakespeare/kinglear.txt'], limits = [1] def match(self, patterns, limits=None): """Find all matching paths to the patterns provided. See Also: :meth:`translate_pattern` Patterns ending with '/' or '\\' will be appended with '*'. Args: patterns: list of string for the file path pattern to match against limits: list of maximum number of responses that need to be fetched Returns: list of ``MatchResult`` objects. Raises: ``BeamIOError``: if any of the pattern match operations fail """ if limits is None: limits = [None] * len(patterns) else: err_msg = "Patterns and limits should be equal in length" assert len(patterns) == len(limits), err_msg def _match(pattern, limit): """Find all matching paths to the pattern provided.""" if pattern.endswith('/') or pattern.endswith('\\'): pattern += '*' # Get the part of the pattern before the first globbing character. # For example scheme://path/foo* will become scheme://path/foo for # filesystems like GCS, or converted to scheme://path for filesystems with # directories. prefix_or_dir = re.match('^[^[*?]*', pattern).group(0) file_metadatas = [] if prefix_or_dir == pattern: # Short-circuit calling self.list() if there's no glob pattern to match. if self.exists(pattern): file_metadatas = [self.metadata(pattern)] else: if self.has_dirs(): prefix_dirname = self._url_dirname(prefix_or_dir) if not prefix_dirname == prefix_or_dir: logger.debug( "Changed prefix_or_dir %r -> %r", prefix_or_dir, prefix_dirname) prefix_or_dir = prefix_dirname logger.debug("Listing files in %r", prefix_or_dir) file_metadatas = self._list(prefix_or_dir) metadata_list = [] for file_metadata in self.match_files(file_metadatas, pattern): if limit is not None and len(metadata_list) >= limit: break metadata_list.append(file_metadata) return MatchResult(pattern, metadata_list) exceptions = {} result = [] for pattern, limit in zip(patterns, limits): try: result.append(_match(pattern, limit)) except Exception as e: # pylint: disable=broad-except exceptions[pattern] = e if exceptions: > raise BeamIOError("Match operation failed", exceptions) E apache_beam.io.filesystem.BeamIOError: Match operation failed with exceptions {'gs://dataflow-samples/shakespeare/kinglear.txt': RefreshError('Unable to acquire impersonated credentials', '{\n "error": {\n "code": 400,\n "message": "Request contains an invalid argument.",\n "status": "INVALID_ARGUMENT"\n }\n}\n')} apache_beam/io/filesystem.py:787: BeamIOError ``` This does cause other tests to fail since the credentials object isn't [reset](https://github.com/apache/beam/blob/10e15a96383de0bb1167d619e4d1fce5dd5b562b/sdks/python/apache_beam/examples/wordcount_it_test.py#L87) at the end of the test causing other integration tests to pick up the failing impersonation credential. ### Issue Priority Priority: 0 ### Issue Component Component: sdk-py-core -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
