ahmedabu98 commented on code in PR #36720: URL: https://github.com/apache/beam/pull/36720#discussion_r2546580824
########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A PTransform that buffers elements and outputs them to one of two TupleTags based on the total + * size of the bundle in finish_bundle. + * + * <p>This is the Java equivalent of the BundleLifter PTransform in Python. + * + * @param <T> The type of elements in the input PCollection. + */ +public class BundleLifter<T> extends PTransform<PCollection<T>, PCollectionTuple> { + + final TupleTag<T> smallBatchTag; + final TupleTag<T> largeBatchTag; + final int threshold; + final SerializableFunction<T, Integer> elementSizer; + + /** + * A private, static DoFn that buffers elements within a bundle and outputs them to different tags + * in finish_bundle based on the total bundle size. + * + * <p>This is the Java equivalent of the _BundleLiftDoFn in Python, now merged inside the + * PTransform. + * + * @param <T> The type of elements being processed. + */ + private static class BundleLiftDoFn<T> extends DoFn<T, Void> { + private static final Logger LOG = LoggerFactory.getLogger(BundleLiftDoFn.class); + + final TupleTag<T> smallBatchTag; + final TupleTag<T> largeBatchTag; + final int threshold; + final SerializableFunction<T, Integer> elementSizer; + + private transient @MonotonicNonNull List<T> buffer; + private transient long bundleSize; + private transient @Nullable MultiOutputReceiver receiver; + + BundleLiftDoFn( + TupleTag<T> smallBatchTag, + TupleTag<T> largeBatchTag, + int threshold, + SerializableFunction<T, Integer> elementSizer) { + this.smallBatchTag = smallBatchTag; + this.largeBatchTag = largeBatchTag; + this.threshold = threshold; + this.elementSizer = elementSizer; + } + + @StartBundle + public void startBundle() { + buffer = new ArrayList<>(); + receiver = null; + bundleSize = 0L; + } + + @ProcessElement + public void processElement(@Element T element, MultiOutputReceiver mor) { + if (receiver == null) { + receiver = mor; + } + checkArgumentNotNull(buffer, "Buffer should be set by startBundle."); + buffer.add(element); + bundleSize += elementSizer.apply(element); + } + + @FinishBundle + public void finishBundle() { + checkArgumentNotNull(buffer, "Buffer should be set by startBundle."); + if (buffer.isEmpty()) { + return; + } + + TupleTag<T> targetTag; + + // Select the target tag based on the bundle size + if (bundleSize < threshold) { + targetTag = smallBatchTag; + LOG.debug("Emitting {} elements to small tag: '{}'", bundleSize, targetTag.getId()); + } else { + targetTag = largeBatchTag; + LOG.debug("Emitting {} elements to large tag: '{}'", bundleSize, targetTag.getId()); + } + + checkArgumentNotNull(receiver, "Receiver should be set by startBundle."); + OutputReceiver<T> taggedOutput = receiver.get(targetTag); + + for (T element : buffer) { + taggedOutput.output(element); + } + } + } + + public BundleLifter(TupleTag<T> smallBatchTag, TupleTag<T> largeBatchTag, int threshold) { + this(smallBatchTag, largeBatchTag, threshold, x -> 1); + } + + public BundleLifter( Review Comment: nit: Make these construction methods private if the expected usage will be `BundleLifter.of(...)` ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A PTransform that buffers elements and outputs them to one of two TupleTags based on the total + * size of the bundle in finish_bundle. + * + * <p>This is the Java equivalent of the BundleLifter PTransform in Python. + * + * @param <T> The type of elements in the input PCollection. + */ +public class BundleLifter<T> extends PTransform<PCollection<T>, PCollectionTuple> { + + final TupleTag<T> smallBatchTag; + final TupleTag<T> largeBatchTag; + final int threshold; + final SerializableFunction<T, Integer> elementSizer; + + /** + * A private, static DoFn that buffers elements within a bundle and outputs them to different tags + * in finish_bundle based on the total bundle size. + * + * <p>This is the Java equivalent of the _BundleLiftDoFn in Python, now merged inside the + * PTransform. + * + * @param <T> The type of elements being processed. + */ + private static class BundleLiftDoFn<T> extends DoFn<T, Void> { + private static final Logger LOG = LoggerFactory.getLogger(BundleLiftDoFn.class); + + final TupleTag<T> smallBatchTag; + final TupleTag<T> largeBatchTag; + final int threshold; + final SerializableFunction<T, Integer> elementSizer; + + private transient @MonotonicNonNull List<T> buffer; + private transient long bundleSize; + private transient @Nullable MultiOutputReceiver receiver; + + BundleLiftDoFn( + TupleTag<T> smallBatchTag, + TupleTag<T> largeBatchTag, + int threshold, + SerializableFunction<T, Integer> elementSizer) { + this.smallBatchTag = smallBatchTag; + this.largeBatchTag = largeBatchTag; + this.threshold = threshold; + this.elementSizer = elementSizer; + } + + @StartBundle + public void startBundle() { + buffer = new ArrayList<>(); + receiver = null; + bundleSize = 0L; + } + + @ProcessElement + public void processElement(@Element T element, MultiOutputReceiver mor) { + if (receiver == null) { + receiver = mor; + } + checkArgumentNotNull(buffer, "Buffer should be set by startBundle."); + buffer.add(element); + bundleSize += elementSizer.apply(element); + } + + @FinishBundle + public void finishBundle() { + checkArgumentNotNull(buffer, "Buffer should be set by startBundle."); + if (buffer.isEmpty()) { + return; + } + + TupleTag<T> targetTag; + + // Select the target tag based on the bundle size + if (bundleSize < threshold) { + targetTag = smallBatchTag; + LOG.debug("Emitting {} elements to small tag: '{}'", bundleSize, targetTag.getId()); Review Comment: nit: It seems like `bundleSize` could be up to interpretation, depending on what the `elementSizer` function does? Maybe the log message could be "Emitting batch of elements of size {} to small/large tag", to avoid assuming that `bundleSize` will always mean count of elements ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A PTransform that buffers elements and outputs them to one of two TupleTags based on the total + * size of the bundle in finish_bundle. + * + * <p>This is the Java equivalent of the BundleLifter PTransform in Python. + * + * @param <T> The type of elements in the input PCollection. + */ +public class BundleLifter<T> extends PTransform<PCollection<T>, PCollectionTuple> { + + final TupleTag<T> smallBatchTag; + final TupleTag<T> largeBatchTag; + final int threshold; + final SerializableFunction<T, Integer> elementSizer; + + /** + * A private, static DoFn that buffers elements within a bundle and outputs them to different tags + * in finish_bundle based on the total bundle size. + * + * <p>This is the Java equivalent of the _BundleLiftDoFn in Python, now merged inside the + * PTransform. + * + * @param <T> The type of elements being processed. + */ + private static class BundleLiftDoFn<T> extends DoFn<T, Void> { + private static final Logger LOG = LoggerFactory.getLogger(BundleLiftDoFn.class); + + final TupleTag<T> smallBatchTag; + final TupleTag<T> largeBatchTag; + final int threshold; + final SerializableFunction<T, Integer> elementSizer; + + private transient @MonotonicNonNull List<T> buffer; + private transient long bundleSize; + private transient @Nullable MultiOutputReceiver receiver; + + BundleLiftDoFn( + TupleTag<T> smallBatchTag, + TupleTag<T> largeBatchTag, + int threshold, + SerializableFunction<T, Integer> elementSizer) { + this.smallBatchTag = smallBatchTag; + this.largeBatchTag = largeBatchTag; + this.threshold = threshold; + this.elementSizer = elementSizer; + } + + @StartBundle + public void startBundle() { + buffer = new ArrayList<>(); + receiver = null; + bundleSize = 0L; + } + + @ProcessElement + public void processElement(@Element T element, MultiOutputReceiver mor) { + if (receiver == null) { + receiver = mor; + } Review Comment: Btw finish bundle has access to a `FinishBundleContext` that can output elements to different TupleTags. Consider using that for simpler code design. They may also be implications for using a ProcessElement's output receiver outside of that scope. But lmk if there is a reason you're doing it this way ``` @FinishBundle public void finishBundle(FinishBundleContext c) { c.output(targetTag, element, timestamp, window); ``` ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java: ########## @@ -455,7 +463,11 @@ public IcebergWriteResult expand(PCollection<Row> input) { .apply("Assign Table Destinations", new AssignDestinations(destinations)) .apply( "Write Rows to Destinations", - new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency())); + new WriteToDestinations( + getCatalogConfig(), + destinations, + getTriggeringFrequency(), + getDirectWriteByteLimit())); Review Comment: Let's validate that the input is Unbounded if `directWriteByteLimit` is set. Should throw an error if that's not the case ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java: ########## @@ -135,10 +216,33 @@ private PCollection<FileWriteResult> writeUntriggered(PCollection<KV<String, Row .apply("Group spilled rows by destination shard", GroupByKey.create()) .apply( "Write remaining rows to files", - new WriteGroupedRowsToFiles(catalogConfig, dynamicDestinations, filePrefix)); + new WriteGroupedRowsToFiles( + catalogConfig, dynamicDestinations, filePrefix, DEFAULT_MAX_BYTES_PER_FILE)); return PCollectionList.of(writeUngroupedResult.getWrittenFiles()) .and(writeGroupedResult) .apply("Flatten Written Files", Flatten.pCollections()); } + + /** + * A SerializableFunction to estimate the byte size of a Row for bundling purposes. This is a + * heuristic that avoids the high cost of encoding each row with a Coder. + */ + private static class RowSizer implements SerializableFunction<KV<String, Row>, Integer> { + @Override + public Integer apply(KV<String, Row> element) { + Row row = element.getValue(); + int size = 0; + for (Object value : row.getValues()) { + if (value instanceof String) { + size += Utf8.encodedLength((String) value); + } else if (value instanceof byte[]) { + size += ((byte[]) value).length; + } else { + size += 8; // Approximation for non-string/byte fields + } + } Review Comment: Also, I think we should skip null values. Currently a null `value` will return false for `value instanceof ___`, so it falls to the else block's `size += 8` ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java: ########## @@ -47,30 +52,38 @@ class WriteToDestinations extends PTransform<PCollection<KV<String, Row>>, Icebe private static final int FILE_TRIGGERING_RECORD_COUNT = 500_000; // Used for auto-sharding in streaming. Limits total byte size per batch/file public static final int FILE_TRIGGERING_BYTE_COUNT = 1 << 30; // 1GiB - static final int DEFAULT_NUM_FILE_SHARDS = 0; + private static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 29); // 512mb private final IcebergCatalogConfig catalogConfig; private final DynamicDestinations dynamicDestinations; private final @Nullable Duration triggeringFrequency; private final String filePrefix; + private final @Nullable Integer directWriteByteLimit; WriteToDestinations( IcebergCatalogConfig catalogConfig, DynamicDestinations dynamicDestinations, - @Nullable Duration triggeringFrequency) { + @Nullable Duration triggeringFrequency, + @Nullable Integer directWriteByteLimit) { this.dynamicDestinations = dynamicDestinations; this.catalogConfig = catalogConfig; this.triggeringFrequency = triggeringFrequency; + this.directWriteByteLimit = directWriteByteLimit; // single unique prefix per write transform this.filePrefix = UUID.randomUUID().toString(); } @Override public IcebergWriteResult expand(PCollection<KV<String, Row>> input) { // Write records to files - PCollection<FileWriteResult> writtenFiles = - input.isBounded().equals(PCollection.IsBounded.UNBOUNDED) - ? writeTriggered(input) - : writeUntriggered(input); + PCollection<FileWriteResult> writtenFiles; + if (directWriteByteLimit != null && directWriteByteLimit >= 0) { + writtenFiles = writeTriggeredWithBundleLifting(input); Review Comment: I'm guessing this should also check for UNBOUNDED input. We can make that check here in the higher-level IcebergIO class (see previous comment) ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A PTransform that buffers elements and outputs them to one of two TupleTags based on the total + * size of the bundle in finish_bundle. + * + * <p>This is the Java equivalent of the BundleLifter PTransform in Python. Review Comment: Does Python actually have a transform called "BundleLifter" ? Can't seem to find it ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java: ########## @@ -135,10 +216,33 @@ private PCollection<FileWriteResult> writeUntriggered(PCollection<KV<String, Row .apply("Group spilled rows by destination shard", GroupByKey.create()) .apply( "Write remaining rows to files", - new WriteGroupedRowsToFiles(catalogConfig, dynamicDestinations, filePrefix)); + new WriteGroupedRowsToFiles( + catalogConfig, dynamicDestinations, filePrefix, DEFAULT_MAX_BYTES_PER_FILE)); return PCollectionList.of(writeUngroupedResult.getWrittenFiles()) .and(writeGroupedResult) .apply("Flatten Written Files", Flatten.pCollections()); } + + /** + * A SerializableFunction to estimate the byte size of a Row for bundling purposes. This is a + * heuristic that avoids the high cost of encoding each row with a Coder. + */ + private static class RowSizer implements SerializableFunction<KV<String, Row>, Integer> { + @Override + public Integer apply(KV<String, Row> element) { + Row row = element.getValue(); + int size = 0; + for (Object value : row.getValues()) { + if (value instanceof String) { + size += Utf8.encodedLength((String) value); + } else if (value instanceof byte[]) { + size += ((byte[]) value).length; + } else { + size += 8; // Approximation for non-string/byte fields + } + } Review Comment: Let's handle collections as well. This heuristic should be applied for: - each element of a list - key-value pairs of a map - values in a nested Row We wouldn't want to assume a 1,000 item list is 8 bytes ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java: ########## @@ -79,6 +92,72 @@ public IcebergWriteResult expand(PCollection<KV<String, Row>> input) { return new IcebergWriteResult(input.getPipeline(), snapshots); } + private PCollection<FileWriteResult> writeTriggeredWithBundleLifting( + PCollection<KV<String, Row>> input) { + checkArgumentNotNull( + triggeringFrequency, "Streaming pipelines must set a triggering frequency."); + checkArgumentNotNull( + directWriteByteLimit, "Must set non-null directWriteByteLimit for bundle lifting."); + + final TupleTag<KV<String, Row>> groupedRecordsTag = new TupleTag<>("small_batches"); + final TupleTag<KV<String, Row>> directRecordsTag = new TupleTag<>("large_batches"); + + input = input.apply("WindowIntoGlobal", Window.into(new GlobalWindows())); + PCollectionTuple bundleOutputs = + input.apply( + BundleLifter.of( + groupedRecordsTag, directRecordsTag, directWriteByteLimit, new RowSizer())); + + PCollection<KV<String, Row>> smallBatches = + bundleOutputs + .get(groupedRecordsTag) + .setCoder( + KvCoder.of(StringUtf8Coder.of(), RowCoder.of(dynamicDestinations.getDataSchema()))); + PCollection<KV<String, Row>> largeBatches = + bundleOutputs + .get(directRecordsTag) + .setCoder( + KvCoder.of(StringUtf8Coder.of(), RowCoder.of(dynamicDestinations.getDataSchema()))); + + PCollection<KV<ShardedKey<String>, Iterable<Row>>> groupedRecords = + smallBatches + .apply( + GroupIntoBatches.<String, Row>ofSize(FILE_TRIGGERING_RECORD_COUNT) + .withByteSize(FILE_TRIGGERING_BYTE_COUNT) + .withMaxBufferingDuration(checkArgumentNotNull(triggeringFrequency)) + .withShardedKey()) + .setCoder( + KvCoder.of( + Coder.of(StringUtf8Coder.of()), + IterableCoder.of(RowCoder.of(dynamicDestinations.getDataSchema())))); + + PCollection<FileWriteResult> directFileWrites = + largeBatches.apply( + "WriteDirectRowsToFiles", + new WriteDirectRowsToFiles( + catalogConfig, dynamicDestinations, filePrefix, DEFAULT_MAX_BYTES_PER_FILE)); + + PCollection<FileWriteResult> groupedFileWrites = + groupedRecords.apply( + "WriteGroupedRows", + new WriteGroupedRowsToFiles( + catalogConfig, dynamicDestinations, filePrefix, DEFAULT_MAX_BYTES_PER_FILE)); Review Comment: nit: There's a little overlap between this section and the writeTriggered path: https://github.com/apache/beam/blob/e336419205c3d6580c45ed33b2f08d5194178499/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java#L93-L106 Consider extracting into a "groupAndWriteRecords" method to handle both ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A PTransform that buffers elements and outputs them to one of two TupleTags based on the total + * size of the bundle in finish_bundle. + * + * <p>This is the Java equivalent of the BundleLifter PTransform in Python. + * + * @param <T> The type of elements in the input PCollection. + */ +public class BundleLifter<T> extends PTransform<PCollection<T>, PCollectionTuple> { + + final TupleTag<T> smallBatchTag; + final TupleTag<T> largeBatchTag; + final int threshold; + final SerializableFunction<T, Integer> elementSizer; + + /** + * A private, static DoFn that buffers elements within a bundle and outputs them to different tags + * in finish_bundle based on the total bundle size. + * + * <p>This is the Java equivalent of the _BundleLiftDoFn in Python, now merged inside the + * PTransform. + * + * @param <T> The type of elements being processed. + */ + private static class BundleLiftDoFn<T> extends DoFn<T, Void> { + private static final Logger LOG = LoggerFactory.getLogger(BundleLiftDoFn.class); + + final TupleTag<T> smallBatchTag; + final TupleTag<T> largeBatchTag; + final int threshold; + final SerializableFunction<T, Integer> elementSizer; + + private transient @MonotonicNonNull List<T> buffer; + private transient long bundleSize; + private transient @Nullable MultiOutputReceiver receiver; + + BundleLiftDoFn( + TupleTag<T> smallBatchTag, + TupleTag<T> largeBatchTag, + int threshold, + SerializableFunction<T, Integer> elementSizer) { + this.smallBatchTag = smallBatchTag; + this.largeBatchTag = largeBatchTag; + this.threshold = threshold; + this.elementSizer = elementSizer; + } + + @StartBundle + public void startBundle() { + buffer = new ArrayList<>(); + receiver = null; + bundleSize = 0L; + } + + @ProcessElement + public void processElement(@Element T element, MultiOutputReceiver mor) { + if (receiver == null) { + receiver = mor; + } + checkArgumentNotNull(buffer, "Buffer should be set by startBundle."); + buffer.add(element); + bundleSize += elementSizer.apply(element); + } + + @FinishBundle + public void finishBundle() { + checkArgumentNotNull(buffer, "Buffer should be set by startBundle."); + if (buffer.isEmpty()) { + return; + } + + TupleTag<T> targetTag; + + // Select the target tag based on the bundle size + if (bundleSize < threshold) { + targetTag = smallBatchTag; + LOG.debug("Emitting {} elements to small tag: '{}'", bundleSize, targetTag.getId()); + } else { + targetTag = largeBatchTag; + LOG.debug("Emitting {} elements to large tag: '{}'", bundleSize, targetTag.getId()); + } + + checkArgumentNotNull(receiver, "Receiver should be set by startBundle."); + OutputReceiver<T> taggedOutput = receiver.get(targetTag); + + for (T element : buffer) { + taggedOutput.output(element); + } Review Comment: Are bundles guaranteed to stay the same between transforms? e.g. say we detected a large bundle and are flushing the rows down to WriteDirectRowsToFiles. Will the bundle of rows stay the same when it reaches that transform? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
