stevenzwu commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1038246186


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.enumerator;
+
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.calcite.shaded.com.google.common.collect.EvictingQueue;
+
+/**
+ * This enumeration history is used for split discovery throttling. It wraps 
Guava {@link
+ * EvictingQueue} to provide thread safety.
+ */
+@ThreadSafe
+class EnumerationHistory {
+
+  // EvictingQueue is not thread safe.
+  private final EvictingQueue<Integer> enumerationSplitCountHistory;
+
+  EnumerationHistory(int maxHistorySize) {
+    this.enumerationSplitCountHistory = EvictingQueue.create(maxHistorySize);
+  }
+
+  /** Add the split count from the last enumeration result. */
+  synchronized void add(int splitCount) {
+    enumerationSplitCountHistory.add(splitCount);
+  }
+
+  /** @return true if split discovery should pause because assigner has too 
many splits already. */
+  synchronized boolean shouldPauseSplitDiscovery(int 
pendingSplitCountFromAssigner) {
+    if (enumerationSplitCountHistory.remainingCapacity() > 0) {
+      // only check throttling when full history is obtained.

Review Comment:
   > We might have a big file where the read takes a while (or short 
checkpointing period), and we face an error at the end of the file
   
   This is orthogonal to the split discovery throttling feature in this PR. It 
is about processing semantics / duplicates upon failure recovery. Flink can 
achieve exactly-once processing semantics regarding state consistency. Whether 
it is e2e exactly-once semantics depends on sink (transactional or not). For 
Iceberg sink (with transactional commit), recovery won't cause duplicates in 
the sink Iceberg table.
   
   > here is a pathological scenario. Job was in a restart loop with successful 
checkpoints (like 1 or 2) btw restart. This would essentially bypass the 
throttling check. Potentially the tracked splits can keep growing. That is the 
small concern I has regarding not checkpointing. The concern is small, because 
this scenario is very unusual. Hence, I didn't implement the checkpointing of 
enumeration history (for throttling purpose).
   
   Whether we checkpoint the enumeration history or not really only affects the 
throttling feature in this PR. I can only think of the above pathological 
scenario where not checkpointing can be a potential problem. But again, it is a 
unusual scenario but it could happen in theory.
   
   We can add the checkpointing here without breaking the compatibility as the 
`IcebergEnumeratorStateSerializer` is versioned. We can just bump up the 
version and handle both the old and new versions.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to