This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 881bbaa0aac
DynamicDestinationsHelper.ConstantTimePartitioningClusteringDestinations is
parsing per element json configuration for partitioning and clustering which is
expensive. Cache the outcome of evaluation so it's done once. (#37014)
881bbaa0aac is described below
commit 881bbaa0aac1a030fdea3dac9123231ec62920cd
Author: Radosław Stankiewicz <[email protected]>
AuthorDate: Wed Jan 7 20:14:59 2026 +0100
DynamicDestinationsHelper.ConstantTimePartitioningClusteringDestinations
is parsing per element json configuration for partitioning and clustering which
is expensive. Cache the outcome of evaluation so it's done once. (#37014)
---
.../gcp/bigquery/DynamicDestinationsHelpers.java | 47 +++++++++++++++++-----
1 file changed, 37 insertions(+), 10 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
index eed4314e391..52b5b954a09 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
@@ -30,7 +30,7 @@ import com.google.gson.JsonParser;
import java.io.IOException;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
@@ -279,6 +279,11 @@ class DynamicDestinationsHelpers {
private final @Nullable ValueProvider<String> jsonTimePartitioning;
private final @Nullable ValueProvider<String> jsonClustering;
+ // Lazily initialized and cached values.
+ private @Nullable String evaluatedPartitioning = null;
+ private @Nullable String evaluatedClustering = null;
+ private final AtomicBoolean initialized = new AtomicBoolean(false);
+
ConstantTimePartitioningClusteringDestinations(
DynamicDestinations<T, TableDestination> inner,
ValueProvider<String> jsonTimePartitioning,
@@ -299,19 +304,41 @@ class DynamicDestinationsHelpers {
this.jsonClustering = jsonClustering;
}
+ static boolean isJsonConfigPresent(ValueProvider<String> json) {
+ String jsonValue = json.get();
+ return jsonValue != null &&
!JsonParser.parseString(jsonValue).getAsJsonObject().isEmpty();
+ }
+
+ private synchronized void evaluateOncePartitioningAndClustering() {
+ if (initialized.get()) {
+ return;
+ }
+ if (jsonTimePartitioning != null) {
+ if (isJsonConfigPresent(jsonTimePartitioning)) {
+ this.evaluatedPartitioning = jsonTimePartitioning.get();
+ }
+ }
+ if (jsonClustering != null) {
+ if (isJsonConfigPresent(jsonClustering)) {
+ this.evaluatedClustering = jsonClustering.get();
+ }
+ }
+ initialized.set(true);
+ }
+
@Override
public TableDestination getDestination(@Nullable ValueInSingleWindow<T>
element) {
+ if (!initialized.get()) {
+ evaluateOncePartitioningAndClustering();
+ }
TableDestination destination = super.getDestination(element);
+
String partitioning =
-
Optional.ofNullable(jsonTimePartitioning).map(ValueProvider::get).orElse(null);
- if (partitioning == null
- || JsonParser.parseString(partitioning).getAsJsonObject().isEmpty())
{
- partitioning = destination.getJsonTimePartitioning();
- }
- String clustering =
Optional.ofNullable(jsonClustering).map(ValueProvider::get).orElse(null);
- if (clustering == null ||
JsonParser.parseString(clustering).getAsJsonObject().isEmpty()) {
- clustering = destination.getJsonClustering();
- }
+ evaluatedPartitioning != null
+ ? evaluatedPartitioning
+ : destination.getJsonTimePartitioning();
+ String clustering =
+ evaluatedClustering != null ? evaluatedClustering :
destination.getJsonClustering();
return new TableDestination(
destination.getTableSpec(), destination.getTableDescription(),
partitioning, clustering);