This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 45911229938 Iceberg direct write (#36720)
45911229938 is described below
commit 45911229938b34aa2918620d075dd16309b2e245
Author: Tom Stepp <[email protected]>
AuthorDate: Tue Nov 25 10:03:15 2025 -0800
Iceberg direct write (#36720)
* Iceberg direct write
* Make RowSizer compatible with Java 11.
* Fix some build issues
* Minor updates
* More efficient encoded string size calculation
* Rm extra parenthesis
* Write direct rows to files
* Address PR feedback
* Remove commented import
* Mack new Iceberg util methods package private.
* Add unit test
---
.../apache/beam/sdk/io/iceberg/BundleLifter.java | 170 ++++++++++++++++++++
.../org/apache/beam/sdk/io/iceberg/IcebergIO.java | 19 ++-
.../apache/beam/sdk/io/iceberg/IcebergUtils.java | 9 ++
.../IcebergWriteSchemaTransformProvider.java | 11 ++
...owsToFiles.java => WriteDirectRowsToFiles.java} | 86 ++++++----
.../sdk/io/iceberg/WriteGroupedRowsToFiles.java | 9 +-
.../beam/sdk/io/iceberg/WriteToDestinations.java | 173 +++++++++++++++++----
.../sdk/io/iceberg/WriteUngroupedRowsToFiles.java | 9 +-
.../beam/sdk/io/iceberg/BundleLifterTest.java | 99 ++++++++++++
9 files changed, 512 insertions(+), 73 deletions(-)
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java
new file mode 100644
index 00000000000..639e247357f
--- /dev/null
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A PTransform that buffers elements and outputs them to one of two TupleTags
based on the total
+ * size of the bundle in finish_bundle.
+ *
+ * @param <T> The type of elements in the input PCollection.
+ */
+public class BundleLifter<T> extends PTransform<PCollection<T>,
PCollectionTuple> {
+
+ final TupleTag<T> smallBatchTag;
+ final TupleTag<T> largeBatchTag;
+ final int threshold;
+ final SerializableFunction<T, Integer> elementSizer;
+
+ /**
+ * A DoFn that buffers elements within a bundle and outputs them to
different tags in
+ * finish_bundle based on the total bundle size.
+ *
+ * @param <T> The type of elements being processed.
+ */
+ static class BundleLiftDoFn<T> extends DoFn<T, Void> {
+ private static final Logger LOG =
LoggerFactory.getLogger(BundleLiftDoFn.class);
+
+ final TupleTag<T> smallBatchTag;
+ final TupleTag<T> largeBatchTag;
+ final int threshold;
+ final SerializableFunction<T, Integer> elementSizer;
+
+ private transient @MonotonicNonNull List<T> buffer;
+ private transient long bundleSizeBytes;
+ private transient @Nullable MultiOutputReceiver receiver;
+
+ BundleLiftDoFn(
+ TupleTag<T> smallBatchTag,
+ TupleTag<T> largeBatchTag,
+ int threshold,
+ SerializableFunction<T, Integer> elementSizer) {
+ this.smallBatchTag = smallBatchTag;
+ this.largeBatchTag = largeBatchTag;
+ this.threshold = threshold;
+ this.elementSizer = elementSizer;
+ }
+
+ @StartBundle
+ public void startBundle() {
+ buffer = new ArrayList<>();
+ receiver = null;
+ bundleSizeBytes = 0L;
+ }
+
+ @ProcessElement
+ public void processElement(@Element T element, MultiOutputReceiver mor) {
+ if (receiver == null) {
+ receiver = mor;
+ }
+ checkArgumentNotNull(buffer, "Buffer should be set by startBundle.");
+ buffer.add(element);
+ bundleSizeBytes += elementSizer.apply(element);
+ }
+
+ @FinishBundle
+ public void finishBundle() {
+ checkArgumentNotNull(buffer, "Buffer should be set by startBundle.");
+ if (buffer.isEmpty()) {
+ return;
+ }
+
+ // Select the target tag based on the bundle size
+ TupleTag<T> targetTag;
+ targetTag = (bundleSizeBytes < threshold) ? smallBatchTag :
largeBatchTag;
+ LOG.debug(
+ "Emitting {} elements of {} estimated bytes to tag: '{}'",
+ buffer.size(),
+ bundleSizeBytes,
+ targetTag.getId());
+
+ checkArgumentNotNull(receiver, "Receiver should be set by startBundle.");
+ OutputReceiver<T> taggedOutput = receiver.get(targetTag);
+
+ for (T element : buffer) {
+ taggedOutput.output(element);
+ }
+ }
+ }
+
+ private BundleLifter(TupleTag<T> smallBatchTag, TupleTag<T> largeBatchTag,
int threshold) {
+ this(smallBatchTag, largeBatchTag, threshold, x -> 1);
+ }
+
+ private BundleLifter(
+ TupleTag<T> smallBatchTag,
+ TupleTag<T> largeBatchTag,
+ int threshold,
+ SerializableFunction<T, Integer> elementSizer) {
+ if (smallBatchTag == null || largeBatchTag == null) {
+ throw new IllegalArgumentException("smallBatchTag and largeBatchTag must
not be null");
+ }
+ if (smallBatchTag.getId().equals(largeBatchTag.getId())) {
+ throw new IllegalArgumentException("smallBatchTag and largeBatchTag must
be different");
+ }
+ if (threshold <= 0) {
+ throw new IllegalArgumentException("Threshold must be a positive
integer");
+ }
+
+ this.smallBatchTag = smallBatchTag;
+ this.largeBatchTag = largeBatchTag;
+ this.threshold = threshold;
+ this.elementSizer = elementSizer;
+ }
+
+ public static <T> BundleLifter<T> of(
+ TupleTag<T> smallBatchTag, TupleTag<T> largeBatchTag, int threshold) {
+ return new BundleLifter<>(smallBatchTag, largeBatchTag, threshold);
+ }
+
+ public static <T> BundleLifter<T> of(
+ TupleTag<T> smallBatchTag,
+ TupleTag<T> largeBatchTag,
+ int threshold,
+ SerializableFunction<T, Integer> elementSizer) {
+ return new BundleLifter<>(smallBatchTag, largeBatchTag, threshold,
elementSizer);
+ }
+
+ @Override
+ public PCollectionTuple expand(PCollection<T> input) {
+ final TupleTag<Void> mainOutputTag = new TupleTag<Void>() {};
+
+ return input.apply(
+ "BundleLiftDoFn",
+ ParDo.of(new BundleLiftDoFn<>(smallBatchTag, largeBatchTag, threshold,
elementSizer))
+ .withOutputTags(mainOutputTag,
TupleTagList.of(smallBatchTag).and(largeBatchTag)));
+ }
+}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
index 956e45651df..1d71ad54909 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
@@ -395,6 +395,8 @@ public class IcebergIO {
abstract @Nullable Duration getTriggeringFrequency();
+ abstract @Nullable Integer getDirectWriteByteLimit();
+
abstract Builder toBuilder();
@AutoValue.Builder
@@ -407,6 +409,8 @@ public class IcebergIO {
abstract Builder setTriggeringFrequency(Duration triggeringFrequency);
+ abstract Builder setDirectWriteByteLimit(Integer directWriteByteLimit);
+
abstract WriteRows build();
}
@@ -435,6 +439,10 @@ public class IcebergIO {
return toBuilder().setTriggeringFrequency(triggeringFrequency).build();
}
+ public WriteRows withDirectWriteByteLimit(Integer directWriteByteLimit) {
+ return toBuilder().setDirectWriteByteLimit(directWriteByteLimit).build();
+ }
+
@Override
public IcebergWriteResult expand(PCollection<Row> input) {
List<?> allToArgs = Arrays.asList(getTableIdentifier(),
getDynamicDestinations());
@@ -451,11 +459,20 @@ public class IcebergIO {
// Assign destinations before re-windowing to global in
WriteToDestinations because
// user's dynamic destination may depend on windowing properties
+ if (IcebergUtils.validDirectWriteLimit(getDirectWriteByteLimit())) {
+ Preconditions.checkArgument(
+ IcebergUtils.isUnbounded(input),
+ "Must only provide direct write limit for unbounded pipelines.");
+ }
return input
.apply("Assign Table Destinations", new
AssignDestinations(destinations))
.apply(
"Write Rows to Destinations",
- new WriteToDestinations(getCatalogConfig(), destinations,
getTriggeringFrequency()));
+ new WriteToDestinations(
+ getCatalogConfig(),
+ destinations,
+ getTriggeringFrequency(),
+ getDirectWriteByteLimit()));
}
}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
index 4b448a2e08c..f76d000628f 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
@@ -38,6 +38,7 @@ import
org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric;
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -608,4 +609,12 @@ public class IcebergUtils {
// LocalDateTime, LocalDate, LocalTime
return icebergValue;
}
+
+ static <T> boolean isUnbounded(PCollection<T> input) {
+ return input.isBounded().equals(PCollection.IsBounded.UNBOUNDED);
+ }
+
+ static boolean validDirectWriteLimit(@Nullable Integer directWriteByteLimit)
{
+ return directWriteByteLimit != null && directWriteByteLimit >= 0;
+ }
}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
index 71c898b0044..428ef71f23e 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
@@ -95,6 +95,10 @@ public class IcebergWriteSchemaTransformProvider
"For a streaming pipeline, sets the frequency at which snapshots are
produced.")
public abstract @Nullable Integer getTriggeringFrequencySeconds();
+ @SchemaFieldDescription(
+ "For a streaming pipeline, sets the limit for lifting bundles into the
direct write path.")
+ public abstract @Nullable Integer getDirectWriteByteLimit();
+
@SchemaFieldDescription(
"A list of field names to keep in the input record. All other fields
are dropped before writing. "
+ "Is mutually exclusive with 'drop' and 'only'.")
@@ -142,6 +146,8 @@ public class IcebergWriteSchemaTransformProvider
public abstract Builder setTriggeringFrequencySeconds(Integer
triggeringFrequencySeconds);
+ public abstract Builder setDirectWriteByteLimit(Integer
directWriteByteLimit);
+
public abstract Builder setKeep(List<String> keep);
public abstract Builder setDrop(List<String> drop);
@@ -227,6 +233,11 @@ public class IcebergWriteSchemaTransformProvider
writeTransform =
writeTransform.withTriggeringFrequency(Duration.standardSeconds(trigFreq));
}
+ Integer directWriteByteLimit = configuration.getDirectWriteByteLimit();
+ if (directWriteByteLimit != null) {
+ writeTransform =
writeTransform.withDirectWriteByteLimit(directWriteByteLimit);
+ }
+
// TODO: support dynamic destinations
IcebergWriteResult result = rows.apply(writeTransform);
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteDirectRowsToFiles.java
similarity index 58%
copy from
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
copy to
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteDirectRowsToFiles.java
index 7db1ac42659..8835e2ff628 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteDirectRowsToFiles.java
@@ -18,59 +18,60 @@
package org.apache.beam.sdk.io.iceberg;
import java.util.List;
+import java.util.Map;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.iceberg.catalog.Catalog;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
-class WriteGroupedRowsToFiles
- extends PTransform<
- PCollection<KV<ShardedKey<String>, Iterable<Row>>>,
PCollection<FileWriteResult>> {
-
- private static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 29); // 512mb
+class WriteDirectRowsToFiles
+ extends PTransform<PCollection<KV<String, Row>>,
PCollection<FileWriteResult>> {
private final DynamicDestinations dynamicDestinations;
private final IcebergCatalogConfig catalogConfig;
private final String filePrefix;
+ private final long maxBytesPerFile;
- WriteGroupedRowsToFiles(
+ WriteDirectRowsToFiles(
IcebergCatalogConfig catalogConfig,
DynamicDestinations dynamicDestinations,
- String filePrefix) {
+ String filePrefix,
+ long maxBytesPerFile) {
this.catalogConfig = catalogConfig;
this.dynamicDestinations = dynamicDestinations;
this.filePrefix = filePrefix;
+ this.maxBytesPerFile = maxBytesPerFile;
}
@Override
- public PCollection<FileWriteResult> expand(
- PCollection<KV<ShardedKey<String>, Iterable<Row>>> input) {
+ public PCollection<FileWriteResult> expand(PCollection<KV<String, Row>>
input) {
return input.apply(
ParDo.of(
- new WriteGroupedRowsToFilesDoFn(
- catalogConfig, dynamicDestinations,
DEFAULT_MAX_BYTES_PER_FILE, filePrefix)));
+ new WriteDirectRowsToFilesDoFn(
+ catalogConfig, dynamicDestinations, maxBytesPerFile,
filePrefix)));
}
- private static class WriteGroupedRowsToFilesDoFn
- extends DoFn<KV<ShardedKey<String>, Iterable<Row>>, FileWriteResult> {
+ private static class WriteDirectRowsToFilesDoFn extends DoFn<KV<String,
Row>, FileWriteResult> {
private final DynamicDestinations dynamicDestinations;
private final IcebergCatalogConfig catalogConfig;
private transient @MonotonicNonNull Catalog catalog;
private final String filePrefix;
private final long maxFileSize;
+ private transient @Nullable RecordWriterManager recordWriterManager;
- WriteGroupedRowsToFilesDoFn(
+ WriteDirectRowsToFilesDoFn(
IcebergCatalogConfig catalogConfig,
DynamicDestinations dynamicDestinations,
long maxFileSize,
@@ -79,6 +80,7 @@ class WriteGroupedRowsToFiles
this.dynamicDestinations = dynamicDestinations;
this.filePrefix = filePrefix;
this.maxFileSize = maxFileSize;
+ this.recordWriterManager = null;
}
private org.apache.iceberg.catalog.Catalog getCatalog() {
@@ -88,36 +90,52 @@ class WriteGroupedRowsToFiles
return catalog;
}
+ @StartBundle
+ public void startBundle() {
+ recordWriterManager =
+ new RecordWriterManager(getCatalog(), filePrefix, maxFileSize,
Integer.MAX_VALUE);
+ }
+
@ProcessElement
public void processElement(
- ProcessContext c,
- @Element KV<ShardedKey<String>, Iterable<Row>> element,
+ ProcessContext context,
+ @Element KV<String, Row> element,
BoundedWindow window,
PaneInfo paneInfo)
throws Exception {
-
- String tableIdentifier = element.getKey().getKey();
+ String tableIdentifier = element.getKey();
IcebergDestination destination =
dynamicDestinations.instantiateDestination(tableIdentifier);
WindowedValue<IcebergDestination> windowedDestination =
WindowedValues.of(destination, window.maxTimestamp(), window,
paneInfo);
- RecordWriterManager writer;
- try (RecordWriterManager openWriter =
- new RecordWriterManager(getCatalog(), filePrefix, maxFileSize,
Integer.MAX_VALUE)) {
- writer = openWriter;
- for (Row e : element.getValue()) {
- writer.write(windowedDestination, e);
- }
+ Preconditions.checkNotNull(recordWriterManager)
+ .write(windowedDestination, element.getValue());
+ }
+
+ @FinishBundle
+ public void finishBundle(FinishBundleContext context) throws Exception {
+ if (recordWriterManager == null) {
+ return;
}
+ recordWriterManager.close();
- List<SerializableDataFile> serializableDataFiles =
-
Preconditions.checkNotNull(writer.getSerializableDataFiles().get(windowedDestination));
- for (SerializableDataFile dataFile : serializableDataFiles) {
- c.output(
- FileWriteResult.builder()
- .setTableIdentifier(destination.getTableIdentifier())
- .setSerializableDataFile(dataFile)
- .build());
+ for (Map.Entry<WindowedValue<IcebergDestination>,
List<SerializableDataFile>>
+ destinationAndFiles :
+ Preconditions.checkNotNull(recordWriterManager)
+ .getSerializableDataFiles()
+ .entrySet()) {
+ WindowedValue<IcebergDestination> windowedDestination =
destinationAndFiles.getKey();
+
+ for (SerializableDataFile dataFile : destinationAndFiles.getValue()) {
+ context.output(
+ FileWriteResult.builder()
+ .setSerializableDataFile(dataFile)
+
.setTableIdentifier(windowedDestination.getValue().getTableIdentifier())
+ .build(),
+ windowedDestination.getTimestamp(),
+ Iterables.getFirst(windowedDestination.getWindows(), null));
+ }
}
+ recordWriterManager = null;
}
}
}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
index 7db1ac42659..12d9570d4a3 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
@@ -36,8 +36,7 @@ import
org.checkerframework.checker.nullness.qual.MonotonicNonNull;
class WriteGroupedRowsToFiles
extends PTransform<
PCollection<KV<ShardedKey<String>, Iterable<Row>>>,
PCollection<FileWriteResult>> {
-
- private static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 29); // 512mb
+ private final long maxBytesPerFile;
private final DynamicDestinations dynamicDestinations;
private final IcebergCatalogConfig catalogConfig;
@@ -46,10 +45,12 @@ class WriteGroupedRowsToFiles
WriteGroupedRowsToFiles(
IcebergCatalogConfig catalogConfig,
DynamicDestinations dynamicDestinations,
- String filePrefix) {
+ String filePrefix,
+ long maxBytesPerFile) {
this.catalogConfig = catalogConfig;
this.dynamicDestinations = dynamicDestinations;
this.filePrefix = filePrefix;
+ this.maxBytesPerFile = maxBytesPerFile;
}
@Override
@@ -58,7 +59,7 @@ class WriteGroupedRowsToFiles
return input.apply(
ParDo.of(
new WriteGroupedRowsToFilesDoFn(
- catalogConfig, dynamicDestinations,
DEFAULT_MAX_BYTES_PER_FILE, filePrefix)));
+ catalogConfig, dynamicDestinations, maxBytesPerFile,
filePrefix)));
}
private static class WriteGroupedRowsToFilesDoFn
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java
index fb3bf43f351..bea84fc826b 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java
@@ -17,8 +17,11 @@
*/
package org.apache.beam.sdk.io.iceberg;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import java.util.List;
+import java.util.Map;
import java.util.UUID;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
@@ -28,6 +31,7 @@ import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
@@ -36,7 +40,9 @@ import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
@@ -47,19 +53,22 @@ class WriteToDestinations extends
PTransform<PCollection<KV<String, Row>>, Icebe
private static final int FILE_TRIGGERING_RECORD_COUNT = 500_000;
// Used for auto-sharding in streaming. Limits total byte size per batch/file
public static final int FILE_TRIGGERING_BYTE_COUNT = 1 << 30; // 1GiB
- static final int DEFAULT_NUM_FILE_SHARDS = 0;
+ private static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 29); // 512mb
private final IcebergCatalogConfig catalogConfig;
private final DynamicDestinations dynamicDestinations;
private final @Nullable Duration triggeringFrequency;
private final String filePrefix;
+ private final @Nullable Integer directWriteByteLimit;
WriteToDestinations(
IcebergCatalogConfig catalogConfig,
DynamicDestinations dynamicDestinations,
- @Nullable Duration triggeringFrequency) {
+ @Nullable Duration triggeringFrequency,
+ @Nullable Integer directWriteByteLimit) {
this.dynamicDestinations = dynamicDestinations;
this.catalogConfig = catalogConfig;
this.triggeringFrequency = triggeringFrequency;
+ this.directWriteByteLimit = directWriteByteLimit;
// single unique prefix per write transform
this.filePrefix = UUID.randomUUID().toString();
}
@@ -67,10 +76,15 @@ class WriteToDestinations extends
PTransform<PCollection<KV<String, Row>>, Icebe
@Override
public IcebergWriteResult expand(PCollection<KV<String, Row>> input) {
// Write records to files
- PCollection<FileWriteResult> writtenFiles =
- input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)
- ? writeTriggered(input)
- : writeUntriggered(input);
+ PCollection<FileWriteResult> writtenFiles;
+ if (IcebergUtils.isUnbounded(input)) {
+ writtenFiles =
+ IcebergUtils.validDirectWriteLimit(directWriteByteLimit)
+ ? writeTriggeredWithBundleLifting(input)
+ : writeTriggered(input);
+ } else {
+ writtenFiles = writeUntriggered(input);
+ }
// Commit files to tables
PCollection<KV<String, SnapshotInfo>> snapshots =
@@ -79,17 +93,12 @@ class WriteToDestinations extends
PTransform<PCollection<KV<String, Row>>, Icebe
return new IcebergWriteResult(input.getPipeline(), snapshots);
}
- private PCollection<FileWriteResult> writeTriggered(PCollection<KV<String,
Row>> input) {
- checkArgumentNotNull(
- triggeringFrequency, "Streaming pipelines must set a triggering
frequency.");
-
- // Group records into batches to avoid writing thousands of small files
+ private PCollection<FileWriteResult>
groupAndWriteRecords(PCollection<KV<String, Row>> input) {
+ // We rely on GroupIntoBatches to group and parallelize records properly,
+ // respecting our thresholds for number of records and bytes per batch.
+ // Each output batch will be written to a file.
PCollection<KV<ShardedKey<String>, Iterable<Row>>> groupedRecords =
input
- .apply("WindowIntoGlobal", Window.into(new GlobalWindows()))
- // We rely on GroupIntoBatches to group and parallelize records
properly,
- // respecting our thresholds for number of records and bytes per
batch.
- // Each output batch will be written to a file.
.apply(
GroupIntoBatches.<String,
Row>ofSize(FILE_TRIGGERING_RECORD_COUNT)
.withByteSize(FILE_TRIGGERING_BYTE_COUNT)
@@ -100,19 +109,72 @@ class WriteToDestinations extends
PTransform<PCollection<KV<String, Row>>, Icebe
org.apache.beam.sdk.util.ShardedKey.Coder.of(StringUtf8Coder.of()),
IterableCoder.of(RowCoder.of(dynamicDestinations.getDataSchema()))));
- return groupedRecords
- .apply(
- "WriteGroupedRows",
- new WriteGroupedRowsToFiles(catalogConfig, dynamicDestinations,
filePrefix))
- // Respect user's triggering frequency before committing snapshots
- .apply(
- "ApplyUserTrigger",
- Window.<FileWriteResult>into(new GlobalWindows())
- .triggering(
- Repeatedly.forever(
- AfterProcessingTime.pastFirstElementInPane()
-
.plusDelayOf(checkArgumentNotNull(triggeringFrequency))))
- .discardingFiredPanes());
+ return groupedRecords.apply(
+ "WriteGroupedRows",
+ new WriteGroupedRowsToFiles(
+ catalogConfig, dynamicDestinations, filePrefix,
DEFAULT_MAX_BYTES_PER_FILE));
+ }
+
+ private PCollection<FileWriteResult>
applyUserTriggering(PCollection<FileWriteResult> input) {
+ return input.apply(
+ "ApplyUserTrigger",
+ Window.<FileWriteResult>into(new GlobalWindows())
+ .triggering(
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+
.plusDelayOf(checkArgumentNotNull(triggeringFrequency))))
+ .discardingFiredPanes());
+ }
+
+ private PCollection<FileWriteResult> writeTriggeredWithBundleLifting(
+ PCollection<KV<String, Row>> input) {
+ checkArgumentNotNull(
+ triggeringFrequency, "Streaming pipelines must set a triggering
frequency.");
+ checkArgumentNotNull(
+ directWriteByteLimit, "Must set non-null directWriteByteLimit for
bundle lifting.");
+
+ final TupleTag<KV<String, Row>> groupedRecordsTag = new
TupleTag<>("small_batches");
+ final TupleTag<KV<String, Row>> directRecordsTag = new
TupleTag<>("large_batches");
+
+ input = input.apply("WindowIntoGlobal", Window.into(new GlobalWindows()));
+ PCollectionTuple bundleOutputs =
+ input.apply(
+ BundleLifter.of(
+ groupedRecordsTag, directRecordsTag, directWriteByteLimit, new
RowSizer()));
+
+ PCollection<KV<String, Row>> smallBatches =
+ bundleOutputs
+ .get(groupedRecordsTag)
+ .setCoder(
+ KvCoder.of(StringUtf8Coder.of(),
RowCoder.of(dynamicDestinations.getDataSchema())));
+ PCollection<KV<String, Row>> largeBatches =
+ bundleOutputs
+ .get(directRecordsTag)
+ .setCoder(
+ KvCoder.of(StringUtf8Coder.of(),
RowCoder.of(dynamicDestinations.getDataSchema())));
+
+ PCollection<FileWriteResult> directFileWrites =
+ largeBatches.apply(
+ "WriteDirectRowsToFiles",
+ new WriteDirectRowsToFiles(
+ catalogConfig, dynamicDestinations, filePrefix,
DEFAULT_MAX_BYTES_PER_FILE));
+
+ PCollection<FileWriteResult> groupedFileWrites =
groupAndWriteRecords(smallBatches);
+
+ PCollection<FileWriteResult> allFileWrites =
+ PCollectionList.of(groupedFileWrites)
+ .and(directFileWrites)
+ .apply(Flatten.<FileWriteResult>pCollections());
+
+ return applyUserTriggering(allFileWrites);
+ }
+
+ private PCollection<FileWriteResult> writeTriggered(PCollection<KV<String,
Row>> input) {
+ checkArgumentNotNull(
+ triggeringFrequency, "Streaming pipelines must set a triggering
frequency.");
+ input = input.apply("WindowIntoGlobal", Window.into(new GlobalWindows()));
+ PCollection<FileWriteResult> files = groupAndWriteRecords(input);
+ return applyUserTriggering(files);
}
private PCollection<FileWriteResult> writeUntriggered(PCollection<KV<String,
Row>> input) {
@@ -126,7 +188,8 @@ class WriteToDestinations extends
PTransform<PCollection<KV<String, Row>>, Icebe
WriteUngroupedRowsToFiles.Result writeUngroupedResult =
input.apply(
"Fast-path write rows",
- new WriteUngroupedRowsToFiles(catalogConfig, dynamicDestinations,
filePrefix));
+ new WriteUngroupedRowsToFiles(
+ catalogConfig, dynamicDestinations, filePrefix,
DEFAULT_MAX_BYTES_PER_FILE));
// Then write the rest by shuffling on the destination
PCollection<FileWriteResult> writeGroupedResult =
@@ -135,10 +198,60 @@ class WriteToDestinations extends
PTransform<PCollection<KV<String, Row>>, Icebe
.apply("Group spilled rows by destination shard",
GroupByKey.create())
.apply(
"Write remaining rows to files",
- new WriteGroupedRowsToFiles(catalogConfig,
dynamicDestinations, filePrefix));
+ new WriteGroupedRowsToFiles(
+ catalogConfig, dynamicDestinations, filePrefix,
DEFAULT_MAX_BYTES_PER_FILE));
return PCollectionList.of(writeUngroupedResult.getWrittenFiles())
.and(writeGroupedResult)
.apply("Flatten Written Files", Flatten.pCollections());
}
+
+ /**
+ * A SerializableFunction to estimate the byte size of a Row for bundling
purposes. This is a
+ * heuristic that avoids the high cost of encoding each row with a Coder.
+ */
+ private static class RowSizer implements SerializableFunction<KV<String,
Row>, Integer> {
+ @Override
+ public Integer apply(KV<String, Row> element) {
+ return estimateRowSize(element.getValue());
+ }
+
+ private int estimateRowSize(Row row) {
+ if (row == null) {
+ return 0;
+ }
+ int size = 0;
+ for (Object value : row.getValues()) {
+ size += estimateObjectSize(value);
+ }
+ return size;
+ }
+
+ private int estimateObjectSize(@Nullable Object value) {
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof String) {
+ return ((String) value).getBytes(UTF_8).length;
+ } else if (value instanceof byte[]) {
+ return ((byte[]) value).length;
+ } else if (value instanceof Row) {
+ return estimateRowSize((Row) value);
+ } else if (value instanceof List) {
+ int listSize = 0;
+ for (Object item : (List) value) {
+ listSize += estimateObjectSize(item);
+ }
+ return listSize;
+ } else if (value instanceof Map) {
+ int mapSize = 0;
+ for (Map.Entry<?, ?> entry : ((Map<?, ?>) value).entrySet()) {
+ mapSize += estimateObjectSize(entry.getKey()) +
estimateObjectSize(entry.getValue());
+ }
+ return mapSize;
+ } else {
+ return 8; // Approximation for other fields
+ }
+ }
+ }
}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java
index bf2a5a3535f..1db6ede3016 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java
@@ -65,8 +65,6 @@ class WriteUngroupedRowsToFiles
*/
@VisibleForTesting static final int DEFAULT_MAX_WRITERS_PER_BUNDLE = 20;
- private static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 29); // 512mb
-
private static final TupleTag<FileWriteResult> WRITTEN_FILES_TAG = new
TupleTag<>("writtenFiles");
private static final TupleTag<Row> WRITTEN_ROWS_TAG = new
TupleTag<Row>("writtenRows") {};
private static final TupleTag<KV<ShardedKey<String>, Row>> SPILLED_ROWS_TAG =
@@ -75,14 +73,17 @@ class WriteUngroupedRowsToFiles
private final String filePrefix;
private final DynamicDestinations dynamicDestinations;
private final IcebergCatalogConfig catalogConfig;
+ private final long maxBytesPerFile;
WriteUngroupedRowsToFiles(
IcebergCatalogConfig catalogConfig,
DynamicDestinations dynamicDestinations,
- String filePrefix) {
+ String filePrefix,
+ long maxBytesPerFile) {
this.catalogConfig = catalogConfig;
this.dynamicDestinations = dynamicDestinations;
this.filePrefix = filePrefix;
+ this.maxBytesPerFile = maxBytesPerFile;
}
@Override
@@ -96,7 +97,7 @@ class WriteUngroupedRowsToFiles
dynamicDestinations,
filePrefix,
DEFAULT_MAX_WRITERS_PER_BUNDLE,
- DEFAULT_MAX_BYTES_PER_FILE))
+ maxBytesPerFile))
.withOutputTags(
WRITTEN_FILES_TAG,
TupleTagList.of(ImmutableList.of(WRITTEN_ROWS_TAG,
SPILLED_ROWS_TAG))));
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BundleLifterTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BundleLifterTest.java
new file mode 100644
index 00000000000..1eaa0920e6c
--- /dev/null
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BundleLifterTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+
+import org.apache.beam.sdk.io.iceberg.BundleLifter.BundleLiftDoFn;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.values.TupleTag;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class BundleLifterTest {
+
+ private static final TupleTag<Integer> INTEGER_SMALL = new
TupleTag<Integer>() {};
+ private static final TupleTag<Integer> INTEGER_LARGE = new
TupleTag<Integer>() {};
+ private static final TupleTag<String> STRING_SMALL = new TupleTag<String>()
{};
+ private static final TupleTag<String> STRING_LARGE = new TupleTag<String>()
{};
+
+ @Test
+ public void testSmallBundle() throws Exception {
+ DoFnTester<Integer, Void> tester =
+ DoFnTester.of(new BundleLiftDoFn<>(INTEGER_SMALL, INTEGER_LARGE, 3, x
-> 1));
+
+ tester.startBundle();
+ tester.processElement(1);
+ tester.processElement(2);
+ tester.finishBundle();
+
+ assertThat(tester.peekOutputElements(INTEGER_SMALL), containsInAnyOrder(1,
2));
+ assertThat(tester.peekOutputElements(INTEGER_LARGE), empty());
+ }
+
+ @Test
+ public void testLargeBundle() throws Exception {
+ DoFnTester<Integer, Void> tester =
+ DoFnTester.of(new BundleLiftDoFn<>(INTEGER_SMALL, INTEGER_LARGE, 3, x
-> 1));
+
+ tester.startBundle();
+ tester.processElement(1);
+ tester.processElement(2);
+ tester.processElement(3);
+ tester.finishBundle();
+
+ assertThat(tester.peekOutputElements(INTEGER_SMALL), empty());
+ assertThat(tester.peekOutputElements(INTEGER_LARGE), containsInAnyOrder(1,
2, 3));
+ }
+
+ @Test
+ public void testSmallBundleWithSizer() throws Exception {
+ DoFnTester<String, Void> tester =
+ DoFnTester.of(new BundleLiftDoFn<>(STRING_SMALL, STRING_LARGE, 10, e
-> e.length()));
+
+ tester.startBundle();
+ tester.processElement("123");
+ tester.processElement("456");
+ tester.processElement("789");
+ tester.finishBundle();
+
+ assertThat(tester.peekOutputElements(STRING_SMALL),
containsInAnyOrder("123", "456", "789"));
+ assertThat(tester.peekOutputElements(STRING_LARGE), empty());
+ }
+
+ @Test
+ public void testLargeBundleWithSizer() throws Exception {
+ DoFnTester<String, Void> tester =
+ DoFnTester.of(new BundleLiftDoFn<>(STRING_SMALL, STRING_LARGE, 10, e
-> e.length()));
+
+ tester.startBundle();
+ tester.processElement("123");
+ tester.processElement("456");
+ tester.processElement("789");
+ tester.processElement("0");
+ tester.finishBundle();
+
+ assertThat(tester.peekOutputElements(STRING_SMALL), empty());
+ assertThat(
+ tester.peekOutputElements(STRING_LARGE), containsInAnyOrder("123",
"456", "789", "0"));
+ }
+}