This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new 0fb3e940a89 [HUDI-8212] Add extra config of billing project ID for
BigQuery sync (#11988)
0fb3e940a89 is described below
commit 0fb3e940a89d83352a41a7ad42b57f687cb4f4a2
Author: Aditya Goenka <[email protected]>
AuthorDate: Sun Oct 6 05:32:27 2024 +0530
[HUDI-8212] Add extra config of billing project ID for BigQuery sync
(#11988)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../hudi/gcp/bigquery/BigQuerySyncConfig.java | 12 +++++++
.../gcp/bigquery/HoodieBigQuerySyncClient.java | 7 +++-
.../hudi/gcp/bigquery/TestBigQuerySyncConfig.java | 3 ++
.../gcp/bigquery/TestHoodieBigQuerySyncClient.java | 37 ++++++++++++++++++++--
4 files changed, 55 insertions(+), 4 deletions(-)
diff --git
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
index ec035435579..b15b4f2842c 100644
---
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
+++
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
@@ -58,6 +58,15 @@ public class BigQuerySyncConfig extends HoodieSyncConfig
implements Serializable
.markAdvanced()
.withDocumentation("Name of the target project in BigQuery");
+ public static final ConfigProperty<String> BIGQUERY_SYNC_BILLING_PROJECT_ID
= ConfigProperty
+ .key("hoodie.gcp.bigquery.sync.billing.project.id")
+ .noDefaultValue()
+ .sinceVersion("0.15.1")
+ .markAdvanced()
+ .withDocumentation("Name of the billing project id in BigQuery. By
default it uses the "
+ + "configuration from `hoodie.gcp.bigquery.sync.project_id` if this
configuration is "
+ + "not set. This can only be used with manifest file based
approach");
+
public static final ConfigProperty<String> BIGQUERY_SYNC_DATASET_NAME =
ConfigProperty
.key("hoodie.gcp.bigquery.sync.dataset_name")
.noDefaultValue()
@@ -156,6 +165,8 @@ public class BigQuerySyncConfig extends HoodieSyncConfig
implements Serializable
@Parameter(names = {"--project-id"}, description = "Name of the target
project in BigQuery", required = true)
public String projectId;
+ @Parameter(names = {"--billing-project-id"}, description = "Name of the
billing project in BigQuery. This can only be used with
--use-bq-manifest-file", required = false)
+ public String billingProjectId;
@Parameter(names = {"--dataset-name"}, description = "Name of the target
dataset in BigQuery", required = true)
public String datasetName;
@Parameter(names = {"--dataset-location"}, description = "Location of the
target dataset in BigQuery", required = true)
@@ -181,6 +192,7 @@ public class BigQuerySyncConfig extends HoodieSyncConfig
implements Serializable
public TypedProperties toProps() {
final TypedProperties props = hoodieSyncConfigParams.toProps();
props.setPropertyIfNonNull(BIGQUERY_SYNC_PROJECT_ID.key(), projectId);
+ props.setPropertyIfNonNull(BIGQUERY_SYNC_BILLING_PROJECT_ID.key(),
billingProjectId);
props.setPropertyIfNonNull(BIGQUERY_SYNC_DATASET_NAME.key(),
datasetName);
props.setPropertyIfNonNull(BIGQUERY_SYNC_DATASET_LOCATION.key(),
datasetLocation);
props.setPropertyIfNonNull(BIGQUERY_SYNC_TABLE_NAME.key(),
hoodieSyncConfigParams.tableName);
diff --git
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
index 32430b53329..725fcc62969 100644
---
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
+++
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
@@ -58,6 +58,8 @@ import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BIG_
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_LOCATION;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
+import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BILLING_PROJECT_ID;
+
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER;
public class HoodieBigQuerySyncClient extends HoodieSyncClient {
@@ -66,6 +68,7 @@ public class HoodieBigQuerySyncClient extends
HoodieSyncClient {
protected final BigQuerySyncConfig config;
private final String projectId;
+ private final String billingProjectId;
private final String bigLakeConnectionId;
private final String datasetName;
private final boolean requirePartitionFilter;
@@ -75,6 +78,7 @@ public class HoodieBigQuerySyncClient extends
HoodieSyncClient {
super(config);
this.config = config;
this.projectId = config.getString(BIGQUERY_SYNC_PROJECT_ID);
+ this.billingProjectId = config.getString(BIGQUERY_SYNC_BILLING_PROJECT_ID);
this.bigLakeConnectionId =
config.getString(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID);
this.datasetName = config.getString(BIGQUERY_SYNC_DATASET_NAME);
this.requirePartitionFilter =
config.getBoolean(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER);
@@ -86,6 +90,7 @@ public class HoodieBigQuerySyncClient extends
HoodieSyncClient {
super(config);
this.config = config;
this.projectId = config.getString(BIGQUERY_SYNC_PROJECT_ID);
+ this.billingProjectId =
config.getStringOrDefault(BIGQUERY_SYNC_BILLING_PROJECT_ID, this.projectId);
this.datasetName = config.getString(BIGQUERY_SYNC_DATASET_NAME);
this.requirePartitionFilter =
config.getBoolean(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER);
this.bigquery = bigquery;
@@ -131,7 +136,7 @@ public class HoodieBigQuerySyncClient extends
HoodieSyncClient {
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(query)
.setUseLegacySql(false)
.build();
- JobId jobId =
JobId.newBuilder().setProject(projectId).setRandomJob().build();
+ JobId jobId =
JobId.newBuilder().setProject(billingProjectId).setRandomJob().build();
Job queryJob =
bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
queryJob = queryJob.waitFor();
diff --git
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
index d31566df131..910027b4530 100644
---
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
+++
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
@@ -34,6 +34,7 @@ import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATA
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
+import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BILLING_PROJECT_ID;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX;
import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SYNC_BASE_PATH;
@@ -49,6 +50,7 @@ public class TestBigQuerySyncConfig {
public void testGetConfigs() {
Properties props = new Properties();
props.setProperty(BIGQUERY_SYNC_PROJECT_ID.key(), "fooproject");
+ props.setProperty(BIGQUERY_SYNC_BILLING_PROJECT_ID.key(),
"foobillingproject");
props.setProperty(BIGQUERY_SYNC_DATASET_NAME.key(), "foodataset");
props.setProperty(BIGQUERY_SYNC_DATASET_LOCATION.key(), "US");
props.setProperty(BIGQUERY_SYNC_TABLE_NAME.key(), "footable");
@@ -61,6 +63,7 @@ public class TestBigQuerySyncConfig {
props.setProperty(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING.key(), "true");
BigQuerySyncConfig syncConfig = new BigQuerySyncConfig(props);
assertEquals("fooproject", syncConfig.getString(BIGQUERY_SYNC_PROJECT_ID));
+ assertEquals("foobillingproject",
syncConfig.getString(BIGQUERY_SYNC_BILLING_PROJECT_ID));
assertEquals("foodataset",
syncConfig.getString(BIGQUERY_SYNC_DATASET_NAME));
assertEquals("US", syncConfig.getString(BIGQUERY_SYNC_DATASET_LOCATION));
assertEquals("footable", syncConfig.getString(BIGQUERY_SYNC_TABLE_NAME));
diff --git
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
index c78b293de63..42c438533fd 100644
---
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
+++
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
@@ -41,22 +41,27 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
-import java.util.ArrayList;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BILLING_PROJECT_ID;
+import static
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TestHoodieBigQuerySyncClient {
private static final String PROJECT_ID = "test_project";
+ private static final String BILLING_PROJECT_ID = "test_billing_project";
private static final String MANIFEST_FILE_URI = "file:/manifest_file";
private static final String SOURCE_PREFIX = "file:/manifest_file/date=*";
private static final String TEST_TABLE = "test_table";
@@ -82,12 +87,38 @@ public class TestHoodieBigQuerySyncClient {
@BeforeEach
void setup() {
properties = new Properties();
- properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID.key(),
PROJECT_ID);
+ properties.setProperty(BIGQUERY_SYNC_PROJECT_ID.key(), PROJECT_ID);
+ properties.setProperty(BIGQUERY_SYNC_BILLING_PROJECT_ID.key(),
BILLING_PROJECT_ID);
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME.key(),
TEST_DATASET);
properties.setProperty(HoodieSyncConfig.META_SYNC_BASE_PATH.key(),
tempDir.toString());
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER.key(),
"true");
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testCreateOrUpdateTableUsingManifestWithBillingProjectId(boolean
setBillingProjectId) {
+ Properties props = new Properties();
+ props.setProperty(BIGQUERY_SYNC_PROJECT_ID.key(), PROJECT_ID);
+ if (setBillingProjectId) {
+ props.setProperty(BIGQUERY_SYNC_BILLING_PROJECT_ID.key(),
BILLING_PROJECT_ID);
+ }
+ props.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME.key(),
TEST_DATASET);
+ props.setProperty(HoodieSyncConfig.META_SYNC_BASE_PATH.key(),
tempDir.toString());
+
props.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER.key(),
"true");
+ BigQuerySyncConfig syncConfig = new BigQuerySyncConfig(props);
+ Job mockJob = mock(Job.class);
+ ArgumentCaptor<JobInfo> jobInfoCaptor =
ArgumentCaptor.forClass(JobInfo.class);
+ when(mockBigQuery.create(jobInfoCaptor.capture())).thenReturn(mockJob);
+
+ HoodieBigQuerySyncClient syncClient = new
HoodieBigQuerySyncClient(syncConfig, mockBigQuery);
+ Schema schema = Schema.of(Field.of("field", StandardSQLTypeName.STRING));
+ syncClient.createOrUpdateTableUsingBqManifestFile(TEST_TABLE,
MANIFEST_FILE_URI, SOURCE_PREFIX, schema);
+
+ assertEquals(
+ setBillingProjectId ? BILLING_PROJECT_ID : PROJECT_ID,
+ jobInfoCaptor.getValue().getJobId().getProject());
+ }
+
@Test
void createTableWithManifestFile_partitioned() throws Exception {
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID.key(),
"my-project.us.bl_connection");