tomstepp commented on code in PR #36720:
URL: https://github.com/apache/beam/pull/36720#discussion_r2550380629


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A PTransform that buffers elements and outputs them to one of two TupleTags 
based on the total
+ * size of the bundle in finish_bundle.
+ *
+ * @param <T> The type of elements in the input PCollection.
+ */
+public class BundleLifter<T> extends PTransform<PCollection<T>, 
PCollectionTuple> {
+
+  final TupleTag<T> smallBatchTag;
+  final TupleTag<T> largeBatchTag;
+  final int threshold;
+  final SerializableFunction<T, Integer> elementSizer;
+
+  /**
+   * A private, static DoFn that buffers elements within a bundle and outputs 
them to different tags
+   * in finish_bundle based on the total bundle size.
+   *
+   * @param <T> The type of elements being processed.
+   */
+  private static class BundleLiftDoFn<T> extends DoFn<T, Void> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BundleLiftDoFn.class);
+
+    final TupleTag<T> smallBatchTag;
+    final TupleTag<T> largeBatchTag;
+    final int threshold;
+    final SerializableFunction<T, Integer> elementSizer;
+
+    private transient @MonotonicNonNull List<T> buffer;
+    private transient long bundleSizeBytes;
+    private transient @Nullable MultiOutputReceiver receiver;
+
+    BundleLiftDoFn(
+        TupleTag<T> smallBatchTag,
+        TupleTag<T> largeBatchTag,
+        int threshold,
+        SerializableFunction<T, Integer> elementSizer) {
+      this.smallBatchTag = smallBatchTag;
+      this.largeBatchTag = largeBatchTag;
+      this.threshold = threshold;
+      this.elementSizer = elementSizer;
+    }
+
+    @StartBundle
+    public void startBundle() {
+      buffer = new ArrayList<>();
+      receiver = null;
+      bundleSizeBytes = 0L;
+    }
+
+    @ProcessElement
+    public void processElement(@Element T element, MultiOutputReceiver mor) {
+      if (receiver == null) {
+        receiver = mor;
+      }
+      checkArgumentNotNull(buffer, "Buffer should be set by startBundle.");
+      buffer.add(element);
+      bundleSizeBytes += elementSizer.apply(element);

Review Comment:
   I see what you mean. I think this requires:
   1. Flush current buffer to large tag
   2. Track that we already did a partial output for the bundle, so we also 
output future elements to the same large tag.
   
   This doesn't seem to optimize memory though since the elements will just be 
occupying the same amount of memory buffering to call next transform after the 
current bundle finishes. In that case I think leaving as is would be simpler 
without missing out on optimizations. Wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to