tomstepp commented on code in PR #36720: URL: https://github.com/apache/beam/pull/36720#discussion_r2548171993
########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A PTransform that buffers elements and outputs them to one of two TupleTags based on the total + * size of the bundle in finish_bundle. + * + * <p>This is the Java equivalent of the BundleLifter PTransform in Python. + * + * @param <T> The type of elements in the input PCollection. + */ +public class BundleLifter<T> extends PTransform<PCollection<T>, PCollectionTuple> { + + final TupleTag<T> smallBatchTag; + final TupleTag<T> largeBatchTag; + final int threshold; + final SerializableFunction<T, Integer> elementSizer; + + /** + * A private, static DoFn that buffers elements within a bundle and outputs them to different tags + * in finish_bundle based on the total bundle size. + * + * <p>This is the Java equivalent of the _BundleLiftDoFn in Python, now merged inside the + * PTransform. + * + * @param <T> The type of elements being processed. + */ + private static class BundleLiftDoFn<T> extends DoFn<T, Void> { + private static final Logger LOG = LoggerFactory.getLogger(BundleLiftDoFn.class); + + final TupleTag<T> smallBatchTag; + final TupleTag<T> largeBatchTag; + final int threshold; + final SerializableFunction<T, Integer> elementSizer; + + private transient @MonotonicNonNull List<T> buffer; + private transient long bundleSize; + private transient @Nullable MultiOutputReceiver receiver; + + BundleLiftDoFn( + TupleTag<T> smallBatchTag, + TupleTag<T> largeBatchTag, + int threshold, + SerializableFunction<T, Integer> elementSizer) { + this.smallBatchTag = smallBatchTag; + this.largeBatchTag = largeBatchTag; + this.threshold = threshold; + this.elementSizer = elementSizer; + } + + @StartBundle + public void startBundle() { + buffer = new ArrayList<>(); + receiver = null; + bundleSize = 0L; + } + + @ProcessElement + public void processElement(@Element T element, MultiOutputReceiver mor) { + if (receiver == null) { + receiver = mor; + } + checkArgumentNotNull(buffer, "Buffer should be set by startBundle."); + buffer.add(element); + bundleSize += elementSizer.apply(element); + } + + @FinishBundle + public void finishBundle() { + checkArgumentNotNull(buffer, "Buffer should be set by startBundle."); + if (buffer.isEmpty()) { + return; + } + + TupleTag<T> targetTag; + + // Select the target tag based on the bundle size + if (bundleSize < threshold) { + targetTag = smallBatchTag; + LOG.debug("Emitting {} elements to small tag: '{}'", bundleSize, targetTag.getId()); Review Comment: I clarified this by logging both element count and byte size estimate -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
