Reo-LEI commented on a change in pull request #3120:
URL: https://github.com/apache/iceberg/pull/3120#discussion_r719302947



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -123,6 +123,9 @@ private TableProperties() {
   public static final String SPLIT_OPEN_FILE_COST = 
"read.split.open-file-cost";
   public static final long SPLIT_OPEN_FILE_COST_DEFAULT = 4 * 1024 * 1024; // 
4MB
 
+  public static final String READ_DELETE_FILES_WORKER_POOL_SIZE = 
"read.deletes.num-threads";
+  public static final int READ_DELETE_FILES_WORKER_POOL_SIZE_DEFAULT = 1;  // 
read delete files in serial.

Review comment:
       I admit that this configuration may not be appropriate as a table 
property. But I think this optimization should apply to all engines, not only 
flink. Such as I will sync mysql cdc data to iceberg by flink and rewrite and 
merge the delete files into data files by spark. Maybe we chould config this 
through `SystemProperties` like we config `iceberg.worker.num-threads`.
   
https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/SystemProperties.java#L35

##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -108,6 +123,15 @@ protected long pos(T record) {
     return (Long) posAccessor.get(asStructLike(record));
   }
 
+  private ExecutorService readDeletesService(int workerPoolSize) {
+    return MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(
+            workerPoolSize,
+            new ThreadFactoryBuilder()
+                .setNameFormat("Read-delete-Service-%d")
+                .build()));
+  }

Review comment:
       I think we chould create a custom executor and set `corePoolSize = 0`, 
`maximumPoolSize = poolSzie`, `workQueue = new LinkedBlockingQueue<Runnable>()` 
and let executor shutdown automatically. Otherwise, we need to call `shutdown` 
in `filter` or add `close` metod and call by `DeleterFilter` caller.

##########
File path: core/src/main/java/org/apache/iceberg/deletes/Deletes.java
##########
@@ -77,8 +77,9 @@ private Deletes() {
     return filter.filter(rows);
   }
 
-  public static StructLikeSet toEqualitySet(CloseableIterable<StructLike> 
eqDeletes, Types.StructType eqType) {
-    try (CloseableIterable<StructLike> deletes = eqDeletes) {
+  public static <T extends StructLike> StructLikeSet 
toEqualitySet(CloseableIterable<T> eqDeletes,
+                                                                   
Types.StructType eqType) {
+    try (CloseableIterable<T> deletes = eqDeletes) {

Review comment:
       To avoid to transform `Record` to `StructLike` as your comment 
https://github.com/apache/iceberg/pull/3120#discussion_r711824192.

##########
File path: core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
##########
@@ -128,9 +127,9 @@ public long targetSplitSize() {
   }
 
   private static CloseableIterable<ManifestFile> 
allDataManifestFiles(List<Snapshot> snapshots) {
-    try (CloseableIterable<ManifestFile> iterable = new ParallelIterable<>(
-        Iterables.transform(snapshots, snapshot -> (Iterable<ManifestFile>) () 
-> snapshot.dataManifests().iterator()),
-        ThreadPools.getWorkerPool())) {
+    try (CloseableIterable<ManifestFile> iterable = CloseableIterable.combine(
+        Iterables.transform(snapshots, snapshot -> 
CloseableIterable.withNoopClose(snapshot.dataManifests())),

Review comment:
       I think I can't avoid adding this, becasue `combine` will call `concat` 
when `workerPool` is null, and `concat` receive 
`Iterable<CloseableIterable<E>>` but not `Iterable<Iterable<E>>`. So I need to 
add a noop close to wrap it and pass to `combine`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to