This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ffd443b986 Fix inconsistent handling of Firestore Project and 
Database ID (#36895)
2ffd443b986 is described below

commit 2ffd443b98689f4d7aac05a5173f3d24a3b6b9fd
Author: Paco Avila <[email protected]>
AuthorDate: Wed Nov 26 08:38:27 2025 -0800

    Fix inconsistent handling of Firestore Project and Database ID (#36895)
    
    * Fix inconsistent handling of Firestore Project and Database ID in routing 
header (resolves #36894)
    
    * Update CHANGED.md
    
    * Address reviewer comments from yixiaoshen
    
    * Fix unit tests
    
    * revert unnecessary change to read test
    
    * Revert unnecessary change to test helper
---
 CHANGES.md                                         |  2 +-
 .../sdk/io/gcp/firestore/FirestoreOptions.java     |  5 +----
 .../FirestoreStatefulComponentFactory.java         | 23 +++++++++++++++++-----
 .../sdk/io/gcp/firestore/FirestoreV1WriteFn.java   |  4 +++-
 .../gcp/firestore/BaseFirestoreV1WriteFnTest.java  |  8 ++++----
 ...storeV1FnBatchWriteWithDeadLetterQueueTest.java |  2 +-
 .../FirestoreV1FnBatchWriteWithSummaryTest.java    |  2 +-
 .../sdk/io/gcp/firestore/it/BaseFirestoreIT.java   |  3 ++-
 8 files changed, 31 insertions(+), 18 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index a7612f0a835..50d04b6f0e4 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -84,7 +84,7 @@
 
 ## Bugfixes
 
-* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
+* Fixed FirestoreV1 Beam connectors allow configuring inconsistent 
project/database IDs between RPC requests and routing headers #36895 (Java) 
([#36895](https://github.com/apache/beam/issues/36895)).
 
 ## Known Issues
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreOptions.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreOptions.java
index 5adc9ef38f3..8b90594bb65 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreOptions.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreOptions.java
@@ -48,10 +48,7 @@ public interface FirestoreOptions extends PipelineOptions {
    */
   void setEmulatorHost(String host);
 
-  /**
-   * The Firestore database ID to connect to. Note: named database is 
currently an internal feature
-   * in Firestore. Do not set this to anything other than "(default)".
-   */
+  /** The Firestore database ID to connect to. */
   @Description("Firestore database ID")
   @Default.String("(default)")
   String getFirestoreDb();
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java
index 4e8c11f7072..390e102b601 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java
@@ -29,6 +29,7 @@ import com.google.cloud.firestore.v1.stub.GrpcFirestoreStub;
 import java.io.Serializable;
 import java.security.SecureRandom;
 import java.util.Map;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.Immutable;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -65,6 +66,13 @@ class FirestoreStatefulComponentFactory implements 
Serializable {
    * @return a new {@link FirestoreStub} pre-configured with values from the 
provided options
    */
   FirestoreStub getFirestoreStub(PipelineOptions options) {
+    return getFirestoreStub(options, null, null);
+  }
+
+  FirestoreStub getFirestoreStub(
+      PipelineOptions options,
+      @Nullable String configuredProjectId,
+      @Nullable String configuredDatabaseId) {
     try {
       FirestoreSettings.Builder builder = FirestoreSettings.newBuilder();
 
@@ -94,12 +102,17 @@ class FirestoreStatefulComponentFactory implements 
Serializable {
         builder
             
.setCredentialsProvider(FixedCredentialsProvider.create(gcpOptions.getGcpCredential()))
             .setEndpoint(firestoreOptions.getFirestoreHost());
+        String projectId =
+            configuredProjectId != null
+                ? configuredProjectId
+                : firestoreOptions.getFirestoreProject();
+        if (projectId == null) {
+          projectId = gcpOptions.getProject();
+        }
+        String databaseId =
+            configuredDatabaseId != null ? configuredDatabaseId : 
firestoreOptions.getFirestoreDb();
         headers.put(
-            "x-goog-request-params",
-            "project_id="
-                + gcpOptions.getProject()
-                + "&database_id="
-                + firestoreOptions.getFirestoreDb());
+            "x-goog-request-params", "project_id=" + projectId + 
"&database_id=" + databaseId);
       }
 
       builder.setHeaderProvider(
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java
index 6bbb00e76f2..ab33d8e5c16 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java
@@ -244,7 +244,9 @@ final class FirestoreV1WriteFn {
               requireNonNull(
                   databaseId,
                   "firestoreDb must be defined on FirestoreOptions of 
PipelineOptions"));
-      firestoreStub = 
firestoreStatefulComponentFactory.getFirestoreStub(c.getPipelineOptions());
+      firestoreStub =
+          firestoreStatefulComponentFactory.getFirestoreStub(
+              c.getPipelineOptions(), project, databaseId);
     }
 
     /**
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java
index 623f947c45a..f20181fbc32 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java
@@ -111,7 +111,7 @@ public abstract class BaseFirestoreV1WriteFnTest<
     when(rpcQos.newWriteAttempt(any())).thenReturn(attempt, attempt2);
 
     when(ff.getRpcQos(any())).thenReturn(rpcQos);
-    when(ff.getFirestoreStub(pipelineOptions)).thenReturn(stub);
+    when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
     when(stub.batchWriteCallable()).thenReturn(callable);
     metricsFixture = new MetricsFixture();
   }
@@ -129,7 +129,7 @@ public abstract class BaseFirestoreV1WriteFnTest<
     Write write = newWrite();
     Element<Write> element1 = new WriteElement(0, write, window);
 
-    when(ff.getFirestoreStub(any())).thenReturn(stub);
+    when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
     when(ff.getRpcQos(any())).thenReturn(rpcQos);
     
when(rpcQos.newWriteAttempt(FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite))
         .thenReturn(attempt);
@@ -175,7 +175,7 @@ public abstract class BaseFirestoreV1WriteFnTest<
   @Override
   @Test
   public final void noRequestIsSentIfNotSafeToProceed() throws Exception {
-    when(ff.getFirestoreStub(any())).thenReturn(stub);
+    when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
     when(ff.getRpcQos(any())).thenReturn(rpcQos);
     
when(rpcQos.newWriteAttempt(FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite))
         .thenReturn(attempt);
@@ -369,7 +369,7 @@ public abstract class BaseFirestoreV1WriteFnTest<
     LOG.debug("options = {}", options);
 
     FirestoreStatefulComponentFactory ff = 
mock(FirestoreStatefulComponentFactory.class);
-    when(ff.getFirestoreStub(any())).thenReturn(stub);
+    when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
     Random random = new Random(12345);
     TestClock clock = new TestClock(Instant.EPOCH, 
Duration.standardSeconds(1));
     Sleeper sleeper = millis -> 
clock.setNext(advanceClockBy(Duration.millis(millis)));
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java
index 35d0ea9482d..e7f98ff73c6 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java
@@ -67,7 +67,7 @@ public final class 
FirestoreV1FnBatchWriteWithDeadLetterQueueTest
     int maxBytes = 50;
     RpcQosOptions options = 
rpcQosOptions.toBuilder().withBatchMaxBytes(maxBytes).build();
 
-    when(ff.getFirestoreStub(any())).thenReturn(stub);
+    when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
     when(ff.getRpcQos(any()))
         
.thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options));
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java
index 3e37e3975bf..e7174537943 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java
@@ -76,7 +76,7 @@ public final class FirestoreV1FnBatchWriteWithSummaryTest
     int maxBytes = 50;
     RpcQosOptions options = 
rpcQosOptions.toBuilder().withBatchMaxBytes(maxBytes).build();
 
-    when(ff.getFirestoreStub(any())).thenReturn(stub);
+    when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
     when(ff.getRpcQos(any()))
         
.thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options));
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java
index 8695080cb88..14344b105b3 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java
@@ -42,6 +42,7 @@ import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.io.gcp.firestore.FirestoreIO;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreOptions;
 import org.apache.beam.sdk.io.gcp.firestore.RpcQosOptions;
 import 
org.apache.beam.sdk.io.gcp.firestore.it.FirestoreTestingHelper.CleanupMode;
 import 
org.apache.beam.sdk.io.gcp.firestore.it.FirestoreTestingHelper.DataLayout;
@@ -97,7 +98,7 @@ abstract class BaseFirestoreIT {
   @Before
   public void setup() {
     project = 
TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
-    databaseId = "firestoredb";
+    databaseId = 
TestPipeline.testingPipelineOptions().as(FirestoreOptions.class).getFirestoreDb();
   }
 
   private static Instant toWriteTime(WriteResult result) {

Reply via email to