This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2ffd443b986 Fix inconsistent handling of Firestore Project and
Database ID (#36895)
2ffd443b986 is described below
commit 2ffd443b98689f4d7aac05a5173f3d24a3b6b9fd
Author: Paco Avila <[email protected]>
AuthorDate: Wed Nov 26 08:38:27 2025 -0800
Fix inconsistent handling of Firestore Project and Database ID (#36895)
* Fix inconsistent handling of Firestore Project and Database ID in routing
header (resolves #36894)
* Update CHANGED.md
* Address reviewer comments from yixiaoshen
* Fix unit tests
* revert unnecessary change to read test
* Revert unnecessary change to test helper
---
CHANGES.md | 2 +-
.../sdk/io/gcp/firestore/FirestoreOptions.java | 5 +----
.../FirestoreStatefulComponentFactory.java | 23 +++++++++++++++++-----
.../sdk/io/gcp/firestore/FirestoreV1WriteFn.java | 4 +++-
.../gcp/firestore/BaseFirestoreV1WriteFnTest.java | 8 ++++----
...storeV1FnBatchWriteWithDeadLetterQueueTest.java | 2 +-
.../FirestoreV1FnBatchWriteWithSummaryTest.java | 2 +-
.../sdk/io/gcp/firestore/it/BaseFirestoreIT.java | 3 ++-
8 files changed, 31 insertions(+), 18 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index a7612f0a835..50d04b6f0e4 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -84,7 +84,7 @@
## Bugfixes
-* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
+* Fixed FirestoreV1 Beam connectors allow configuring inconsistent
project/database IDs between RPC requests and routing headers #36895 (Java)
([#36895](https://github.com/apache/beam/issues/36895)).
## Known Issues
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreOptions.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreOptions.java
index 5adc9ef38f3..8b90594bb65 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreOptions.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreOptions.java
@@ -48,10 +48,7 @@ public interface FirestoreOptions extends PipelineOptions {
*/
void setEmulatorHost(String host);
- /**
- * The Firestore database ID to connect to. Note: named database is
currently an internal feature
- * in Firestore. Do not set this to anything other than "(default)".
- */
+ /** The Firestore database ID to connect to. */
@Description("Firestore database ID")
@Default.String("(default)")
String getFirestoreDb();
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java
index 4e8c11f7072..390e102b601 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java
@@ -29,6 +29,7 @@ import com.google.cloud.firestore.v1.stub.GrpcFirestoreStub;
import java.io.Serializable;
import java.security.SecureRandom;
import java.util.Map;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -65,6 +66,13 @@ class FirestoreStatefulComponentFactory implements
Serializable {
* @return a new {@link FirestoreStub} pre-configured with values from the
provided options
*/
FirestoreStub getFirestoreStub(PipelineOptions options) {
+ return getFirestoreStub(options, null, null);
+ }
+
+ FirestoreStub getFirestoreStub(
+ PipelineOptions options,
+ @Nullable String configuredProjectId,
+ @Nullable String configuredDatabaseId) {
try {
FirestoreSettings.Builder builder = FirestoreSettings.newBuilder();
@@ -94,12 +102,17 @@ class FirestoreStatefulComponentFactory implements
Serializable {
builder
.setCredentialsProvider(FixedCredentialsProvider.create(gcpOptions.getGcpCredential()))
.setEndpoint(firestoreOptions.getFirestoreHost());
+ String projectId =
+ configuredProjectId != null
+ ? configuredProjectId
+ : firestoreOptions.getFirestoreProject();
+ if (projectId == null) {
+ projectId = gcpOptions.getProject();
+ }
+ String databaseId =
+ configuredDatabaseId != null ? configuredDatabaseId :
firestoreOptions.getFirestoreDb();
headers.put(
- "x-goog-request-params",
- "project_id="
- + gcpOptions.getProject()
- + "&database_id="
- + firestoreOptions.getFirestoreDb());
+ "x-goog-request-params", "project_id=" + projectId +
"&database_id=" + databaseId);
}
builder.setHeaderProvider(
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java
index 6bbb00e76f2..ab33d8e5c16 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java
@@ -244,7 +244,9 @@ final class FirestoreV1WriteFn {
requireNonNull(
databaseId,
"firestoreDb must be defined on FirestoreOptions of
PipelineOptions"));
- firestoreStub =
firestoreStatefulComponentFactory.getFirestoreStub(c.getPipelineOptions());
+ firestoreStub =
+ firestoreStatefulComponentFactory.getFirestoreStub(
+ c.getPipelineOptions(), project, databaseId);
}
/**
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java
index 623f947c45a..f20181fbc32 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java
@@ -111,7 +111,7 @@ public abstract class BaseFirestoreV1WriteFnTest<
when(rpcQos.newWriteAttempt(any())).thenReturn(attempt, attempt2);
when(ff.getRpcQos(any())).thenReturn(rpcQos);
- when(ff.getFirestoreStub(pipelineOptions)).thenReturn(stub);
+ when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
when(stub.batchWriteCallable()).thenReturn(callable);
metricsFixture = new MetricsFixture();
}
@@ -129,7 +129,7 @@ public abstract class BaseFirestoreV1WriteFnTest<
Write write = newWrite();
Element<Write> element1 = new WriteElement(0, write, window);
- when(ff.getFirestoreStub(any())).thenReturn(stub);
+ when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
when(ff.getRpcQos(any())).thenReturn(rpcQos);
when(rpcQos.newWriteAttempt(FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite))
.thenReturn(attempt);
@@ -175,7 +175,7 @@ public abstract class BaseFirestoreV1WriteFnTest<
@Override
@Test
public final void noRequestIsSentIfNotSafeToProceed() throws Exception {
- when(ff.getFirestoreStub(any())).thenReturn(stub);
+ when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
when(ff.getRpcQos(any())).thenReturn(rpcQos);
when(rpcQos.newWriteAttempt(FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite))
.thenReturn(attempt);
@@ -369,7 +369,7 @@ public abstract class BaseFirestoreV1WriteFnTest<
LOG.debug("options = {}", options);
FirestoreStatefulComponentFactory ff =
mock(FirestoreStatefulComponentFactory.class);
- when(ff.getFirestoreStub(any())).thenReturn(stub);
+ when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
Random random = new Random(12345);
TestClock clock = new TestClock(Instant.EPOCH,
Duration.standardSeconds(1));
Sleeper sleeper = millis ->
clock.setNext(advanceClockBy(Duration.millis(millis)));
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java
index 35d0ea9482d..e7f98ff73c6 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java
@@ -67,7 +67,7 @@ public final class
FirestoreV1FnBatchWriteWithDeadLetterQueueTest
int maxBytes = 50;
RpcQosOptions options =
rpcQosOptions.toBuilder().withBatchMaxBytes(maxBytes).build();
- when(ff.getFirestoreStub(any())).thenReturn(stub);
+ when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
when(ff.getRpcQos(any()))
.thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options));
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java
index 3e37e3975bf..e7174537943 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java
@@ -76,7 +76,7 @@ public final class FirestoreV1FnBatchWriteWithSummaryTest
int maxBytes = 50;
RpcQosOptions options =
rpcQosOptions.toBuilder().withBatchMaxBytes(maxBytes).build();
- when(ff.getFirestoreStub(any())).thenReturn(stub);
+ when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
when(ff.getRpcQos(any()))
.thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options));
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java
index 8695080cb88..14344b105b3 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java
@@ -42,6 +42,7 @@ import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreIO;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreOptions;
import org.apache.beam.sdk.io.gcp.firestore.RpcQosOptions;
import
org.apache.beam.sdk.io.gcp.firestore.it.FirestoreTestingHelper.CleanupMode;
import
org.apache.beam.sdk.io.gcp.firestore.it.FirestoreTestingHelper.DataLayout;
@@ -97,7 +98,7 @@ abstract class BaseFirestoreIT {
@Before
public void setup() {
project =
TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
- databaseId = "firestoredb";
+ databaseId =
TestPipeline.testingPipelineOptions().as(FirestoreOptions.class).getFirestoreDb();
}
private static Instant toWriteTime(WriteResult result) {