This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3812847f6 [core] Optimize partitions table to avoid OOM (#3258)
3812847f6 is described below

commit 3812847f674b4a424a547f3f1c2bd7c7db26eb62
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Apr 25 17:26:01 2024 +0800

    [core] Optimize partitions table to avoid OOM (#3258)
---
 .../org/apache/paimon/manifest/PartitionEntry.java | 136 ++++++++++++++
 .../paimon/operation/AbstractFileStoreScan.java    |  29 +++
 .../org/apache/paimon/operation/FileStoreScan.java |   3 +
 .../table/source/snapshot/SnapshotReader.java      |   3 +
 .../table/source/snapshot/SnapshotReaderImpl.java  |  12 +-
 .../apache/paimon/table/system/AuditLogTable.java  |   6 +
 .../paimon/table/system/PartitionsTable.java       | 206 +++------------------
 .../apache/paimon/utils/ScanParallelExecutor.java  |  14 +-
 .../apache/paimon/flink/CatalogTableITCase.java    | 119 ++++++++----
 9 files changed, 304 insertions(+), 224 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
new file mode 100644
index 000000000..191621e46
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import org.apache.paimon.data.BinaryRow;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.paimon.manifest.FileKind.DELETE;
+
+/** Entry representing a partition. */
+public class PartitionEntry {
+
+    private final BinaryRow partition;
+    private final long recordCount;
+    private final long fileSizeInBytes;
+    private final long fileCount;
+    private final long lastFileCreationTime;
+
+    public PartitionEntry(
+            BinaryRow partition,
+            long recordCount,
+            long fileSizeInBytes,
+            long fileCount,
+            long lastFileCreationTime) {
+        this.partition = partition;
+        this.recordCount = recordCount;
+        this.fileSizeInBytes = fileSizeInBytes;
+        this.fileCount = fileCount;
+        this.lastFileCreationTime = lastFileCreationTime;
+    }
+
+    public BinaryRow partition() {
+        return partition;
+    }
+
+    public long recordCount() {
+        return recordCount;
+    }
+
+    public long fileSizeInBytes() {
+        return fileSizeInBytes;
+    }
+
+    public long fileCount() {
+        return fileCount;
+    }
+
+    public long lastFileCreationTime() {
+        return lastFileCreationTime;
+    }
+
+    public PartitionEntry merge(PartitionEntry entry) {
+        return new PartitionEntry(
+                partition,
+                recordCount + entry.recordCount,
+                fileSizeInBytes + entry.fileSizeInBytes,
+                fileCount + entry.fileCount,
+                Math.max(lastFileCreationTime, entry.lastFileCreationTime));
+    }
+
+    public static PartitionEntry fromManifestEntry(ManifestEntry entry) {
+        long recordCount = entry.file().rowCount();
+        long fileSizeInBytes = entry.file().fileSize();
+        long fileCount = 1;
+        if (entry.kind() == DELETE) {
+            recordCount = -recordCount;
+            fileSizeInBytes = -fileSizeInBytes;
+            fileCount = -fileCount;
+        }
+        return new PartitionEntry(
+                entry.partition(),
+                recordCount,
+                fileSizeInBytes,
+                fileCount,
+                entry.file().creationTimeEpochMillis());
+    }
+
+    public static Collection<PartitionEntry> merge(Collection<ManifestEntry> 
fileEntries) {
+        Map<BinaryRow, PartitionEntry> partitions = new HashMap<>();
+        for (ManifestEntry entry : fileEntries) {
+            PartitionEntry partitionEntry = fromManifestEntry(entry);
+            partitions.compute(
+                    entry.partition(),
+                    (part, old) -> old == null ? partitionEntry : 
old.merge(partitionEntry));
+        }
+        return partitions.values();
+    }
+
+    public static void merge(Collection<PartitionEntry> from, Map<BinaryRow, 
PartitionEntry> to) {
+        for (PartitionEntry entry : from) {
+            to.compute(entry.partition(), (part, old) -> old == null ? entry : 
old.merge(entry));
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        PartitionEntry that = (PartitionEntry) o;
+        return recordCount == that.recordCount
+                && fileSizeInBytes == that.fileSizeInBytes
+                && fileCount == that.fileCount
+                && lastFileCreationTime == that.lastFileCreationTime
+                && Objects.equals(partition, that.partition);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                partition, recordCount, fileSizeInBytes, fileCount, 
lastFileCreationTime);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index a911b0847..f4b9338ff 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -27,6 +27,7 @@ import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.manifest.SimpleFileEntry;
 import org.apache.paimon.operation.metrics.ScanMetrics;
 import org.apache.paimon.operation.metrics.ScanStats;
@@ -50,8 +51,11 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -251,6 +255,31 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         return new ArrayList<>(mergedEntries);
     }
 
+    @Override
+    public List<PartitionEntry> readPartitionEntries() {
+        List<ManifestFileMeta> manifests = readManifests().getRight();
+        Map<BinaryRow, PartitionEntry> partitions = new ConcurrentHashMap<>();
+        // Don't need to use parallelismBatchIterable here
+        // Can be executed in disorder
+        ForkJoinPool executePool = 
ScanParallelExecutor.getExecutePool(scanManifestParallelism);
+        List<ForkJoinTask<?>> tasks = new ArrayList<>();
+        for (ManifestFileMeta manifest : manifests) {
+            ForkJoinTask<?> task =
+                    executePool.submit(
+                            () ->
+                                    PartitionEntry.merge(
+                                            
PartitionEntry.merge(readManifestFileMeta(manifest)),
+                                            partitions));
+            tasks.add(task);
+        }
+        for (ForkJoinTask<?> task : tasks) {
+            task.join();
+        }
+        return partitions.values().stream()
+                .filter(p -> p.fileCount() > 0)
+                .collect(Collectors.toList());
+    }
+
     private Pair<Snapshot, List<ManifestEntry>> doPlan() {
         long started = System.nanoTime();
         Pair<Snapshot, List<ManifestFileMeta>> snapshotListPair = 
readManifests();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index dbed15bd3..a963e8d42 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -25,6 +25,7 @@ import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.manifest.SimpleFileEntry;
 import org.apache.paimon.operation.metrics.ScanMetrics;
 import org.apache.paimon.partition.PartitionPredicate;
@@ -80,6 +81,8 @@ public interface FileStoreScan {
      */
     List<SimpleFileEntry> readSimpleEntries();
 
+    List<PartitionEntry> readPartitionEntries();
+
     /** Result plan of this scan. */
     interface Plan {
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 9ccb4c49f..bf59a01e7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.source.snapshot;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.DataSplit;
@@ -76,6 +77,8 @@ public interface SnapshotReader {
     /** Get partitions from a snapshot. */
     List<BinaryRow> partitions();
 
+    List<PartitionEntry> partitionEntries();
+
     /** Result plan of this scan. */
     interface Plan extends TableScan.Plan {
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index b21010469..205dfb3a1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -29,7 +29,7 @@ import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileKind;
-import org.apache.paimon.manifest.SimpleFileEntry;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.operation.FileStoreScan;
@@ -309,12 +309,16 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
 
     @Override
     public List<BinaryRow> partitions() {
-        return scan.readSimpleEntries().stream()
-                .map(SimpleFileEntry::partition)
-                .distinct()
+        return scan.readPartitionEntries().stream()
+                .map(PartitionEntry::partition)
                 .collect(Collectors.toList());
     }
 
+    @Override
+    public List<PartitionEntry> partitionEntries() {
+        return scan.readPartitionEntries();
+    }
+
     @Override
     public Plan readChanges() {
         withMode(ScanMode.DELTA);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 59ceed137..ef47e1209 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Predicate;
@@ -298,6 +299,11 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         public List<BinaryRow> partitions() {
             return snapshotReader.partitions();
         }
+
+        @Override
+        public List<PartitionEntry> partitionEntries() {
+            return snapshotReader.partitionEntries();
+        }
     }
 
     private class AuditLogBatchScan implements InnerTableScan {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
index 4f1899394..c08cdfaa3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
@@ -21,16 +21,14 @@ package org.apache.paimon.table.system;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.LazyGenericRow;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.ReadonlyTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadOnceTableScan;
@@ -54,14 +52,9 @@ import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Objects;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
 
 import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
 
@@ -104,7 +97,7 @@ public class PartitionsTable implements ReadonlyTable {
 
     @Override
     public InnerTableScan newScan() {
-        return new PartitionsScan(storeTable);
+        return new PartitionsScan();
     }
 
     @Override
@@ -119,12 +112,6 @@ public class PartitionsTable implements ReadonlyTable {
 
     private static class PartitionsScan extends ReadOnceTableScan {
 
-        private final FileStoreTable storeTable;
-
-        private PartitionsScan(FileStoreTable storeTable) {
-            this.storeTable = storeTable;
-        }
-
         @Override
         public InnerTableScan withFilter(Predicate predicate) {
             // TODO
@@ -133,9 +120,7 @@ public class PartitionsTable implements ReadonlyTable {
 
         @Override
         public Plan innerPlan() {
-            return () ->
-                    Collections.singletonList(
-                            new 
PartitionsSplit(storeTable.newScan().plan().splits()));
+            return () -> Collections.singletonList(new PartitionsSplit());
         }
     }
 
@@ -143,22 +128,9 @@ public class PartitionsTable implements ReadonlyTable {
 
         private static final long serialVersionUID = 1L;
 
-        private final List<Split> splits;
-
-        private PartitionsSplit(List<Split> splits) {
-            this.splits = splits;
-        }
-
         @Override
         public long rowCount() {
-            return splits.stream()
-                    .map(s -> ((DataSplit) s).partition())
-                    .collect(Collectors.toSet())
-                    .size();
-        }
-
-        private List<Split> splits() {
-            return splits;
+            return 1;
         }
 
         @Override
@@ -166,16 +138,12 @@ public class PartitionsTable implements ReadonlyTable {
             if (this == o) {
                 return true;
             }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            PartitionsSplit that = (PartitionsSplit) o;
-            return Objects.equals(splits, that.splits);
+            return o != null && getClass() == o.getClass();
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(splits);
+            return 1;
         }
     }
 
@@ -211,155 +179,41 @@ public class PartitionsTable implements ReadonlyTable {
             if (!(split instanceof PartitionsSplit)) {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
-            PartitionsSplit filesSplit = (PartitionsSplit) split;
-            if (filesSplit.splits().isEmpty()) {
-                return new IteratorRecordReader<>(Collections.emptyIterator());
-            }
-            List<Iterator<InternalRow>> iteratorList = new ArrayList<>();
-            RowDataToObjectArrayConverter partitionConverter =
+
+            List<PartitionEntry> partitions = 
fileStoreTable.newSnapshotReader().partitionEntries();
+
+            RowDataToObjectArrayConverter converter =
                     new RowDataToObjectArrayConverter(
                             fileStoreTable.schema().logicalPartitionType());
 
-            for (Split dataSplit : filesSplit.splits()) {
-                iteratorList.add(
-                        Iterators.transform(
-                                ((DataSplit) dataSplit).dataFiles().iterator(),
-                                file -> toRow((DataSplit) dataSplit, 
partitionConverter, file)));
+            List<InternalRow> results = new ArrayList<>(partitions.size());
+            for (PartitionEntry entry : partitions) {
+                results.add(toRow(entry, converter));
             }
-            Iterator<InternalRow> rows = 
Iterators.concat(iteratorList.iterator());
-            // Group by partition and sum the others
-            Iterator<InternalRow> resultRows = groupAndSum(rows);
 
+            Iterator<InternalRow> iterator = results.iterator();
             if (projection != null) {
-                resultRows =
+                iterator =
                         Iterators.transform(
-                                resultRows, row -> 
ProjectedRow.from(projection).replaceRow(row));
+                                iterator, row -> 
ProjectedRow.from(projection).replaceRow(row));
             }
-
-            return new IteratorRecordReader<>(resultRows);
+            return new IteratorRecordReader<>(iterator);
         }
 
-        private LazyGenericRow toRow(
-                DataSplit dataSplit,
-                RowDataToObjectArrayConverter partitionConverter,
-                DataFileMeta dataFileMeta) {
-
+        private GenericRow toRow(
+                PartitionEntry entry, RowDataToObjectArrayConverter 
partitionConverter) {
             BinaryString partitionId =
-                    dataSplit.partition() == null
-                            ? null
-                            : BinaryString.fromString(
-                                    Arrays.toString(
-                                            
partitionConverter.convert(dataSplit.partition())));
-            @SuppressWarnings("unchecked")
-            Supplier<Object>[] fields =
-                    new Supplier[] {
-                        () -> partitionId,
-                        dataFileMeta::rowCount,
-                        dataFileMeta::fileSize,
-                        dataFileMeta::creationTimeEpochMillis
-                    };
-
-            return new LazyGenericRow(fields);
-        }
-    }
-
-    public static Iterator<InternalRow> groupAndSum(Iterator<InternalRow> 
rows) {
-        return new GroupedIterator(rows);
-    }
-
-    /** group by partition and sum the recordCount and fileBytes . */
-    static class GroupedIterator implements Iterator<InternalRow> {
-        private final Iterator<InternalRow> rows;
-        private final Map<BinaryString, Partition> groupedData;
-        private Iterator<Partition> resultIterator;
-
-        public GroupedIterator(Iterator<InternalRow> rows) {
-            this.rows = rows;
-            this.groupedData = new HashMap<>();
-            groupAndSum();
-        }
-
-        private void groupAndSum() {
-            while (rows.hasNext()) {
-                InternalRow row = rows.next();
-                BinaryString partitionId = row.getString(0);
-                long recordCount = row.getLong(1);
-                long fileSizeInBytes = row.getLong(2);
-                long lastFileCreationTime = row.getLong(3);
-
-                // Grouping and summing
-                Partition rowData =
-                        groupedData.computeIfAbsent(
-                                partitionId, key -> new Partition(partitionId, 
0, 0, 0, -1));
-                rowData.recordCount += recordCount;
-                rowData.fileSizeInBytes += fileSizeInBytes;
-                rowData.fileCount++;
-                rowData.lastFileCreationTime =
-                        Math.max(rowData.lastFileCreationTime, 
lastFileCreationTime);
-            }
-            resultIterator = groupedData.values().iterator();
-        }
-
-        @Override
-        public boolean hasNext() {
-            return resultIterator.hasNext();
-        }
-
-        @Override
-        public InternalRow next() {
-            if (hasNext()) {
-                Partition partition = resultIterator.next();
-                return GenericRow.of(
-                        partition.partition,
-                        partition.recordCount,
-                        partition.fileSizeInBytes,
-                        partition.fileCount,
-                        Timestamp.fromLocalDateTime(
-                                LocalDateTime.ofInstant(
-                                        
Instant.ofEpochMilli(partition.lastFileCreationTime),
-                                        ZoneId.systemDefault())));
-            } else {
-                throw new NoSuchElementException("No more elements in the 
iterator.");
-            }
-        }
-    }
-
-    static class Partition {
-        private final BinaryString partition;
-        private long recordCount;
-        private long fileSizeInBytes;
-
-        private long fileCount;
-
-        private long lastFileCreationTime;
-
-        Partition(
-                BinaryString partition,
-                long recordCount,
-                long fileSizeInBytes,
-                long fileCount,
-                long lastFileCreationTime) {
-            this.partition = partition;
-            this.recordCount = recordCount;
-            this.fileSizeInBytes = fileSizeInBytes;
-            this.lastFileCreationTime = lastFileCreationTime;
-            this.fileCount = fileCount;
-        }
-
-        public long recordCount() {
-            return recordCount;
-        }
-
-        public long fileSize() {
-            return fileSizeInBytes;
-        }
-
-        public long getFileCount() {
-            return fileCount;
-        }
-
-        public long getLastFileCreationTime() {
-            return lastFileCreationTime;
+                    BinaryString.fromString(
+                            
Arrays.toString(partitionConverter.convert(entry.partition())));
+            return GenericRow.of(
+                    partitionId,
+                    entry.recordCount(),
+                    entry.fileSizeInBytes(),
+                    entry.fileCount(),
+                    Timestamp.fromLocalDateTime(
+                            LocalDateTime.ofInstant(
+                                    
Instant.ofEpochMilli(entry.lastFileCreationTime()),
+                                    ZoneId.systemDefault())));
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java
index 1299b618b..a29be0540 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java
@@ -31,6 +31,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ForkJoinPool;
 import java.util.function.Function;
 
+import static org.apache.paimon.utils.FileUtils.COMMON_IO_FORK_JOIN_POOL;
+
 /**
  * This class is a parallel execution util class, which mainly aim to process 
tasks parallelly with
  * memory control.
@@ -40,7 +42,7 @@ public class ScanParallelExecutor {
     // reduce memory usage by batch iterable process, the cached result in 
memory will be queueSize
     public static <T, U> Iterable<T> parallelismBatchIterable(
             Function<List<U>, List<T>> processor, List<U> input, @Nullable 
Integer queueSize) {
-        ForkJoinPool poolCandidate = FileUtils.COMMON_IO_FORK_JOIN_POOL;
+        ForkJoinPool poolCandidate = COMMON_IO_FORK_JOIN_POOL;
         if (queueSize == null) {
             queueSize = poolCandidate.getParallelism();
         } else if (queueSize <= 0) {
@@ -88,9 +90,13 @@ public class ScanParallelExecutor {
                 };
     }
 
-    private static ForkJoinPool getExecutePool(int queueSize) {
-        return queueSize > FileUtils.COMMON_IO_FORK_JOIN_POOL.getParallelism()
+    public static ForkJoinPool getExecutePool(@Nullable Integer queueSize) {
+        if (queueSize == null) {
+            return COMMON_IO_FORK_JOIN_POOL;
+        }
+
+        return queueSize > COMMON_IO_FORK_JOIN_POOL.getParallelism()
                 ? FileUtils.getScanIoForkJoinPool(queueSize)
-                : FileUtils.COMMON_IO_FORK_JOIN_POOL;
+                : COMMON_IO_FORK_JOIN_POOL;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 5fd42cd3a..6b167c4e7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -34,6 +34,7 @@ import org.junit.jupiter.api.Test;
 import javax.annotation.Nonnull;
 
 import java.util.List;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
@@ -438,16 +439,19 @@ public class CatalogTableITCase extends CatalogITCaseBase 
{
         sql("INSERT INTO PartitionTable select 2,2,'b','2020-01-02','11'");
         sql("INSERT INTO PartitionTable select 3,3,'c','2020-01-03','11'");
         List<Row> result = sql("SHOW PARTITIONS PartitionTable");
-        assertThat(result.toString())
-                .isEqualTo(
-                        "[+I[dt=2020-01-01/hh=10], +I[dt=2020-01-02/hh=11], 
+I[dt=2020-01-03/hh=11]]");
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of("dt=2020-01-01/hh=10"),
+                        Row.of("dt=2020-01-02/hh=11"),
+                        Row.of("dt=2020-01-03/hh=11"));
 
         result = sql("SHOW PARTITIONS PartitionTable partition (hh='11')");
-        assertThat(result.toString())
-                .isEqualTo("[+I[dt=2020-01-02/hh=11], 
+I[dt=2020-01-03/hh=11]]");
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of("dt=2020-01-02/hh=11"), 
Row.of("dt=2020-01-03/hh=11"));
 
         result = sql("SHOW PARTITIONS PartitionTable partition 
(dt='2020-01-02', hh='11')");
-        assertThat(result.toString()).isEqualTo("[+I[dt=2020-01-02/hh=11]]");
+        
assertThat(result).containsExactlyInAnyOrder(Row.of("dt=2020-01-02/hh=11"));
     }
 
     @Test
@@ -482,21 +486,28 @@ public class CatalogTableITCase extends CatalogITCaseBase 
{
                 .isEqualTo("[+I[OK]]");
 
         List<Row> result = sql("SHOW PARTITIONS PartitionTable");
-        assertThat(result.toString())
-                .isEqualTo(
-                        "[+I[dt=2020-01-01/hh=10], +I[dt=2020-01-02/hh=11], 
+I[dt=2020-01-03/hh=11], +I[dt=2020-01-04/hh=14], +I[dt=2020-01-05/hh=15]]");
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of("dt=2020-01-01/hh=10"),
+                        Row.of("dt=2020-01-02/hh=11"),
+                        Row.of("dt=2020-01-03/hh=11"),
+                        Row.of("dt=2020-01-04/hh=14"),
+                        Row.of("dt=2020-01-05/hh=15"));
 
         // drop a partition
         sql("ALTER TABLE PartitionTable DROP PARTITION (`dt` = '2020-01-01', 
`hh` = '10')");
         result = sql("SHOW PARTITIONS PartitionTable");
-        assertThat(result.toString())
-                .isEqualTo(
-                        "[+I[dt=2020-01-02/hh=11], +I[dt=2020-01-03/hh=11], 
+I[dt=2020-01-04/hh=14], +I[dt=2020-01-05/hh=15]]");
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of("dt=2020-01-02/hh=11"),
+                        Row.of("dt=2020-01-03/hh=11"),
+                        Row.of("dt=2020-01-04/hh=14"),
+                        Row.of("dt=2020-01-05/hh=15"));
 
         // drop two partitions
         sql("ALTER TABLE PartitionTable DROP PARTITION (dt ='2020-01-04'), 
PARTITION (hh='11')");
         result = sql("SHOW PARTITIONS PartitionTable");
-        assertThat(result.toString()).isEqualTo("[+I[dt=2020-01-05/hh=15]]");
+        assertThat(result).containsExactly(Row.of("dt=2020-01-05/hh=15"));
     }
 
     @Test
@@ -716,34 +727,62 @@ public class CatalogTableITCase extends CatalogITCaseBase 
{
 
     @Test
     public void testPartitionsTable() {
-        sql(
-                "CREATE TABLE T_WITH_KEY (a INT, p INT, b BIGINT, c STRING, 
PRIMARY KEY (a, p) NOT ENFORCED) "
-                        + "PARTITIONED BY (p)");
-        assertFilesTable("T_WITH_KEY");
-
-        sql(
-                "CREATE TABLE T_APPEND_ONLY (a INT, p INT, b BIGINT, c STRING) 
"
-                        + "PARTITIONED BY (p)");
-        assertPartitionsTable("T_APPEND_ONLY");
-    }
-
-    private void assertPartitionsTable(String tableName) {
-        assertThat(sql(String.format("SELECT * FROM %s$partitions", 
tableName))).isEmpty();
-        sql(String.format("INSERT INTO %s VALUES (3, 1, 4, 'S2'), (1, 2, 2, 
'S1')", tableName));
-        sql(String.format("INSERT INTO %s VALUES (3, 1, 4, 'S3'), (1, 2, 2, 
'S4')", tableName));
-        List<Row> rows1 = sql(String.format("SELECT * FROM %s$partitions", 
tableName));
-        for (Row row : rows1) {
-            assertThat((String) row.getField(0)).containsAnyOf("[1]", "[2]");
-            assertThat((long) row.getField(2)).isGreaterThan(0L); // check 
file size
-        }
-
-        sql(String.format("INSERT INTO %s VALUES (3, 4, 4, 'S3'), (1, 3, 2, 
'S4')", tableName));
-        sql(String.format("INSERT INTO %s VALUES (3, 1, 4, 'S3'), (1, 2, 2, 
'S4')", tableName));
+        String table = "PARTITIONS_TABLE";
+        sql("CREATE TABLE %s (a INT, p INT, b BIGINT, c STRING) " + 
"PARTITIONED BY (p)", table);
+
+        // assert empty
+        assertThat(sql("SELECT * FROM %s$partitions", table)).isEmpty();
+
+        // Convert to another Row to avoid timestamp diff
+        Function<List<Row>, List<Row>> convert =
+                rows ->
+                        rows.stream()
+                                .map(
+                                        r ->
+                                                Row.of(
+                                                        r.getField(0),
+                                                        r.getField(1),
+                                                        r.getField(2),
+                                                        r.getField(3)))
+                                .collect(Collectors.toList());
+
+        // assert new partitions
+        sql("INSERT INTO %s VALUES (3, 1, 4, 'S2'), (1, 2, 2, 'S1'), (1, 2, 2, 
'S1')", table);
+        sql("INSERT INTO %s VALUES (3, 1, 4, 'S3'), (1, 2, 2, 'S4')", table);
+        List<Row> result = sql("SELECT * FROM %s$partitions", table);
+        assertThat(convert.apply(result))
+                .containsExactlyInAnyOrder(
+                        Row.of("[1]", 2L, 910L, 2L), Row.of("[2]", 3L, 879L, 
2L));
 
-        List<Row> rows2 = sql(String.format("SELECT * FROM %s$partitions", 
tableName));
-        for (Row row : rows2) {
-            assertThat((String) row.getField(0)).containsAnyOf("[1]", "[2]", 
"[3]", "[4]");
-        }
+        // assert new files in partition
+        sql("INSERT INTO %s VALUES (3, 4, 4, 'S3'), (1, 3, 2, 'S4')", table);
+        sql("INSERT INTO %s VALUES (3, 1, 4, 'S3'), (1, 2, 2, 'S4')", table);
+        result = sql(String.format("SELECT * FROM %s$partitions", table));
+        assertThat(convert.apply(result))
+                .containsExactlyInAnyOrder(
+                        Row.of("[1]", 3L, 1365L, 3L),
+                        Row.of("[2]", 4L, 1317L, 3L),
+                        Row.of("[3]", 1L, 453L, 1L),
+                        Row.of("[4]", 1L, 440L, 1L));
+
+        // assert delete partitions
+        sql("ALTER TABLE %s DROP PARTITION (p = 2)", table);
+        result = sql(String.format("SELECT * FROM %s$partitions", table));
+        assertThat(convert.apply(result))
+                .containsExactlyInAnyOrder(
+                        Row.of("[1]", 3L, 1365L, 3L),
+                        Row.of("[3]", 1L, 453L, 1L),
+                        Row.of("[4]", 1L, 440L, 1L));
+
+        // add new file to p 2
+        sql("INSERT INTO %s VALUES (1, 2, 2, 'S1')", table);
+        result = sql(String.format("SELECT * FROM %s$partitions", table));
+        assertThat(convert.apply(result))
+                .containsExactlyInAnyOrder(
+                        Row.of("[1]", 3L, 1365L, 3L),
+                        Row.of("[2]", 1L, 438L, 1L),
+                        Row.of("[3]", 1L, 453L, 1L),
+                        Row.of("[4]", 1L, 440L, 1L));
     }
 
     @Test


Reply via email to