reuvenlax commented on a change in pull request #13771:
URL: https://github.com/apache/beam/pull/13771#discussion_r581344849



##########
File path: sdks/java/extensions/google-cloud-platform-core/build.gradle
##########
@@ -17,6 +17,7 @@
  */
 
 import groovy.json.JsonOutput
+import org.apache.beam.gradle.GrpcVendoring_1_26_0

Review comment:
       Reverted

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
##########
@@ -411,6 +411,21 @@ public static TableReference parseTableSpec(String 
tableSpec) {
     return 
ref.setDatasetId(match.group("DATASET")).setTableId(match.group("TABLE"));
   }
 
+  public static TableReference parseTableUrn(String tableUrn) {

Review comment:
       Yes - this is used in the followon PR. Added a unit test in this PR.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
##########
@@ -411,6 +411,21 @@ public static TableReference parseTableSpec(String 
tableSpec) {
     return 
ref.setDatasetId(match.group("DATASET")).setTableId(match.group("TABLE"));
   }
 
+  public static TableReference parseTableUrn(String tableUrn) {
+    Matcher match = BigQueryIO.TABLE_URN_SPEC.matcher(tableUrn);
+    if (!match.matches()) {
+      throw new IllegalArgumentException(
+          "Table reference is not in 
projects/[project_id]/datasets/[dataset_id]/tables/[table_id] "

Review comment:
       Vortex expects this format, and doesn't accept project:dataset.table

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
##########
@@ -74,6 +88,17 @@
 })
 public class BigQueryUtils {
 
+  public static Descriptor getDescriptorFromTableSchema(TableSchema jsonSchema)

Review comment:
       done

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
##########
@@ -74,6 +88,17 @@
 })
 public class BigQueryUtils {
 
+  public static Descriptor getDescriptorFromTableSchema(TableSchema jsonSchema)

Review comment:
       Added comment. Vortex takes protocol buffers as elements, and the 
protocol buffer descriptor must match the table schema

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
##########
@@ -405,6 +430,185 @@ public static Schema fromTableSchema(TableSchema 
tableSchema, SchemaConversionOp
     return fromTableFieldSchema(tableSchema.getFields(), options);
   }
 
+  public static DescriptorProto descriptorSchemaFromTableSchema(TableSchema 
tableSchema) {
+    return descriptorSchemaFromTableFieldSchemas(tableSchema.getFields());
+  }
+
+  public static DescriptorProto descriptorSchemaFromTableFieldSchemas(
+      Iterable<TableFieldSchema> tableFieldSchemas) {
+    DescriptorProto.Builder descriptorBuilder = DescriptorProto.newBuilder();
+    descriptorBuilder.setName("D" + UUID.randomUUID().toString().replace("-", 
"_"));

Review comment:
       done

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
##########
@@ -405,6 +430,185 @@ public static Schema fromTableSchema(TableSchema 
tableSchema, SchemaConversionOp
     return fromTableFieldSchema(tableSchema.getFields(), options);
   }
 
+  public static DescriptorProto descriptorSchemaFromTableSchema(TableSchema 
tableSchema) {
+    return descriptorSchemaFromTableFieldSchemas(tableSchema.getFields());
+  }
+
+  public static DescriptorProto descriptorSchemaFromTableFieldSchemas(
+      Iterable<TableFieldSchema> tableFieldSchemas) {
+    DescriptorProto.Builder descriptorBuilder = DescriptorProto.newBuilder();
+    descriptorBuilder.setName("D" + UUID.randomUUID().toString().replace("-", 
"_"));
+    int i = 1;
+    for (TableFieldSchema fieldSchema : tableFieldSchemas) {
+      fieldDescriptorFromTableField(fieldSchema, i++, descriptorBuilder);
+    }
+    return descriptorBuilder.build();
+  }
+
+  public static void fieldDescriptorFromTableField(
+      TableFieldSchema fieldSchema, int fieldNumber, DescriptorProto.Builder 
descriptorBuilder) {
+    FieldDescriptorProto.Builder fieldDescriptorBuilder = 
FieldDescriptorProto.newBuilder();
+    fieldDescriptorBuilder = 
fieldDescriptorBuilder.setName(fieldSchema.getName());
+    fieldDescriptorBuilder = fieldDescriptorBuilder.setNumber(fieldNumber);
+    switch (fieldSchema.getType()) {
+      case "STRING":
+        fieldDescriptorBuilder = 
fieldDescriptorBuilder.setType(Type.TYPE_STRING);
+        break;
+      case "BYTES":
+        fieldDescriptorBuilder = 
fieldDescriptorBuilder.setType(Type.TYPE_BYTES);
+        break;
+      case "INT64":
+      case "INTEGER":
+        fieldDescriptorBuilder = 
fieldDescriptorBuilder.setType(Type.TYPE_INT64);
+        break;
+      case "FLOAT64":
+      case "FLOAT":
+        fieldDescriptorBuilder = 
fieldDescriptorBuilder.setType(Type.TYPE_FLOAT);
+        break;
+      case "BOOL":
+      case "BOOLEAN":
+        fieldDescriptorBuilder = 
fieldDescriptorBuilder.setType(Type.TYPE_BOOL);
+        break;
+      case "TIMESTAMP":
+      case "TIME":
+      case "DATETIME":
+        fieldDescriptorBuilder = 
fieldDescriptorBuilder.setType(Type.TYPE_INT64);
+        break;
+      case "DATE":
+        fieldDescriptorBuilder = 
fieldDescriptorBuilder.setType(Type.TYPE_INT32);
+        break;
+      case "STRUCT":
+      case "RECORD":
+        DescriptorProto nested = 
descriptorSchemaFromTableFieldSchemas(fieldSchema.getFields());
+        descriptorBuilder.addNestedType(nested);
+        fieldDescriptorBuilder =
+            
fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName());
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            "Converting BigQuery type " + fieldSchema.getType() + " to Beam 
type is unsupported");
+    }
+
+    Optional<Mode> fieldMode = 
Optional.ofNullable(fieldSchema.getMode()).map(Mode::valueOf);
+    if (fieldMode.filter(m -> m == Mode.REPEATED).isPresent()) {
+      fieldDescriptorBuilder = 
fieldDescriptorBuilder.setLabel(Label.LABEL_REPEATED);
+    } else if (!fieldMode.isPresent() || fieldMode.filter(m -> m == 
Mode.NULLABLE).isPresent()) {
+      fieldDescriptorBuilder = 
fieldDescriptorBuilder.setLabel(Label.LABEL_OPTIONAL);
+    } else {
+      fieldDescriptorBuilder = 
fieldDescriptorBuilder.setLabel(Label.LABEL_REQUIRED);
+    }
+    descriptorBuilder.addField(fieldDescriptorBuilder.build());
+  }
+
+  public static DynamicMessage messageFromTableRow(Descriptor descriptor, 
TableRow tableRow) {
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+    for (FieldDescriptor fieldDescriptor : descriptor.getFields()) {
+      Object value =
+          messageValueFromFieldValue(fieldDescriptor, 
tableRow.get(fieldDescriptor.getName()));
+      if (value != null) {
+        builder.setField(fieldDescriptor, value);
+      }
+    }
+    return builder.build();
+  }
+
+  public static Object messageValueFromFieldValue(FieldDescriptor 
fieldDescriptor, Object bqValue) {
+    if (bqValue == null) {
+      if (fieldDescriptor.isOptional()) {
+        return null;
+      } else {
+        throw new IllegalArgumentException(
+            "Received null value for non-nullable field " + 
fieldDescriptor.getName());
+      }
+    }
+    return toProtoValue(fieldDescriptor, bqValue);
+  }
+
+  private static final Map<FieldDescriptor.Type, Function<String, Object>> 
JSON_PROTO_PARSERS =
+      ImmutableMap.<FieldDescriptor.Type, Function<String, Object>>builder()
+          .put(FieldDescriptor.Type.INT32, Integer::valueOf)
+          .put(FieldDescriptor.Type.INT64, Long::valueOf)
+          .put(FieldDescriptor.Type.FLOAT, Float::valueOf)
+          .put(FieldDescriptor.Type.DOUBLE, Double::valueOf)
+          .put(FieldDescriptor.Type.BOOL, Boolean::valueOf)
+          .put(FieldDescriptor.Type.STRING, str -> str)
+          .put(
+              FieldDescriptor.Type.BYTES,
+              b64 -> ByteString.copyFrom(BaseEncoding.base64().decode(b64)))
+          .build();
+
+  private static Object toProtoValue(FieldDescriptor fieldDescriptor, Object 
jsonBQValue) {
+    if (jsonBQValue instanceof String) {
+      Function<String, Object> mapper = 
JSON_PROTO_PARSERS.get(fieldDescriptor.getType());
+      if (mapper != null) {
+        return mapper.apply((String) jsonBQValue);
+      }
+    } else if (jsonBQValue instanceof Integer) {
+      switch (fieldDescriptor.getJavaType()) {
+        case INT:
+          return Integer.valueOf((int) jsonBQValue);
+        case LONG:
+          return Long.valueOf((int) jsonBQValue);
+        default:
+          throw new RuntimeException("foo");

Review comment:
       done

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
##########
@@ -405,6 +430,185 @@ public static Schema fromTableSchema(TableSchema 
tableSchema, SchemaConversionOp
     return fromTableFieldSchema(tableSchema.getFields(), options);
   }
 
+  public static DescriptorProto descriptorSchemaFromTableSchema(TableSchema 
tableSchema) {
+    return descriptorSchemaFromTableFieldSchemas(tableSchema.getFields());
+  }
+
+  public static DescriptorProto descriptorSchemaFromTableFieldSchemas(
+      Iterable<TableFieldSchema> tableFieldSchemas) {
+    DescriptorProto.Builder descriptorBuilder = DescriptorProto.newBuilder();
+    descriptorBuilder.setName("D" + UUID.randomUUID().toString().replace("-", 
"_"));
+    int i = 1;
+    for (TableFieldSchema fieldSchema : tableFieldSchemas) {
+      fieldDescriptorFromTableField(fieldSchema, i++, descriptorBuilder);
+    }
+    return descriptorBuilder.build();
+  }
+
+  public static void fieldDescriptorFromTableField(
+      TableFieldSchema fieldSchema, int fieldNumber, DescriptorProto.Builder 
descriptorBuilder) {
+    FieldDescriptorProto.Builder fieldDescriptorBuilder = 
FieldDescriptorProto.newBuilder();
+    fieldDescriptorBuilder = 
fieldDescriptorBuilder.setName(fieldSchema.getName());
+    fieldDescriptorBuilder = fieldDescriptorBuilder.setNumber(fieldNumber);
+    switch (fieldSchema.getType()) {
+      case "STRING":
+        fieldDescriptorBuilder = 
fieldDescriptorBuilder.setType(Type.TYPE_STRING);
+        break;
+      case "BYTES":
+        fieldDescriptorBuilder = 
fieldDescriptorBuilder.setType(Type.TYPE_BYTES);
+        break;
+      case "INT64":
+      case "INTEGER":
+        fieldDescriptorBuilder = 
fieldDescriptorBuilder.setType(Type.TYPE_INT64);
+        break;
+      case "FLOAT64":
+      case "FLOAT":
+        fieldDescriptorBuilder = 
fieldDescriptorBuilder.setType(Type.TYPE_FLOAT);
+        break;
+      case "BOOL":
+      case "BOOLEAN":
+        fieldDescriptorBuilder = 
fieldDescriptorBuilder.setType(Type.TYPE_BOOL);
+        break;
+      case "TIMESTAMP":
+      case "TIME":
+      case "DATETIME":
+        fieldDescriptorBuilder = 
fieldDescriptorBuilder.setType(Type.TYPE_INT64);
+        break;
+      case "DATE":
+        fieldDescriptorBuilder = 
fieldDescriptorBuilder.setType(Type.TYPE_INT32);
+        break;
+      case "STRUCT":
+      case "RECORD":
+        DescriptorProto nested = 
descriptorSchemaFromTableFieldSchemas(fieldSchema.getFields());
+        descriptorBuilder.addNestedType(nested);
+        fieldDescriptorBuilder =
+            
fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName());
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            "Converting BigQuery type " + fieldSchema.getType() + " to Beam 
type is unsupported");
+    }
+
+    Optional<Mode> fieldMode = 
Optional.ofNullable(fieldSchema.getMode()).map(Mode::valueOf);
+    if (fieldMode.filter(m -> m == Mode.REPEATED).isPresent()) {
+      fieldDescriptorBuilder = 
fieldDescriptorBuilder.setLabel(Label.LABEL_REPEATED);
+    } else if (!fieldMode.isPresent() || fieldMode.filter(m -> m == 
Mode.NULLABLE).isPresent()) {
+      fieldDescriptorBuilder = 
fieldDescriptorBuilder.setLabel(Label.LABEL_OPTIONAL);
+    } else {
+      fieldDescriptorBuilder = 
fieldDescriptorBuilder.setLabel(Label.LABEL_REQUIRED);
+    }
+    descriptorBuilder.addField(fieldDescriptorBuilder.build());
+  }
+
+  public static DynamicMessage messageFromTableRow(Descriptor descriptor, 
TableRow tableRow) {
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+    for (FieldDescriptor fieldDescriptor : descriptor.getFields()) {
+      Object value =
+          messageValueFromFieldValue(fieldDescriptor, 
tableRow.get(fieldDescriptor.getName()));
+      if (value != null) {
+        builder.setField(fieldDescriptor, value);
+      }
+    }
+    return builder.build();
+  }
+
+  public static Object messageValueFromFieldValue(FieldDescriptor 
fieldDescriptor, Object bqValue) {
+    if (bqValue == null) {
+      if (fieldDescriptor.isOptional()) {
+        return null;
+      } else {
+        throw new IllegalArgumentException(
+            "Received null value for non-nullable field " + 
fieldDescriptor.getName());
+      }
+    }
+    return toProtoValue(fieldDescriptor, bqValue);
+  }
+
+  private static final Map<FieldDescriptor.Type, Function<String, Object>> 
JSON_PROTO_PARSERS =
+      ImmutableMap.<FieldDescriptor.Type, Function<String, Object>>builder()
+          .put(FieldDescriptor.Type.INT32, Integer::valueOf)
+          .put(FieldDescriptor.Type.INT64, Long::valueOf)
+          .put(FieldDescriptor.Type.FLOAT, Float::valueOf)
+          .put(FieldDescriptor.Type.DOUBLE, Double::valueOf)
+          .put(FieldDescriptor.Type.BOOL, Boolean::valueOf)
+          .put(FieldDescriptor.Type.STRING, str -> str)
+          .put(
+              FieldDescriptor.Type.BYTES,
+              b64 -> ByteString.copyFrom(BaseEncoding.base64().decode(b64)))
+          .build();
+
+  private static Object toProtoValue(FieldDescriptor fieldDescriptor, Object 
jsonBQValue) {
+    if (jsonBQValue instanceof String) {
+      Function<String, Object> mapper = 
JSON_PROTO_PARSERS.get(fieldDescriptor.getType());
+      if (mapper != null) {
+        return mapper.apply((String) jsonBQValue);
+      }
+    } else if (jsonBQValue instanceof Integer) {
+      switch (fieldDescriptor.getJavaType()) {
+        case INT:
+          return Integer.valueOf((int) jsonBQValue);
+        case LONG:
+          return Long.valueOf((int) jsonBQValue);
+        default:
+          throw new RuntimeException("foo");
+      }
+    } else if (jsonBQValue instanceof List) {
+      return ((List<Object>) jsonBQValue)
+          .stream()
+              .map(v -> ((Map<String, Object>) v).get("v"))
+              .map(v -> toProtoValue(fieldDescriptor, v))
+              .collect(toList());
+    } else if (jsonBQValue instanceof AbstractMap) {
+      // This will handle nested rows.
+      TableRow tr = new TableRow();
+      tr.putAll((AbstractMap<String, Object>) jsonBQValue);
+      return messageFromTableRow(fieldDescriptor.getMessageType(), tr);
+    } else if (jsonBQValue instanceof TableRow) {

Review comment:
       Removed. It was a dead conditional - since TableRow inherits from 
AbstractMap, this would never be hit.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.services.bigquery.model.EncryptionConfiguration;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.transforms.DoFn;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+
+public class CreateTableHelpers {
+  /**
+   * The list of tables created so far, so we don't try the creation each time.
+   *
+   * <p>TODO: We should put a bound on memory usage of this. Use guava cache 
instead.
+   */
+  private static Set<String> createdTables =
+      Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+
+  static TableDestination possiblyCreateTable(

Review comment:
       completely refactoring

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.services.bigquery.model.EncryptionConfiguration;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.transforms.DoFn;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+
+public class CreateTableHelpers {
+  /**
+   * The list of tables created so far, so we don't try the creation each time.
+   *
+   * <p>TODO: We should put a bound on memory usage of this. Use guava cache 
instead.
+   */
+  private static Set<String> createdTables =
+      Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+
+  static TableDestination possiblyCreateTable(
+      DoFn<?, ?>.ProcessContext context,
+      TableDestination tableDestination,
+      Supplier<TableSchema> schemaSupplier,
+      CreateDisposition createDisposition,
+      Coder<?> tableDestinationCoder,
+      String kmsKey,
+      BigQueryServices bqServices) {
+    checkArgument(
+        tableDestination.getTableSpec() != null,
+        "DynamicDestinations.getTable() must return a TableDestination "
+            + "with a non-null table spec, but %s returned %s for destination 
%s,"
+            + "which has a null table spec",
+        tableDestination);
+    boolean destinationCoderSupportsClustering =
+        !(tableDestinationCoder instanceof TableDestinationCoderV2);
+    checkArgument(
+        tableDestination.getClustering() == null || 
destinationCoderSupportsClustering,
+        "DynamicDestinations.getTable() may only return destinations with 
clustering configured"
+            + " if a destination coder is supplied that supports clustering, 
but %s is configured"
+            + " to use TableDestinationCoderV2. Set withClustering() on 
BigQueryIO.write() and, "
+            + " if you provided a custom DynamicDestinations instance, 
override"
+            + " getDestinationCoder() to return TableDestinationCoderV3.");
+    TableReference tableReference = 
tableDestination.getTableReference().clone();
+    if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
+      tableReference.setProjectId(
+          context.getPipelineOptions().as(BigQueryOptions.class).getProject());
+      tableDestination = tableDestination.withTableReference(tableReference);
+    }
+    if (createDisposition == CreateDisposition.CREATE_NEVER) {
+      return tableDestination;
+    }
+
+    String tableSpec = 
BigQueryHelpers.stripPartitionDecorator(tableDestination.getTableSpec());
+    if (!createdTables.contains(tableSpec)) {
+      // Another thread may have succeeded in creating the table in the 
meanwhile, so
+      // check again. This check isn't needed for correctness, but we add it 
to prevent
+      // every thread from attempting a create and overwhelming our BigQuery 
quota.
+      synchronized (createdTables) {
+        if (!createdTables.contains(tableSpec)) {
+          tryCreateTable(
+              context,
+              schemaSupplier,
+              tableDestination,
+              createDisposition,
+              tableSpec,
+              kmsKey,
+              bqServices);
+        }
+      }
+    }
+    return tableDestination;
+  }
+
+  @SuppressWarnings({"nullness"})
+  private static void tryCreateTable(

Review comment:
       refactoring




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to