ruodingt opened a new issue, #23598:
URL: https://github.com/apache/beam/issues/23598

   ### What happened?
   
   Thanks for reviewing my issue:)
   
   I am trying to use the `test_client` arg while declare an `WriteToBigQuery` 
instance and hopefully I can make the bq client work with big query emulator. 
   
   I did notice this one:
   
https://github.com/apache/beam/blob/786ba8b54c023d6f7a24cbf8321a46f329ca7027/sdks/python/apache_beam/io/gcp/bigquery.py#L1443
 
   
   which lead to an assumption: `test_client` is an instance of `BigqueryV2`
   
   
   Here is the pipeline code:
   
   ```
   def test_write_big_query():
       project_id = "test-project"
       dataset_id = 'poc-test-dataset'
       table_1_id = 'poc-test-table-1'
   
       pipeline_options = PipelineOptions(save_main_session=True)
       tp = TestPipeline(options=pipeline_options)
   
       x = tp | Create([
           {'type2': 'a', "full_name": 'firstname lastname', 'id2': 
str(uuid.uuid4())},
           {'type2': 'b', "full_addr": 'addr st, vic 3000', 'id2': 
str(uuid.uuid4())}
       ])
   
       from apache_beam.io.gcp.internal.clients.bigquery import BigqueryV2
       from apache_beam.internal.http_client import get_new_http
       from apache_beam.internal.gcp import auth
   
       test_client_bq = BigqueryV2(... # I tried different versions her and 
trying to make this work
   
       x = x | WriteToBigQuery(table=table_resolve,
                               dataset=dataset_id,
                               project=project_id,
                               test_client=test_client_bq,
                               method=WriteToBigQuery.Method.STREAMING_INSERTS, 
validate=False)
   
       tp.run()
   
   ```
   
   I keeps getting trouble while trying different ways to construct a working 
`BigqueryV2` instance but it keeps failing...
   
   ### Experiment 1
   
   `test_client_bq` is defined as :
   
   ```
   test_client_bq = BigqueryV2(url='http://localhost:9050/bigquery/v2/')
   ```
   
   <details>
   <summary>error msg</summary>
   
   ```
   unit/test_io.py:301 (test_write_big_query)
   encoded = 
b'QlpoOTFBWSZTWURKFI8ABwF/6v////ve///1v///ov////pz5K4AIABIAEU1YAgfelh9D32+fON6ALc55NTzVnlLoaSGphoU2p6ZRP0DU0ynqPE0ymEa...T9hSQuqjjMVZ+10rH+O4z7uw6tc2/eQw6eVNTFVY9LZqrermFypf6cBJfsdjNmqjKJjfaP/RsCy366TEl+MGJiyaqHmRJ+5hGaEiD/4u5IpwoSCIlCkeA='
   enable_trace = True, use_zlib = False
   
       def loads(encoded, enable_trace=True, use_zlib=False):
         """For internal use only; no backwards-compatibility guarantees."""
       
         c = base64.b64decode(encoded)
       
         if use_zlib:
           s = zlib.decompress(c)
         else:
           s = bz2.decompress(c)
       
         del c  # Free up some possibly large and no-longer-needed memory.
       
         with _pickle_lock:
           try:
   >         return dill.loads(s)
   
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py:285:
 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   str = 
b'\x80\x04\x956\x10\x00\x00\x00\x00\x00\x00\x8c\x1bapache_beam.io.gcp.bigquery\x94\x8c\x0fBigQueryWriteFn\x94\x93\x94)...#streaming_api_logging_frequency_sec\x94M,\x01\x8c\x16ignore_unknown_columns\x94\x89\x8c\x0c_max_retries\x94M\x10\'ub.'
   ignore = None, kwds = {}, file = <_io.BytesIO object at 0x13392df90>
   
       def loads(str, ignore=None, **kwds):
           """unpickle an object from a string"""
           file = StringIO(str)
   >       return load(file, ignore, **kwds)
   
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:275:
 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   file = <_io.BytesIO object at 0x13392df90>, ignore = None, kwds = {}
   
       def load(file, ignore=None, **kwds):
           """unpickle an object from a file"""
   >       return Unpickler(file, ignore=ignore, **kwds).load()
   
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:270:
 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <dill._dill.Unpickler object at 0x133a08630>
   
       def load(self): #NOTE: if settings change, need to update attributes
   >       obj = StockUnpickler.load(self)
   
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:472:
 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = [], sequence = ['*/*']
   
       def extend(self, sequence):
           """Validate extension of list."""
   >       self.__field.validate(sequence)
   E       AttributeError: 'FieldList' object has no attribute 
'_FieldList__field'
   
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apitools/base/protorpclite/messages.py:1147:
 AttributeError
   
   During handling of the above exception, another exception occurred:
   
       def test_write_big_query():
           project_id = "test-project"
           dataset_id = 'poc-test-dataset'
           table_1_id = 'poc-test-table-1'
       
           pipeline_options = PipelineOptions(save_main_session=True)
           tp = TestPipeline(options=pipeline_options)
       
           x = tp | Create([
               {'type2': 'a', "full_name": 'firstname lastname', 'id2': 
str(uuid.uuid4())},
               {'type2': 'b', "full_addr": 'addr st, vic 3000', 'id2': 
str(uuid.uuid4())}
           ])
       
           from apache_beam.io.gcp.internal.clients.bigquery import BigqueryV2
           from apache_beam.internal.http_client import get_new_http
           from apache_beam.internal.gcp import auth
           # from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
           # bqc_ = BigqueryV2(url='http://localhost:9050/bigquery/v2/')
           # bq_test_client = BigQueryWrapper(client=bqc_)
       
           test_client_bq = BigqueryV2(
               url='http://localhost:9050/bigquery/v2/'
               # http=get_new_http(),
               # credentials=auth.get_service_credentials(None),
               # response_encoding='utf8',
               # additional_http_headers={
               #     "user-agent": "apache-beam-x"
               # }
           )
       
   >       x = x | WriteToBigQuery(table=table_resolve,
                                   dataset=dataset_id,
                                   project=project_id,
                                   test_client=test_client_bq,
                                   
method=WriteToBigQuery.Method.STREAMING_INSERTS, validate=False)
   
   test_io.py:332: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pvalue.py:137:
 in __or__
       return self.pipeline.apply(ptransform, self)
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pipeline.py:709:
 in apply
       pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:185:
 in apply
       return m(transform, input, options)
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:215:
 in apply_PTransform
       return transform.expand(input)
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery.py:2308:
 in expand
       outputs = pcoll | _StreamToBigQuery(
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pvalue.py:137:
 in __or__
       return self.pipeline.apply(ptransform, self)
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pipeline.py:709:
 in apply
       pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:185:
 in apply
       return m(transform, input, options)
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:215:
 in apply_PTransform
       return transform.expand(input)
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery.py:2031:
 in expand
       | 'StreamInsertRows' >> ParDo(
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/core.py:1416:
 in __init__
       super().__init__(fn, *args, **kwargs)
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/ptransform.py:864:
 in __init__
       self.fn = pickler.loads(pickler.dumps(self.fn))
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/pickler.py:51:
 in loads
       return desired_pickle_lib.loads(
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py:289:
 in loads
       return dill.loads(s)
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:275:
 in loads
       return load(file, ignore, **kwds)
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:270:
 in load
       return Unpickler(file, ignore=ignore, **kwds).load()
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:472:
 in load
       obj = StockUnpickler.load(self)
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = [], sequence = ['*/*']
   
       def extend(self, sequence):
           """Validate extension of list."""
   >       self.__field.validate(sequence)
   E       AttributeError: 'FieldList' object has no attribute 
'_FieldList__field'
   
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apitools/base/protorpclite/messages.py:1147:
 AttributeError
   
   
   ```
   </details>
   
   
   ---
   
   ### Experiment 2
   
   since I see this line:
   
   
https://github.com/apache/beam/blob/786ba8b54c023d6f7a24cbf8321a46f329ca7027/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L332
   
   
   I put `test_client_bq` as :
   
   ```
   bigquery.BigqueryV2(
           http=get_new_http(),
           credentials=auth.get_service_credentials(None),
           response_encoding='utf8',
           additional_http_headers={
               "user-agent": "apache-beam-%s" % apache_beam.__version__
           })
   ```
   
   <details>
   
   <summary>error</summary>
   
   ```
   unit/test_io.py:301 (test_write_big_query)
   encoded = 
b'QlpoOTFBWSZTWWgkjdgAB3X/+/////v////1v///ov////rj7S4IAEAgAEAAAQQMAGAIn30+56Cp9vV573du719OefHOtoNs9HffPvDRCExA1MnqmaMk...dj9dkHpxsfg1V9mODbtHdYiyVO4UjKY1QG3OhDMGUiXjpbDNEUcrtRipaYUdeOKZH9XoMTt8LOY7Dtna2a4isad47qLu83S3xyJ/6f/F3JFOFCQaCSN2A='
   enable_trace = True, use_zlib = False
   
       def loads(encoded, enable_trace=True, use_zlib=False):
         """For internal use only; no backwards-compatibility guarantees."""
       
         c = base64.b64decode(encoded)
       
         if use_zlib:
           s = zlib.decompress(c)
         else:
           s = bz2.decompress(c)
       
         del c  # Free up some possibly large and no-longer-needed memory.
       
         with _pickle_lock:
           try:
   >         return dill.loads(s)
   
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py:285:
 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   str = 
b'\x80\x04\x95a\x11\x00\x00\x00\x00\x00\x00\x8c\x1bapache_beam.io.gcp.bigquery\x94\x8c\x0fBigQueryWriteFn\x94\x93\x94)...#streaming_api_logging_frequency_sec\x94M,\x01\x8c\x16ignore_unknown_columns\x94\x89\x8c\x0c_max_retries\x94M\x10\'ub.'
   ignore = None, kwds = {}, file = <_io.BytesIO object at 0x133cd2a40>
   
       def loads(str, ignore=None, **kwds):
           """unpickle an object from a string"""
           file = StringIO(str)
   >       return load(file, ignore, **kwds)
   
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:275:
 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   file = <_io.BytesIO object at 0x133cd2a40>, ignore = None, kwds = {}
   
       def load(file, ignore=None, **kwds):
           """unpickle an object from a file"""
   >       return Unpickler(file, ignore=ignore, **kwds).load()
   
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:270:
 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <dill._dill.Unpickler object at 0x133c90500>
   
       def load(self): #NOTE: if settings change, need to update attributes
   >       obj = StockUnpickler.load(self)
   
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:472:
 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 
0x133ca9880>
   attr = '__setstate__'
   
       def __getattr__(self, attr):
         """Delegate attribute access to underlying google-auth credentials."""
   >     return getattr(self._google_auth_credentials, attr)
   
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119:
 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 
0x133ca9880>
   attr = '_google_auth_credentials'
   
       def __getattr__(self, attr):
         """Delegate attribute access to underlying google-auth credentials."""
   >     return getattr(self._google_auth_credentials, attr)
   
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119:
 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 
0x133ca9880>
   attr = '_google_auth_credentials'
   
       def __getattr__(self, attr):
         """Delegate attribute access to underlying google-auth credentials."""
   >     return getattr(self._google_auth_credentials, attr)
   E     RecursionError: maximum recursion depth exceeded
   
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119:
 RecursionError
   !!! Recursion detected (same locals & position)
   
   During handling of the above exception, another exception occurred:
   
   self = <ParDo(PTransform) label=[ParDo(BigQueryWriteFn)] at 0x133ca9190>
   fn = <apache_beam.io.gcp.bigquery.BigQueryWriteFn object at 0x133ca36d0>
   args = (), kwargs = {}
   
       def __init__(self, fn, *args, **kwargs):
         # type: (WithTypeHints, *Any, **Any) -> None
         if isinstance(fn, type) and issubclass(fn, WithTypeHints):
           # Don't treat Fn class objects as callables.
           raise ValueError('Use %s() not %s.' % (fn.__name__, fn.__name__))
         self.fn = self.make_fn(fn, bool(args or kwargs))
         # Now that we figure out the label, initialize the super-class.
         super().__init__()
       
         if (any(isinstance(v, pvalue.PCollection) for v in args) or
             any(isinstance(v, pvalue.PCollection) for v in kwargs.values())):
           raise error.SideInputError(
               'PCollection used directly as side input argument. Specify '
               'AsIter(pcollection) or AsSingleton(pcollection) to indicate how 
the '
               'PCollection is to be used.')
         self.args, self.kwargs, self.side_inputs = 
util.remove_objects_from_args(
             args, kwargs, pvalue.AsSideInput)
         self.raw_side_inputs = args, kwargs
       
         # Prevent name collisions with fns of the form '<function <lambda> at 
...>'
         self._cached_fn = self.fn
       
         # Ensure fn and side inputs are picklable for remote execution.
         try:
   >       self.fn = pickler.loads(pickler.dumps(self.fn))
   
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/ptransform.py:864:
 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   encoded = 
b'QlpoOTFBWSZTWWgkjdgAB3X/+/////v////1v///ov////rj7S4IAEAgAEAAAQQMAGAIn30+56Cp9vV573du719OefHOtoNs9HffPvDRCExA1MnqmaMk...dj9dkHpxsfg1V9mODbtHdYiyVO4UjKY1QG3OhDMGUiXjpbDNEUcrtRipaYUdeOKZH9XoMTt8LOY7Dtna2a4isad47qLu83S3xyJ/6f/F3JFOFCQaCSN2A='
   enable_trace = True, use_zlib = False
   
       def loads(encoded, enable_trace=True, use_zlib=False):
         """For internal use only; no backwards-compatibility guarantees."""
       
   >     return desired_pickle_lib.loads(
             encoded, enable_trace=enable_trace, use_zlib=use_zlib)
   
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/pickler.py:51:
 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   encoded = 
b'QlpoOTFBWSZTWWgkjdgAB3X/+/////v////1v///ov////rj7S4IAEAgAEAAAQQMAGAIn30+56Cp9vV573du719OefHOtoNs9HffPvDRCExA1MnqmaMk...dj9dkHpxsfg1V9mODbtHdYiyVO4UjKY1QG3OhDMGUiXjpbDNEUcrtRipaYUdeOKZH9XoMTt8LOY7Dtna2a4isad47qLu83S3xyJ/6f/F3JFOFCQaCSN2A='
   enable_trace = True, use_zlib = False
   
       def loads(encoded, enable_trace=True, use_zlib=False):
         """For internal use only; no backwards-compatibility guarantees."""
       
         c = base64.b64decode(encoded)
       
         if use_zlib:
           s = zlib.decompress(c)
         else:
           s = bz2.decompress(c)
       
         del c  # Free up some possibly large and no-longer-needed memory.
       
         with _pickle_lock:
           try:
             return dill.loads(s)
           except Exception:  # pylint: disable=broad-except
             if enable_trace:
               dill.dill._trace(True)  # pylint: disable=protected-access
   >           return dill.loads(s)
   
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py:289:
 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   str = 
b'\x80\x04\x95a\x11\x00\x00\x00\x00\x00\x00\x8c\x1bapache_beam.io.gcp.bigquery\x94\x8c\x0fBigQueryWriteFn\x94\x93\x94)...#streaming_api_logging_frequency_sec\x94M,\x01\x8c\x16ignore_unknown_columns\x94\x89\x8c\x0c_max_retries\x94M\x10\'ub.'
   ignore = None, kwds = {}, file = <_io.BytesIO object at 0x133d2d4f0>
   
       def loads(str, ignore=None, **kwds):
           """unpickle an object from a string"""
           file = StringIO(str)
   >       return load(file, ignore, **kwds)
   
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:275:
 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   file = <_io.BytesIO object at 0x133d2d4f0>, ignore = None, kwds = {}
   
       def load(file, ignore=None, **kwds):
           """unpickle an object from a file"""
   >       return Unpickler(file, ignore=ignore, **kwds).load()
   
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:270:
 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <dill._dill.Unpickler object at 0x133c90630>
   
       def load(self): #NOTE: if settings change, need to update attributes
   >       obj = StockUnpickler.load(self)
   
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:472:
 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 
0x133ca33d0>
   attr = '__setstate__'
   
       def __getattr__(self, attr):
         """Delegate attribute access to underlying google-auth credentials."""
   >     return getattr(self._google_auth_credentials, attr)
   
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119:
 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 
0x133ca33d0>
   attr = '_google_auth_credentials'
   
       def __getattr__(self, attr):
         """Delegate attribute access to underlying google-auth credentials."""
   >     return getattr(self._google_auth_credentials, attr)
   
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119:
 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 
0x133ca33d0>
   attr = '_google_auth_credentials'
   
       def __getattr__(self, attr):
         """Delegate attribute access to underlying google-auth credentials."""
   >     return getattr(self._google_auth_credentials, attr)
   E     RecursionError: maximum recursion depth exceeded while calling a 
Python object
   
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119:
 RecursionError
   !!! Recursion detected (same locals & position)
   
   During handling of the above exception, another exception occurred:
   
       def test_write_big_query():
           project_id = "test-project"
           dataset_id = 'poc-test-dataset'
           table_1_id = 'poc-test-table-1'
       
           pipeline_options = PipelineOptions(save_main_session=True)
           tp = TestPipeline(options=pipeline_options)
       
           x = tp | Create([
               {'type2': 'a', "full_name": 'firstname lastname', 'id2': 
str(uuid.uuid4())},
               {'type2': 'b', "full_addr": 'addr st, vic 3000', 'id2': 
str(uuid.uuid4())}
           ])
       
           from apache_beam.io.gcp.internal.clients.bigquery import BigqueryV2
           from apache_beam.internal.http_client import get_new_http
           from apache_beam.internal.gcp import auth
           # from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
           # bqc_ = BigqueryV2(url='http://localhost:9050/bigquery/v2/')
           # bq_test_client = BigQueryWrapper(client=bqc_)
       
           # test_client_bq = BigqueryV2(
           #     url='http://localhost:9050/bigquery/v2/'
           #     # http=get_new_http(),
           #     # credentials=auth.get_service_credentials(None),
           #     # response_encoding='utf8',
           #     # additional_http_headers={
           #     #     "user-agent": "apache-beam-x"
           #     # }
           # )
       
           test_client_bq = BigqueryV2(
               http=get_new_http(),
               credentials=auth.get_service_credentials(None),
               response_encoding='utf8',
               additional_http_headers={
                   "user-agent": "apache-beam-2.41.0"
               })
       
   >       x = x | WriteToBigQuery(table=table_resolve,
                                   dataset=dataset_id,
                                   project=project_id,
                                   test_client=test_client_bq,
                                   
method=WriteToBigQuery.Method.STREAMING_INSERTS, validate=False)
   
   test_io.py:340: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pvalue.py:137:
 in __or__
       return self.pipeline.apply(ptransform, self)
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pipeline.py:709:
 in apply
       pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:185:
 in apply
       return m(transform, input, options)
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:215:
 in apply_PTransform
       return transform.expand(input)
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery.py:2308:
 in expand
       outputs = pcoll | _StreamToBigQuery(
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pvalue.py:137:
 in __or__
       return self.pipeline.apply(ptransform, self)
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pipeline.py:709:
 in apply
       pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:185:
 in apply
       return m(transform, input, options)
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:215:
 in apply_PTransform
       return transform.expand(input)
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery.py:2031:
 in expand
       | 'StreamInsertRows' >> ParDo(
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/core.py:1416:
 in __init__
       super().__init__(fn, *args, **kwargs)
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <ParDo(PTransform) label=[ParDo(BigQueryWriteFn)] at 0x133ca9190>
   fn = <apache_beam.io.gcp.bigquery.BigQueryWriteFn object at 0x133ca36d0>
   args = (), kwargs = {}
   
       def __init__(self, fn, *args, **kwargs):
         # type: (WithTypeHints, *Any, **Any) -> None
         if isinstance(fn, type) and issubclass(fn, WithTypeHints):
           # Don't treat Fn class objects as callables.
           raise ValueError('Use %s() not %s.' % (fn.__name__, fn.__name__))
         self.fn = self.make_fn(fn, bool(args or kwargs))
         # Now that we figure out the label, initialize the super-class.
         super().__init__()
       
         if (any(isinstance(v, pvalue.PCollection) for v in args) or
             any(isinstance(v, pvalue.PCollection) for v in kwargs.values())):
           raise error.SideInputError(
               'PCollection used directly as side input argument. Specify '
               'AsIter(pcollection) or AsSingleton(pcollection) to indicate how 
the '
               'PCollection is to be used.')
         self.args, self.kwargs, self.side_inputs = 
util.remove_objects_from_args(
             args, kwargs, pvalue.AsSideInput)
         self.raw_side_inputs = args, kwargs
       
         # Prevent name collisions with fns of the form '<function <lambda> at 
...>'
         self._cached_fn = self.fn
       
         # Ensure fn and side inputs are picklable for remote execution.
         try:
           self.fn = pickler.loads(pickler.dumps(self.fn))
         except RuntimeError as e:
   >       raise RuntimeError('Unable to pickle fn %s: %s' % (self.fn, e))
   E       RuntimeError: Unable to pickle fn 
<apache_beam.io.gcp.bigquery.BigQueryWriteFn object at 0x133ca36d0>: maximum 
recursion depth exceeded while calling a Python object
   
   
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/ptransform.py:866:
 RuntimeError
   
   
   ```
   
   </details>
   
   
   ---
   
   
   ### ENV
   
   <details>
   
   <summary>pip3 freeze</summary>
   
   ```
   allure-pytest==2.9.45
   allure-python-commons==2.9.45
   anyio==3.6.1
   apache-beam==2.41.0
   appdirs==1.4.4
   attrs==22.1.0
   avro==1.11.0
   bigquery-schema-generator==1.5
   cachetools==4.2.4
   certifi==2022.6.15
   charset-normalizer==2.1.1
   click==8.1.3
   cloudevents==1.2.0
   cloudpickle==2.1.0
   confluent-kafka==1.8.2
   coverage==6.4.4
   crcmod==1.7
   deprecation==2.1.0
   dill==0.3.1.1
   docopt==0.6.2
   fastapi==0.79.0
   fastavro==1.6.0
   fasteners==0.18
   google-api-core==2.10.0
   google-apitools==0.5.32
   google-auth==2.11.0
   google-auth-httplib2==0.1.0
   google-cloud-bigquery==2.34.4
   google-cloud-bigquery-storage==2.13.2
   google-cloud-bigtable==1.7.2
   google-cloud-core==2.3.2
   google-cloud-datastore==1.15.5
   google-cloud-dlp==3.9.2
   google-cloud-language==1.3.2
   google-cloud-pubsub==2.13.9
   google-cloud-pubsublite==1.5.0
   google-cloud-recommendations-ai==0.7.1
   google-cloud-spanner==1.19.3
   google-cloud-videointelligence==1.16.3
   google-cloud-vision==1.0.2
   google-crc32c==1.5.0
   google-resumable-media==2.4.0
   googleapis-common-protos==1.56.4
   greenlet==1.1.3
   grpc-google-iam-v1==0.12.4
   grpcio==1.48.1
   grpcio-gcp==0.2.2
   grpcio-status==1.48.1
   h11==0.13.0
   hdfs==2.7.0
   httplib2==0.20.4
   idna==3.3
   importlib-metadata==4.12.0
   libcst==0.4.7
   more-itertools==8.14.0
   MouseInfo==0.1.3
   mypy-extensions==0.4.3
   numpy==1.22.4
   oauth2client==4.1.3
   orjson==3.7.12
   overrides==6.5.0
   packaging==21.3
   pandas==1.4.4
   Pillow==9.2.0
   pluggy==0.13.1
   proto-plus==1.22.0
   protobuf==3.20.3
   py==1.11.0
   pyarrow==7.0.0
   pyasn1==0.4.8
   pyasn1-modules==0.2.8
   PyAutoGUI==0.9.53
   pydantic==1.10.1
   pydot==1.4.2
   pyee==8.2.2
   PyGetWindow==0.0.9
   PyJWT==2.4.0
   pymongo==3.12.3
   PyMsgBox==1.0.9
   pyobjc==8.5.1
   pyobjc-core==8.5.1
   pyobjc-framework-Accessibility==8.5.1
   pyobjc-framework-Accounts==8.5.1
   pyobjc-framework-AddressBook==8.5.1
   pyobjc-framework-AdServices==8.5.1
   pyobjc-framework-AdSupport==8.5.1
   pyobjc-framework-AppleScriptKit==8.5.1
   pyobjc-framework-AppleScriptObjC==8.5.1
   pyobjc-framework-ApplicationServices==8.5.1
   pyobjc-framework-AppTrackingTransparency==8.5.1
   pyobjc-framework-AudioVideoBridging==8.5.1
   pyobjc-framework-AuthenticationServices==8.5.1
   pyobjc-framework-AutomaticAssessmentConfiguration==8.5.1
   pyobjc-framework-Automator==8.5.1
   pyobjc-framework-AVFoundation==8.5.1
   pyobjc-framework-AVKit==8.5.1
   pyobjc-framework-BusinessChat==8.5.1
   pyobjc-framework-CalendarStore==8.5.1
   pyobjc-framework-CallKit==8.5.1
   pyobjc-framework-CFNetwork==8.5.1
   pyobjc-framework-ClassKit==8.5.1
   pyobjc-framework-CloudKit==8.5.1
   pyobjc-framework-Cocoa==8.5.1
   pyobjc-framework-Collaboration==8.5.1
   pyobjc-framework-ColorSync==8.5.1
   pyobjc-framework-Contacts==8.5.1
   pyobjc-framework-ContactsUI==8.5.1
   pyobjc-framework-CoreAudio==8.5.1
   pyobjc-framework-CoreAudioKit==8.5.1
   pyobjc-framework-CoreBluetooth==8.5.1
   pyobjc-framework-CoreData==8.5.1
   pyobjc-framework-CoreHaptics==8.5.1
   pyobjc-framework-CoreLocation==8.5.1
   pyobjc-framework-CoreMedia==8.5.1
   pyobjc-framework-CoreMediaIO==8.5.1
   pyobjc-framework-CoreMIDI==8.5.1
   pyobjc-framework-CoreML==8.5.1
   pyobjc-framework-CoreMotion==8.5.1
   pyobjc-framework-CoreServices==8.5.1
   pyobjc-framework-CoreSpotlight==8.5.1
   pyobjc-framework-CoreText==8.5.1
   pyobjc-framework-CoreWLAN==8.5.1
   pyobjc-framework-CryptoTokenKit==8.5.1
   pyobjc-framework-DataDetection==8.5.1
   pyobjc-framework-DeviceCheck==8.5.1
   pyobjc-framework-DictionaryServices==8.5.1
   pyobjc-framework-DiscRecording==8.5.1
   pyobjc-framework-DiscRecordingUI==8.5.1
   pyobjc-framework-DiskArbitration==8.5.1
   pyobjc-framework-DVDPlayback==8.5.1
   pyobjc-framework-EventKit==8.5.1
   pyobjc-framework-ExceptionHandling==8.5.1
   pyobjc-framework-ExecutionPolicy==8.5.1
   pyobjc-framework-ExternalAccessory==8.5.1
   pyobjc-framework-FileProvider==8.5.1
   pyobjc-framework-FileProviderUI==8.5.1
   pyobjc-framework-FinderSync==8.5.1
   pyobjc-framework-FSEvents==8.5.1
   pyobjc-framework-GameCenter==8.5.1
   pyobjc-framework-GameController==8.5.1
   pyobjc-framework-GameKit==8.5.1
   pyobjc-framework-GameplayKit==8.5.1
   pyobjc-framework-ImageCaptureCore==8.5.1
   pyobjc-framework-IMServicePlugIn==8.5.1
   pyobjc-framework-InputMethodKit==8.5.1
   pyobjc-framework-InstallerPlugins==8.5.1
   pyobjc-framework-InstantMessage==8.5.1
   pyobjc-framework-Intents==8.5.1
   pyobjc-framework-IntentsUI==8.5.1
   pyobjc-framework-IOSurface==8.5.1
   pyobjc-framework-iTunesLibrary==8.5.1
   pyobjc-framework-KernelManagement==8.5.1
   pyobjc-framework-LatentSemanticMapping==8.5.1
   pyobjc-framework-LaunchServices==8.5.1
   pyobjc-framework-libdispatch==8.5.1
   pyobjc-framework-LinkPresentation==8.5.1
   pyobjc-framework-LocalAuthentication==8.5.1
   pyobjc-framework-LocalAuthenticationEmbeddedUI==8.5.1
   pyobjc-framework-MailKit==8.5.1
   pyobjc-framework-MapKit==8.5.1
   pyobjc-framework-MediaAccessibility==8.5.1
   pyobjc-framework-MediaLibrary==8.5.1
   pyobjc-framework-MediaPlayer==8.5.1
   pyobjc-framework-MediaToolbox==8.5.1
   pyobjc-framework-Metal==8.5.1
   pyobjc-framework-MetalKit==8.5.1
   pyobjc-framework-MetalPerformanceShaders==8.5.1
   pyobjc-framework-MetalPerformanceShadersGraph==8.5.1
   pyobjc-framework-MetricKit==8.5.1
   pyobjc-framework-MLCompute==8.5.1
   pyobjc-framework-ModelIO==8.5.1
   pyobjc-framework-MultipeerConnectivity==8.5.1
   pyobjc-framework-NaturalLanguage==8.5.1
   pyobjc-framework-NetFS==8.5.1
   pyobjc-framework-Network==8.5.1
   pyobjc-framework-NetworkExtension==8.5.1
   pyobjc-framework-NotificationCenter==8.5.1
   pyobjc-framework-OpenDirectory==8.5.1
   pyobjc-framework-OSAKit==8.5.1
   pyobjc-framework-OSLog==8.5.1
   pyobjc-framework-PassKit==8.5.1
   pyobjc-framework-PencilKit==8.5.1
   pyobjc-framework-Photos==8.5.1
   pyobjc-framework-PhotosUI==8.5.1
   pyobjc-framework-PreferencePanes==8.5.1
   pyobjc-framework-PushKit==8.5.1
   pyobjc-framework-Quartz==8.5.1
   pyobjc-framework-QuickLookThumbnailing==8.5.1
   pyobjc-framework-ReplayKit==8.5.1
   pyobjc-framework-SafariServices==8.5.1
   pyobjc-framework-SceneKit==8.5.1
   pyobjc-framework-ScreenCaptureKit==8.5.1
   pyobjc-framework-ScreenSaver==8.5.1
   pyobjc-framework-ScreenTime==8.5.1
   pyobjc-framework-ScriptingBridge==8.5.1
   pyobjc-framework-SearchKit==8.5.1
   pyobjc-framework-Security==8.5.1
   pyobjc-framework-SecurityFoundation==8.5.1
   pyobjc-framework-SecurityInterface==8.5.1
   pyobjc-framework-ServiceManagement==8.5.1
   pyobjc-framework-ShazamKit==8.5.1
   pyobjc-framework-Social==8.5.1
   pyobjc-framework-SoundAnalysis==8.5.1
   pyobjc-framework-Speech==8.5.1
   pyobjc-framework-SpriteKit==8.5.1
   pyobjc-framework-StoreKit==8.5.1
   pyobjc-framework-SyncServices==8.5.1
   pyobjc-framework-SystemConfiguration==8.5.1
   pyobjc-framework-SystemExtensions==8.5.1
   pyobjc-framework-UniformTypeIdentifiers==8.5.1
   pyobjc-framework-UserNotifications==8.5.1
   pyobjc-framework-UserNotificationsUI==8.5.1
   pyobjc-framework-VideoSubscriberAccount==8.5.1
   pyobjc-framework-VideoToolbox==8.5.1
   pyobjc-framework-Virtualization==8.5.1
   pyobjc-framework-Vision==8.5.1
   pyobjc-framework-WebKit==8.5.1
   pyparsing==3.0.9
   pyperclip==1.8.2
   pyppeteer==1.0.2
   PyRect==0.2.0
   PyScreeze==0.1.28
   pytest==5.4.3
   pytest-order==1.0.1
   python-dateutil==2.8.2
   pytweening==1.0.4
   pytz==2022.2.1
   PyYAML==6.0
   regex==2022.9.13
   requests==2.28.1
   rsa==4.9
   rubicon-objc==0.4.2
   six==1.16.0
   sniffio==1.3.0
   SQLAlchemy==1.4.39
   sqlparse==0.4.2
   starlette==0.19.1
   tqdm==4.64.1
   typing-inspect==0.8.0
   typing_extensions==4.3.0
   urllib3==1.26.11
   uvicorn==0.18.2
   wcwidth==0.2.5
   websockets==10.3
   zipp==3.8.1
   zstandard==0.18.0
   ```
   </details>
   
   ---
   
   ### Other context
   
   
   <details>
   
   <summary>code to set up bigquery emulator</summary>
   
   `docker-compose` to set up the emulator
   ```
   version: '3.6'
   services:
     bigquery-emulator:
       hostname: bigquery
       container_name: bigquery
       image: ghcr.io/goccy/bigquery-emulator:latest
       ports:
         - 9050:9050
       command: --project=${PROJECT_ID} --port=${BIGQUERY_PORT}
   ```
   
   The python code to setup dataset/table
   ```
   def set_up_bq_instance():
       from google.cloud.bigquery import TableReference, DatasetReference
   
       from google.api_core.client_options import ClientOptions
       from google.auth.credentials import AnonymousCredentials
       from google.cloud import bigquery
   
       project_id = "test-project"
   
       client_options = ClientOptions(api_endpoint="http://localhost:9050";)
       bq_test_client = bq_test_client = bigquery.Client(
           project=project_id,
           client_options=client_options,
           credentials=AnonymousCredentials(),
       )
   
   
       dataset_ref = DatasetReference(project=project_id, 
dataset_id='poc-test-dataset')
   
       try:
           bq_test_client.create_dataset(dataset=dataset_ref,
                                         exists_ok=True,
                                         retry=None)
       except InternalServerError:
           pass
   
       table_ref_1 = TableReference(dataset_ref=dataset_ref, 
table_id='poc-test-table-1')
       table_ref_2 = TableReference(dataset_ref=dataset_ref, 
table_id='poc-test-table-2')
   
       schema_1 = [
           bigquery.SchemaField("type2", "STRING", mode="REQUIRED"),
           bigquery.SchemaField("full_name", "STRING", mode="REQUIRED"),
           bigquery.SchemaField("id2", "STRING", mode="REQUIRED"),
       ]
   
       schema_2 = [
           bigquery.SchemaField("type2", "STRING", mode="REQUIRED"),
           bigquery.SchemaField("full_addr", "STRING", mode="REQUIRED"),
           bigquery.SchemaField("id2", "STRING", mode="REQUIRED"),
       ]
   
       table_1 = bigquery.Table(table_ref=table_ref_1, schema=schema_1)
       table_2 = bigquery.Table(table_ref=table_ref_2, schema=schema_2)
   
       try:
           _ = bq_test_client.create_table(table_1, exists_ok=True, retry=None)
       except InternalServerError:
           pass
   
       try:
           _ = bq_test_client.create_table(table_2, exists_ok=True, retry=None)
       except InternalServerError:
           pass
   
       list_dataset = list(bq_test_client.list_datasets())
       list_table = list(bq_test_client.list_tables(dataset_ref))
       return bq_test_client
   ```
   </details>
   
   
   
   
   
   ### Issue Priority
   
   Priority: 2
   
   ### Issue Component
   
   Component: io-py-gcp


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to