ruodingt opened a new issue, #23598: URL: https://github.com/apache/beam/issues/23598
### What happened? Thanks for reviewing my issue:) I am trying to use the `test_client` arg while declare an `WriteToBigQuery` instance and hopefully I can make the bq client work with big query emulator. I did notice this one: https://github.com/apache/beam/blob/786ba8b54c023d6f7a24cbf8321a46f329ca7027/sdks/python/apache_beam/io/gcp/bigquery.py#L1443 which lead to an assumption: `test_client` is an instance of `BigqueryV2` Here is the pipeline code: ``` def test_write_big_query(): project_id = "test-project" dataset_id = 'poc-test-dataset' table_1_id = 'poc-test-table-1' pipeline_options = PipelineOptions(save_main_session=True) tp = TestPipeline(options=pipeline_options) x = tp | Create([ {'type2': 'a', "full_name": 'firstname lastname', 'id2': str(uuid.uuid4())}, {'type2': 'b', "full_addr": 'addr st, vic 3000', 'id2': str(uuid.uuid4())} ]) from apache_beam.io.gcp.internal.clients.bigquery import BigqueryV2 from apache_beam.internal.http_client import get_new_http from apache_beam.internal.gcp import auth test_client_bq = BigqueryV2(... # I tried different versions her and trying to make this work x = x | WriteToBigQuery(table=table_resolve, dataset=dataset_id, project=project_id, test_client=test_client_bq, method=WriteToBigQuery.Method.STREAMING_INSERTS, validate=False) tp.run() ``` I keeps getting trouble while trying different ways to construct a working `BigqueryV2` instance but it keeps failing... ### Experiment 1 `test_client_bq` is defined as : ``` test_client_bq = BigqueryV2(url='http://localhost:9050/bigquery/v2/') ``` <details> <summary>error msg</summary> ``` unit/test_io.py:301 (test_write_big_query) encoded = b'QlpoOTFBWSZTWURKFI8ABwF/6v////ve///1v///ov////pz5K4AIABIAEU1YAgfelh9D32+fON6ALc55NTzVnlLoaSGphoU2p6ZRP0DU0ynqPE0ymEa...T9hSQuqjjMVZ+10rH+O4z7uw6tc2/eQw6eVNTFVY9LZqrermFypf6cBJfsdjNmqjKJjfaP/RsCy366TEl+MGJiyaqHmRJ+5hGaEiD/4u5IpwoSCIlCkeA=' enable_trace = True, use_zlib = False def loads(encoded, enable_trace=True, use_zlib=False): """For internal use only; no backwards-compatibility guarantees.""" c = base64.b64decode(encoded) if use_zlib: s = zlib.decompress(c) else: s = bz2.decompress(c) del c # Free up some possibly large and no-longer-needed memory. with _pickle_lock: try: > return dill.loads(s) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py:285: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ str = b'\x80\x04\x956\x10\x00\x00\x00\x00\x00\x00\x8c\x1bapache_beam.io.gcp.bigquery\x94\x8c\x0fBigQueryWriteFn\x94\x93\x94)...#streaming_api_logging_frequency_sec\x94M,\x01\x8c\x16ignore_unknown_columns\x94\x89\x8c\x0c_max_retries\x94M\x10\'ub.' ignore = None, kwds = {}, file = <_io.BytesIO object at 0x13392df90> def loads(str, ignore=None, **kwds): """unpickle an object from a string""" file = StringIO(str) > return load(file, ignore, **kwds) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:275: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ file = <_io.BytesIO object at 0x13392df90>, ignore = None, kwds = {} def load(file, ignore=None, **kwds): """unpickle an object from a file""" > return Unpickler(file, ignore=ignore, **kwds).load() /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:270: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <dill._dill.Unpickler object at 0x133a08630> def load(self): #NOTE: if settings change, need to update attributes > obj = StockUnpickler.load(self) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:472: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = [], sequence = ['*/*'] def extend(self, sequence): """Validate extension of list.""" > self.__field.validate(sequence) E AttributeError: 'FieldList' object has no attribute '_FieldList__field' /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apitools/base/protorpclite/messages.py:1147: AttributeError During handling of the above exception, another exception occurred: def test_write_big_query(): project_id = "test-project" dataset_id = 'poc-test-dataset' table_1_id = 'poc-test-table-1' pipeline_options = PipelineOptions(save_main_session=True) tp = TestPipeline(options=pipeline_options) x = tp | Create([ {'type2': 'a', "full_name": 'firstname lastname', 'id2': str(uuid.uuid4())}, {'type2': 'b', "full_addr": 'addr st, vic 3000', 'id2': str(uuid.uuid4())} ]) from apache_beam.io.gcp.internal.clients.bigquery import BigqueryV2 from apache_beam.internal.http_client import get_new_http from apache_beam.internal.gcp import auth # from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper # bqc_ = BigqueryV2(url='http://localhost:9050/bigquery/v2/') # bq_test_client = BigQueryWrapper(client=bqc_) test_client_bq = BigqueryV2( url='http://localhost:9050/bigquery/v2/' # http=get_new_http(), # credentials=auth.get_service_credentials(None), # response_encoding='utf8', # additional_http_headers={ # "user-agent": "apache-beam-x" # } ) > x = x | WriteToBigQuery(table=table_resolve, dataset=dataset_id, project=project_id, test_client=test_client_bq, method=WriteToBigQuery.Method.STREAMING_INSERTS, validate=False) test_io.py:332: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pvalue.py:137: in __or__ return self.pipeline.apply(ptransform, self) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pipeline.py:709: in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:185: in apply return m(transform, input, options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:215: in apply_PTransform return transform.expand(input) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery.py:2308: in expand outputs = pcoll | _StreamToBigQuery( /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pvalue.py:137: in __or__ return self.pipeline.apply(ptransform, self) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pipeline.py:709: in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:185: in apply return m(transform, input, options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:215: in apply_PTransform return transform.expand(input) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery.py:2031: in expand | 'StreamInsertRows' >> ParDo( /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/core.py:1416: in __init__ super().__init__(fn, *args, **kwargs) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/ptransform.py:864: in __init__ self.fn = pickler.loads(pickler.dumps(self.fn)) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/pickler.py:51: in loads return desired_pickle_lib.loads( /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py:289: in loads return dill.loads(s) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:275: in loads return load(file, ignore, **kwds) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:270: in load return Unpickler(file, ignore=ignore, **kwds).load() /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:472: in load obj = StockUnpickler.load(self) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = [], sequence = ['*/*'] def extend(self, sequence): """Validate extension of list.""" > self.__field.validate(sequence) E AttributeError: 'FieldList' object has no attribute '_FieldList__field' /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apitools/base/protorpclite/messages.py:1147: AttributeError ``` </details> --- ### Experiment 2 since I see this line: https://github.com/apache/beam/blob/786ba8b54c023d6f7a24cbf8321a46f329ca7027/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L332 I put `test_client_bq` as : ``` bigquery.BigqueryV2( http=get_new_http(), credentials=auth.get_service_credentials(None), response_encoding='utf8', additional_http_headers={ "user-agent": "apache-beam-%s" % apache_beam.__version__ }) ``` <details> <summary>error</summary> ``` unit/test_io.py:301 (test_write_big_query) encoded = b'QlpoOTFBWSZTWWgkjdgAB3X/+/////v////1v///ov////rj7S4IAEAgAEAAAQQMAGAIn30+56Cp9vV573du719OefHOtoNs9HffPvDRCExA1MnqmaMk...dj9dkHpxsfg1V9mODbtHdYiyVO4UjKY1QG3OhDMGUiXjpbDNEUcrtRipaYUdeOKZH9XoMTt8LOY7Dtna2a4isad47qLu83S3xyJ/6f/F3JFOFCQaCSN2A=' enable_trace = True, use_zlib = False def loads(encoded, enable_trace=True, use_zlib=False): """For internal use only; no backwards-compatibility guarantees.""" c = base64.b64decode(encoded) if use_zlib: s = zlib.decompress(c) else: s = bz2.decompress(c) del c # Free up some possibly large and no-longer-needed memory. with _pickle_lock: try: > return dill.loads(s) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py:285: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ str = b'\x80\x04\x95a\x11\x00\x00\x00\x00\x00\x00\x8c\x1bapache_beam.io.gcp.bigquery\x94\x8c\x0fBigQueryWriteFn\x94\x93\x94)...#streaming_api_logging_frequency_sec\x94M,\x01\x8c\x16ignore_unknown_columns\x94\x89\x8c\x0c_max_retries\x94M\x10\'ub.' ignore = None, kwds = {}, file = <_io.BytesIO object at 0x133cd2a40> def loads(str, ignore=None, **kwds): """unpickle an object from a string""" file = StringIO(str) > return load(file, ignore, **kwds) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:275: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ file = <_io.BytesIO object at 0x133cd2a40>, ignore = None, kwds = {} def load(file, ignore=None, **kwds): """unpickle an object from a file""" > return Unpickler(file, ignore=ignore, **kwds).load() /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:270: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <dill._dill.Unpickler object at 0x133c90500> def load(self): #NOTE: if settings change, need to update attributes > obj = StockUnpickler.load(self) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:472: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 0x133ca9880> attr = '__setstate__' def __getattr__(self, attr): """Delegate attribute access to underlying google-auth credentials.""" > return getattr(self._google_auth_credentials, attr) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 0x133ca9880> attr = '_google_auth_credentials' def __getattr__(self, attr): """Delegate attribute access to underlying google-auth credentials.""" > return getattr(self._google_auth_credentials, attr) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 0x133ca9880> attr = '_google_auth_credentials' def __getattr__(self, attr): """Delegate attribute access to underlying google-auth credentials.""" > return getattr(self._google_auth_credentials, attr) E RecursionError: maximum recursion depth exceeded /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: RecursionError !!! Recursion detected (same locals & position) During handling of the above exception, another exception occurred: self = <ParDo(PTransform) label=[ParDo(BigQueryWriteFn)] at 0x133ca9190> fn = <apache_beam.io.gcp.bigquery.BigQueryWriteFn object at 0x133ca36d0> args = (), kwargs = {} def __init__(self, fn, *args, **kwargs): # type: (WithTypeHints, *Any, **Any) -> None if isinstance(fn, type) and issubclass(fn, WithTypeHints): # Don't treat Fn class objects as callables. raise ValueError('Use %s() not %s.' % (fn.__name__, fn.__name__)) self.fn = self.make_fn(fn, bool(args or kwargs)) # Now that we figure out the label, initialize the super-class. super().__init__() if (any(isinstance(v, pvalue.PCollection) for v in args) or any(isinstance(v, pvalue.PCollection) for v in kwargs.values())): raise error.SideInputError( 'PCollection used directly as side input argument. Specify ' 'AsIter(pcollection) or AsSingleton(pcollection) to indicate how the ' 'PCollection is to be used.') self.args, self.kwargs, self.side_inputs = util.remove_objects_from_args( args, kwargs, pvalue.AsSideInput) self.raw_side_inputs = args, kwargs # Prevent name collisions with fns of the form '<function <lambda> at ...>' self._cached_fn = self.fn # Ensure fn and side inputs are picklable for remote execution. try: > self.fn = pickler.loads(pickler.dumps(self.fn)) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/ptransform.py:864: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ encoded = b'QlpoOTFBWSZTWWgkjdgAB3X/+/////v////1v///ov////rj7S4IAEAgAEAAAQQMAGAIn30+56Cp9vV573du719OefHOtoNs9HffPvDRCExA1MnqmaMk...dj9dkHpxsfg1V9mODbtHdYiyVO4UjKY1QG3OhDMGUiXjpbDNEUcrtRipaYUdeOKZH9XoMTt8LOY7Dtna2a4isad47qLu83S3xyJ/6f/F3JFOFCQaCSN2A=' enable_trace = True, use_zlib = False def loads(encoded, enable_trace=True, use_zlib=False): """For internal use only; no backwards-compatibility guarantees.""" > return desired_pickle_lib.loads( encoded, enable_trace=enable_trace, use_zlib=use_zlib) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/pickler.py:51: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ encoded = b'QlpoOTFBWSZTWWgkjdgAB3X/+/////v////1v///ov////rj7S4IAEAgAEAAAQQMAGAIn30+56Cp9vV573du719OefHOtoNs9HffPvDRCExA1MnqmaMk...dj9dkHpxsfg1V9mODbtHdYiyVO4UjKY1QG3OhDMGUiXjpbDNEUcrtRipaYUdeOKZH9XoMTt8LOY7Dtna2a4isad47qLu83S3xyJ/6f/F3JFOFCQaCSN2A=' enable_trace = True, use_zlib = False def loads(encoded, enable_trace=True, use_zlib=False): """For internal use only; no backwards-compatibility guarantees.""" c = base64.b64decode(encoded) if use_zlib: s = zlib.decompress(c) else: s = bz2.decompress(c) del c # Free up some possibly large and no-longer-needed memory. with _pickle_lock: try: return dill.loads(s) except Exception: # pylint: disable=broad-except if enable_trace: dill.dill._trace(True) # pylint: disable=protected-access > return dill.loads(s) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py:289: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ str = b'\x80\x04\x95a\x11\x00\x00\x00\x00\x00\x00\x8c\x1bapache_beam.io.gcp.bigquery\x94\x8c\x0fBigQueryWriteFn\x94\x93\x94)...#streaming_api_logging_frequency_sec\x94M,\x01\x8c\x16ignore_unknown_columns\x94\x89\x8c\x0c_max_retries\x94M\x10\'ub.' ignore = None, kwds = {}, file = <_io.BytesIO object at 0x133d2d4f0> def loads(str, ignore=None, **kwds): """unpickle an object from a string""" file = StringIO(str) > return load(file, ignore, **kwds) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:275: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ file = <_io.BytesIO object at 0x133d2d4f0>, ignore = None, kwds = {} def load(file, ignore=None, **kwds): """unpickle an object from a file""" > return Unpickler(file, ignore=ignore, **kwds).load() /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:270: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <dill._dill.Unpickler object at 0x133c90630> def load(self): #NOTE: if settings change, need to update attributes > obj = StockUnpickler.load(self) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:472: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 0x133ca33d0> attr = '__setstate__' def __getattr__(self, attr): """Delegate attribute access to underlying google-auth credentials.""" > return getattr(self._google_auth_credentials, attr) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 0x133ca33d0> attr = '_google_auth_credentials' def __getattr__(self, attr): """Delegate attribute access to underlying google-auth credentials.""" > return getattr(self._google_auth_credentials, attr) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 0x133ca33d0> attr = '_google_auth_credentials' def __getattr__(self, attr): """Delegate attribute access to underlying google-auth credentials.""" > return getattr(self._google_auth_credentials, attr) E RecursionError: maximum recursion depth exceeded while calling a Python object /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: RecursionError !!! Recursion detected (same locals & position) During handling of the above exception, another exception occurred: def test_write_big_query(): project_id = "test-project" dataset_id = 'poc-test-dataset' table_1_id = 'poc-test-table-1' pipeline_options = PipelineOptions(save_main_session=True) tp = TestPipeline(options=pipeline_options) x = tp | Create([ {'type2': 'a', "full_name": 'firstname lastname', 'id2': str(uuid.uuid4())}, {'type2': 'b', "full_addr": 'addr st, vic 3000', 'id2': str(uuid.uuid4())} ]) from apache_beam.io.gcp.internal.clients.bigquery import BigqueryV2 from apache_beam.internal.http_client import get_new_http from apache_beam.internal.gcp import auth # from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper # bqc_ = BigqueryV2(url='http://localhost:9050/bigquery/v2/') # bq_test_client = BigQueryWrapper(client=bqc_) # test_client_bq = BigqueryV2( # url='http://localhost:9050/bigquery/v2/' # # http=get_new_http(), # # credentials=auth.get_service_credentials(None), # # response_encoding='utf8', # # additional_http_headers={ # # "user-agent": "apache-beam-x" # # } # ) test_client_bq = BigqueryV2( http=get_new_http(), credentials=auth.get_service_credentials(None), response_encoding='utf8', additional_http_headers={ "user-agent": "apache-beam-2.41.0" }) > x = x | WriteToBigQuery(table=table_resolve, dataset=dataset_id, project=project_id, test_client=test_client_bq, method=WriteToBigQuery.Method.STREAMING_INSERTS, validate=False) test_io.py:340: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pvalue.py:137: in __or__ return self.pipeline.apply(ptransform, self) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pipeline.py:709: in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:185: in apply return m(transform, input, options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:215: in apply_PTransform return transform.expand(input) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery.py:2308: in expand outputs = pcoll | _StreamToBigQuery( /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pvalue.py:137: in __or__ return self.pipeline.apply(ptransform, self) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pipeline.py:709: in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:185: in apply return m(transform, input, options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:215: in apply_PTransform return transform.expand(input) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery.py:2031: in expand | 'StreamInsertRows' >> ParDo( /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/core.py:1416: in __init__ super().__init__(fn, *args, **kwargs) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <ParDo(PTransform) label=[ParDo(BigQueryWriteFn)] at 0x133ca9190> fn = <apache_beam.io.gcp.bigquery.BigQueryWriteFn object at 0x133ca36d0> args = (), kwargs = {} def __init__(self, fn, *args, **kwargs): # type: (WithTypeHints, *Any, **Any) -> None if isinstance(fn, type) and issubclass(fn, WithTypeHints): # Don't treat Fn class objects as callables. raise ValueError('Use %s() not %s.' % (fn.__name__, fn.__name__)) self.fn = self.make_fn(fn, bool(args or kwargs)) # Now that we figure out the label, initialize the super-class. super().__init__() if (any(isinstance(v, pvalue.PCollection) for v in args) or any(isinstance(v, pvalue.PCollection) for v in kwargs.values())): raise error.SideInputError( 'PCollection used directly as side input argument. Specify ' 'AsIter(pcollection) or AsSingleton(pcollection) to indicate how the ' 'PCollection is to be used.') self.args, self.kwargs, self.side_inputs = util.remove_objects_from_args( args, kwargs, pvalue.AsSideInput) self.raw_side_inputs = args, kwargs # Prevent name collisions with fns of the form '<function <lambda> at ...>' self._cached_fn = self.fn # Ensure fn and side inputs are picklable for remote execution. try: self.fn = pickler.loads(pickler.dumps(self.fn)) except RuntimeError as e: > raise RuntimeError('Unable to pickle fn %s: %s' % (self.fn, e)) E RuntimeError: Unable to pickle fn <apache_beam.io.gcp.bigquery.BigQueryWriteFn object at 0x133ca36d0>: maximum recursion depth exceeded while calling a Python object /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/ptransform.py:866: RuntimeError ``` </details> --- ### ENV <details> <summary>pip3 freeze</summary> ``` allure-pytest==2.9.45 allure-python-commons==2.9.45 anyio==3.6.1 apache-beam==2.41.0 appdirs==1.4.4 attrs==22.1.0 avro==1.11.0 bigquery-schema-generator==1.5 cachetools==4.2.4 certifi==2022.6.15 charset-normalizer==2.1.1 click==8.1.3 cloudevents==1.2.0 cloudpickle==2.1.0 confluent-kafka==1.8.2 coverage==6.4.4 crcmod==1.7 deprecation==2.1.0 dill==0.3.1.1 docopt==0.6.2 fastapi==0.79.0 fastavro==1.6.0 fasteners==0.18 google-api-core==2.10.0 google-apitools==0.5.32 google-auth==2.11.0 google-auth-httplib2==0.1.0 google-cloud-bigquery==2.34.4 google-cloud-bigquery-storage==2.13.2 google-cloud-bigtable==1.7.2 google-cloud-core==2.3.2 google-cloud-datastore==1.15.5 google-cloud-dlp==3.9.2 google-cloud-language==1.3.2 google-cloud-pubsub==2.13.9 google-cloud-pubsublite==1.5.0 google-cloud-recommendations-ai==0.7.1 google-cloud-spanner==1.19.3 google-cloud-videointelligence==1.16.3 google-cloud-vision==1.0.2 google-crc32c==1.5.0 google-resumable-media==2.4.0 googleapis-common-protos==1.56.4 greenlet==1.1.3 grpc-google-iam-v1==0.12.4 grpcio==1.48.1 grpcio-gcp==0.2.2 grpcio-status==1.48.1 h11==0.13.0 hdfs==2.7.0 httplib2==0.20.4 idna==3.3 importlib-metadata==4.12.0 libcst==0.4.7 more-itertools==8.14.0 MouseInfo==0.1.3 mypy-extensions==0.4.3 numpy==1.22.4 oauth2client==4.1.3 orjson==3.7.12 overrides==6.5.0 packaging==21.3 pandas==1.4.4 Pillow==9.2.0 pluggy==0.13.1 proto-plus==1.22.0 protobuf==3.20.3 py==1.11.0 pyarrow==7.0.0 pyasn1==0.4.8 pyasn1-modules==0.2.8 PyAutoGUI==0.9.53 pydantic==1.10.1 pydot==1.4.2 pyee==8.2.2 PyGetWindow==0.0.9 PyJWT==2.4.0 pymongo==3.12.3 PyMsgBox==1.0.9 pyobjc==8.5.1 pyobjc-core==8.5.1 pyobjc-framework-Accessibility==8.5.1 pyobjc-framework-Accounts==8.5.1 pyobjc-framework-AddressBook==8.5.1 pyobjc-framework-AdServices==8.5.1 pyobjc-framework-AdSupport==8.5.1 pyobjc-framework-AppleScriptKit==8.5.1 pyobjc-framework-AppleScriptObjC==8.5.1 pyobjc-framework-ApplicationServices==8.5.1 pyobjc-framework-AppTrackingTransparency==8.5.1 pyobjc-framework-AudioVideoBridging==8.5.1 pyobjc-framework-AuthenticationServices==8.5.1 pyobjc-framework-AutomaticAssessmentConfiguration==8.5.1 pyobjc-framework-Automator==8.5.1 pyobjc-framework-AVFoundation==8.5.1 pyobjc-framework-AVKit==8.5.1 pyobjc-framework-BusinessChat==8.5.1 pyobjc-framework-CalendarStore==8.5.1 pyobjc-framework-CallKit==8.5.1 pyobjc-framework-CFNetwork==8.5.1 pyobjc-framework-ClassKit==8.5.1 pyobjc-framework-CloudKit==8.5.1 pyobjc-framework-Cocoa==8.5.1 pyobjc-framework-Collaboration==8.5.1 pyobjc-framework-ColorSync==8.5.1 pyobjc-framework-Contacts==8.5.1 pyobjc-framework-ContactsUI==8.5.1 pyobjc-framework-CoreAudio==8.5.1 pyobjc-framework-CoreAudioKit==8.5.1 pyobjc-framework-CoreBluetooth==8.5.1 pyobjc-framework-CoreData==8.5.1 pyobjc-framework-CoreHaptics==8.5.1 pyobjc-framework-CoreLocation==8.5.1 pyobjc-framework-CoreMedia==8.5.1 pyobjc-framework-CoreMediaIO==8.5.1 pyobjc-framework-CoreMIDI==8.5.1 pyobjc-framework-CoreML==8.5.1 pyobjc-framework-CoreMotion==8.5.1 pyobjc-framework-CoreServices==8.5.1 pyobjc-framework-CoreSpotlight==8.5.1 pyobjc-framework-CoreText==8.5.1 pyobjc-framework-CoreWLAN==8.5.1 pyobjc-framework-CryptoTokenKit==8.5.1 pyobjc-framework-DataDetection==8.5.1 pyobjc-framework-DeviceCheck==8.5.1 pyobjc-framework-DictionaryServices==8.5.1 pyobjc-framework-DiscRecording==8.5.1 pyobjc-framework-DiscRecordingUI==8.5.1 pyobjc-framework-DiskArbitration==8.5.1 pyobjc-framework-DVDPlayback==8.5.1 pyobjc-framework-EventKit==8.5.1 pyobjc-framework-ExceptionHandling==8.5.1 pyobjc-framework-ExecutionPolicy==8.5.1 pyobjc-framework-ExternalAccessory==8.5.1 pyobjc-framework-FileProvider==8.5.1 pyobjc-framework-FileProviderUI==8.5.1 pyobjc-framework-FinderSync==8.5.1 pyobjc-framework-FSEvents==8.5.1 pyobjc-framework-GameCenter==8.5.1 pyobjc-framework-GameController==8.5.1 pyobjc-framework-GameKit==8.5.1 pyobjc-framework-GameplayKit==8.5.1 pyobjc-framework-ImageCaptureCore==8.5.1 pyobjc-framework-IMServicePlugIn==8.5.1 pyobjc-framework-InputMethodKit==8.5.1 pyobjc-framework-InstallerPlugins==8.5.1 pyobjc-framework-InstantMessage==8.5.1 pyobjc-framework-Intents==8.5.1 pyobjc-framework-IntentsUI==8.5.1 pyobjc-framework-IOSurface==8.5.1 pyobjc-framework-iTunesLibrary==8.5.1 pyobjc-framework-KernelManagement==8.5.1 pyobjc-framework-LatentSemanticMapping==8.5.1 pyobjc-framework-LaunchServices==8.5.1 pyobjc-framework-libdispatch==8.5.1 pyobjc-framework-LinkPresentation==8.5.1 pyobjc-framework-LocalAuthentication==8.5.1 pyobjc-framework-LocalAuthenticationEmbeddedUI==8.5.1 pyobjc-framework-MailKit==8.5.1 pyobjc-framework-MapKit==8.5.1 pyobjc-framework-MediaAccessibility==8.5.1 pyobjc-framework-MediaLibrary==8.5.1 pyobjc-framework-MediaPlayer==8.5.1 pyobjc-framework-MediaToolbox==8.5.1 pyobjc-framework-Metal==8.5.1 pyobjc-framework-MetalKit==8.5.1 pyobjc-framework-MetalPerformanceShaders==8.5.1 pyobjc-framework-MetalPerformanceShadersGraph==8.5.1 pyobjc-framework-MetricKit==8.5.1 pyobjc-framework-MLCompute==8.5.1 pyobjc-framework-ModelIO==8.5.1 pyobjc-framework-MultipeerConnectivity==8.5.1 pyobjc-framework-NaturalLanguage==8.5.1 pyobjc-framework-NetFS==8.5.1 pyobjc-framework-Network==8.5.1 pyobjc-framework-NetworkExtension==8.5.1 pyobjc-framework-NotificationCenter==8.5.1 pyobjc-framework-OpenDirectory==8.5.1 pyobjc-framework-OSAKit==8.5.1 pyobjc-framework-OSLog==8.5.1 pyobjc-framework-PassKit==8.5.1 pyobjc-framework-PencilKit==8.5.1 pyobjc-framework-Photos==8.5.1 pyobjc-framework-PhotosUI==8.5.1 pyobjc-framework-PreferencePanes==8.5.1 pyobjc-framework-PushKit==8.5.1 pyobjc-framework-Quartz==8.5.1 pyobjc-framework-QuickLookThumbnailing==8.5.1 pyobjc-framework-ReplayKit==8.5.1 pyobjc-framework-SafariServices==8.5.1 pyobjc-framework-SceneKit==8.5.1 pyobjc-framework-ScreenCaptureKit==8.5.1 pyobjc-framework-ScreenSaver==8.5.1 pyobjc-framework-ScreenTime==8.5.1 pyobjc-framework-ScriptingBridge==8.5.1 pyobjc-framework-SearchKit==8.5.1 pyobjc-framework-Security==8.5.1 pyobjc-framework-SecurityFoundation==8.5.1 pyobjc-framework-SecurityInterface==8.5.1 pyobjc-framework-ServiceManagement==8.5.1 pyobjc-framework-ShazamKit==8.5.1 pyobjc-framework-Social==8.5.1 pyobjc-framework-SoundAnalysis==8.5.1 pyobjc-framework-Speech==8.5.1 pyobjc-framework-SpriteKit==8.5.1 pyobjc-framework-StoreKit==8.5.1 pyobjc-framework-SyncServices==8.5.1 pyobjc-framework-SystemConfiguration==8.5.1 pyobjc-framework-SystemExtensions==8.5.1 pyobjc-framework-UniformTypeIdentifiers==8.5.1 pyobjc-framework-UserNotifications==8.5.1 pyobjc-framework-UserNotificationsUI==8.5.1 pyobjc-framework-VideoSubscriberAccount==8.5.1 pyobjc-framework-VideoToolbox==8.5.1 pyobjc-framework-Virtualization==8.5.1 pyobjc-framework-Vision==8.5.1 pyobjc-framework-WebKit==8.5.1 pyparsing==3.0.9 pyperclip==1.8.2 pyppeteer==1.0.2 PyRect==0.2.0 PyScreeze==0.1.28 pytest==5.4.3 pytest-order==1.0.1 python-dateutil==2.8.2 pytweening==1.0.4 pytz==2022.2.1 PyYAML==6.0 regex==2022.9.13 requests==2.28.1 rsa==4.9 rubicon-objc==0.4.2 six==1.16.0 sniffio==1.3.0 SQLAlchemy==1.4.39 sqlparse==0.4.2 starlette==0.19.1 tqdm==4.64.1 typing-inspect==0.8.0 typing_extensions==4.3.0 urllib3==1.26.11 uvicorn==0.18.2 wcwidth==0.2.5 websockets==10.3 zipp==3.8.1 zstandard==0.18.0 ``` </details> --- ### Other context <details> <summary>code to set up bigquery emulator</summary> `docker-compose` to set up the emulator ``` version: '3.6' services: bigquery-emulator: hostname: bigquery container_name: bigquery image: ghcr.io/goccy/bigquery-emulator:latest ports: - 9050:9050 command: --project=${PROJECT_ID} --port=${BIGQUERY_PORT} ``` The python code to setup dataset/table ``` def set_up_bq_instance(): from google.cloud.bigquery import TableReference, DatasetReference from google.api_core.client_options import ClientOptions from google.auth.credentials import AnonymousCredentials from google.cloud import bigquery project_id = "test-project" client_options = ClientOptions(api_endpoint="http://localhost:9050") bq_test_client = bq_test_client = bigquery.Client( project=project_id, client_options=client_options, credentials=AnonymousCredentials(), ) dataset_ref = DatasetReference(project=project_id, dataset_id='poc-test-dataset') try: bq_test_client.create_dataset(dataset=dataset_ref, exists_ok=True, retry=None) except InternalServerError: pass table_ref_1 = TableReference(dataset_ref=dataset_ref, table_id='poc-test-table-1') table_ref_2 = TableReference(dataset_ref=dataset_ref, table_id='poc-test-table-2') schema_1 = [ bigquery.SchemaField("type2", "STRING", mode="REQUIRED"), bigquery.SchemaField("full_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("id2", "STRING", mode="REQUIRED"), ] schema_2 = [ bigquery.SchemaField("type2", "STRING", mode="REQUIRED"), bigquery.SchemaField("full_addr", "STRING", mode="REQUIRED"), bigquery.SchemaField("id2", "STRING", mode="REQUIRED"), ] table_1 = bigquery.Table(table_ref=table_ref_1, schema=schema_1) table_2 = bigquery.Table(table_ref=table_ref_2, schema=schema_2) try: _ = bq_test_client.create_table(table_1, exists_ok=True, retry=None) except InternalServerError: pass try: _ = bq_test_client.create_table(table_2, exists_ok=True, retry=None) except InternalServerError: pass list_dataset = list(bq_test_client.list_datasets()) list_table = list(bq_test_client.list_tables(dataset_ref)) return bq_test_client ``` </details> ### Issue Priority Priority: 2 ### Issue Component Component: io-py-gcp -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
