This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit a808f74ce0342f93af131de5edc6cae56b292fd7
Author: Tim Brown <[email protected]>
AuthorDate: Mon Sep 11 06:35:02 2023 -0500

    [HUDI-6728] Update BigQuery manifest sync to support schema evolution 
(#9482)
    
    Adds schema evolution support to the BigQuerySyncTool by converting
    the Hudi schema into the BigQuery Schema format when creating
    and updating the table.
---
 hudi-gcp/pom.xml                                   |  13 +
 .../hudi/gcp/bigquery/BigQuerySchemaResolver.java  | 197 ++++++++++++++
 .../hudi/gcp/bigquery/BigQuerySyncConfig.java      |   3 +-
 .../apache/hudi/gcp/bigquery/BigQuerySyncTool.java |  95 ++++---
 .../gcp/bigquery/HoodieBigQuerySyncClient.java     |  49 +++-
 .../gcp/bigquery/TestBigQuerySchemaResolver.java   | 299 +++++++++++++++++++++
 .../hudi/gcp/bigquery/TestBigQuerySyncTool.java    | 137 ++++++++++
 .../gcp/bigquery/TestHoodieBigQuerySyncClient.java | 119 ++++++++
 .../org/apache/hudi/sync/adb/AdbSyncConfig.java    |   2 +-
 .../apache/hudi/sync/common/HoodieSyncClient.java  |   4 +
 .../hudi/sync/common/util/ManifestFileWriter.java  |  28 +-
 .../sync/common/util/TestManifestFileWriter.java   |   8 +-
 12 files changed, 895 insertions(+), 59 deletions(-)

diff --git a/hudi-gcp/pom.xml b/hudi-gcp/pom.xml
index 202cbc2f8d9..c0a401551de 100644
--- a/hudi-gcp/pom.xml
+++ b/hudi-gcp/pom.xml
@@ -84,6 +84,12 @@ See 
https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/The-Google
       <artifactId>parquet-avro</artifactId>
     </dependency>
 
+    <!-- Avro -->
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+    </dependency>
+
     <!-- Hadoop -->
     <dependency>
       <groupId>org.apache.hadoop</groupId>
@@ -97,6 +103,13 @@ See 
https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/The-Google
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-hive-sync</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySchemaResolver.java
 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySchemaResolver.java
new file mode 100644
index 00000000000..035ce604e2b
--- /dev/null
+++ 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySchemaResolver.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.gcp.bigquery;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.exception.HoodieException;
+
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.FieldList;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.StandardSQLTypeName;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Extracts the BigQuery schema from a Hudi table.
+ */
+class BigQuerySchemaResolver {
+  private static final BigQuerySchemaResolver INSTANCE = new 
BigQuerySchemaResolver(TableSchemaResolver::new);
+
+  private final Function<HoodieTableMetaClient, TableSchemaResolver> 
tableSchemaResolverSupplier;
+
+  @VisibleForTesting
+  BigQuerySchemaResolver(Function<HoodieTableMetaClient, TableSchemaResolver> 
tableSchemaResolverSupplier) {
+    this.tableSchemaResolverSupplier = tableSchemaResolverSupplier;
+  }
+
+  static BigQuerySchemaResolver getInstance() {
+    return INSTANCE;
+  }
+
+  /**
+   * Get the BigQuery schema for the table. If the BigQuery table is 
configured with partitioning, the caller must pass in the partition fields so 
that they are not returned in the schema.
+   * If the partition fields are in the schema, it will cause an error when 
querying the table since BigQuery will treat it as a duplicate column.
+   * @param metaClient Meta client for the Hudi table
+   * @param partitionFields The fields that are used for partitioning in 
BigQuery
+   * @return The BigQuery schema for the table
+   */
+  Schema getTableSchema(HoodieTableMetaClient metaClient, List<String> 
partitionFields) {
+    try {
+      Schema schema = 
convertSchema(tableSchemaResolverSupplier.apply(metaClient).getTableAvroSchema());
+      if (partitionFields.isEmpty()) {
+        return schema;
+      } else {
+        return Schema.of(schema.getFields().stream().filter(field -> 
!partitionFields.contains(field.getName())).collect(Collectors.toList()));
+      }
+    } catch (Exception e) {
+      throw new HoodieBigQuerySyncException("Failed to get table schema", e);
+    }
+  }
+
+  /**
+   * Converts a BigQuery schema to the string representation used in the 
BigQuery SQL command to create the manifest based table.
+   * @param schema The BigQuery schema
+   * @return The string representation of the schema
+   */
+  public static String schemaToSqlString(Schema schema) {
+    return fieldsToSqlString(schema.getFields());
+  }
+
+  private static String fieldsToSqlString(List<Field> fields) {
+    return fields.stream().map(field -> {
+      String mode = field.getMode() == Field.Mode.REQUIRED ? " NOT NULL" : "";
+      String type;
+      if (field.getType().getStandardType() == StandardSQLTypeName.STRUCT) {
+        type = String.format("STRUCT<%s>", 
fieldsToSqlString(field.getSubFields()));
+      } else {
+        type = field.getType().getStandardType().name();
+      }
+      String name = field.getName();
+      if (field.getMode() == Field.Mode.REPEATED) {
+        return String.format("%s ARRAY<%s>", name, type);
+      } else {
+        return String.format("%s %s%s", name, type, mode);
+      }
+    }).collect(Collectors.joining(", "));
+  }
+
+  @VisibleForTesting
+  Schema convertSchema(org.apache.avro.Schema schema) {
+    return Schema.of(getFields(schema));
+  }
+
+  private Field getField(org.apache.avro.Schema fieldSchema, String name, 
boolean nullable) {
+    final Field.Mode fieldMode = nullable ? Field.Mode.NULLABLE : 
Field.Mode.REQUIRED;
+    StandardSQLTypeName standardSQLTypeName;
+    switch (fieldSchema.getType()) {
+      case INT:
+      case LONG:
+        LogicalType logicalType = fieldSchema.getLogicalType();
+        if (logicalType == null) {
+          standardSQLTypeName = StandardSQLTypeName.INT64;
+        } else if (logicalType.equals(LogicalTypes.date())) {
+          standardSQLTypeName = StandardSQLTypeName.DATE;
+        } else if (logicalType.equals(LogicalTypes.timeMillis()) || 
logicalType.equals(LogicalTypes.timeMicros())) {
+          standardSQLTypeName = StandardSQLTypeName.TIME;
+        } else if (logicalType.equals(LogicalTypes.timestampMillis()) || 
logicalType.equals(LogicalTypes.timestampMicros())) {
+          standardSQLTypeName = StandardSQLTypeName.TIMESTAMP;
+          // Due to older avro support, we need to use strings for local 
timestamp logical types
+        } else if (logicalType.getName().equals("local-timestamp-millis") || 
logicalType.getName().equals("local-timestamp-micros")) {
+          standardSQLTypeName = StandardSQLTypeName.INT64;
+        } else {
+          throw new IllegalArgumentException("Unexpected logical type in 
schema: " + logicalType);
+        }
+        break;
+      case ENUM:
+      case STRING:
+        standardSQLTypeName = StandardSQLTypeName.STRING;
+        break;
+      case BOOLEAN:
+        standardSQLTypeName = StandardSQLTypeName.BOOL;
+        break;
+      case DOUBLE:
+      case FLOAT:
+        standardSQLTypeName = StandardSQLTypeName.FLOAT64;
+        break;
+      case BYTES:
+      case FIXED:
+        LogicalType bytesLogicalType = fieldSchema.getLogicalType();
+        if (bytesLogicalType == null) {
+          standardSQLTypeName = StandardSQLTypeName.BYTES;
+        } else if (bytesLogicalType instanceof LogicalTypes.Decimal) {
+          standardSQLTypeName = StandardSQLTypeName.NUMERIC;
+        } else {
+          throw new IllegalArgumentException("Unexpected logical type in 
schema: " + bytesLogicalType);
+        }
+        break;
+      case RECORD:
+        return Field.newBuilder(name, StandardSQLTypeName.STRUCT,
+            FieldList.of(getFields(fieldSchema))).setMode(fieldMode).build();
+      case ARRAY:
+        Field arrayField = getField(fieldSchema.getElementType(), "array", 
true);
+        return Field.newBuilder(name, arrayField.getType(), 
arrayField.getSubFields()).setMode(Field.Mode.REPEATED).build();
+      case MAP:
+        Field keyField = Field.newBuilder("key", 
StandardSQLTypeName.STRING).setMode(Field.Mode.REQUIRED).build();
+        Field valueField = getField(fieldSchema.getValueType(), "value", 
false);
+        Field keyValueField = Field.newBuilder("key_value", 
StandardSQLTypeName.STRUCT, keyField, 
valueField).setMode(Field.Mode.REPEATED).build();
+        return Field.newBuilder(name, StandardSQLTypeName.STRUCT, 
keyValueField).setMode(Field.Mode.NULLABLE).build();
+      case UNION:
+        List<org.apache.avro.Schema> subTypes = fieldSchema.getTypes();
+        validateUnion(subTypes);
+        org.apache.avro.Schema fieldSchemaFromUnion = 
subTypes.get(0).getType() == org.apache.avro.Schema.Type.NULL ? subTypes.get(1) 
: subTypes.get(0);
+        nullable = true;
+        return getField(fieldSchemaFromUnion, name, nullable);
+      default:
+        throw new RuntimeException("Unexpected field type: " + 
fieldSchema.getType());
+    }
+    return Field.newBuilder(name, 
standardSQLTypeName).setMode(fieldMode).build();
+  }
+
+  private List<Field> getFields(org.apache.avro.Schema schema) {
+    return schema.getFields().stream().map(field -> {
+      final org.apache.avro.Schema fieldSchema;
+      final boolean nullable;
+      if (field.schema().getType() == org.apache.avro.Schema.Type.UNION) {
+        List<org.apache.avro.Schema> subTypes = field.schema().getTypes();
+        validateUnion(subTypes);
+        fieldSchema = subTypes.get(0).getType() == 
org.apache.avro.Schema.Type.NULL ? subTypes.get(1) : subTypes.get(0);
+        nullable = true;
+      } else {
+        fieldSchema = field.schema();
+        nullable = false;
+      }
+      return getField(fieldSchema, field.name(), nullable);
+    }).collect(Collectors.toList());
+  }
+
+  private void validateUnion(List<org.apache.avro.Schema> subTypes) {
+    if (subTypes.size() != 2 || (subTypes.get(0).getType() != 
org.apache.avro.Schema.Type.NULL
+        && subTypes.get(1).getType() != org.apache.avro.Schema.Type.NULL)) {
+      throw new HoodieException("Only unions of a single type and null are 
currently supported");
+    }
+  }
+}
diff --git 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
index 1f99a57b550..8630bacc9c0 100644
--- 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
+++ 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.sync.common.HoodieSyncConfig;
 
@@ -168,7 +169,7 @@ public class BigQuerySyncConfig extends HoodieSyncConfig 
implements Serializable
       props.setPropertyIfNonNull(BIGQUERY_SYNC_SOURCE_URI.key(), sourceUri);
       props.setPropertyIfNonNull(BIGQUERY_SYNC_SOURCE_URI_PREFIX.key(), 
sourceUriPrefix);
       props.setPropertyIfNonNull(BIGQUERY_SYNC_SYNC_BASE_PATH.key(), 
hoodieSyncConfigParams.basePath);
-      props.setPropertyIfNonNull(BIGQUERY_SYNC_PARTITION_FIELDS.key(), 
String.join(",", hoodieSyncConfigParams.partitionFields));
+      props.setPropertyIfNonNull(BIGQUERY_SYNC_PARTITION_FIELDS.key(), 
StringUtils.join(",", hoodieSyncConfigParams.partitionFields));
       
props.setPropertyIfNonNull(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), 
hoodieSyncConfigParams.useFileListingFromMetadata);
       props.setPropertyIfNonNull(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING.key(), 
hoodieSyncConfigParams.assumeDatePartitioning);
       return props;
diff --git 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
index 47aa342dad0..d44c9d533ab 100644
--- a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
+++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
@@ -19,25 +19,28 @@
 
 package org.apache.hudi.gcp.bigquery;
 
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.sync.common.HoodieSyncTool;
 import org.apache.hudi.sync.common.util.ManifestFileWriter;
 
 import com.beust.jcommander.JCommander;
+import com.google.cloud.bigquery.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
 
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS;
-import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX;
-import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SYNC_BASE_PATH;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_TABLE_NAME;
+import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA;
 
 /**
@@ -52,34 +55,63 @@ public class BigQuerySyncTool extends HoodieSyncTool {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(BigQuerySyncTool.class);
 
-  public final BigQuerySyncConfig config;
-  public final String tableName;
-  public final String manifestTableName;
-  public final String versionsTableName;
-  public final String snapshotViewName;
+  private final BigQuerySyncConfig config;
+  private final String tableName;
+  private final String manifestTableName;
+  private final String versionsTableName;
+  private final String snapshotViewName;
+  private final ManifestFileWriter manifestFileWriter;
+  private final HoodieBigQuerySyncClient bqSyncClient;
+  private final HoodieTableMetaClient metaClient;
+  private final BigQuerySchemaResolver bqSchemaResolver;
 
   public BigQuerySyncTool(Properties props) {
+    // will build file writer, client, etc. from configs
     super(props);
     this.config = new BigQuerySyncConfig(props);
     this.tableName = config.getString(BIGQUERY_SYNC_TABLE_NAME);
     this.manifestTableName = tableName + "_manifest";
     this.versionsTableName = tableName + "_versions";
     this.snapshotViewName = tableName;
+    this.bqSyncClient = new HoodieBigQuerySyncClient(config);
+    // reuse existing meta client if not provided (only test cases will 
provide their own meta client)
+    this.metaClient = bqSyncClient.getMetaClient();
+    this.manifestFileWriter = buildManifestFileWriterFromConfig(metaClient, 
config);
+    this.bqSchemaResolver = BigQuerySchemaResolver.getInstance();
+  }
+
+  @VisibleForTesting // allows us to pass in mocks for the writer and client
+  BigQuerySyncTool(Properties properties, ManifestFileWriter 
manifestFileWriter, HoodieBigQuerySyncClient bigQuerySyncClient, 
HoodieTableMetaClient metaClient,
+                   BigQuerySchemaResolver bigQuerySchemaResolver) {
+    super(properties);
+    this.config = new BigQuerySyncConfig(props);
+    this.tableName = config.getString(BIGQUERY_SYNC_TABLE_NAME);
+    this.manifestTableName = tableName + "_manifest";
+    this.versionsTableName = tableName + "_versions";
+    this.snapshotViewName = tableName;
+    this.bqSyncClient = bigQuerySyncClient;
+    this.metaClient = metaClient;
+    this.manifestFileWriter = manifestFileWriter;
+    this.bqSchemaResolver = bigQuerySchemaResolver;
+  }
+
+  private static ManifestFileWriter 
buildManifestFileWriterFromConfig(HoodieTableMetaClient metaClient, 
BigQuerySyncConfig config) {
+    return ManifestFileWriter.builder()
+        .setMetaClient(metaClient)
+        
.setUseFileListingFromMetadata(config.getBoolean(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA))
+        
.setAssumeDatePartitioning(config.getBoolean(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING))
+        .build();
   }
 
   @Override
   public void syncHoodieTable() {
-    try (HoodieBigQuerySyncClient bqSyncClient = new 
HoodieBigQuerySyncClient(config)) {
-      switch (bqSyncClient.getTableType()) {
-        case COPY_ON_WRITE:
-        case MERGE_ON_READ:
-          syncTable(bqSyncClient);
-          break;
-        default:
-          throw new UnsupportedOperationException(bqSyncClient.getTableType() 
+ " table type is not supported yet.");
-      }
-    } catch (Exception e) {
-      throw new HoodieBigQuerySyncException("Failed to sync BigQuery for 
table:" + tableName, e);
+    switch (bqSyncClient.getTableType()) {
+      case COPY_ON_WRITE:
+      case MERGE_ON_READ:
+        syncTable(bqSyncClient);
+        break;
+      default:
+        throw new UnsupportedOperationException(bqSyncClient.getTableType() + 
" table type is not supported yet.");
     }
   }
 
@@ -92,29 +124,26 @@ public class BigQuerySyncTool extends HoodieSyncTool {
   }
 
   private void syncTable(HoodieBigQuerySyncClient bqSyncClient) {
-    ValidationUtils.checkState(bqSyncClient.getTableType() == 
HoodieTableType.COPY_ON_WRITE);
     LOG.info("Sync hoodie table " + snapshotViewName + " at base path " + 
bqSyncClient.getBasePath());
 
     if (!bqSyncClient.datasetExists()) {
       throw new HoodieBigQuerySyncException("Dataset not found: " + 
config.getString(BIGQUERY_SYNC_DATASET_NAME));
     }
 
-    ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder()
-        .setConf(config.getHadoopConf())
-        .setBasePath(config.getString(BIGQUERY_SYNC_SYNC_BASE_PATH))
-        
.setUseFileListingFromMetadata(config.getBoolean(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA))
-        
.setAssumeDatePartitioning(config.getBoolean(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING))
-        .build();
-
+    List<String> partitionFields = 
!StringUtils.isNullOrEmpty(config.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX)) ? 
config.getSplitStrings(BIGQUERY_SYNC_PARTITION_FIELDS) : 
Collections.emptyList();
+    Schema latestSchema = bqSchemaResolver.getTableSchema(metaClient, 
partitionFields);
     if (config.getBoolean(BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE)) {
       manifestFileWriter.writeManifestFile(true);
-
       if (!tableExists(bqSyncClient, tableName)) {
         bqSyncClient.createTableUsingBqManifestFile(
             tableName,
             manifestFileWriter.getManifestSourceUri(true),
-            config.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX));
+            config.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX),
+            latestSchema);
         LOG.info("Completed table " + tableName + " creation using the 
manifest file");
+      } else {
+        bqSyncClient.updateTableSchema(tableName, latestSchema, 
partitionFields);
+        LOG.info("Synced schema for " + tableName);
       }
 
       LOG.info("Sync table complete for " + tableName);
@@ -146,6 +175,12 @@ public class BigQuerySyncTool extends HoodieSyncTool {
     LOG.info("Sync table complete for " + snapshotViewName);
   }
 
+  @Override
+  public void close() throws Exception {
+    super.close();
+    bqSyncClient.close();
+  }
+
   public static void main(String[] args) {
     final BigQuerySyncConfig.BigQuerySyncConfigParams params = new 
BigQuerySyncConfig.BigQuerySyncConfigParams();
     JCommander cmd = JCommander.newBuilder().addObject(params).build();
diff --git 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
index 8c8372a992a..fa32f931049 100644
--- 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
+++ 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
@@ -20,6 +20,7 @@
 package org.apache.hudi.gcp.bigquery;
 
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.sync.common.HoodieSyncClient;
 
 import com.google.cloud.bigquery.BigQuery;
@@ -49,6 +50,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_LOCATION;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
@@ -71,6 +73,15 @@ public class HoodieBigQuerySyncClient extends 
HoodieSyncClient {
     this.createBigQueryConnection();
   }
 
+  @VisibleForTesting
+  HoodieBigQuerySyncClient(final BigQuerySyncConfig config, final BigQuery 
bigquery) {
+    super(config);
+    this.config = config;
+    this.projectId = config.getString(BIGQUERY_SYNC_PROJECT_ID);
+    this.datasetName = config.getString(BIGQUERY_SYNC_DATASET_NAME);
+    this.bigquery = bigquery;
+  }
+
   private void createBigQueryConnection() {
     if (bigquery == null) {
       try {
@@ -84,14 +95,15 @@ public class HoodieBigQuerySyncClient extends 
HoodieSyncClient {
     }
   }
 
-  public void createTableUsingBqManifestFile(String tableName, String 
bqManifestFileUri, String sourceUriPrefix) {
+  public void createTableUsingBqManifestFile(String tableName, String 
bqManifestFileUri, String sourceUriPrefix, Schema schema) {
     try {
-      String withClauses = "";
-      String extraOptions = "";
+      String withClauses = String.format("( %s )", 
BigQuerySchemaResolver.schemaToSqlString(schema));
+      String extraOptions = "enable_list_inference=true,";
       if (!StringUtils.isNullOrEmpty(sourceUriPrefix)) {
-        withClauses = "WITH PARTITION COLUMNS";
-        extraOptions = String.format("hive_partition_uri_prefix=\"%s\",", 
sourceUriPrefix);
+        withClauses += " WITH PARTITION COLUMNS";
+        extraOptions += String.format(" hive_partition_uri_prefix=\"%s\",", 
sourceUriPrefix);
       }
+
       String query =
           String.format(
               "CREATE EXTERNAL TABLE `%s.%s.%s` %s OPTIONS (%s "
@@ -148,6 +160,33 @@ public class HoodieBigQuerySyncClient extends 
HoodieSyncClient {
     }
   }
 
+  /**
+   * Updates the schema for the given table if the schema has changed. The 
schema passed in will not have the partition columns defined,
+   * so we add them back to the schema with the values read from the existing 
BigQuery table. This allows us to keep the partition
+   * field type in sync with how it is registered in BigQuery.
+   * @param tableName name of the table in BigQuery
+   * @param schema latest schema for the table
+   */
+  public void updateTableSchema(String tableName, Schema schema, List<String> 
partitionFields) {
+    Table existingTable = bigquery.getTable(TableId.of(projectId, datasetName, 
tableName));
+    ExternalTableDefinition definition = existingTable.getDefinition();
+    Schema remoteTableSchema = definition.getSchema();
+    // Add the partition fields into the schema to avoid conflicts while 
updating
+    List<Field> updatedTableFields = remoteTableSchema.getFields().stream()
+        .filter(field -> partitionFields.contains(field.getName()))
+        .collect(Collectors.toList());
+    updatedTableFields.addAll(schema.getFields());
+    Schema finalSchema = Schema.of(updatedTableFields);
+    if (definition.getSchema() != null && 
definition.getSchema().equals(finalSchema)) {
+      return; // No need to update schema.
+    }
+    Table updatedTable = existingTable.toBuilder()
+        
.setDefinition(definition.toBuilder().setSchema(finalSchema).setAutodetect(false).build())
+        .build();
+
+    bigquery.update(updatedTable);
+  }
+
   public void createVersionsTable(String tableName, String sourceUri, String 
sourceUriPrefix, List<String> partitionFields) {
     try {
       ExternalTableDefinition customTable;
diff --git 
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySchemaResolver.java
 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySchemaResolver.java
new file mode 100644
index 00000000000..bb45f0b7d56
--- /dev/null
+++ 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySchemaResolver.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.gcp.bigquery;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.StandardSQLTypeName;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static 
org.apache.hudi.gcp.bigquery.BigQuerySchemaResolver.schemaToSqlString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestBigQuerySchemaResolver {
+  private static final com.google.cloud.bigquery.Schema 
PRIMITIVE_TYPES_BQ_SCHEMA = com.google.cloud.bigquery.Schema.of(
+      Field.newBuilder("requiredBoolean", 
StandardSQLTypeName.BOOL).setMode(Field.Mode.REQUIRED).build(),
+      Field.newBuilder("optionalBoolean", 
StandardSQLTypeName.BOOL).setMode(Field.Mode.NULLABLE).build(),
+      Field.newBuilder("requiredInt", 
StandardSQLTypeName.INT64).setMode(Field.Mode.REQUIRED).build(),
+      Field.newBuilder("optionalInt", 
StandardSQLTypeName.INT64).setMode(Field.Mode.NULLABLE).build(),
+      Field.newBuilder("requiredLong", 
StandardSQLTypeName.INT64).setMode(Field.Mode.REQUIRED).build(),
+      Field.newBuilder("optionalLong", 
StandardSQLTypeName.INT64).setMode(Field.Mode.NULLABLE).build(),
+      Field.newBuilder("requiredDouble", 
StandardSQLTypeName.FLOAT64).setMode(Field.Mode.REQUIRED).build(),
+      Field.newBuilder("optionalDouble", 
StandardSQLTypeName.FLOAT64).setMode(Field.Mode.NULLABLE).build(),
+      Field.newBuilder("requiredFloat", 
StandardSQLTypeName.FLOAT64).setMode(Field.Mode.REQUIRED).build(),
+      Field.newBuilder("optionalFloat", 
StandardSQLTypeName.FLOAT64).setMode(Field.Mode.NULLABLE).build(),
+      Field.newBuilder("requiredString", 
StandardSQLTypeName.STRING).setMode(Field.Mode.REQUIRED).build(),
+      Field.newBuilder("optionalString", 
StandardSQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build(),
+      Field.newBuilder("requiredBytes", 
StandardSQLTypeName.BYTES).setMode(Field.Mode.REQUIRED).build(),
+      Field.newBuilder("optionalBytes", 
StandardSQLTypeName.BYTES).setMode(Field.Mode.NULLABLE).build(),
+      Field.newBuilder("requiredEnum", 
StandardSQLTypeName.STRING).setMode(Field.Mode.REQUIRED).build(),
+      Field.newBuilder("optionalEnum", 
StandardSQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build());
+  private static final Schema PRIMITIVE_TYPES = 
SchemaBuilder.record("testRecord")
+      .fields()
+      .requiredBoolean("requiredBoolean")
+      .optionalBoolean("optionalBoolean")
+      .requiredInt("requiredInt")
+      .optionalInt("optionalInt")
+      .requiredLong("requiredLong")
+      .optionalLong("optionalLong")
+      .requiredDouble("requiredDouble")
+      .optionalDouble("optionalDouble")
+      .requiredFloat("requiredFloat")
+      .optionalFloat("optionalFloat")
+      .requiredString("requiredString")
+      .optionalString("optionalString")
+      .requiredBytes("requiredBytes")
+      .optionalBytes("optionalBytes")
+      .name("requiredEnum").type().enumeration("REQUIRED_ENUM").symbols("ONE", 
"TWO").enumDefault("ONE")
+      
.name("optionalEnum").type().optional().enumeration("OPTIONAL_ENUM").symbols("ONE",
 "TWO")
+      .endRecord();
+  private static final Schema NESTED_FIELDS = 
SchemaBuilder.record("testRecord")
+      .fields()
+      .name("nestedOne")
+      .type()
+      .optional()
+      .record("nestedOneType").fields()
+      .optionalInt("nestedOptionalInt")
+      .requiredDouble("nestedRequiredDouble")
+      .name("nestedTwo")
+      .type(SchemaBuilder.record("nestedTwoType").fields()
+          .optionalString("doublyNestedString").endRecord()).noDefault()
+      .endRecord()
+      .endRecord();
+  private static final Schema LISTS = SchemaBuilder.record("testRecord")
+      .fields()
+      .name("intList")
+      .type()
+      .array()
+      .items()
+      .intType().noDefault()
+      .name("recordList")
+      .type()
+      .nullable()
+      .array()
+      
.items(SchemaBuilder.record("randomname").fields().requiredDouble("requiredDouble").optionalString("optionalString").endRecord())
+      .noDefault()
+      .endRecord();
+  private static final BigQuerySchemaResolver SCHEMA_RESOLVER = 
BigQuerySchemaResolver.getInstance();
+
+  @Test
+  void convertSchema_primitiveFields() {
+    Assertions.assertEquals(PRIMITIVE_TYPES_BQ_SCHEMA, 
SCHEMA_RESOLVER.convertSchema(PRIMITIVE_TYPES));
+  }
+
+  @Test
+  void convertSchemaToString_primitiveTypes() {
+    String expectedSqlSchema = "requiredBoolean BOOL NOT NULL, "
+        + "optionalBoolean BOOL, "
+        + "requiredInt INT64 NOT NULL, "
+        + "optionalInt INT64, "
+        + "requiredLong INT64 NOT NULL, "
+        + "optionalLong INT64, "
+        + "requiredDouble FLOAT64 NOT NULL, "
+        + "optionalDouble FLOAT64, "
+        + "requiredFloat FLOAT64 NOT NULL, "
+        + "optionalFloat FLOAT64, "
+        + "requiredString STRING NOT NULL, "
+        + "optionalString STRING, "
+        + "requiredBytes BYTES NOT NULL, "
+        + "optionalBytes BYTES, "
+        + "requiredEnum STRING NOT NULL, "
+        + "optionalEnum STRING";
+    Assertions.assertEquals(expectedSqlSchema, 
schemaToSqlString(SCHEMA_RESOLVER.convertSchema(PRIMITIVE_TYPES)));
+  }
+
+  @Test
+  void convertSchema_nestedFields() {
+    com.google.cloud.bigquery.Schema expected = 
com.google.cloud.bigquery.Schema.of(
+        Field.newBuilder("nestedOne", StandardSQLTypeName.STRUCT,
+                Field.newBuilder("nestedOptionalInt", 
StandardSQLTypeName.INT64).setMode(Field.Mode.NULLABLE).build(),
+                Field.newBuilder("nestedRequiredDouble", 
StandardSQLTypeName.FLOAT64).setMode(Field.Mode.REQUIRED).build(),
+                Field.newBuilder("nestedTwo", StandardSQLTypeName.STRUCT,
+                    Field.newBuilder("doublyNestedString", 
StandardSQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build()).setMode(Field.Mode.REQUIRED).build())
+            .setMode(Field.Mode.NULLABLE).build());
+
+    Assertions.assertEquals(expected, 
SCHEMA_RESOLVER.convertSchema(NESTED_FIELDS));
+  }
+
+  @Test
+  void convertSchemaToString_nestedFields() {
+    String expectedSqlSchema = "nestedOne STRUCT<"
+        + "nestedOptionalInt INT64, "
+        + "nestedRequiredDouble FLOAT64 NOT NULL, "
+        + "nestedTwo STRUCT<doublyNestedString STRING> NOT NULL>";
+    Assertions.assertEquals(expectedSqlSchema, 
schemaToSqlString(SCHEMA_RESOLVER.convertSchema(NESTED_FIELDS)));
+  }
+
+  @Test
+  void convertSchema_lists() {
+    Field intListField = Field.newBuilder("intList", 
StandardSQLTypeName.INT64).setMode(Field.Mode.REPEATED).build();
+
+    Field requiredDoubleField = Field.newBuilder("requiredDouble", 
StandardSQLTypeName.FLOAT64)
+        .setMode(Field.Mode.REQUIRED)
+        .build();
+    Field optionalStringField = Field.newBuilder("optionalString", 
StandardSQLTypeName.STRING)
+        .setMode(Field.Mode.NULLABLE)
+        .build();
+    Field recordListField = Field.newBuilder("recordList", 
StandardSQLTypeName.STRUCT,
+        requiredDoubleField, 
optionalStringField).setMode(Field.Mode.REPEATED).build();
+
+
+    com.google.cloud.bigquery.Schema expected =
+        com.google.cloud.bigquery.Schema.of(intListField, recordListField);
+    Assertions.assertEquals(expected, SCHEMA_RESOLVER.convertSchema(LISTS));
+  }
+
+  @Test
+  void convertSchemaToString_lists() {
+    String expectedSqlSchema = "intList ARRAY<INT64>, "
+        + "recordList ARRAY<STRUCT<requiredDouble FLOAT64 NOT NULL, 
optionalString STRING>>";
+    Assertions.assertEquals(expectedSqlSchema, 
schemaToSqlString(SCHEMA_RESOLVER.convertSchema(LISTS)));
+  }
+
+  @Test
+  void convertSchemaListOfNullableRecords() {
+    Schema nestedRecordType = 
SchemaBuilder.record("nested_record").fields().optionalString("inner_string_field").endRecord();
+    Schema input = SchemaBuilder.record("top_level_schema")
+        .fields().name("top_level_schema_field")
+        .type()
+        .nullable()
+        .array()
+        
.items(SchemaBuilder.unionOf().nullType().and().type(nestedRecordType).endUnion())
+        .noDefault()
+        .endRecord();
+
+    Field innerStringField = Field.newBuilder("inner_string_field", 
StandardSQLTypeName.STRING)
+        .setMode(Field.Mode.NULLABLE)
+        .build();
+    Field topLevelSchemaField = Field.newBuilder("top_level_schema_field", 
StandardSQLTypeName.STRUCT,
+        innerStringField).setMode(Field.Mode.REPEATED).build();
+
+    com.google.cloud.bigquery.Schema expected = 
com.google.cloud.bigquery.Schema.of(topLevelSchemaField);
+    Assertions.assertEquals(expected, SCHEMA_RESOLVER.convertSchema(input));
+  }
+
+  @Test
+  void convertSchema_logicalTypes() {
+    String schemaString = 
"{\"type\":\"record\",\"name\":\"logicalTypes\",\"fields\":[{\"name\":\"int_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},"
+        + 
"{\"name\":\"int_time_millis\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},{\"name\":\"long_time_micros\",\"type\":{\"type\":\"long\",\"logicalType\":\"time-micros\"}},"
+        + 
"{\"name\":\"long_timestamp_millis\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},"
+        + 
"{\"name\":\"long_timestamp_micros\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}},"
+        + 
"{\"name\":\"long_timestamp_millis_local\",\"type\":{\"type\":\"long\",\"logicalType\":\"local-timestamp-millis\"}},"
+        + 
"{\"name\":\"long_timestamp_micros_local\",\"type\":{\"type\":\"long\",\"logicalType\":\"local-timestamp-micros\"}},"
+        + 
"{\"name\":\"bytes_decimal\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",
 \"precision\": 4, \"scale\": 2}}]}";
+    Schema.Parser parser = new Schema.Parser();
+    Schema input = parser.parse(schemaString);
+
+    com.google.cloud.bigquery.Schema expected = 
com.google.cloud.bigquery.Schema.of(
+        Field.newBuilder("int_date", 
StandardSQLTypeName.DATE).setMode(Field.Mode.REQUIRED).build(),
+        Field.newBuilder("int_time_millis", 
StandardSQLTypeName.TIME).setMode(Field.Mode.REQUIRED).build(),
+        Field.newBuilder("long_time_micros", 
StandardSQLTypeName.TIME).setMode(Field.Mode.REQUIRED).build(),
+        Field.newBuilder("long_timestamp_millis", 
StandardSQLTypeName.TIMESTAMP).setMode(Field.Mode.REQUIRED).build(),
+        Field.newBuilder("long_timestamp_micros", 
StandardSQLTypeName.TIMESTAMP).setMode(Field.Mode.REQUIRED).build(),
+        Field.newBuilder("long_timestamp_millis_local", 
StandardSQLTypeName.INT64).setMode(Field.Mode.REQUIRED).build(),
+        Field.newBuilder("long_timestamp_micros_local", 
StandardSQLTypeName.INT64).setMode(Field.Mode.REQUIRED).build(),
+        Field.newBuilder("bytes_decimal", 
StandardSQLTypeName.NUMERIC).setMode(Field.Mode.REQUIRED).build());
+
+    Assertions.assertEquals(expected, SCHEMA_RESOLVER.convertSchema(input));
+  }
+
+  @Test
+  void convertSchema_maps() {
+    Schema input = SchemaBuilder.record("testRecord")
+        .fields()
+        .name("intMap")
+        .type()
+        .map()
+        .values()
+        .intType().noDefault()
+        .name("recordMap")
+        .type()
+        .nullable()
+        .map()
+        
.values(SchemaBuilder.record("element").fields().requiredDouble("requiredDouble").optionalString("optionalString").endRecord())
+        .noDefault()
+        .endRecord();
+
+
+    com.google.cloud.bigquery.Schema expected = 
com.google.cloud.bigquery.Schema.of(
+        Field.newBuilder("intMap", StandardSQLTypeName.STRUCT,
+                Field.newBuilder("key_value", StandardSQLTypeName.STRUCT,
+                        Field.newBuilder("key", 
StandardSQLTypeName.STRING).setMode(Field.Mode.REQUIRED).build(),
+                        Field.newBuilder("value", 
StandardSQLTypeName.INT64).setMode(Field.Mode.REQUIRED).build())
+                    .setMode(Field.Mode.REPEATED).build())
+            .setMode(Field.Mode.NULLABLE).build(),
+        Field.newBuilder("recordMap", StandardSQLTypeName.STRUCT,
+                Field.newBuilder("key_value", StandardSQLTypeName.STRUCT,
+                    Field.newBuilder("key", 
StandardSQLTypeName.STRING).setMode(Field.Mode.REQUIRED).build(),
+                    Field.newBuilder("value", StandardSQLTypeName.STRUCT,
+                        Field.newBuilder("requiredDouble", 
StandardSQLTypeName.FLOAT64).setMode(Field.Mode.REQUIRED).build(),
+                        Field.newBuilder("optionalString", 
StandardSQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build()
+                    
).setMode(Field.Mode.REQUIRED).build()).setMode(Field.Mode.REPEATED).build())
+            .setMode(Field.Mode.NULLABLE).build());
+
+    Assertions.assertEquals(expected, SCHEMA_RESOLVER.convertSchema(input));
+  }
+
+  @Test
+  void getTableSchema_withPartitionFields() throws Exception {
+    HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
+    TableSchemaResolver mockTableSchemaResolver = 
mock(TableSchemaResolver.class);
+    
when(mockTableSchemaResolver.getTableAvroSchema()).thenReturn(PRIMITIVE_TYPES);
+    BigQuerySchemaResolver resolver = new BigQuerySchemaResolver(metaClient -> 
mockTableSchemaResolver);
+
+    com.google.cloud.bigquery.Schema expected = 
com.google.cloud.bigquery.Schema.of(
+        Field.newBuilder("requiredBoolean", 
StandardSQLTypeName.BOOL).setMode(Field.Mode.REQUIRED).build(),
+        Field.newBuilder("optionalBoolean", 
StandardSQLTypeName.BOOL).setMode(Field.Mode.NULLABLE).build(),
+        Field.newBuilder("requiredInt", 
StandardSQLTypeName.INT64).setMode(Field.Mode.REQUIRED).build(),
+        Field.newBuilder("optionalInt", 
StandardSQLTypeName.INT64).setMode(Field.Mode.NULLABLE).build(),
+        Field.newBuilder("requiredLong", 
StandardSQLTypeName.INT64).setMode(Field.Mode.REQUIRED).build(),
+        Field.newBuilder("optionalLong", 
StandardSQLTypeName.INT64).setMode(Field.Mode.NULLABLE).build(),
+        Field.newBuilder("requiredDouble", 
StandardSQLTypeName.FLOAT64).setMode(Field.Mode.REQUIRED).build(),
+        Field.newBuilder("optionalDouble", 
StandardSQLTypeName.FLOAT64).setMode(Field.Mode.NULLABLE).build(),
+        Field.newBuilder("requiredFloat", 
StandardSQLTypeName.FLOAT64).setMode(Field.Mode.REQUIRED).build(),
+        Field.newBuilder("optionalFloat", 
StandardSQLTypeName.FLOAT64).setMode(Field.Mode.NULLABLE).build(),
+        Field.newBuilder("optionalString", 
StandardSQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build(),
+        Field.newBuilder("requiredBytes", 
StandardSQLTypeName.BYTES).setMode(Field.Mode.REQUIRED).build(),
+        Field.newBuilder("optionalBytes", 
StandardSQLTypeName.BYTES).setMode(Field.Mode.NULLABLE).build(),
+        Field.newBuilder("requiredEnum", 
StandardSQLTypeName.STRING).setMode(Field.Mode.REQUIRED).build(),
+        Field.newBuilder("optionalEnum", 
StandardSQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build());
+
+    // expect 'requiredString' field to be removed
+    Assertions.assertEquals(expected, resolver.getTableSchema(mockMetaClient, 
Collections.singletonList("requiredString")));
+  }
+
+  @Test
+  void getTableSchema_withoutPartitionFields() throws Exception {
+    HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
+    TableSchemaResolver mockTableSchemaResolver = 
mock(TableSchemaResolver.class);
+    
when(mockTableSchemaResolver.getTableAvroSchema()).thenReturn(PRIMITIVE_TYPES);
+    
when(mockTableSchemaResolver.getTableAvroSchema()).thenReturn(PRIMITIVE_TYPES);
+    BigQuerySchemaResolver resolver = new BigQuerySchemaResolver(metaClient -> 
mockTableSchemaResolver);
+    Assertions.assertEquals(PRIMITIVE_TYPES_BQ_SCHEMA, 
resolver.getTableSchema(mockMetaClient, Collections.emptyList()));
+  }
+}
diff --git 
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncTool.java 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncTool.java
new file mode 100644
index 00000000000..5edbdac1c2e
--- /dev/null
+++ 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncTool.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.gcp.bigquery;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.sync.common.util.ManifestFileWriter;
+
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.StandardSQLTypeName;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+public class TestBigQuerySyncTool {
+  private static final String TEST_TABLE = "test_table";
+  private final ManifestFileWriter mockManifestFileWriter = 
mock(ManifestFileWriter.class);
+  private final HoodieBigQuerySyncClient mockBqSyncClient = 
mock(HoodieBigQuerySyncClient.class);
+  private final BigQuerySchemaResolver mockBqSchemaResolver = 
mock(BigQuerySchemaResolver.class);
+  private final HoodieTableMetaClient mockMetaClient = 
mock(HoodieTableMetaClient.class);
+  private final Properties properties = new Properties();
+
+  private final Schema schema = Schema.of(Field.of("id", 
StandardSQLTypeName.STRING));
+
+  @BeforeEach
+  void setup() {
+    // add default properties
+    properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_TABLE_NAME.key(), 
TEST_TABLE);
+  }
+
+  @Test
+  void missingDatasetCausesFailure() {
+    
when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE);
+    when(mockBqSyncClient.datasetExists()).thenReturn(false);
+    BigQuerySyncTool tool = new BigQuerySyncTool(properties, 
mockManifestFileWriter, mockBqSyncClient, mockMetaClient, mockBqSchemaResolver);
+    assertThrows(HoodieBigQuerySyncException.class, tool::syncHoodieTable);
+    verifyNoInteractions(mockManifestFileWriter, mockBqSchemaResolver);
+  }
+
+  @Test
+  void useBQManifestFile_newTablePartitioned() {
+    
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE.key(),
 "true");
+    String prefix = "file:///local/prefix";
+    
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX.key(),
 prefix);
+    
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS.key(), 
"datestr,type");
+    
when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE);
+    when(mockBqSyncClient.datasetExists()).thenReturn(true);
+    when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(false);
+    Path manifestPath = new Path("file:///local/path");
+    
when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath());
+    when(mockBqSchemaResolver.getTableSchema(any(), 
eq(Arrays.asList("datestr", "type")))).thenReturn(schema);
+    BigQuerySyncTool tool = new BigQuerySyncTool(properties, 
mockManifestFileWriter, mockBqSyncClient, mockMetaClient, mockBqSchemaResolver);
+    tool.syncHoodieTable();
+    verify(mockBqSyncClient).createTableUsingBqManifestFile(TEST_TABLE, 
manifestPath.toUri().getPath(), prefix, schema);
+    verify(mockManifestFileWriter).writeManifestFile(true);
+  }
+
+  @Test
+  void useBQManifestFile_newTableNonPartitioned() {
+    
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE.key(),
 "true");
+    
when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE);
+    when(mockBqSyncClient.datasetExists()).thenReturn(true);
+    when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(false);
+    Path manifestPath = new Path("file:///local/path");
+    
when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath());
+    when(mockBqSchemaResolver.getTableSchema(any(), 
eq(Collections.emptyList()))).thenReturn(schema);
+    BigQuerySyncTool tool = new BigQuerySyncTool(properties, 
mockManifestFileWriter, mockBqSyncClient, mockMetaClient, mockBqSchemaResolver);
+    tool.syncHoodieTable();
+    verify(mockBqSyncClient).createTableUsingBqManifestFile(TEST_TABLE, 
manifestPath.toUri().getPath(), null, schema);
+    verify(mockManifestFileWriter).writeManifestFile(true);
+  }
+
+  @Test
+  void useBQManifestFile_existingPartitionedTable() {
+    
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE.key(),
 "true");
+    String prefix = "file:///local/prefix";
+    
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX.key(),
 prefix);
+    
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS.key(), 
"datestr,type");
+    
when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE);
+    when(mockBqSyncClient.datasetExists()).thenReturn(true);
+    when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(true);
+    Path manifestPath = new Path("file:///local/path");
+    
when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath());
+    List<String> partitionFields = Arrays.asList("datestr", "type");
+    when(mockBqSchemaResolver.getTableSchema(any(), 
eq(partitionFields))).thenReturn(schema);
+    BigQuerySyncTool tool = new BigQuerySyncTool(properties, 
mockManifestFileWriter, mockBqSyncClient, mockMetaClient, mockBqSchemaResolver);
+    tool.syncHoodieTable();
+    verify(mockBqSyncClient).updateTableSchema(TEST_TABLE, schema, 
partitionFields);
+    verify(mockManifestFileWriter).writeManifestFile(true);
+  }
+
+  @Test
+  void useBQManifestFile_existingNonPartitionedTable() {
+    
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE.key(),
 "true");
+    
when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE);
+    when(mockBqSyncClient.datasetExists()).thenReturn(true);
+    when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(true);
+    Path manifestPath = new Path("file:///local/path");
+    
when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath());
+    when(mockBqSchemaResolver.getTableSchema(any(), 
eq(Collections.emptyList()))).thenReturn(schema);
+    BigQuerySyncTool tool = new BigQuerySyncTool(properties, 
mockManifestFileWriter, mockBqSyncClient, mockMetaClient, mockBqSchemaResolver);
+    tool.syncHoodieTable();
+    verify(mockBqSyncClient).updateTableSchema(TEST_TABLE, schema, 
Collections.emptyList());
+    verify(mockManifestFileWriter).writeManifestFile(true);
+  }
+}
diff --git 
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
new file mode 100644
index 00000000000..df7e6a9f31e
--- /dev/null
+++ 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.gcp.bigquery;
+
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.sync.common.HoodieSyncConfig;
+
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.Job;
+import com.google.cloud.bigquery.JobInfo;
+import com.google.cloud.bigquery.JobStatus;
+import com.google.cloud.bigquery.QueryJobConfiguration;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.StandardSQLTypeName;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.ArgumentCaptor;
+
+import java.nio.file.Path;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestHoodieBigQuerySyncClient {
+  private static final String PROJECT_ID = "test_project";
+  private static final String MANIFEST_FILE_URI = "file:/manifest_file";
+  private static final String SOURCE_PREFIX = "file:/manifest_file/date=*";
+  private static final String TEST_TABLE = "test_table";
+  private static final String TEST_DATASET = "test_dataset";
+
+  static @TempDir Path tempDir;
+
+  private static String basePath;
+  private final BigQuery mockBigQuery = mock(BigQuery.class);
+  private HoodieBigQuerySyncClient client;
+
+  @BeforeAll
+  static void setupOnce() throws Exception {
+    basePath = tempDir.toString();
+    HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(HoodieTableType.COPY_ON_WRITE)
+        .setTableName(TEST_TABLE)
+        .setPayloadClass(HoodieAvroPayload.class)
+        .initTable(new Configuration(), basePath);
+  }
+
+  @BeforeEach
+  void setup() {
+    Properties properties = new Properties();
+    properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID.key(), 
PROJECT_ID);
+    
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME.key(), 
TEST_DATASET);
+    properties.setProperty(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), 
tempDir.toString());
+    BigQuerySyncConfig config = new BigQuerySyncConfig(properties);
+    client = new HoodieBigQuerySyncClient(config, mockBigQuery);
+  }
+
+  @Test
+  void createTableWithManifestFile_partitioned() throws Exception {
+    Schema schema = Schema.of(Field.of("field", StandardSQLTypeName.STRING));
+    ArgumentCaptor<JobInfo> jobInfoCaptor = 
ArgumentCaptor.forClass(JobInfo.class);
+    Job mockJob = mock(Job.class);
+    when(mockBigQuery.create(jobInfoCaptor.capture())).thenReturn(mockJob);
+    Job mockJobFinished = mock(Job.class);
+    when(mockJob.waitFor()).thenReturn(mockJobFinished);
+    JobStatus mockJobStatus = mock(JobStatus.class);
+    when(mockJobFinished.getStatus()).thenReturn(mockJobStatus);
+    when(mockJobStatus.getError()).thenReturn(null);
+    client.createTableUsingBqManifestFile(TEST_TABLE, MANIFEST_FILE_URI, 
SOURCE_PREFIX, schema);
+
+    QueryJobConfiguration configuration = 
jobInfoCaptor.getValue().getConfiguration();
+    assertEquals(configuration.getQuery(),
+        String.format("CREATE EXTERNAL TABLE `%s.%s` ( field STRING ) WITH 
PARTITION COLUMNS OPTIONS (enable_list_inference=true, 
hive_partition_uri_prefix=\"%s\", uris=[\"%s\"], format=\"PARQUET\", "
+            + "file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")", 
TEST_DATASET, TEST_TABLE, SOURCE_PREFIX, MANIFEST_FILE_URI));
+  }
+
+  @Test
+  void createTableWithManifestFile_nonPartitioned() throws Exception {
+    Schema schema = Schema.of(Field.of("field", StandardSQLTypeName.STRING));
+    ArgumentCaptor<JobInfo> jobInfoCaptor = 
ArgumentCaptor.forClass(JobInfo.class);
+    Job mockJob = mock(Job.class);
+    when(mockBigQuery.create(jobInfoCaptor.capture())).thenReturn(mockJob);
+    Job mockJobFinished = mock(Job.class);
+    when(mockJob.waitFor()).thenReturn(mockJobFinished);
+    JobStatus mockJobStatus = mock(JobStatus.class);
+    when(mockJobFinished.getStatus()).thenReturn(mockJobStatus);
+    when(mockJobStatus.getError()).thenReturn(null);
+    client.createTableUsingBqManifestFile(TEST_TABLE, MANIFEST_FILE_URI, "", 
schema);
+
+    QueryJobConfiguration configuration = 
jobInfoCaptor.getValue().getConfiguration();
+    assertEquals(configuration.getQuery(),
+        String.format("CREATE EXTERNAL TABLE `%s.%s` ( field STRING ) OPTIONS 
(enable_list_inference=true, uris=[\"%s\"], format=\"PARQUET\", "
+            + "file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")", 
TEST_DATASET, TEST_TABLE, MANIFEST_FILE_URI));
+  }
+}
diff --git 
a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java
 
b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java
index e03388e1dba..442f796fdf6 100644
--- 
a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java
+++ 
b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java
@@ -201,7 +201,7 @@ public class AdbSyncConfig extends HiveSyncConfig {
       props.setPropertyIfNonNull(ADB_SYNC_PASS.key(), 
hiveSyncConfigParams.hivePass);
       props.setPropertyIfNonNull(ADB_SYNC_JDBC_URL.key(), 
hiveSyncConfigParams.jdbcUrl);
       props.setPropertyIfNonNull(META_SYNC_BASE_PATH.key(), 
hiveSyncConfigParams.hoodieSyncConfigParams.basePath);
-      props.setPropertyIfNonNull(META_SYNC_PARTITION_FIELDS.key(), 
String.join(",", hiveSyncConfigParams.hoodieSyncConfigParams.partitionFields));
+      props.setPropertyIfNonNull(META_SYNC_PARTITION_FIELDS.key(), 
StringUtils.join(",", 
hiveSyncConfigParams.hoodieSyncConfigParams.partitionFields));
       props.setPropertyIfNonNull(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), 
hiveSyncConfigParams.hoodieSyncConfigParams.partitionValueExtractorClass);
       props.setPropertyIfNonNull(META_SYNC_ASSUME_DATE_PARTITION.key(), 
String.valueOf(hiveSyncConfigParams.hoodieSyncConfigParams.assumeDatePartitioning));
       props.setPropertyIfNonNull(ADB_SYNC_SKIP_RO_SUFFIX.key(), 
String.valueOf(hiveSyncConfigParams.skipROSuffix));
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index 3eeb72f89e0..4c5fb01b9e7 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -83,6 +83,10 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
     return metaClient.getTableConfig().getBootstrapBasePath().isPresent();
   }
 
+  public HoodieTableMetaClient getMetaClient() {
+    return metaClient;
+  }
+
   /**
    * Get the set of dropped partitions since the last synced commit.
    * If last sync time is not known then consider only active timeline.
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
index c078884efc8..7090c194104 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
@@ -51,8 +51,8 @@ public class ManifestFileWriter {
   private final boolean useFileListingFromMetadata;
   private final boolean assumeDatePartitioning;
 
-  private ManifestFileWriter(Configuration hadoopConf, String basePath, 
boolean useFileListingFromMetadata, boolean assumeDatePartitioning) {
-    this.metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
+  private ManifestFileWriter(HoodieTableMetaClient metaClient, boolean 
useFileListingFromMetadata, boolean assumeDatePartitioning) {
+    this.metaClient = metaClient;
     this.useFileListingFromMetadata = useFileListingFromMetadata;
     this.assumeDatePartitioning = assumeDatePartitioning;
   }
@@ -122,21 +122,9 @@ public class ManifestFileWriter {
    * Builder for {@link ManifestFileWriter}.
    */
   public static class Builder {
-
-    private Configuration conf;
-    private String basePath;
     private boolean useFileListingFromMetadata;
     private boolean assumeDatePartitioning;
-
-    public Builder setConf(Configuration conf) {
-      this.conf = conf;
-      return this;
-    }
-
-    public Builder setBasePath(String basePath) {
-      this.basePath = basePath;
-      return this;
-    }
+    private HoodieTableMetaClient metaClient;
 
     public Builder setUseFileListingFromMetadata(boolean 
useFileListingFromMetadata) {
       this.useFileListingFromMetadata = useFileListingFromMetadata;
@@ -148,10 +136,14 @@ public class ManifestFileWriter {
       return this;
     }
 
+    public Builder setMetaClient(HoodieTableMetaClient metaClient) {
+      this.metaClient = metaClient;
+      return this;
+    }
+
     public ManifestFileWriter build() {
-      ValidationUtils.checkArgument(conf != null, "Configuration needs to be 
set to init ManifestFileGenerator");
-      ValidationUtils.checkArgument(basePath != null, "basePath needs to be 
set to init ManifestFileGenerator");
-      return new ManifestFileWriter(conf, basePath, 
useFileListingFromMetadata, assumeDatePartitioning);
+      ValidationUtils.checkArgument(metaClient != null, "MetaClient needs to 
be set to init ManifestFileGenerator");
+      return new ManifestFileWriter(metaClient, useFileListingFromMetadata, 
assumeDatePartitioning);
     }
   }
 }
diff --git 
a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestManifestFileWriter.java
 
b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestManifestFileWriter.java
index b01125853cb..85fd1ef4886 100644
--- 
a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestManifestFileWriter.java
+++ 
b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestManifestFileWriter.java
@@ -49,7 +49,7 @@ public class TestManifestFileWriter extends 
HoodieCommonTestHarness {
   public void testMultiLevelPartitionedTable() throws Exception {
     // Generate 10 files under each partition
     createTestDataForPartitionedTable(metaClient, 10);
-    ManifestFileWriter manifestFileWriter = 
ManifestFileWriter.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build();
+    ManifestFileWriter manifestFileWriter = 
ManifestFileWriter.builder().setMetaClient(metaClient).build();
     assertEquals(30, fetchLatestBaseFilesForAllPartitions(metaClient, false, 
false, false).count());
   }
 
@@ -57,7 +57,7 @@ public class TestManifestFileWriter extends 
HoodieCommonTestHarness {
   public void testCreateManifestFile() throws Exception {
     // Generate 10 files under each partition
     createTestDataForPartitionedTable(metaClient, 3);
-    ManifestFileWriter manifestFileWriter = 
ManifestFileWriter.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build();
+    ManifestFileWriter manifestFileWriter = 
ManifestFileWriter.builder().setMetaClient(metaClient).build();
     manifestFileWriter.writeManifestFile(false);
     Path manifestFilePath = manifestFileWriter.getManifestFilePath(false);
     try (InputStream is = metaClient.getFs().open(manifestFilePath)) {
@@ -71,7 +71,7 @@ public class TestManifestFileWriter extends 
HoodieCommonTestHarness {
   public void testCreateManifestFileWithAbsolutePath() throws Exception {
     // Generate 10 files under each partition
     createTestDataForPartitionedTable(metaClient, 3);
-    ManifestFileWriter manifestFileWriter = 
ManifestFileWriter.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build();
+    ManifestFileWriter manifestFileWriter = 
ManifestFileWriter.builder().setMetaClient(metaClient).build();
     manifestFileWriter.writeManifestFile(true);
     Path manifestFilePath = manifestFileWriter.getManifestFilePath(true);
     try (InputStream is = metaClient.getFs().open(manifestFilePath)) {
@@ -92,7 +92,7 @@ public class TestManifestFileWriter extends 
HoodieCommonTestHarness {
 
   @Test
   public void getManifestSourceUri() {
-    ManifestFileWriter manifestFileWriter = 
ManifestFileWriter.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build();
+    ManifestFileWriter manifestFileWriter = 
ManifestFileWriter.builder().setMetaClient(metaClient).build();
     String sourceUri = manifestFileWriter.getManifestSourceUri(false);
     assertEquals(new Path(basePath, ".hoodie/manifest/*").toUri().toString(), 
sourceUri);
 

Reply via email to