This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a1afcdd989c [HUDI-7115] Add in new options for the bigquery sync
(#10125)
a1afcdd989c is described below
commit a1afcdd989ce2d634290d1bd9e099a17057e6b4d
Author: Tim Brown <[email protected]>
AuthorDate: Tue Nov 21 14:58:12 2023 -0600
[HUDI-7115] Add in new options for the bigquery sync (#10125)
- Add in new options for the bigquery sync
---
hudi-gcp/pom.xml | 3 +-
.../hudi/gcp/bigquery/BigQuerySyncConfig.java | 20 ++++++++
.../apache/hudi/gcp/bigquery/BigQuerySyncTool.java | 23 +++++----
.../gcp/bigquery/HoodieBigQuerySyncClient.java | 58 +++++++++++++++++++---
.../hudi/gcp/bigquery/TestBigQuerySyncConfig.java | 2 +-
.../hudi/gcp/bigquery/TestBigQuerySyncTool.java | 12 ++---
.../gcp/bigquery/TestBigQuerySyncToolArgs.java | 8 ++-
.../gcp/bigquery/TestHoodieBigQuerySyncClient.java | 26 ++++++----
8 files changed, 114 insertions(+), 38 deletions(-)
diff --git a/hudi-gcp/pom.xml b/hudi-gcp/pom.xml
index b1cfb8076a6..2c308fbf424 100644
--- a/hudi-gcp/pom.xml
+++ b/hudi-gcp/pom.xml
@@ -36,7 +36,7 @@ See
https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/The-Google
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
- <version>25.1.0</version>
+ <version>26.15.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
@@ -70,7 +70,6 @@ See
https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/The-Google
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
- <version>${google.cloud.pubsub.version}</version>
</dependency>
<dependency>
diff --git
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
index 94510ca8dfa..ed8895ca217 100644
---
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
+++
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
@@ -122,6 +122,20 @@ public class BigQuerySyncConfig extends HoodieSyncConfig
implements Serializable
.markAdvanced()
.withDocumentation("Fetch file listing from Hudi's metadata");
+ public static final ConfigProperty<Boolean>
BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER = ConfigProperty
+ .key("hoodie.gcp.bigquery.sync.require_partition_filter")
+ .defaultValue(false)
+ .sinceVersion("0.14.1")
+ .markAdvanced()
+ .withDocumentation("If true, configure table to require a partition
filter to be specified when querying the table");
+
+ public static final ConfigProperty<String>
BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID = ConfigProperty
+ .key("hoodie.gcp.bigquery.sync.big_lake_connection_id")
+ .noDefaultValue()
+ .sinceVersion("0.14.1")
+ .markAdvanced()
+ .withDocumentation("The Big Lake connection ID to use");
+
public BigQuerySyncConfig(Properties props) {
super(props);
setDefaults(BigQuerySyncConfig.class.getName());
@@ -147,6 +161,10 @@ public class BigQuerySyncConfig extends HoodieSyncConfig
implements Serializable
public String sourceUri;
@Parameter(names = {"--source-uri-prefix"}, description = "Name of the
source uri gcs path prefix of the table", required = false)
public String sourceUriPrefix;
+ @Parameter(names = {"--big-lake-connection-id"}, description = "The Big
Lake connection ID to use when creating the table if using the manifest file
approach.")
+ public String bigLakeConnectionId;
+ @Parameter(names = {"--require-partition-filter"}, description = "If true,
configure table to require a partition filter to be specified when querying the
table")
+ public Boolean requirePartitionFilter;
public boolean isHelp() {
return hoodieSyncConfigParams.isHelp();
@@ -164,6 +182,8 @@ public class BigQuerySyncConfig extends HoodieSyncConfig
implements Serializable
props.setPropertyIfNonNull(BIGQUERY_SYNC_SYNC_BASE_PATH.key(),
hoodieSyncConfigParams.basePath);
props.setPropertyIfNonNull(BIGQUERY_SYNC_PARTITION_FIELDS.key(),
StringUtils.join(",", hoodieSyncConfigParams.partitionFields));
props.setPropertyIfNonNull(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA.key(),
hoodieSyncConfigParams.useFileListingFromMetadata);
+ props.setPropertyIfNonNull(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID.key(),
bigLakeConnectionId);
+ props.setPropertyIfNonNull(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER.key(),
requirePartitionFilter);
return props;
}
}
diff --git
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
index 19c8449f8fa..28c071e5231 100644
--- a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
+++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
@@ -122,7 +122,7 @@ public class BigQuerySyncTool extends HoodieSyncTool {
}
private void syncTable(HoodieBigQuerySyncClient bqSyncClient) {
- LOG.info("Sync hoodie table " + snapshotViewName + " at base path " +
bqSyncClient.getBasePath());
+ LOG.info("Sync hoodie table {} at base path {}", snapshotViewName,
bqSyncClient.getBasePath());
if (!bqSyncClient.datasetExists()) {
throw new HoodieBigQuerySyncException("Dataset not found: " +
config.getString(BIGQUERY_SYNC_DATASET_NAME));
@@ -132,19 +132,21 @@ public class BigQuerySyncTool extends HoodieSyncTool {
Schema latestSchema = bqSchemaResolver.getTableSchema(metaClient,
partitionFields);
if (config.getBoolean(BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE)) {
manifestFileWriter.writeManifestFile(true);
- if (!tableExists(bqSyncClient, tableName)) {
- bqSyncClient.createTableUsingBqManifestFile(
+ // if table does not exist, create it using the manifest file
+ // if table exists but is not yet using manifest file or needs to be
recreated with the big-lake connection ID, update it to use manifest file
+ if (bqSyncClient.tableNotExistsOrDoesNotMatchSpecification(tableName)) {
+ bqSyncClient.createOrUpdateTableUsingBqManifestFile(
tableName,
manifestFileWriter.getManifestSourceUri(true),
config.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX),
latestSchema);
- LOG.info("Completed table " + tableName + " creation using the
manifest file");
+ LOG.info("Completed table {} creation using the manifest file",
tableName);
} else {
bqSyncClient.updateTableSchema(tableName, latestSchema,
partitionFields);
- LOG.info("Synced schema for " + tableName);
+ LOG.info("Synced schema for {}", tableName);
}
- LOG.info("Sync table complete for " + tableName);
+ LOG.info("Sync table complete for {}", tableName);
return;
}
@@ -152,7 +154,7 @@ public class BigQuerySyncTool extends HoodieSyncTool {
if (!tableExists(bqSyncClient, manifestTableName)) {
bqSyncClient.createManifestTable(manifestTableName,
manifestFileWriter.getManifestSourceUri(false));
- LOG.info("Manifest table creation complete for " + manifestTableName);
+ LOG.info("Manifest table creation complete for {}", manifestTableName);
}
if (!tableExists(bqSyncClient, versionsTableName)) {
@@ -161,16 +163,15 @@ public class BigQuerySyncTool extends HoodieSyncTool {
config.getString(BIGQUERY_SYNC_SOURCE_URI),
config.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX),
config.getSplitStrings(BIGQUERY_SYNC_PARTITION_FIELDS));
- LOG.info("Versions table creation complete for " + versionsTableName);
+ LOG.info("Versions table creation complete for {}", versionsTableName);
}
if (!tableExists(bqSyncClient, snapshotViewName)) {
bqSyncClient.createSnapshotView(snapshotViewName, versionsTableName,
manifestTableName);
- LOG.info("Snapshot view creation complete for " + snapshotViewName);
+ LOG.info("Snapshot view creation complete for {}", snapshotViewName);
}
- // TODO: Implement automatic schema evolution when you add a new column.
- LOG.info("Sync table complete for " + snapshotViewName);
+ LOG.info("Sync table complete for {}", snapshotViewName);
}
@Override
diff --git
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
index a5462b5669e..af56194214d 100644
---
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
+++
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
@@ -22,6 +22,7 @@ package org.apache.hudi.gcp.bigquery;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.sync.common.HoodieSyncClient;
+import org.apache.hudi.sync.common.util.ManifestFileWriter;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
@@ -51,9 +52,11 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_LOCATION;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
+import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER;
public class HoodieBigQuerySyncClient extends HoodieSyncClient {
@@ -61,14 +64,18 @@ public class HoodieBigQuerySyncClient extends
HoodieSyncClient {
protected final BigQuerySyncConfig config;
private final String projectId;
+ private final String bigLakeConnectionId;
private final String datasetName;
+ private final boolean requirePartitionFilter;
private transient BigQuery bigquery;
public HoodieBigQuerySyncClient(final BigQuerySyncConfig config) {
super(config);
this.config = config;
this.projectId = config.getString(BIGQUERY_SYNC_PROJECT_ID);
+ this.bigLakeConnectionId =
config.getString(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID);
this.datasetName = config.getString(BIGQUERY_SYNC_DATASET_NAME);
+ this.requirePartitionFilter =
config.getBoolean(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER);
this.createBigQueryConnection();
}
@@ -78,7 +85,9 @@ public class HoodieBigQuerySyncClient extends
HoodieSyncClient {
this.config = config;
this.projectId = config.getString(BIGQUERY_SYNC_PROJECT_ID);
this.datasetName = config.getString(BIGQUERY_SYNC_DATASET_NAME);
+ this.requirePartitionFilter =
config.getBoolean(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER);
this.bigquery = bigquery;
+ this.bigLakeConnectionId =
config.getString(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID);
}
private void createBigQueryConnection() {
@@ -94,19 +103,22 @@ public class HoodieBigQuerySyncClient extends
HoodieSyncClient {
}
}
- public void createTableUsingBqManifestFile(String tableName, String
bqManifestFileUri, String sourceUriPrefix, Schema schema) {
+ public void createOrUpdateTableUsingBqManifestFile(String tableName, String
bqManifestFileUri, String sourceUriPrefix, Schema schema) {
try {
String withClauses = String.format("( %s )",
BigQuerySchemaResolver.schemaToSqlString(schema));
String extraOptions = "enable_list_inference=true,";
if (!StringUtils.isNullOrEmpty(sourceUriPrefix)) {
withClauses += " WITH PARTITION COLUMNS";
- extraOptions += String.format(" hive_partition_uri_prefix=\"%s\",",
sourceUriPrefix);
+ extraOptions += String.format(" hive_partition_uri_prefix=\"%s\",
require_hive_partition_filter=%s,", sourceUriPrefix, requirePartitionFilter);
+ }
+ if (!StringUtils.isNullOrEmpty(bigLakeConnectionId)) {
+ withClauses += String.format(" WITH CONNECTION `%s`",
bigLakeConnectionId);
}
String query =
String.format(
- "CREATE EXTERNAL TABLE `%s.%s.%s` %s OPTIONS (%s "
- + "uris=[\"%s\"], format=\"PARQUET\",
file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")",
+ "CREATE OR REPLACE EXTERNAL TABLE `%s.%s.%s` %s OPTIONS (%s "
+ + "uris=[\"%s\"], format=\"PARQUET\",
file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")",
projectId,
datasetName,
tableName,
@@ -125,7 +137,7 @@ public class HoodieBigQuerySyncClient extends
HoodieSyncClient {
if (queryJob == null) {
LOG.error("Job for table creation no longer exists");
} else if (queryJob.getStatus().getError() != null) {
- LOG.error("Job for table creation failed: " +
queryJob.getStatus().getError().toString());
+ LOG.error("Job for table creation failed: {}",
queryJob.getStatus().getError().toString());
} else {
LOG.info("External table created using manifest file.");
}
@@ -176,13 +188,21 @@ public class HoodieBigQuerySyncClient extends
HoodieSyncClient {
.collect(Collectors.toList());
updatedTableFields.addAll(schema.getFields());
Schema finalSchema = Schema.of(updatedTableFields);
- if (definition.getSchema() != null &&
definition.getSchema().equals(finalSchema)) {
+ boolean sameSchema = definition.getSchema() != null &&
definition.getSchema().equals(finalSchema);
+ boolean samePartitionFilter = partitionFields.isEmpty()
+ || (requirePartitionFilter ==
(definition.getHivePartitioningOptions().getRequirePartitionFilter() != null &&
definition.getHivePartitioningOptions().getRequirePartitionFilter()));
+ if (sameSchema && samePartitionFilter) {
return; // No need to update schema.
}
+ ExternalTableDefinition.Builder builder = definition.toBuilder();
+ builder.setSchema(finalSchema);
+ builder.setAutodetect(false);
+ if (definition.getHivePartitioningOptions() != null) {
+
builder.setHivePartitioningOptions(definition.getHivePartitioningOptions().toBuilder().setRequirePartitionFilter(requirePartitionFilter).build());
+ }
Table updatedTable = existingTable.toBuilder()
-
.setDefinition(definition.toBuilder().setSchema(finalSchema).setAutodetect(false).build())
+ .setDefinition(builder.build())
.build();
-
bigquery.update(updatedTable);
}
@@ -264,6 +284,28 @@ public class HoodieBigQuerySyncClient extends
HoodieSyncClient {
return table != null && table.exists();
}
+ /**
+ * Checks for the existence of a table that uses the manifest file approach
and matches other requirements.
+ * @param tableName name of the table
+ * @return Returns true if the table does not exist or if the table does
exist but does not use the manifest file. False otherwise.
+ */
+ public boolean tableNotExistsOrDoesNotMatchSpecification(String tableName) {
+ TableId tableId = TableId.of(projectId, datasetName, tableName);
+ Table table = bigquery.getTable(tableId);
+ if (table == null || !table.exists()) {
+ return true;
+ }
+ ExternalTableDefinition externalTableDefinition = table.getDefinition();
+ boolean manifestDoesNotExist =
+ externalTableDefinition.getSourceUris() == null
+ || externalTableDefinition.getSourceUris().stream().noneMatch(uri
-> uri.contains(ManifestFileWriter.ABSOLUTE_PATH_MANIFEST_FOLDER_NAME));
+ if
(!StringUtils.isNullOrEmpty(config.getString(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID)))
{
+ // If bigLakeConnectionId is present and connectionId is not present in
table definition, we need to replace the table.
+ return manifestDoesNotExist || externalTableDefinition.getConnectionId()
== null;
+ }
+ return manifestDoesNotExist;
+ }
+
@Override
public void close() {
bigquery = null;
diff --git
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
index b5d812cce10..2c17749158f 100644
---
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
+++
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
@@ -33,11 +33,11 @@ import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATA
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
-import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SYNC_BASE_PATH;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_TABLE_NAME;
+import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
diff --git
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncTool.java
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncTool.java
index 5edbdac1c2e..ff7abdb6870 100644
---
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncTool.java
+++
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncTool.java
@@ -76,13 +76,13 @@ public class TestBigQuerySyncTool {
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS.key(),
"datestr,type");
when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE);
when(mockBqSyncClient.datasetExists()).thenReturn(true);
- when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(false);
+
when(mockBqSyncClient.tableNotExistsOrDoesNotMatchSpecification(TEST_TABLE)).thenReturn(true);
Path manifestPath = new Path("file:///local/path");
when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath());
when(mockBqSchemaResolver.getTableSchema(any(),
eq(Arrays.asList("datestr", "type")))).thenReturn(schema);
BigQuerySyncTool tool = new BigQuerySyncTool(properties,
mockManifestFileWriter, mockBqSyncClient, mockMetaClient, mockBqSchemaResolver);
tool.syncHoodieTable();
- verify(mockBqSyncClient).createTableUsingBqManifestFile(TEST_TABLE,
manifestPath.toUri().getPath(), prefix, schema);
+
verify(mockBqSyncClient).createOrUpdateTableUsingBqManifestFile(TEST_TABLE,
manifestPath.toUri().getPath(), prefix, schema);
verify(mockManifestFileWriter).writeManifestFile(true);
}
@@ -91,13 +91,13 @@ public class TestBigQuerySyncTool {
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE.key(),
"true");
when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE);
when(mockBqSyncClient.datasetExists()).thenReturn(true);
- when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(false);
+
when(mockBqSyncClient.tableNotExistsOrDoesNotMatchSpecification(TEST_TABLE)).thenReturn(true);
Path manifestPath = new Path("file:///local/path");
when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath());
when(mockBqSchemaResolver.getTableSchema(any(),
eq(Collections.emptyList()))).thenReturn(schema);
BigQuerySyncTool tool = new BigQuerySyncTool(properties,
mockManifestFileWriter, mockBqSyncClient, mockMetaClient, mockBqSchemaResolver);
tool.syncHoodieTable();
- verify(mockBqSyncClient).createTableUsingBqManifestFile(TEST_TABLE,
manifestPath.toUri().getPath(), null, schema);
+
verify(mockBqSyncClient).createOrUpdateTableUsingBqManifestFile(TEST_TABLE,
manifestPath.toUri().getPath(), null, schema);
verify(mockManifestFileWriter).writeManifestFile(true);
}
@@ -109,7 +109,7 @@ public class TestBigQuerySyncTool {
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS.key(),
"datestr,type");
when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE);
when(mockBqSyncClient.datasetExists()).thenReturn(true);
- when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(true);
+
when(mockBqSyncClient.tableNotExistsOrDoesNotMatchSpecification(TEST_TABLE)).thenReturn(false);
Path manifestPath = new Path("file:///local/path");
when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath());
List<String> partitionFields = Arrays.asList("datestr", "type");
@@ -125,7 +125,7 @@ public class TestBigQuerySyncTool {
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE.key(),
"true");
when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE);
when(mockBqSyncClient.datasetExists()).thenReturn(true);
- when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(true);
+
when(mockBqSyncClient.tableNotExistsOrDoesNotMatchSpecification(TEST_TABLE)).thenReturn(false);
Path manifestPath = new Path("file:///local/path");
when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath());
when(mockBqSchemaResolver.getTableSchema(any(),
eq(Collections.emptyList()))).thenReturn(schema);
diff --git
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncToolArgs.java
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncToolArgs.java
index 0a4ba6fd61a..3f8ee6b9966 100644
---
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncToolArgs.java
+++
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncToolArgs.java
@@ -23,10 +23,12 @@ import org.junit.jupiter.api.Test;
import java.util.Properties;
+import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_LOCATION;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
+import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX;
@@ -50,8 +52,10 @@ public class TestBigQuerySyncToolArgs {
"--source-uri-prefix", "gs://foobartable/",
"--base-path", "gs://foobartable",
"--partitioned-by", "year,month,day",
+ "--big-lake-connection-id", "connection-id",
"--use-bq-manifest-file",
- "--use-file-listing-from-metadata"
+ "--use-file-listing-from-metadata",
+ "--require-partition-filter"
};
cmd.parse(args);
@@ -66,5 +70,7 @@ public class TestBigQuerySyncToolArgs {
assertEquals("year,month,day",
props.getProperty(BIGQUERY_SYNC_PARTITION_FIELDS.key()));
assertEquals("true",
props.getProperty(BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE.key()));
assertEquals("true",
props.getProperty(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA.key()));
+ assertEquals("true",
props.getProperty(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER.key()));
+ assertEquals("connection-id",
props.getProperty(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID.key()));
}
}
diff --git
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
index af2167f0f16..37b2800b563 100644
---
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
+++
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
@@ -58,6 +58,7 @@ public class TestHoodieBigQuerySyncClient {
private static String basePath;
private final BigQuery mockBigQuery = mock(BigQuery.class);
private HoodieBigQuerySyncClient client;
+ private Properties properties;
@BeforeAll
static void setupOnce() throws Exception {
@@ -71,16 +72,19 @@ public class TestHoodieBigQuerySyncClient {
@BeforeEach
void setup() {
- Properties properties = new Properties();
+ properties = new Properties();
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID.key(),
PROJECT_ID);
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME.key(),
TEST_DATASET);
properties.setProperty(HoodieSyncConfig.META_SYNC_BASE_PATH.key(),
tempDir.toString());
- BigQuerySyncConfig config = new BigQuerySyncConfig(properties);
- client = new HoodieBigQuerySyncClient(config, mockBigQuery);
+
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER.key(),
"true");
}
@Test
void createTableWithManifestFile_partitioned() throws Exception {
+
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID.key(),
"my-project.us.bl_connection");
+ BigQuerySyncConfig config = new BigQuerySyncConfig(properties);
+ client = new HoodieBigQuerySyncClient(config, mockBigQuery);
+
Schema schema = Schema.of(Field.of("field", StandardSQLTypeName.STRING));
ArgumentCaptor<JobInfo> jobInfoCaptor =
ArgumentCaptor.forClass(JobInfo.class);
Job mockJob = mock(Job.class);
@@ -90,17 +94,21 @@ public class TestHoodieBigQuerySyncClient {
JobStatus mockJobStatus = mock(JobStatus.class);
when(mockJobFinished.getStatus()).thenReturn(mockJobStatus);
when(mockJobStatus.getError()).thenReturn(null);
- client.createTableUsingBqManifestFile(TEST_TABLE, MANIFEST_FILE_URI,
SOURCE_PREFIX, schema);
+ client.createOrUpdateTableUsingBqManifestFile(TEST_TABLE,
MANIFEST_FILE_URI, SOURCE_PREFIX, schema);
QueryJobConfiguration configuration =
jobInfoCaptor.getValue().getConfiguration();
assertEquals(configuration.getQuery(),
- String.format("CREATE EXTERNAL TABLE `%s.%s.%s` ( `field` STRING )
WITH PARTITION COLUMNS OPTIONS (enable_list_inference=true, "
- + "hive_partition_uri_prefix=\"%s\", uris=[\"%s\"],
format=\"PARQUET\", "
- + "file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")",
PROJECT_ID, TEST_DATASET, TEST_TABLE, SOURCE_PREFIX, MANIFEST_FILE_URI));
+ String.format("CREATE OR REPLACE EXTERNAL TABLE `%s.%s.%s` ( `field`
STRING ) WITH PARTITION COLUMNS WITH CONNECTION `my-project.us.bl_connection` "
+ + "OPTIONS (enable_list_inference=true,
hive_partition_uri_prefix=\"%s\", "
+ + "require_hive_partition_filter=true, uris=[\"%s\"],
format=\"PARQUET\", file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")",
+ PROJECT_ID, TEST_DATASET, TEST_TABLE, SOURCE_PREFIX,
MANIFEST_FILE_URI));
}
@Test
void createTableWithManifestFile_nonPartitioned() throws Exception {
+ BigQuerySyncConfig config = new BigQuerySyncConfig(properties);
+ client = new HoodieBigQuerySyncClient(config, mockBigQuery);
+
Schema schema = Schema.of(Field.of("field", StandardSQLTypeName.STRING));
ArgumentCaptor<JobInfo> jobInfoCaptor =
ArgumentCaptor.forClass(JobInfo.class);
Job mockJob = mock(Job.class);
@@ -110,11 +118,11 @@ public class TestHoodieBigQuerySyncClient {
JobStatus mockJobStatus = mock(JobStatus.class);
when(mockJobFinished.getStatus()).thenReturn(mockJobStatus);
when(mockJobStatus.getError()).thenReturn(null);
- client.createTableUsingBqManifestFile(TEST_TABLE, MANIFEST_FILE_URI, "",
schema);
+ client.createOrUpdateTableUsingBqManifestFile(TEST_TABLE,
MANIFEST_FILE_URI, "", schema);
QueryJobConfiguration configuration =
jobInfoCaptor.getValue().getConfiguration();
assertEquals(configuration.getQuery(),
- String.format("CREATE EXTERNAL TABLE `%s.%s.%s` ( `field` STRING )
OPTIONS (enable_list_inference=true, uris=[\"%s\"], format=\"PARQUET\", "
+ String.format("CREATE OR REPLACE EXTERNAL TABLE `%s.%s.%s` ( `field`
STRING ) OPTIONS (enable_list_inference=true, uris=[\"%s\"],
format=\"PARQUET\", "
+ "file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")",
PROJECT_ID, TEST_DATASET, TEST_TABLE, MANIFEST_FILE_URI));
}
}