This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new 0fb3e940a89 [HUDI-8212] Add extra config of billing project ID for 
BigQuery sync (#11988)
0fb3e940a89 is described below

commit 0fb3e940a89d83352a41a7ad42b57f687cb4f4a2
Author: Aditya Goenka <[email protected]>
AuthorDate: Sun Oct 6 05:32:27 2024 +0530

    [HUDI-8212] Add extra config of billing project ID for BigQuery sync 
(#11988)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../hudi/gcp/bigquery/BigQuerySyncConfig.java      | 12 +++++++
 .../gcp/bigquery/HoodieBigQuerySyncClient.java     |  7 +++-
 .../hudi/gcp/bigquery/TestBigQuerySyncConfig.java  |  3 ++
 .../gcp/bigquery/TestHoodieBigQuerySyncClient.java | 37 ++++++++++++++++++++--
 4 files changed, 55 insertions(+), 4 deletions(-)

diff --git 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
index ec035435579..b15b4f2842c 100644
--- 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
+++ 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
@@ -58,6 +58,15 @@ public class BigQuerySyncConfig extends HoodieSyncConfig 
implements Serializable
       .markAdvanced()
       .withDocumentation("Name of the target project in BigQuery");
 
+  public static final ConfigProperty<String> BIGQUERY_SYNC_BILLING_PROJECT_ID 
= ConfigProperty
+      .key("hoodie.gcp.bigquery.sync.billing.project.id")
+      .noDefaultValue()
+      .sinceVersion("0.15.1")
+      .markAdvanced()
+      .withDocumentation("Name of the billing project id in BigQuery. By 
default it uses the "
+          + "configuration from `hoodie.gcp.bigquery.sync.project_id` if this 
configuration is "
+          + "not set. This can only be used with manifest file based 
approach");
+  
   public static final ConfigProperty<String> BIGQUERY_SYNC_DATASET_NAME = 
ConfigProperty
       .key("hoodie.gcp.bigquery.sync.dataset_name")
       .noDefaultValue()
@@ -156,6 +165,8 @@ public class BigQuerySyncConfig extends HoodieSyncConfig 
implements Serializable
 
     @Parameter(names = {"--project-id"}, description = "Name of the target 
project in BigQuery", required = true)
     public String projectId;
+    @Parameter(names = {"--billing-project-id"}, description = "Name of the 
billing project in BigQuery. This can only be used with 
--use-bq-manifest-file", required = false)
+    public String billingProjectId;
     @Parameter(names = {"--dataset-name"}, description = "Name of the target 
dataset in BigQuery", required = true)
     public String datasetName;
     @Parameter(names = {"--dataset-location"}, description = "Location of the 
target dataset in BigQuery", required = true)
@@ -181,6 +192,7 @@ public class BigQuerySyncConfig extends HoodieSyncConfig 
implements Serializable
     public TypedProperties toProps() {
       final TypedProperties props = hoodieSyncConfigParams.toProps();
       props.setPropertyIfNonNull(BIGQUERY_SYNC_PROJECT_ID.key(), projectId);
+      props.setPropertyIfNonNull(BIGQUERY_SYNC_BILLING_PROJECT_ID.key(), 
billingProjectId);
       props.setPropertyIfNonNull(BIGQUERY_SYNC_DATASET_NAME.key(), 
datasetName);
       props.setPropertyIfNonNull(BIGQUERY_SYNC_DATASET_LOCATION.key(), 
datasetLocation);
       props.setPropertyIfNonNull(BIGQUERY_SYNC_TABLE_NAME.key(), 
hoodieSyncConfigParams.tableName);
diff --git 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
index 32430b53329..725fcc62969 100644
--- 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
+++ 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
@@ -58,6 +58,8 @@ import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BIG_
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_LOCATION;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
+import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BILLING_PROJECT_ID;
+
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER;
 
 public class HoodieBigQuerySyncClient extends HoodieSyncClient {
@@ -66,6 +68,7 @@ public class HoodieBigQuerySyncClient extends 
HoodieSyncClient {
 
   protected final BigQuerySyncConfig config;
   private final String projectId;
+  private final String billingProjectId;
   private final String bigLakeConnectionId;
   private final String datasetName;
   private final boolean requirePartitionFilter;
@@ -75,6 +78,7 @@ public class HoodieBigQuerySyncClient extends 
HoodieSyncClient {
     super(config);
     this.config = config;
     this.projectId = config.getString(BIGQUERY_SYNC_PROJECT_ID);
+    this.billingProjectId = config.getString(BIGQUERY_SYNC_BILLING_PROJECT_ID);
     this.bigLakeConnectionId = 
config.getString(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID);
     this.datasetName = config.getString(BIGQUERY_SYNC_DATASET_NAME);
     this.requirePartitionFilter = 
config.getBoolean(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER);
@@ -86,6 +90,7 @@ public class HoodieBigQuerySyncClient extends 
HoodieSyncClient {
     super(config);
     this.config = config;
     this.projectId = config.getString(BIGQUERY_SYNC_PROJECT_ID);
+    this.billingProjectId = 
config.getStringOrDefault(BIGQUERY_SYNC_BILLING_PROJECT_ID, this.projectId);
     this.datasetName = config.getString(BIGQUERY_SYNC_DATASET_NAME);
     this.requirePartitionFilter = 
config.getBoolean(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER);
     this.bigquery = bigquery;
@@ -131,7 +136,7 @@ public class HoodieBigQuerySyncClient extends 
HoodieSyncClient {
       QueryJobConfiguration queryConfig = 
QueryJobConfiguration.newBuilder(query)
           .setUseLegacySql(false)
           .build();
-      JobId jobId = 
JobId.newBuilder().setProject(projectId).setRandomJob().build();
+      JobId jobId = 
JobId.newBuilder().setProject(billingProjectId).setRandomJob().build();
       Job queryJob = 
bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
 
       queryJob = queryJob.waitFor();
diff --git 
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
index d31566df131..910027b4530 100644
--- 
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
+++ 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
@@ -34,6 +34,7 @@ import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATA
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
+import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BILLING_PROJECT_ID;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SYNC_BASE_PATH;
@@ -49,6 +50,7 @@ public class TestBigQuerySyncConfig {
   public void testGetConfigs() {
     Properties props = new Properties();
     props.setProperty(BIGQUERY_SYNC_PROJECT_ID.key(), "fooproject");
+    props.setProperty(BIGQUERY_SYNC_BILLING_PROJECT_ID.key(), 
"foobillingproject");
     props.setProperty(BIGQUERY_SYNC_DATASET_NAME.key(), "foodataset");
     props.setProperty(BIGQUERY_SYNC_DATASET_LOCATION.key(), "US");
     props.setProperty(BIGQUERY_SYNC_TABLE_NAME.key(), "footable");
@@ -61,6 +63,7 @@ public class TestBigQuerySyncConfig {
     props.setProperty(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING.key(), "true");
     BigQuerySyncConfig syncConfig = new BigQuerySyncConfig(props);
     assertEquals("fooproject", syncConfig.getString(BIGQUERY_SYNC_PROJECT_ID));
+    assertEquals("foobillingproject", 
syncConfig.getString(BIGQUERY_SYNC_BILLING_PROJECT_ID));
     assertEquals("foodataset", 
syncConfig.getString(BIGQUERY_SYNC_DATASET_NAME));
     assertEquals("US", syncConfig.getString(BIGQUERY_SYNC_DATASET_LOCATION));
     assertEquals("footable", syncConfig.getString(BIGQUERY_SYNC_TABLE_NAME));
diff --git 
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
index c78b293de63..42c438533fd 100644
--- 
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
+++ 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
@@ -41,22 +41,27 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.ArgumentCaptor;
 
-import java.util.ArrayList;
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
+import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BILLING_PROJECT_ID;
+import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TestHoodieBigQuerySyncClient {
   private static final String PROJECT_ID = "test_project";
+  private static final String BILLING_PROJECT_ID = "test_billing_project";
   private static final String MANIFEST_FILE_URI = "file:/manifest_file";
   private static final String SOURCE_PREFIX = "file:/manifest_file/date=*";
   private static final String TEST_TABLE = "test_table";
@@ -82,12 +87,38 @@ public class TestHoodieBigQuerySyncClient {
   @BeforeEach
   void setup() {
     properties = new Properties();
-    properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID.key(), 
PROJECT_ID);
+    properties.setProperty(BIGQUERY_SYNC_PROJECT_ID.key(), PROJECT_ID);
+    properties.setProperty(BIGQUERY_SYNC_BILLING_PROJECT_ID.key(), 
BILLING_PROJECT_ID);
     
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME.key(), 
TEST_DATASET);
     properties.setProperty(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), 
tempDir.toString());
     
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER.key(),
 "true");
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testCreateOrUpdateTableUsingManifestWithBillingProjectId(boolean 
setBillingProjectId) {
+    Properties props = new Properties();
+    props.setProperty(BIGQUERY_SYNC_PROJECT_ID.key(), PROJECT_ID);
+    if (setBillingProjectId) {
+      props.setProperty(BIGQUERY_SYNC_BILLING_PROJECT_ID.key(), 
BILLING_PROJECT_ID);
+    }
+    props.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME.key(), 
TEST_DATASET);
+    props.setProperty(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), 
tempDir.toString());
+    
props.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER.key(),
 "true");
+    BigQuerySyncConfig syncConfig = new BigQuerySyncConfig(props);
+    Job mockJob = mock(Job.class);
+    ArgumentCaptor<JobInfo> jobInfoCaptor = 
ArgumentCaptor.forClass(JobInfo.class);
+    when(mockBigQuery.create(jobInfoCaptor.capture())).thenReturn(mockJob);
+
+    HoodieBigQuerySyncClient syncClient = new 
HoodieBigQuerySyncClient(syncConfig, mockBigQuery);
+    Schema schema = Schema.of(Field.of("field", StandardSQLTypeName.STRING));
+    syncClient.createOrUpdateTableUsingBqManifestFile(TEST_TABLE, 
MANIFEST_FILE_URI, SOURCE_PREFIX, schema);
+
+    assertEquals(
+        setBillingProjectId ? BILLING_PROJECT_ID : PROJECT_ID,
+        jobInfoCaptor.getValue().getJobId().getProject());
+  }
+
   @Test
   void createTableWithManifestFile_partitioned() throws Exception {
     
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID.key(),
 "my-project.us.bl_connection");

Reply via email to