lukecwik opened a new issue, #23676:
URL: https://github.com/apache/beam/issues/23676

   ### What happened?
   
   Example:
   https://ci-beam.apache.org/view/PostCommit/job/beam_PostCommit_Python39/974
   
   ```
   Error Message
   apache_beam.io.filesystem.BeamIOError: Match operation failed with 
exceptions {'gs://dataflow-samples/shakespeare/kinglear.txt': 
RefreshError('Unable to acquire impersonated credentials', '{\n  "error": {\n   
 "code": 400,\n    "message": "Request contains an invalid argument.",\n    
"status": "INVALID_ARGUMENT"\n  }\n}\n')}
   Stacktrace
   self = <apache_beam.examples.wordcount_it_test.WordCountIT 
testMethod=test_wordcount_impersonation_it>
   
       @pytest.mark.it_postcommit
       @pytest.mark.sickbay_direct
       @pytest.mark.sickbay_spark
       @pytest.mark.sickbay_flink
       def test_wordcount_impersonation_it(self):
         """Tests impersonation on dataflow.
       
         For testing impersonation, we use three ingredients:
         - a principal to impersonate
         - a dataflow service account that only that principal is
           allowed to launch jobs as
         - a temp root that only the above two accounts have access to
       
         Jenkins and Dataflow workers both run as GCE default service account.
         So we remove that account from all the above.
         """
         # Credentials need to be reset or this test will fail and credentials
         # from a previous test will be used.
         auth._Credentials._credentials_init = False
       
         ACCOUNT_TO_IMPERSONATE = (
             'allows-impersonation@apache-'
             'beam-testing.iam.gserviceaccount.com')
         RUNNER_ACCOUNT = (
             'impersonation-dataflow-worker@'
             'apache-beam-testing.iam.gserviceaccount.com')
         TEMP_DIR = 'gs://impersonation-test-bucket/temp-it'
         STAGING_LOCATION = 'gs://impersonation-test-bucket/staging-it'
         extra_options = {
             'impersonate_service_account': ACCOUNT_TO_IMPERSONATE,
             'service_account_email': RUNNER_ACCOUNT,
             'temp_location': TEMP_DIR,
             'staging_location': STAGING_LOCATION
         }
   >     self._run_wordcount_it(wordcount.run, **extra_options)
   
   apache_beam/examples/wordcount_it_test.py:85: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   apache_beam/examples/wordcount_it_test.py:146: in _run_wordcount_it
       run_wordcount(
   apache_beam/examples/wordcount.py:90: in run
       lines = p | 'Read' >> ReadFromText(known_args.input)
   apache_beam/io/textio.py:765: in __init__
       self._source = self._source_class(
   apache_beam/io/textio.py:131: in __init__
       super().__init__(
   apache_beam/io/filebasedsource.py:127: in __init__
       self._validate()
   apache_beam/options/value_provider.py:193: in _f
       return fnc(self, *args, **kwargs)
   apache_beam/io/filebasedsource.py:188: in _validate
       match_result = FileSystems.match([pattern], limits=[1])[0]
   apache_beam/io/filesystems.py:204: in match
       return filesystem.match(patterns, limits)
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <apache_beam.io.gcp.gcsfilesystem.GCSFileSystem object at 
0x7f86bd27bfa0>
   patterns = ['gs://dataflow-samples/shakespeare/kinglear.txt'], limits = [1]
   
       def match(self, patterns, limits=None):
         """Find all matching paths to the patterns provided.
       
         See Also:
           :meth:`translate_pattern`
       
         Patterns ending with '/' or '\\' will be appended with '*'.
       
         Args:
           patterns: list of string for the file path pattern to match against
           limits: list of maximum number of responses that need to be fetched
       
         Returns: list of ``MatchResult`` objects.
       
         Raises:
           ``BeamIOError``: if any of the pattern match operations fail
         """
         if limits is None:
           limits = [None] * len(patterns)
         else:
           err_msg = "Patterns and limits should be equal in length"
           assert len(patterns) == len(limits), err_msg
       
         def _match(pattern, limit):
           """Find all matching paths to the pattern provided."""
           if pattern.endswith('/') or pattern.endswith('\\'):
             pattern += '*'
           # Get the part of the pattern before the first globbing character.
           # For example scheme://path/foo* will become scheme://path/foo for
           # filesystems like GCS, or converted to scheme://path for 
filesystems with
           # directories.
           prefix_or_dir = re.match('^[^[*?]*', pattern).group(0)
       
           file_metadatas = []
           if prefix_or_dir == pattern:
             # Short-circuit calling self.list() if there's no glob pattern to 
match.
             if self.exists(pattern):
               file_metadatas = [self.metadata(pattern)]
           else:
             if self.has_dirs():
               prefix_dirname = self._url_dirname(prefix_or_dir)
               if not prefix_dirname == prefix_or_dir:
                 logger.debug(
                     "Changed prefix_or_dir %r -> %r", prefix_or_dir, 
prefix_dirname)
                 prefix_or_dir = prefix_dirname
       
             logger.debug("Listing files in %r", prefix_or_dir)
             file_metadatas = self._list(prefix_or_dir)
       
           metadata_list = []
           for file_metadata in self.match_files(file_metadatas, pattern):
             if limit is not None and len(metadata_list) >= limit:
               break
             metadata_list.append(file_metadata)
       
           return MatchResult(pattern, metadata_list)
       
         exceptions = {}
         result = []
         for pattern, limit in zip(patterns, limits):
           try:
             result.append(_match(pattern, limit))
           except Exception as e:  # pylint: disable=broad-except
             exceptions[pattern] = e
       
         if exceptions:
   >       raise BeamIOError("Match operation failed", exceptions)
   E       apache_beam.io.filesystem.BeamIOError: Match operation failed with 
exceptions {'gs://dataflow-samples/shakespeare/kinglear.txt': 
RefreshError('Unable to acquire impersonated credentials', '{\n  "error": {\n   
 "code": 400,\n    "message": "Request contains an invalid argument.",\n    
"status": "INVALID_ARGUMENT"\n  }\n}\n')}
   
   apache_beam/io/filesystem.py:787: BeamIOError
   ```
   
   This does cause other tests to fail since the credentials object isn't 
[reset](https://github.com/apache/beam/blob/10e15a96383de0bb1167d619e4d1fce5dd5b562b/sdks/python/apache_beam/examples/wordcount_it_test.py#L87)
 at the end of the test causing other integration tests to pick up the failing 
impersonation credential.
   
   ### Issue Priority
   
   Priority: 0
   
   ### Issue Component
   
   Component: sdk-py-core


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to