This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a1afcdd989c [HUDI-7115] Add in new options for the bigquery sync 
(#10125)
a1afcdd989c is described below

commit a1afcdd989ce2d634290d1bd9e099a17057e6b4d
Author: Tim Brown <t...@onehouse.ai>
AuthorDate: Tue Nov 21 14:58:12 2023 -0600

    [HUDI-7115] Add in new options for the bigquery sync (#10125)
    
    - Add in new options for the bigquery sync
---
 hudi-gcp/pom.xml                                   |  3 +-
 .../hudi/gcp/bigquery/BigQuerySyncConfig.java      | 20 ++++++++
 .../apache/hudi/gcp/bigquery/BigQuerySyncTool.java | 23 +++++----
 .../gcp/bigquery/HoodieBigQuerySyncClient.java     | 58 +++++++++++++++++++---
 .../hudi/gcp/bigquery/TestBigQuerySyncConfig.java  |  2 +-
 .../hudi/gcp/bigquery/TestBigQuerySyncTool.java    | 12 ++---
 .../gcp/bigquery/TestBigQuerySyncToolArgs.java     |  8 ++-
 .../gcp/bigquery/TestHoodieBigQuerySyncClient.java | 26 ++++++----
 8 files changed, 114 insertions(+), 38 deletions(-)

diff --git a/hudi-gcp/pom.xml b/hudi-gcp/pom.xml
index b1cfb8076a6..2c308fbf424 100644
--- a/hudi-gcp/pom.xml
+++ b/hudi-gcp/pom.xml
@@ -36,7 +36,7 @@ See 
https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/The-Google
       <dependency>
         <groupId>com.google.cloud</groupId>
         <artifactId>libraries-bom</artifactId>
-        <version>25.1.0</version>
+        <version>26.15.0</version>
         <type>pom</type>
         <scope>import</scope>
       </dependency>
@@ -70,7 +70,6 @@ See 
https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/The-Google
     <dependency>
       <groupId>com.google.cloud</groupId>
       <artifactId>google-cloud-pubsub</artifactId>
-      <version>${google.cloud.pubsub.version}</version>
     </dependency>
 
     <dependency>
diff --git 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
index 94510ca8dfa..ed8895ca217 100644
--- 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
+++ 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
@@ -122,6 +122,20 @@ public class BigQuerySyncConfig extends HoodieSyncConfig 
implements Serializable
       .markAdvanced()
       .withDocumentation("Fetch file listing from Hudi's metadata");
 
+  public static final ConfigProperty<Boolean> 
BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER = ConfigProperty
+      .key("hoodie.gcp.bigquery.sync.require_partition_filter")
+      .defaultValue(false)
+      .sinceVersion("0.14.1")
+      .markAdvanced()
+      .withDocumentation("If true, configure table to require a partition 
filter to be specified when querying the table");
+
+  public static final ConfigProperty<String> 
BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID = ConfigProperty
+      .key("hoodie.gcp.bigquery.sync.big_lake_connection_id")
+      .noDefaultValue()
+      .sinceVersion("0.14.1")
+      .markAdvanced()
+      .withDocumentation("The Big Lake connection ID to use");
+
   public BigQuerySyncConfig(Properties props) {
     super(props);
     setDefaults(BigQuerySyncConfig.class.getName());
@@ -147,6 +161,10 @@ public class BigQuerySyncConfig extends HoodieSyncConfig 
implements Serializable
     public String sourceUri;
     @Parameter(names = {"--source-uri-prefix"}, description = "Name of the 
source uri gcs path prefix of the table", required = false)
     public String sourceUriPrefix;
+    @Parameter(names = {"--big-lake-connection-id"}, description = "The Big 
Lake connection ID to use when creating the table if using the manifest file 
approach.")
+    public String bigLakeConnectionId;
+    @Parameter(names = {"--require-partition-filter"}, description = "If true, 
configure table to require a partition filter to be specified when querying the 
table")
+    public Boolean requirePartitionFilter;
 
     public boolean isHelp() {
       return hoodieSyncConfigParams.isHelp();
@@ -164,6 +182,8 @@ public class BigQuerySyncConfig extends HoodieSyncConfig 
implements Serializable
       props.setPropertyIfNonNull(BIGQUERY_SYNC_SYNC_BASE_PATH.key(), 
hoodieSyncConfigParams.basePath);
       props.setPropertyIfNonNull(BIGQUERY_SYNC_PARTITION_FIELDS.key(), 
StringUtils.join(",", hoodieSyncConfigParams.partitionFields));
       
props.setPropertyIfNonNull(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), 
hoodieSyncConfigParams.useFileListingFromMetadata);
+      props.setPropertyIfNonNull(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID.key(), 
bigLakeConnectionId);
+      props.setPropertyIfNonNull(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER.key(), 
requirePartitionFilter);
       return props;
     }
   }
diff --git 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
index 19c8449f8fa..28c071e5231 100644
--- a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
+++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
@@ -122,7 +122,7 @@ public class BigQuerySyncTool extends HoodieSyncTool {
   }
 
   private void syncTable(HoodieBigQuerySyncClient bqSyncClient) {
-    LOG.info("Sync hoodie table " + snapshotViewName + " at base path " + 
bqSyncClient.getBasePath());
+    LOG.info("Sync hoodie table {} at base path {}", snapshotViewName, 
bqSyncClient.getBasePath());
 
     if (!bqSyncClient.datasetExists()) {
       throw new HoodieBigQuerySyncException("Dataset not found: " + 
config.getString(BIGQUERY_SYNC_DATASET_NAME));
@@ -132,19 +132,21 @@ public class BigQuerySyncTool extends HoodieSyncTool {
     Schema latestSchema = bqSchemaResolver.getTableSchema(metaClient, 
partitionFields);
     if (config.getBoolean(BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE)) {
       manifestFileWriter.writeManifestFile(true);
-      if (!tableExists(bqSyncClient, tableName)) {
-        bqSyncClient.createTableUsingBqManifestFile(
+      // if table does not exist, create it using the manifest file
+      // if table exists but is not yet using manifest file or needs to be 
recreated with the big-lake connection ID, update it to use manifest file
+      if (bqSyncClient.tableNotExistsOrDoesNotMatchSpecification(tableName)) {
+        bqSyncClient.createOrUpdateTableUsingBqManifestFile(
             tableName,
             manifestFileWriter.getManifestSourceUri(true),
             config.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX),
             latestSchema);
-        LOG.info("Completed table " + tableName + " creation using the 
manifest file");
+        LOG.info("Completed table {} creation using the manifest file", 
tableName);
       } else {
         bqSyncClient.updateTableSchema(tableName, latestSchema, 
partitionFields);
-        LOG.info("Synced schema for " + tableName);
+        LOG.info("Synced schema for {}", tableName);
       }
 
-      LOG.info("Sync table complete for " + tableName);
+      LOG.info("Sync table complete for {}", tableName);
       return;
     }
 
@@ -152,7 +154,7 @@ public class BigQuerySyncTool extends HoodieSyncTool {
 
     if (!tableExists(bqSyncClient, manifestTableName)) {
       bqSyncClient.createManifestTable(manifestTableName, 
manifestFileWriter.getManifestSourceUri(false));
-      LOG.info("Manifest table creation complete for " + manifestTableName);
+      LOG.info("Manifest table creation complete for {}", manifestTableName);
     }
 
     if (!tableExists(bqSyncClient, versionsTableName)) {
@@ -161,16 +163,15 @@ public class BigQuerySyncTool extends HoodieSyncTool {
           config.getString(BIGQUERY_SYNC_SOURCE_URI),
           config.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX),
           config.getSplitStrings(BIGQUERY_SYNC_PARTITION_FIELDS));
-      LOG.info("Versions table creation complete for " + versionsTableName);
+      LOG.info("Versions table creation complete for {}", versionsTableName);
     }
 
     if (!tableExists(bqSyncClient, snapshotViewName)) {
       bqSyncClient.createSnapshotView(snapshotViewName, versionsTableName, 
manifestTableName);
-      LOG.info("Snapshot view creation complete for " + snapshotViewName);
+      LOG.info("Snapshot view creation complete for {}", snapshotViewName);
     }
 
-    // TODO: Implement automatic schema evolution when you add a new column.
-    LOG.info("Sync table complete for " + snapshotViewName);
+    LOG.info("Sync table complete for {}", snapshotViewName);
   }
 
   @Override
diff --git 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
index a5462b5669e..af56194214d 100644
--- 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
+++ 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
@@ -22,6 +22,7 @@ package org.apache.hudi.gcp.bigquery;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.sync.common.HoodieSyncClient;
+import org.apache.hudi.sync.common.util.ManifestFileWriter;
 
 import com.google.cloud.bigquery.BigQuery;
 import com.google.cloud.bigquery.BigQueryException;
@@ -51,9 +52,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_LOCATION;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
+import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER;
 
 public class HoodieBigQuerySyncClient extends HoodieSyncClient {
 
@@ -61,14 +64,18 @@ public class HoodieBigQuerySyncClient extends 
HoodieSyncClient {
 
   protected final BigQuerySyncConfig config;
   private final String projectId;
+  private final String bigLakeConnectionId;
   private final String datasetName;
+  private final boolean requirePartitionFilter;
   private transient BigQuery bigquery;
 
   public HoodieBigQuerySyncClient(final BigQuerySyncConfig config) {
     super(config);
     this.config = config;
     this.projectId = config.getString(BIGQUERY_SYNC_PROJECT_ID);
+    this.bigLakeConnectionId = 
config.getString(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID);
     this.datasetName = config.getString(BIGQUERY_SYNC_DATASET_NAME);
+    this.requirePartitionFilter = 
config.getBoolean(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER);
     this.createBigQueryConnection();
   }
 
@@ -78,7 +85,9 @@ public class HoodieBigQuerySyncClient extends 
HoodieSyncClient {
     this.config = config;
     this.projectId = config.getString(BIGQUERY_SYNC_PROJECT_ID);
     this.datasetName = config.getString(BIGQUERY_SYNC_DATASET_NAME);
+    this.requirePartitionFilter = 
config.getBoolean(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER);
     this.bigquery = bigquery;
+    this.bigLakeConnectionId = 
config.getString(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID);
   }
 
   private void createBigQueryConnection() {
@@ -94,19 +103,22 @@ public class HoodieBigQuerySyncClient extends 
HoodieSyncClient {
     }
   }
 
-  public void createTableUsingBqManifestFile(String tableName, String 
bqManifestFileUri, String sourceUriPrefix, Schema schema) {
+  public void createOrUpdateTableUsingBqManifestFile(String tableName, String 
bqManifestFileUri, String sourceUriPrefix, Schema schema) {
     try {
       String withClauses = String.format("( %s )", 
BigQuerySchemaResolver.schemaToSqlString(schema));
       String extraOptions = "enable_list_inference=true,";
       if (!StringUtils.isNullOrEmpty(sourceUriPrefix)) {
         withClauses += " WITH PARTITION COLUMNS";
-        extraOptions += String.format(" hive_partition_uri_prefix=\"%s\",", 
sourceUriPrefix);
+        extraOptions += String.format(" hive_partition_uri_prefix=\"%s\", 
require_hive_partition_filter=%s,", sourceUriPrefix, requirePartitionFilter);
+      }
+      if (!StringUtils.isNullOrEmpty(bigLakeConnectionId)) {
+        withClauses += String.format(" WITH CONNECTION `%s`", 
bigLakeConnectionId);
       }
 
       String query =
           String.format(
-              "CREATE EXTERNAL TABLE `%s.%s.%s` %s OPTIONS (%s "
-              + "uris=[\"%s\"], format=\"PARQUET\", 
file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")",
+              "CREATE OR REPLACE EXTERNAL TABLE `%s.%s.%s` %s OPTIONS (%s "
+                  + "uris=[\"%s\"], format=\"PARQUET\", 
file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")",
               projectId,
               datasetName,
               tableName,
@@ -125,7 +137,7 @@ public class HoodieBigQuerySyncClient extends 
HoodieSyncClient {
       if (queryJob == null) {
         LOG.error("Job for table creation no longer exists");
       } else if (queryJob.getStatus().getError() != null) {
-        LOG.error("Job for table creation failed: " + 
queryJob.getStatus().getError().toString());
+        LOG.error("Job for table creation failed: {}", 
queryJob.getStatus().getError().toString());
       } else {
         LOG.info("External table created using manifest file.");
       }
@@ -176,13 +188,21 @@ public class HoodieBigQuerySyncClient extends 
HoodieSyncClient {
         .collect(Collectors.toList());
     updatedTableFields.addAll(schema.getFields());
     Schema finalSchema = Schema.of(updatedTableFields);
-    if (definition.getSchema() != null && 
definition.getSchema().equals(finalSchema)) {
+    boolean sameSchema = definition.getSchema() != null && 
definition.getSchema().equals(finalSchema);
+    boolean samePartitionFilter = partitionFields.isEmpty()
+        || (requirePartitionFilter == 
(definition.getHivePartitioningOptions().getRequirePartitionFilter() != null && 
definition.getHivePartitioningOptions().getRequirePartitionFilter()));
+    if (sameSchema && samePartitionFilter) {
       return; // No need to update schema.
     }
+    ExternalTableDefinition.Builder builder = definition.toBuilder();
+    builder.setSchema(finalSchema);
+    builder.setAutodetect(false);
+    if (definition.getHivePartitioningOptions() != null) {
+      
builder.setHivePartitioningOptions(definition.getHivePartitioningOptions().toBuilder().setRequirePartitionFilter(requirePartitionFilter).build());
+    }
     Table updatedTable = existingTable.toBuilder()
-        
.setDefinition(definition.toBuilder().setSchema(finalSchema).setAutodetect(false).build())
+        .setDefinition(builder.build())
         .build();
-
     bigquery.update(updatedTable);
   }
 
@@ -264,6 +284,28 @@ public class HoodieBigQuerySyncClient extends 
HoodieSyncClient {
     return table != null && table.exists();
   }
 
+  /**
+   * Checks for the existence of a table that uses the manifest file approach 
and matches other requirements.
+   * @param tableName name of the table
+   * @return Returns true if the table does not exist or if the table does 
exist but does not use the manifest file. False otherwise.
+   */
+  public boolean tableNotExistsOrDoesNotMatchSpecification(String tableName) {
+    TableId tableId = TableId.of(projectId, datasetName, tableName);
+    Table table = bigquery.getTable(tableId);
+    if (table == null || !table.exists()) {
+      return true;
+    }
+    ExternalTableDefinition externalTableDefinition = table.getDefinition();
+    boolean manifestDoesNotExist =
+        externalTableDefinition.getSourceUris() == null
+            || externalTableDefinition.getSourceUris().stream().noneMatch(uri 
-> uri.contains(ManifestFileWriter.ABSOLUTE_PATH_MANIFEST_FOLDER_NAME));
+    if 
(!StringUtils.isNullOrEmpty(config.getString(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID)))
 {
+      // If bigLakeConnectionId is present and connectionId is not present in 
table definition, we need to replace the table.
+      return manifestDoesNotExist || externalTableDefinition.getConnectionId() 
== null;
+    }
+    return manifestDoesNotExist;
+  }
+
   @Override
   public void close() {
     bigquery = null;
diff --git 
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
index b5d812cce10..2c17749158f 100644
--- 
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
+++ 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
@@ -33,11 +33,11 @@ import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATA
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
-import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SYNC_BASE_PATH;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_TABLE_NAME;
+import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
diff --git 
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncTool.java 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncTool.java
index 5edbdac1c2e..ff7abdb6870 100644
--- 
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncTool.java
+++ 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncTool.java
@@ -76,13 +76,13 @@ public class TestBigQuerySyncTool {
     
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS.key(), 
"datestr,type");
     
when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE);
     when(mockBqSyncClient.datasetExists()).thenReturn(true);
-    when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(false);
+    
when(mockBqSyncClient.tableNotExistsOrDoesNotMatchSpecification(TEST_TABLE)).thenReturn(true);
     Path manifestPath = new Path("file:///local/path");
     
when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath());
     when(mockBqSchemaResolver.getTableSchema(any(), 
eq(Arrays.asList("datestr", "type")))).thenReturn(schema);
     BigQuerySyncTool tool = new BigQuerySyncTool(properties, 
mockManifestFileWriter, mockBqSyncClient, mockMetaClient, mockBqSchemaResolver);
     tool.syncHoodieTable();
-    verify(mockBqSyncClient).createTableUsingBqManifestFile(TEST_TABLE, 
manifestPath.toUri().getPath(), prefix, schema);
+    
verify(mockBqSyncClient).createOrUpdateTableUsingBqManifestFile(TEST_TABLE, 
manifestPath.toUri().getPath(), prefix, schema);
     verify(mockManifestFileWriter).writeManifestFile(true);
   }
 
@@ -91,13 +91,13 @@ public class TestBigQuerySyncTool {
     
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE.key(),
 "true");
     
when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE);
     when(mockBqSyncClient.datasetExists()).thenReturn(true);
-    when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(false);
+    
when(mockBqSyncClient.tableNotExistsOrDoesNotMatchSpecification(TEST_TABLE)).thenReturn(true);
     Path manifestPath = new Path("file:///local/path");
     
when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath());
     when(mockBqSchemaResolver.getTableSchema(any(), 
eq(Collections.emptyList()))).thenReturn(schema);
     BigQuerySyncTool tool = new BigQuerySyncTool(properties, 
mockManifestFileWriter, mockBqSyncClient, mockMetaClient, mockBqSchemaResolver);
     tool.syncHoodieTable();
-    verify(mockBqSyncClient).createTableUsingBqManifestFile(TEST_TABLE, 
manifestPath.toUri().getPath(), null, schema);
+    
verify(mockBqSyncClient).createOrUpdateTableUsingBqManifestFile(TEST_TABLE, 
manifestPath.toUri().getPath(), null, schema);
     verify(mockManifestFileWriter).writeManifestFile(true);
   }
 
@@ -109,7 +109,7 @@ public class TestBigQuerySyncTool {
     
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS.key(), 
"datestr,type");
     
when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE);
     when(mockBqSyncClient.datasetExists()).thenReturn(true);
-    when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(true);
+    
when(mockBqSyncClient.tableNotExistsOrDoesNotMatchSpecification(TEST_TABLE)).thenReturn(false);
     Path manifestPath = new Path("file:///local/path");
     
when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath());
     List<String> partitionFields = Arrays.asList("datestr", "type");
@@ -125,7 +125,7 @@ public class TestBigQuerySyncTool {
     
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE.key(),
 "true");
     
when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE);
     when(mockBqSyncClient.datasetExists()).thenReturn(true);
-    when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(true);
+    
when(mockBqSyncClient.tableNotExistsOrDoesNotMatchSpecification(TEST_TABLE)).thenReturn(false);
     Path manifestPath = new Path("file:///local/path");
     
when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath());
     when(mockBqSchemaResolver.getTableSchema(any(), 
eq(Collections.emptyList()))).thenReturn(schema);
diff --git 
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncToolArgs.java
 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncToolArgs.java
index 0a4ba6fd61a..3f8ee6b9966 100644
--- 
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncToolArgs.java
+++ 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncToolArgs.java
@@ -23,10 +23,12 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Properties;
 
+import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_LOCATION;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
+import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX;
@@ -50,8 +52,10 @@ public class TestBigQuerySyncToolArgs {
         "--source-uri-prefix", "gs://foobartable/",
         "--base-path", "gs://foobartable",
         "--partitioned-by", "year,month,day",
+        "--big-lake-connection-id", "connection-id",
         "--use-bq-manifest-file",
-        "--use-file-listing-from-metadata"
+        "--use-file-listing-from-metadata",
+        "--require-partition-filter"
     };
     cmd.parse(args);
 
@@ -66,5 +70,7 @@ public class TestBigQuerySyncToolArgs {
     assertEquals("year,month,day", 
props.getProperty(BIGQUERY_SYNC_PARTITION_FIELDS.key()));
     assertEquals("true", 
props.getProperty(BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE.key()));
     assertEquals("true", 
props.getProperty(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA.key()));
+    assertEquals("true", 
props.getProperty(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER.key()));
+    assertEquals("connection-id", 
props.getProperty(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID.key()));
   }
 }
diff --git 
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
index af2167f0f16..37b2800b563 100644
--- 
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
+++ 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
@@ -58,6 +58,7 @@ public class TestHoodieBigQuerySyncClient {
   private static String basePath;
   private final BigQuery mockBigQuery = mock(BigQuery.class);
   private HoodieBigQuerySyncClient client;
+  private Properties properties;
 
   @BeforeAll
   static void setupOnce() throws Exception {
@@ -71,16 +72,19 @@ public class TestHoodieBigQuerySyncClient {
 
   @BeforeEach
   void setup() {
-    Properties properties = new Properties();
+    properties = new Properties();
     properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID.key(), 
PROJECT_ID);
     
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME.key(), 
TEST_DATASET);
     properties.setProperty(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), 
tempDir.toString());
-    BigQuerySyncConfig config = new BigQuerySyncConfig(properties);
-    client = new HoodieBigQuerySyncClient(config, mockBigQuery);
+    
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER.key(),
 "true");
   }
 
   @Test
   void createTableWithManifestFile_partitioned() throws Exception {
+    
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID.key(),
 "my-project.us.bl_connection");
+    BigQuerySyncConfig config = new BigQuerySyncConfig(properties);
+    client = new HoodieBigQuerySyncClient(config, mockBigQuery);
+
     Schema schema = Schema.of(Field.of("field", StandardSQLTypeName.STRING));
     ArgumentCaptor<JobInfo> jobInfoCaptor = 
ArgumentCaptor.forClass(JobInfo.class);
     Job mockJob = mock(Job.class);
@@ -90,17 +94,21 @@ public class TestHoodieBigQuerySyncClient {
     JobStatus mockJobStatus = mock(JobStatus.class);
     when(mockJobFinished.getStatus()).thenReturn(mockJobStatus);
     when(mockJobStatus.getError()).thenReturn(null);
-    client.createTableUsingBqManifestFile(TEST_TABLE, MANIFEST_FILE_URI, 
SOURCE_PREFIX, schema);
+    client.createOrUpdateTableUsingBqManifestFile(TEST_TABLE, 
MANIFEST_FILE_URI, SOURCE_PREFIX, schema);
 
     QueryJobConfiguration configuration = 
jobInfoCaptor.getValue().getConfiguration();
     assertEquals(configuration.getQuery(),
-        String.format("CREATE EXTERNAL TABLE `%s.%s.%s` ( `field` STRING ) 
WITH PARTITION COLUMNS OPTIONS (enable_list_inference=true, "
-            + "hive_partition_uri_prefix=\"%s\", uris=[\"%s\"], 
format=\"PARQUET\", "
-            + "file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")", 
PROJECT_ID, TEST_DATASET, TEST_TABLE, SOURCE_PREFIX, MANIFEST_FILE_URI));
+        String.format("CREATE OR REPLACE EXTERNAL TABLE `%s.%s.%s` ( `field` 
STRING ) WITH PARTITION COLUMNS WITH CONNECTION `my-project.us.bl_connection` "
+                + "OPTIONS (enable_list_inference=true, 
hive_partition_uri_prefix=\"%s\", "
+                + "require_hive_partition_filter=true, uris=[\"%s\"], 
format=\"PARQUET\", file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")",
+            PROJECT_ID, TEST_DATASET, TEST_TABLE, SOURCE_PREFIX, 
MANIFEST_FILE_URI));
   }
 
   @Test
   void createTableWithManifestFile_nonPartitioned() throws Exception {
+    BigQuerySyncConfig config = new BigQuerySyncConfig(properties);
+    client = new HoodieBigQuerySyncClient(config, mockBigQuery);
+
     Schema schema = Schema.of(Field.of("field", StandardSQLTypeName.STRING));
     ArgumentCaptor<JobInfo> jobInfoCaptor = 
ArgumentCaptor.forClass(JobInfo.class);
     Job mockJob = mock(Job.class);
@@ -110,11 +118,11 @@ public class TestHoodieBigQuerySyncClient {
     JobStatus mockJobStatus = mock(JobStatus.class);
     when(mockJobFinished.getStatus()).thenReturn(mockJobStatus);
     when(mockJobStatus.getError()).thenReturn(null);
-    client.createTableUsingBqManifestFile(TEST_TABLE, MANIFEST_FILE_URI, "", 
schema);
+    client.createOrUpdateTableUsingBqManifestFile(TEST_TABLE, 
MANIFEST_FILE_URI, "", schema);
 
     QueryJobConfiguration configuration = 
jobInfoCaptor.getValue().getConfiguration();
     assertEquals(configuration.getQuery(),
-        String.format("CREATE EXTERNAL TABLE `%s.%s.%s` ( `field` STRING ) 
OPTIONS (enable_list_inference=true, uris=[\"%s\"], format=\"PARQUET\", "
+        String.format("CREATE OR REPLACE EXTERNAL TABLE `%s.%s.%s` ( `field` 
STRING ) OPTIONS (enable_list_inference=true, uris=[\"%s\"], 
format=\"PARQUET\", "
             + "file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")", 
PROJECT_ID, TEST_DATASET, TEST_TABLE, MANIFEST_FILE_URI));
   }
 }

Reply via email to