This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3812847f6 [core] Optimize partitions table to avoid OOM (#3258)
3812847f6 is described below
commit 3812847f674b4a424a547f3f1c2bd7c7db26eb62
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Apr 25 17:26:01 2024 +0800
[core] Optimize partitions table to avoid OOM (#3258)
---
.../org/apache/paimon/manifest/PartitionEntry.java | 136 ++++++++++++++
.../paimon/operation/AbstractFileStoreScan.java | 29 +++
.../org/apache/paimon/operation/FileStoreScan.java | 3 +
.../table/source/snapshot/SnapshotReader.java | 3 +
.../table/source/snapshot/SnapshotReaderImpl.java | 12 +-
.../apache/paimon/table/system/AuditLogTable.java | 6 +
.../paimon/table/system/PartitionsTable.java | 206 +++------------------
.../apache/paimon/utils/ScanParallelExecutor.java | 14 +-
.../apache/paimon/flink/CatalogTableITCase.java | 119 ++++++++----
9 files changed, 304 insertions(+), 224 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
new file mode 100644
index 000000000..191621e46
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import org.apache.paimon.data.BinaryRow;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.paimon.manifest.FileKind.DELETE;
+
+/** Entry representing a partition. */
+public class PartitionEntry {
+
+ private final BinaryRow partition;
+ private final long recordCount;
+ private final long fileSizeInBytes;
+ private final long fileCount;
+ private final long lastFileCreationTime;
+
+ public PartitionEntry(
+ BinaryRow partition,
+ long recordCount,
+ long fileSizeInBytes,
+ long fileCount,
+ long lastFileCreationTime) {
+ this.partition = partition;
+ this.recordCount = recordCount;
+ this.fileSizeInBytes = fileSizeInBytes;
+ this.fileCount = fileCount;
+ this.lastFileCreationTime = lastFileCreationTime;
+ }
+
+ public BinaryRow partition() {
+ return partition;
+ }
+
+ public long recordCount() {
+ return recordCount;
+ }
+
+ public long fileSizeInBytes() {
+ return fileSizeInBytes;
+ }
+
+ public long fileCount() {
+ return fileCount;
+ }
+
+ public long lastFileCreationTime() {
+ return lastFileCreationTime;
+ }
+
+ public PartitionEntry merge(PartitionEntry entry) {
+ return new PartitionEntry(
+ partition,
+ recordCount + entry.recordCount,
+ fileSizeInBytes + entry.fileSizeInBytes,
+ fileCount + entry.fileCount,
+ Math.max(lastFileCreationTime, entry.lastFileCreationTime));
+ }
+
+ public static PartitionEntry fromManifestEntry(ManifestEntry entry) {
+ long recordCount = entry.file().rowCount();
+ long fileSizeInBytes = entry.file().fileSize();
+ long fileCount = 1;
+ if (entry.kind() == DELETE) {
+ recordCount = -recordCount;
+ fileSizeInBytes = -fileSizeInBytes;
+ fileCount = -fileCount;
+ }
+ return new PartitionEntry(
+ entry.partition(),
+ recordCount,
+ fileSizeInBytes,
+ fileCount,
+ entry.file().creationTimeEpochMillis());
+ }
+
+ public static Collection<PartitionEntry> merge(Collection<ManifestEntry>
fileEntries) {
+ Map<BinaryRow, PartitionEntry> partitions = new HashMap<>();
+ for (ManifestEntry entry : fileEntries) {
+ PartitionEntry partitionEntry = fromManifestEntry(entry);
+ partitions.compute(
+ entry.partition(),
+ (part, old) -> old == null ? partitionEntry :
old.merge(partitionEntry));
+ }
+ return partitions.values();
+ }
+
+ public static void merge(Collection<PartitionEntry> from, Map<BinaryRow,
PartitionEntry> to) {
+ for (PartitionEntry entry : from) {
+ to.compute(entry.partition(), (part, old) -> old == null ? entry :
old.merge(entry));
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PartitionEntry that = (PartitionEntry) o;
+ return recordCount == that.recordCount
+ && fileSizeInBytes == that.fileSizeInBytes
+ && fileCount == that.fileCount
+ && lastFileCreationTime == that.lastFileCreationTime
+ && Objects.equals(partition, that.partition);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ partition, recordCount, fileSizeInBytes, fileCount,
lastFileCreationTime);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index a911b0847..f4b9338ff 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -27,6 +27,7 @@ import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.operation.metrics.ScanMetrics;
import org.apache.paimon.operation.metrics.ScanStats;
@@ -50,8 +51,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -251,6 +255,31 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return new ArrayList<>(mergedEntries);
}
+ @Override
+ public List<PartitionEntry> readPartitionEntries() {
+ List<ManifestFileMeta> manifests = readManifests().getRight();
+ Map<BinaryRow, PartitionEntry> partitions = new ConcurrentHashMap<>();
+ // Don't need to use parallelismBatchIterable here
+ // Can be executed in disorder
+ ForkJoinPool executePool =
ScanParallelExecutor.getExecutePool(scanManifestParallelism);
+ List<ForkJoinTask<?>> tasks = new ArrayList<>();
+ for (ManifestFileMeta manifest : manifests) {
+ ForkJoinTask<?> task =
+ executePool.submit(
+ () ->
+ PartitionEntry.merge(
+
PartitionEntry.merge(readManifestFileMeta(manifest)),
+ partitions));
+ tasks.add(task);
+ }
+ for (ForkJoinTask<?> task : tasks) {
+ task.join();
+ }
+ return partitions.values().stream()
+ .filter(p -> p.fileCount() > 0)
+ .collect(Collectors.toList());
+ }
+
private Pair<Snapshot, List<ManifestEntry>> doPlan() {
long started = System.nanoTime();
Pair<Snapshot, List<ManifestFileMeta>> snapshotListPair =
readManifests();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index dbed15bd3..a963e8d42 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -25,6 +25,7 @@ import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.operation.metrics.ScanMetrics;
import org.apache.paimon.partition.PartitionPredicate;
@@ -80,6 +81,8 @@ public interface FileStoreScan {
*/
List<SimpleFileEntry> readSimpleEntries();
+ List<PartitionEntry> readPartitionEntries();
+
/** Result plan of this scan. */
interface Plan {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 9ccb4c49f..bf59a01e7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.Snapshot;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.DataSplit;
@@ -76,6 +77,8 @@ public interface SnapshotReader {
/** Get partitions from a snapshot. */
List<BinaryRow> partitions();
+ List<PartitionEntry> partitionEntries();
+
/** Result plan of this scan. */
interface Plan extends TableScan.Plan {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index b21010469..205dfb3a1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -29,7 +29,7 @@ import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
-import org.apache.paimon.manifest.SimpleFileEntry;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.operation.FileStoreScan;
@@ -309,12 +309,16 @@ public class SnapshotReaderImpl implements SnapshotReader
{
@Override
public List<BinaryRow> partitions() {
- return scan.readSimpleEntries().stream()
- .map(SimpleFileEntry::partition)
- .distinct()
+ return scan.readPartitionEntries().stream()
+ .map(PartitionEntry::partition)
.collect(Collectors.toList());
}
+ @Override
+ public List<PartitionEntry> partitionEntries() {
+ return scan.readPartitionEntries();
+ }
+
@Override
public Plan readChanges() {
withMode(ScanMode.DELTA);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 59ceed137..ef47e1209 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
@@ -298,6 +299,11 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
public List<BinaryRow> partitions() {
return snapshotReader.partitions();
}
+
+ @Override
+ public List<PartitionEntry> partitionEntries() {
+ return snapshotReader.partitionEntries();
+ }
}
private class AuditLogBatchScan implements InnerTableScan {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
index 4f1899394..c08cdfaa3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
@@ -21,16 +21,14 @@ package org.apache.paimon.table.system;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.LazyGenericRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
@@ -54,14 +52,9 @@ import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Objects;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
@@ -104,7 +97,7 @@ public class PartitionsTable implements ReadonlyTable {
@Override
public InnerTableScan newScan() {
- return new PartitionsScan(storeTable);
+ return new PartitionsScan();
}
@Override
@@ -119,12 +112,6 @@ public class PartitionsTable implements ReadonlyTable {
private static class PartitionsScan extends ReadOnceTableScan {
- private final FileStoreTable storeTable;
-
- private PartitionsScan(FileStoreTable storeTable) {
- this.storeTable = storeTable;
- }
-
@Override
public InnerTableScan withFilter(Predicate predicate) {
// TODO
@@ -133,9 +120,7 @@ public class PartitionsTable implements ReadonlyTable {
@Override
public Plan innerPlan() {
- return () ->
- Collections.singletonList(
- new
PartitionsSplit(storeTable.newScan().plan().splits()));
+ return () -> Collections.singletonList(new PartitionsSplit());
}
}
@@ -143,22 +128,9 @@ public class PartitionsTable implements ReadonlyTable {
private static final long serialVersionUID = 1L;
- private final List<Split> splits;
-
- private PartitionsSplit(List<Split> splits) {
- this.splits = splits;
- }
-
@Override
public long rowCount() {
- return splits.stream()
- .map(s -> ((DataSplit) s).partition())
- .collect(Collectors.toSet())
- .size();
- }
-
- private List<Split> splits() {
- return splits;
+ return 1;
}
@Override
@@ -166,16 +138,12 @@ public class PartitionsTable implements ReadonlyTable {
if (this == o) {
return true;
}
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- PartitionsSplit that = (PartitionsSplit) o;
- return Objects.equals(splits, that.splits);
+ return o != null && getClass() == o.getClass();
}
@Override
public int hashCode() {
- return Objects.hash(splits);
+ return 1;
}
}
@@ -211,155 +179,41 @@ public class PartitionsTable implements ReadonlyTable {
if (!(split instanceof PartitionsSplit)) {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
- PartitionsSplit filesSplit = (PartitionsSplit) split;
- if (filesSplit.splits().isEmpty()) {
- return new IteratorRecordReader<>(Collections.emptyIterator());
- }
- List<Iterator<InternalRow>> iteratorList = new ArrayList<>();
- RowDataToObjectArrayConverter partitionConverter =
+
+ List<PartitionEntry> partitions =
fileStoreTable.newSnapshotReader().partitionEntries();
+
+ RowDataToObjectArrayConverter converter =
new RowDataToObjectArrayConverter(
fileStoreTable.schema().logicalPartitionType());
- for (Split dataSplit : filesSplit.splits()) {
- iteratorList.add(
- Iterators.transform(
- ((DataSplit) dataSplit).dataFiles().iterator(),
- file -> toRow((DataSplit) dataSplit,
partitionConverter, file)));
+ List<InternalRow> results = new ArrayList<>(partitions.size());
+ for (PartitionEntry entry : partitions) {
+ results.add(toRow(entry, converter));
}
- Iterator<InternalRow> rows =
Iterators.concat(iteratorList.iterator());
- // Group by partition and sum the others
- Iterator<InternalRow> resultRows = groupAndSum(rows);
+ Iterator<InternalRow> iterator = results.iterator();
if (projection != null) {
- resultRows =
+ iterator =
Iterators.transform(
- resultRows, row ->
ProjectedRow.from(projection).replaceRow(row));
+ iterator, row ->
ProjectedRow.from(projection).replaceRow(row));
}
-
- return new IteratorRecordReader<>(resultRows);
+ return new IteratorRecordReader<>(iterator);
}
- private LazyGenericRow toRow(
- DataSplit dataSplit,
- RowDataToObjectArrayConverter partitionConverter,
- DataFileMeta dataFileMeta) {
-
+ private GenericRow toRow(
+ PartitionEntry entry, RowDataToObjectArrayConverter
partitionConverter) {
BinaryString partitionId =
- dataSplit.partition() == null
- ? null
- : BinaryString.fromString(
- Arrays.toString(
-
partitionConverter.convert(dataSplit.partition())));
- @SuppressWarnings("unchecked")
- Supplier<Object>[] fields =
- new Supplier[] {
- () -> partitionId,
- dataFileMeta::rowCount,
- dataFileMeta::fileSize,
- dataFileMeta::creationTimeEpochMillis
- };
-
- return new LazyGenericRow(fields);
- }
- }
-
- public static Iterator<InternalRow> groupAndSum(Iterator<InternalRow>
rows) {
- return new GroupedIterator(rows);
- }
-
- /** group by partition and sum the recordCount and fileBytes . */
- static class GroupedIterator implements Iterator<InternalRow> {
- private final Iterator<InternalRow> rows;
- private final Map<BinaryString, Partition> groupedData;
- private Iterator<Partition> resultIterator;
-
- public GroupedIterator(Iterator<InternalRow> rows) {
- this.rows = rows;
- this.groupedData = new HashMap<>();
- groupAndSum();
- }
-
- private void groupAndSum() {
- while (rows.hasNext()) {
- InternalRow row = rows.next();
- BinaryString partitionId = row.getString(0);
- long recordCount = row.getLong(1);
- long fileSizeInBytes = row.getLong(2);
- long lastFileCreationTime = row.getLong(3);
-
- // Grouping and summing
- Partition rowData =
- groupedData.computeIfAbsent(
- partitionId, key -> new Partition(partitionId,
0, 0, 0, -1));
- rowData.recordCount += recordCount;
- rowData.fileSizeInBytes += fileSizeInBytes;
- rowData.fileCount++;
- rowData.lastFileCreationTime =
- Math.max(rowData.lastFileCreationTime,
lastFileCreationTime);
- }
- resultIterator = groupedData.values().iterator();
- }
-
- @Override
- public boolean hasNext() {
- return resultIterator.hasNext();
- }
-
- @Override
- public InternalRow next() {
- if (hasNext()) {
- Partition partition = resultIterator.next();
- return GenericRow.of(
- partition.partition,
- partition.recordCount,
- partition.fileSizeInBytes,
- partition.fileCount,
- Timestamp.fromLocalDateTime(
- LocalDateTime.ofInstant(
-
Instant.ofEpochMilli(partition.lastFileCreationTime),
- ZoneId.systemDefault())));
- } else {
- throw new NoSuchElementException("No more elements in the
iterator.");
- }
- }
- }
-
- static class Partition {
- private final BinaryString partition;
- private long recordCount;
- private long fileSizeInBytes;
-
- private long fileCount;
-
- private long lastFileCreationTime;
-
- Partition(
- BinaryString partition,
- long recordCount,
- long fileSizeInBytes,
- long fileCount,
- long lastFileCreationTime) {
- this.partition = partition;
- this.recordCount = recordCount;
- this.fileSizeInBytes = fileSizeInBytes;
- this.lastFileCreationTime = lastFileCreationTime;
- this.fileCount = fileCount;
- }
-
- public long recordCount() {
- return recordCount;
- }
-
- public long fileSize() {
- return fileSizeInBytes;
- }
-
- public long getFileCount() {
- return fileCount;
- }
-
- public long getLastFileCreationTime() {
- return lastFileCreationTime;
+ BinaryString.fromString(
+
Arrays.toString(partitionConverter.convert(entry.partition())));
+ return GenericRow.of(
+ partitionId,
+ entry.recordCount(),
+ entry.fileSizeInBytes(),
+ entry.fileCount(),
+ Timestamp.fromLocalDateTime(
+ LocalDateTime.ofInstant(
+
Instant.ofEpochMilli(entry.lastFileCreationTime()),
+ ZoneId.systemDefault())));
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java
index 1299b618b..a29be0540 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java
@@ -31,6 +31,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Function;
+import static org.apache.paimon.utils.FileUtils.COMMON_IO_FORK_JOIN_POOL;
+
/**
* This class is a parallel execution util class, which mainly aim to process
tasks parallelly with
* memory control.
@@ -40,7 +42,7 @@ public class ScanParallelExecutor {
// reduce memory usage by batch iterable process, the cached result in
memory will be queueSize
public static <T, U> Iterable<T> parallelismBatchIterable(
Function<List<U>, List<T>> processor, List<U> input, @Nullable
Integer queueSize) {
- ForkJoinPool poolCandidate = FileUtils.COMMON_IO_FORK_JOIN_POOL;
+ ForkJoinPool poolCandidate = COMMON_IO_FORK_JOIN_POOL;
if (queueSize == null) {
queueSize = poolCandidate.getParallelism();
} else if (queueSize <= 0) {
@@ -88,9 +90,13 @@ public class ScanParallelExecutor {
};
}
- private static ForkJoinPool getExecutePool(int queueSize) {
- return queueSize > FileUtils.COMMON_IO_FORK_JOIN_POOL.getParallelism()
+ public static ForkJoinPool getExecutePool(@Nullable Integer queueSize) {
+ if (queueSize == null) {
+ return COMMON_IO_FORK_JOIN_POOL;
+ }
+
+ return queueSize > COMMON_IO_FORK_JOIN_POOL.getParallelism()
? FileUtils.getScanIoForkJoinPool(queueSize)
- : FileUtils.COMMON_IO_FORK_JOIN_POOL;
+ : COMMON_IO_FORK_JOIN_POOL;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 5fd42cd3a..6b167c4e7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -34,6 +34,7 @@ import org.junit.jupiter.api.Test;
import javax.annotation.Nonnull;
import java.util.List;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
@@ -438,16 +439,19 @@ public class CatalogTableITCase extends CatalogITCaseBase
{
sql("INSERT INTO PartitionTable select 2,2,'b','2020-01-02','11'");
sql("INSERT INTO PartitionTable select 3,3,'c','2020-01-03','11'");
List<Row> result = sql("SHOW PARTITIONS PartitionTable");
- assertThat(result.toString())
- .isEqualTo(
- "[+I[dt=2020-01-01/hh=10], +I[dt=2020-01-02/hh=11],
+I[dt=2020-01-03/hh=11]]");
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of("dt=2020-01-01/hh=10"),
+ Row.of("dt=2020-01-02/hh=11"),
+ Row.of("dt=2020-01-03/hh=11"));
result = sql("SHOW PARTITIONS PartitionTable partition (hh='11')");
- assertThat(result.toString())
- .isEqualTo("[+I[dt=2020-01-02/hh=11],
+I[dt=2020-01-03/hh=11]]");
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of("dt=2020-01-02/hh=11"),
Row.of("dt=2020-01-03/hh=11"));
result = sql("SHOW PARTITIONS PartitionTable partition
(dt='2020-01-02', hh='11')");
- assertThat(result.toString()).isEqualTo("[+I[dt=2020-01-02/hh=11]]");
+
assertThat(result).containsExactlyInAnyOrder(Row.of("dt=2020-01-02/hh=11"));
}
@Test
@@ -482,21 +486,28 @@ public class CatalogTableITCase extends CatalogITCaseBase
{
.isEqualTo("[+I[OK]]");
List<Row> result = sql("SHOW PARTITIONS PartitionTable");
- assertThat(result.toString())
- .isEqualTo(
- "[+I[dt=2020-01-01/hh=10], +I[dt=2020-01-02/hh=11],
+I[dt=2020-01-03/hh=11], +I[dt=2020-01-04/hh=14], +I[dt=2020-01-05/hh=15]]");
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of("dt=2020-01-01/hh=10"),
+ Row.of("dt=2020-01-02/hh=11"),
+ Row.of("dt=2020-01-03/hh=11"),
+ Row.of("dt=2020-01-04/hh=14"),
+ Row.of("dt=2020-01-05/hh=15"));
// drop a partition
sql("ALTER TABLE PartitionTable DROP PARTITION (`dt` = '2020-01-01',
`hh` = '10')");
result = sql("SHOW PARTITIONS PartitionTable");
- assertThat(result.toString())
- .isEqualTo(
- "[+I[dt=2020-01-02/hh=11], +I[dt=2020-01-03/hh=11],
+I[dt=2020-01-04/hh=14], +I[dt=2020-01-05/hh=15]]");
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of("dt=2020-01-02/hh=11"),
+ Row.of("dt=2020-01-03/hh=11"),
+ Row.of("dt=2020-01-04/hh=14"),
+ Row.of("dt=2020-01-05/hh=15"));
// drop two partitions
sql("ALTER TABLE PartitionTable DROP PARTITION (dt ='2020-01-04'),
PARTITION (hh='11')");
result = sql("SHOW PARTITIONS PartitionTable");
- assertThat(result.toString()).isEqualTo("[+I[dt=2020-01-05/hh=15]]");
+ assertThat(result).containsExactly(Row.of("dt=2020-01-05/hh=15"));
}
@Test
@@ -716,34 +727,62 @@ public class CatalogTableITCase extends CatalogITCaseBase
{
@Test
public void testPartitionsTable() {
- sql(
- "CREATE TABLE T_WITH_KEY (a INT, p INT, b BIGINT, c STRING,
PRIMARY KEY (a, p) NOT ENFORCED) "
- + "PARTITIONED BY (p)");
- assertFilesTable("T_WITH_KEY");
-
- sql(
- "CREATE TABLE T_APPEND_ONLY (a INT, p INT, b BIGINT, c STRING)
"
- + "PARTITIONED BY (p)");
- assertPartitionsTable("T_APPEND_ONLY");
- }
-
- private void assertPartitionsTable(String tableName) {
- assertThat(sql(String.format("SELECT * FROM %s$partitions",
tableName))).isEmpty();
- sql(String.format("INSERT INTO %s VALUES (3, 1, 4, 'S2'), (1, 2, 2,
'S1')", tableName));
- sql(String.format("INSERT INTO %s VALUES (3, 1, 4, 'S3'), (1, 2, 2,
'S4')", tableName));
- List<Row> rows1 = sql(String.format("SELECT * FROM %s$partitions",
tableName));
- for (Row row : rows1) {
- assertThat((String) row.getField(0)).containsAnyOf("[1]", "[2]");
- assertThat((long) row.getField(2)).isGreaterThan(0L); // check
file size
- }
-
- sql(String.format("INSERT INTO %s VALUES (3, 4, 4, 'S3'), (1, 3, 2,
'S4')", tableName));
- sql(String.format("INSERT INTO %s VALUES (3, 1, 4, 'S3'), (1, 2, 2,
'S4')", tableName));
+ String table = "PARTITIONS_TABLE";
+ sql("CREATE TABLE %s (a INT, p INT, b BIGINT, c STRING) " +
"PARTITIONED BY (p)", table);
+
+ // assert empty
+ assertThat(sql("SELECT * FROM %s$partitions", table)).isEmpty();
+
+ // Convert to another Row to avoid timestamp diff
+ Function<List<Row>, List<Row>> convert =
+ rows ->
+ rows.stream()
+ .map(
+ r ->
+ Row.of(
+ r.getField(0),
+ r.getField(1),
+ r.getField(2),
+ r.getField(3)))
+ .collect(Collectors.toList());
+
+ // assert new partitions
+ sql("INSERT INTO %s VALUES (3, 1, 4, 'S2'), (1, 2, 2, 'S1'), (1, 2, 2,
'S1')", table);
+ sql("INSERT INTO %s VALUES (3, 1, 4, 'S3'), (1, 2, 2, 'S4')", table);
+ List<Row> result = sql("SELECT * FROM %s$partitions", table);
+ assertThat(convert.apply(result))
+ .containsExactlyInAnyOrder(
+ Row.of("[1]", 2L, 910L, 2L), Row.of("[2]", 3L, 879L,
2L));
- List<Row> rows2 = sql(String.format("SELECT * FROM %s$partitions",
tableName));
- for (Row row : rows2) {
- assertThat((String) row.getField(0)).containsAnyOf("[1]", "[2]",
"[3]", "[4]");
- }
+ // assert new files in partition
+ sql("INSERT INTO %s VALUES (3, 4, 4, 'S3'), (1, 3, 2, 'S4')", table);
+ sql("INSERT INTO %s VALUES (3, 1, 4, 'S3'), (1, 2, 2, 'S4')", table);
+ result = sql(String.format("SELECT * FROM %s$partitions", table));
+ assertThat(convert.apply(result))
+ .containsExactlyInAnyOrder(
+ Row.of("[1]", 3L, 1365L, 3L),
+ Row.of("[2]", 4L, 1317L, 3L),
+ Row.of("[3]", 1L, 453L, 1L),
+ Row.of("[4]", 1L, 440L, 1L));
+
+ // assert delete partitions
+ sql("ALTER TABLE %s DROP PARTITION (p = 2)", table);
+ result = sql(String.format("SELECT * FROM %s$partitions", table));
+ assertThat(convert.apply(result))
+ .containsExactlyInAnyOrder(
+ Row.of("[1]", 3L, 1365L, 3L),
+ Row.of("[3]", 1L, 453L, 1L),
+ Row.of("[4]", 1L, 440L, 1L));
+
+ // add new file to p 2
+ sql("INSERT INTO %s VALUES (1, 2, 2, 'S1')", table);
+ result = sql(String.format("SELECT * FROM %s$partitions", table));
+ assertThat(convert.apply(result))
+ .containsExactlyInAnyOrder(
+ Row.of("[1]", 3L, 1365L, 3L),
+ Row.of("[2]", 1L, 438L, 1L),
+ Row.of("[3]", 1L, 453L, 1L),
+ Row.of("[4]", 1L, 440L, 1L));
}
@Test