shunping commented on code in PR #30015:
URL: https://github.com/apache/beam/pull/30015#discussion_r1459716006


##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py:
##########
@@ -903,6 +903,68 @@ def setUp(self):
     _LOGGER.info(
         "Created dataset %s in project %s", self.dataset_id, self.project)
 
+  @pytest.mark.it_postcommit
+  def test_batch_copy_jobs_with_no_input_schema(self):
+    schema_1 = "col_1:INTEGER"
+    schema_2 = "col_2:INTEGER"
+
+    # create two tables with different schemas
+    # test to make sure this works with dynamic destinations too
+    self.bigquery_client.get_or_create_table(
+        project_id=self.project,
+        dataset_id=self.dataset_id,
+        table_id="output_table_1",
+        schema=bigquery_tools.get_table_schema_from_string(schema_1),
+        create_disposition='CREATE_IF_NEEDED',
+        write_disposition='WRITE_APPEND')
+    self.bigquery_client.get_or_create_table(
+        project_id=self.project,
+        dataset_id=self.dataset_id,
+        table_id="output_table_2",
+        schema=bigquery_tools.get_table_schema_from_string(schema_2),
+        create_disposition='CREATE_IF_NEEDED',
+        write_disposition='WRITE_APPEND')
+
+    # reduce load job size to induce copy jobs
+    bqfl._DEFAULT_MAX_FILE_SIZE = 10
+    bqfl._MAXIMUM_LOAD_SIZE = 20
+    verifiers = [
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT * FROM %s" % (self.output_table + "_1"),
+            data=[(i, ) for i in range(5)]),
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT * FROM %s" % (self.output_table + "_2"),
+            data=[(i, ) for i in range(5, 10)])
+    ]
+
+    output = self.output_table
+
+    def callable_table(el: dict):
+      dest = output
+      if "col_1" in el:
+        dest += "_1"
+      elif "col_2" in el:
+        dest += "_2"
+      return dest
+
+    args = self.test_pipeline.get_full_options_as_args(
+        on_success_matcher=all_of(verifiers))
+
+    with beam.Pipeline(argv=args) as p:
+      # 0...4 going to table 1
+      # 5...9 going to table 2
+      items = [{"col_1": i} for i in range(5)]
+      items.extend([{"col_2": i} for i in range(5, 10)])
+      _ = (

Review Comment:
   ok, sgtm



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to