This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 241ac76169 [flink] Copy bytes with multiple threads when preforming
precommit compact for changelogs (#4907)
241ac76169 is described below
commit 241ac76169060fc80fa959c52ed6bd7f750d7fff
Author: tsreaper <[email protected]>
AuthorDate: Sun Apr 6 19:27:08 2025 +0800
[flink] Copy bytes with multiple threads when preforming precommit compact
for changelogs (#4907)
---
.../generated/flink_connector_configuration.html | 8 +-
.../org/apache/paimon/utils/ThreadPoolUtils.java | 29 +-
.../apache/paimon/flink/FlinkConnectorOptions.java | 19 +-
.../ChangelogCompactCoordinateOperator.java | 101 +++++-
.../compact/changelog/ChangelogCompactTask.java | 204 +++++++----
.../changelog/ChangelogCompactWorkerOperator.java | 29 +-
.../org/apache/paimon/flink/sink/FlinkSink.java | 2 +-
.../flink/PrimaryKeyFileStoreTableITCase.java | 6 +-
.../ChangelogCompactCoordinateOperatorTest.java | 371 +++++++++++++++++++++
.../ChangelogCompactTaskSerializerTest.java | 13 +-
10 files changed, 684 insertions(+), 98 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index feb8715701..f348c35e8e 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -26,6 +26,12 @@ under the License.
</tr>
</thead>
<tbody>
+ <tr>
+ <td><h5>changelog.precommit-compact.thread-num</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Integer</td>
+ <td>Maximum number of threads to copy bytes from small changelog
files. By default is the number of processors available to the Java virtual
machine.</td>
+ </tr>
<tr>
<td><h5>end-input.watermark</h5></td>
<td style="word-wrap: break-word;">(none)</td>
@@ -108,7 +114,7 @@ under the License.
<td><h5>precommit-compact</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
- <td>If true, it will add a compact coordinator and worker operator
after the writer operator,in order to compact several changelog files (for
primary key tables) or newly created data files (for unaware bucket tables)
from the same partition into large ones, which can decrease the number of small
files. </td>
+ <td>If true, it will add a compact coordinator and worker operator
after the writer operator,in order to compact several changelog files (for
primary key tables) or newly created data files (for unaware bucket tables)
from the same partition into large ones, which can decrease the number of small
files.</td>
</tr>
<tr>
<td><h5>scan.bounded</h5></td>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
index e4b3da8ca8..a4790583c5 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
@@ -138,17 +138,7 @@ public class ThreadPoolUtils {
public static <U> void randomlyOnlyExecute(
ExecutorService executor, Consumer<U> processor, Collection<U>
input) {
- List<Future<?>> futures = new ArrayList<>(input.size());
- ClassLoader cl = Thread.currentThread().getContextClassLoader();
- for (U u : input) {
- futures.add(
- executor.submit(
- () -> {
-
Thread.currentThread().setContextClassLoader(cl);
- processor.accept(u);
- }));
- }
- awaitAllFutures(futures);
+ awaitAllFutures(submitAllTasks(executor, processor, input));
}
public static <U, T> Iterator<T> randomlyExecuteSequentialReturn(
@@ -189,7 +179,22 @@ public class ThreadPoolUtils {
});
}
- private static void awaitAllFutures(List<Future<?>> futures) {
+ public static <U> List<Future<?>> submitAllTasks(
+ ExecutorService executor, Consumer<U> processor, Collection<U>
input) {
+ List<Future<?>> futures = new ArrayList<>(input.size());
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ for (U u : input) {
+ futures.add(
+ executor.submit(
+ () -> {
+
Thread.currentThread().setContextClassLoader(cl);
+ processor.accept(u);
+ }));
+ }
+ return futures;
+ }
+
+ public static void awaitAllFutures(List<Future<?>> futures) {
for (Future<?> future : futures) {
try {
future.get();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index adf7c9624c..a9e7f0f7d1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -430,7 +430,24 @@ public class FlinkConnectorOptions {
+ "in order to compact several changelog
files (for primary key tables) "
+ "or newly created data files (for
unaware bucket tables) "
+ "from the same partition into large
ones, "
- + "which can decrease the number of small
files. ");
+ + "which can decrease the number of small
files.");
+
+ public static final ConfigOption<Integer>
CHANGELOG_PRECOMMIT_COMPACT_THREAD_NUM =
+ key("changelog.precommit-compact.thread-num")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "Maximum number of threads to copy bytes from
small changelog files. "
+ + "By default is the number of processors
available to the Java virtual machine.");
+
+ @ExcludeFromDocumentation("Most users won't need to adjust this config")
+ public static final ConfigOption<MemorySize>
CHANGELOG_PRECOMMIT_COMPACT_BUFFER_SIZE =
+ key("changelog.precommit-compact.buffer-size")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(128))
+ .withDescription(
+ "The buffer size for copying bytes from small
changelog files. "
+ + "The default value is 128 MB.");
public static final ConfigOption<String> SOURCE_OPERATOR_UID_SUFFIX =
key("source.operator-uid.suffix")
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
index 9f6bd4431d..34553ebe5f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
@@ -18,12 +18,13 @@
package org.apache.paimon.flink.compact.changelog;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -35,6 +36,7 @@ import org.apache.flink.types.Either;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -49,13 +51,14 @@ public class ChangelogCompactCoordinateOperator
extends AbstractStreamOperator<Either<Committable,
ChangelogCompactTask>>
implements OneInputStreamOperator<Committable, Either<Committable,
ChangelogCompactTask>>,
BoundedOneInput {
- private final FileStoreTable table;
+
+ private final CoreOptions options;
private transient long checkpointId;
private transient Map<BinaryRow, PartitionChangelog> partitionChangelogs;
- public ChangelogCompactCoordinateOperator(FileStoreTable table) {
- this.table = table;
+ public ChangelogCompactCoordinateOperator(CoreOptions options) {
+ this.options = options;
}
@Override
@@ -63,7 +66,7 @@ public class ChangelogCompactCoordinateOperator
super.open();
checkpointId = Long.MIN_VALUE;
- partitionChangelogs = new HashMap<>();
+ partitionChangelogs = new LinkedHashMap<>();
}
public void processElement(StreamRecord<Committable> record) {
@@ -81,10 +84,26 @@ public class ChangelogCompactCoordinateOperator
return;
}
+ // Changelog files are not stored in an LSM tree,
+ // so we can regard them as files without primary keys.
+ long targetFileSize = options.targetFileSize(false);
+ long compactionFileSize =
+ Math.min(
+ options.compactionFileSize(false),
+ options.toConfiguration()
+
.get(FlinkConnectorOptions.CHANGELOG_PRECOMMIT_COMPACT_BUFFER_SIZE)
+ .getBytes());
+
BinaryRow partition = message.partition();
Integer bucket = message.bucket();
- long targetFileSize = table.coreOptions().targetFileSize(false);
+ List<DataFileMeta> skippedNewChangelogs = new ArrayList<>();
+ List<DataFileMeta> skippedCompactChangelogs = new ArrayList<>();
+
for (DataFileMeta meta : message.newFilesIncrement().changelogFiles())
{
+ if (meta.fileSize() >= compactionFileSize) {
+ skippedNewChangelogs.add(meta);
+ continue;
+ }
partitionChangelogs
.computeIfAbsent(partition, k -> new PartitionChangelog())
.addNewChangelogFile(bucket, meta);
@@ -94,6 +113,10 @@ public class ChangelogCompactCoordinateOperator
}
}
for (DataFileMeta meta : message.compactIncrement().changelogFiles()) {
+ if (meta.fileSize() >= compactionFileSize) {
+ skippedCompactChangelogs.add(meta);
+ continue;
+ }
partitionChangelogs
.computeIfAbsent(partition, k -> new PartitionChangelog())
.addCompactChangelogFile(bucket, meta);
@@ -111,11 +134,11 @@ public class ChangelogCompactCoordinateOperator
new DataIncrement(
message.newFilesIncrement().newFiles(),
message.newFilesIncrement().deletedFiles(),
- Collections.emptyList()),
+ skippedNewChangelogs),
new CompactIncrement(
message.compactIncrement().compactBefore(),
message.compactIncrement().compactAfter(),
- Collections.emptyList()),
+ skippedCompactChangelogs),
message.indexIncrement());
Committable newCommittable =
new Committable(committable.checkpointId(),
Committable.Kind.FILE, newMessage);
@@ -132,15 +155,59 @@ public class ChangelogCompactCoordinateOperator
private void emitPartitionChangelogCompactTask(BinaryRow partition) {
PartitionChangelog partitionChangelog =
partitionChangelogs.get(partition);
- output.collect(
- new StreamRecord<>(
- Either.Right(
- new ChangelogCompactTask(
- checkpointId,
- partition,
- table.coreOptions().bucket(),
-
partitionChangelog.newFileChangelogFiles,
-
partitionChangelog.compactChangelogFiles))));
+ int numNewChangelogFiles =
+ partitionChangelog.newFileChangelogFiles.values().stream()
+ .mapToInt(List::size)
+ .sum();
+ int numCompactChangelogFiles =
+ partitionChangelog.compactChangelogFiles.values().stream()
+ .mapToInt(List::size)
+ .sum();
+ if (numNewChangelogFiles + numCompactChangelogFiles == 1) {
+ // there is only one changelog file in this partition, so we don't
wrap it as a
+ // compaction task
+ CommitMessageImpl message;
+ if (numNewChangelogFiles == 1) {
+ Map.Entry<Integer, List<DataFileMeta>> entry =
+
partitionChangelog.newFileChangelogFiles.entrySet().iterator().next();
+ message =
+ new CommitMessageImpl(
+ partition,
+ entry.getKey(),
+ options.bucket(),
+ new DataIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ entry.getValue()),
+ CompactIncrement.emptyIncrement());
+ } else {
+ Map.Entry<Integer, List<DataFileMeta>> entry =
+
partitionChangelog.compactChangelogFiles.entrySet().iterator().next();
+ message =
+ new CommitMessageImpl(
+ partition,
+ entry.getKey(),
+ options.bucket(),
+ DataIncrement.emptyIncrement(),
+ new CompactIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ entry.getValue()));
+ }
+ Committable newCommittable =
+ new Committable(checkpointId, Committable.Kind.FILE,
message);
+ output.collect(new StreamRecord<>(Either.Left(newCommittable)));
+ } else {
+ output.collect(
+ new StreamRecord<>(
+ Either.Right(
+ new ChangelogCompactTask(
+ checkpointId,
+ partition,
+ options.bucket(),
+
partitionChangelog.newFileChangelogFiles,
+
partitionChangelog.compactChangelogFiles))));
+ }
partitionChangelogs.remove(partition);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
index 96fe15c344..7605af61c4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
@@ -23,19 +23,24 @@ import
org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOn
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
-import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.options.MemorySize;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.ThreadPoolUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -43,6 +48,12 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.function.BiConsumer;
/**
* {@link ChangelogCompactTask} to compact several changelog files from the
same partition into one
@@ -50,6 +61,8 @@ import java.util.UUID;
*/
public class ChangelogCompactTask implements Serializable {
+ private static final Logger LOG =
LoggerFactory.getLogger(ChangelogCompactTask.class);
+
private final long checkpointId;
private final BinaryRow partition;
private final int totalBuckets;
@@ -67,6 +80,11 @@ public class ChangelogCompactTask implements Serializable {
this.totalBuckets = totalBuckets;
this.newFileChangelogFiles = newFileChangelogFiles;
this.compactChangelogFiles = compactChangelogFiles;
+ Preconditions.checkArgument(
+ newFileChangelogFiles.isEmpty() ||
compactChangelogFiles.isEmpty(),
+ "Both newFileChangelogFiles and compactChangelogFiles are not
empty. "
+ + "There is no such table where changelog is produced
both from new files and from compaction. "
+ + "This is unexpected.");
}
public long checkpointId() {
@@ -89,69 +107,102 @@ public class ChangelogCompactTask implements Serializable
{
return compactChangelogFiles;
}
- public List<Committable> doCompact(FileStoreTable table) throws Exception {
+ public List<Committable> doCompact(
+ FileStoreTable table, ExecutorService executor, MemorySize
bufferSize)
+ throws Exception {
+ Preconditions.checkArgument(
+ bufferSize.getBytes() <= Integer.MAX_VALUE,
+ "Changelog pre-commit compaction buffer size ({} bytes) too
large! "
+ + "The maximum possible value is {} bytes.",
+ bufferSize.getBytes(),
+ Integer.MAX_VALUE);
+
FileStorePathFactory pathFactory = table.store().pathFactory();
+ List<ReadTask> tasks = new ArrayList<>();
+ BiConsumer<Map<Integer, List<DataFileMeta>>, Boolean> addTasks =
+ (files, isCompactResult) -> {
+ for (Map.Entry<Integer, List<DataFileMeta>> entry :
files.entrySet()) {
+ int bucket = entry.getKey();
+ DataFilePathFactory dataFilePathFactory =
+
pathFactory.createDataFilePathFactory(partition, bucket);
+ for (DataFileMeta meta : entry.getValue()) {
+ ReadTask task =
+ new ReadTask(
+ table,
+ dataFilePathFactory.toPath(meta),
+ bucket,
+ isCompactResult,
+ meta);
+ Preconditions.checkArgument(
+ meta.fileSize() <= bufferSize.getBytes(),
+ "Trying to compact changelog file with
size {} bytes, "
+ + "while the buffer size is only
{} bytes. This is unexpected.",
+ meta.fileSize(),
+ bufferSize.getBytes());
+ tasks.add(task);
+ }
+ }
+ };
+ addTasks.accept(newFileChangelogFiles, false);
+ addTasks.accept(compactChangelogFiles, true);
+
+ Semaphore semaphore = new Semaphore((int) bufferSize.getBytes());
+ BlockingQueue<ReadTask> finishedTasks = new LinkedBlockingQueue<>();
+ List<Future<?>> futures =
+ ThreadPoolUtils.submitAllTasks(
+ executor,
+ t -> {
+ // Why not create `finishedTasks` as a blocking
queue and use it to
+ // limit the total size of bytes awaiting to be
copied? Because finished
+ // tasks are added after their contents are read,
so even if
+ // `finishedTasks` is full, each thread can still
read one more file,
+ // and the limit will become `bytesInThreads +
bufferSize`, not just
+ // `bufferSize`.
+ try {
+ semaphore.acquire((int) t.meta.fileSize());
+ t.readFully();
+ finishedTasks.put(t);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ },
+ tasks);
+
OutputStream outputStream = new OutputStream();
List<Result> results = new ArrayList<>();
-
- // copy all changelog files to a new big file
- for (Map.Entry<Integer, List<DataFileMeta>> entry :
newFileChangelogFiles.entrySet()) {
- int bucket = entry.getKey();
- DataFilePathFactory dataFilePathFactory =
- pathFactory.createDataFilePathFactory(partition, bucket);
- for (DataFileMeta meta : entry.getValue()) {
- copyFile(
- outputStream,
- results,
- table,
- dataFilePathFactory.toPath(meta),
- bucket,
- false,
- meta);
- }
- }
- for (Map.Entry<Integer, List<DataFileMeta>> entry :
compactChangelogFiles.entrySet()) {
- Integer bucket = entry.getKey();
- DataFilePathFactory dataFilePathFactory =
- pathFactory.createDataFilePathFactory(partition, bucket);
- for (DataFileMeta meta : entry.getValue()) {
- copyFile(
- outputStream,
- results,
- table,
- dataFilePathFactory.toPath(meta),
- bucket,
- true,
- meta);
- }
+ for (int i = 0; i < tasks.size(); i++) {
+ // copy all files into a new big file
+ ReadTask task = finishedTasks.take();
+ write(task, outputStream, results);
+ semaphore.release((int) task.meta.fileSize());
}
outputStream.out.close();
+ ThreadPoolUtils.awaitAllFutures(futures);
return produceNewCommittables(results, table, pathFactory,
outputStream.path);
}
- private void copyFile(
- OutputStream outputStream,
- List<Result> results,
- FileStoreTable table,
- Path path,
- int bucket,
- boolean isCompactResult,
- DataFileMeta meta)
+ private void write(ReadTask task, OutputStream outputStream, List<Result>
results)
throws Exception {
if (!outputStream.isInitialized) {
Path outputPath =
- new Path(path.getParent(), "tmp-compacted-changelog-" +
UUID.randomUUID());
- outputStream.init(outputPath,
table.fileIO().newOutputStream(outputPath, false));
+ new Path(task.path.getParent(), "tmp-compacted-changelog-"
+ UUID.randomUUID());
+ outputStream.init(outputPath,
task.table.fileIO().newOutputStream(outputPath, false));
}
- long offset = outputStream.out.getPos();
- try (SeekableInputStream in = table.fileIO().newInputStream(path)) {
- IOUtils.copyBytes(in, outputStream.out, IOUtils.BLOCKSIZE, false);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Copying bytes from {} to {}", task.path,
outputStream.path);
}
- table.fileIO().deleteQuietly(path);
+ long offset = outputStream.out.getPos();
+ outputStream.out.write(task.result);
results.add(
new Result(
- bucket, isCompactResult, meta, offset,
outputStream.out.getPos() - offset));
+ task.bucket,
+ task.isCompactResult,
+ task.meta,
+ offset,
+ outputStream.out.getPos() - offset));
}
private List<Committable> produceNewCommittables(
@@ -172,23 +223,23 @@ public class ChangelogCompactTask implements Serializable
{
+ baseResult.bucket
+ "-"
+ baseResult.length;
- table.fileIO()
- .rename(
- changelogTempPath,
- dataFilePathFactory.toAlignedPath(
- realName
- + "."
- +
CompactedChangelogReadOnlyFormat.getIdentifier(
- baseResult.meta.fileFormat()),
- baseResult.meta));
-
- List<Committable> newCommittables = new ArrayList<>();
+ Path realPath =
+ dataFilePathFactory.toAlignedPath(
+ realName
+ + "."
+ +
CompactedChangelogReadOnlyFormat.getIdentifier(
+ baseResult.meta.fileFormat()),
+ baseResult.meta);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Rename {} to {}", changelogTempPath, realPath);
+ }
+ table.fileIO().rename(changelogTempPath, realPath);
Map<Integer, List<Result>> bucketedResults = new HashMap<>();
for (Result result : results) {
bucketedResults.computeIfAbsent(result.bucket, b -> new
ArrayList<>()).add(result);
}
-
+ List<Committable> newCommittables = new ArrayList<>();
for (Map.Entry<Integer, List<Result>> entry :
bucketedResults.entrySet()) {
List<DataFileMeta> newFilesChangelog = new ArrayList<>();
List<DataFileMeta> compactChangelog = new ArrayList<>();
@@ -256,6 +307,39 @@ public class ChangelogCompactTask implements Serializable {
partition, newFileChangelogFiles, compactChangelogFiles);
}
+ private static class ReadTask {
+
+ private final FileStoreTable table;
+ private final Path path;
+ private final int bucket;
+ private final boolean isCompactResult;
+ private final DataFileMeta meta;
+
+ private byte[] result = null;
+
+ private ReadTask(
+ FileStoreTable table,
+ Path path,
+ int bucket,
+ boolean isCompactResult,
+ DataFileMeta meta) {
+ this.table = table;
+ this.path = path;
+ this.bucket = bucket;
+ this.isCompactResult = isCompactResult;
+ this.meta = meta;
+ }
+
+ private void readFully() {
+ try {
+ result =
IOUtils.readFully(table.fileIO().newInputStream(path), true);
+ table.fileIO().deleteQuietly(path);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ }
+
private static class OutputStream {
private Path path;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java
index 260c25a315..9d5fb46e3e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java
@@ -18,8 +18,12 @@
package org.apache.paimon.flink.compact.changelog;
+import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.ThreadPoolUtils;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -27,6 +31,7 @@ import
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Either;
import java.util.List;
+import java.util.concurrent.ExecutorService;
/**
* Receive and process the {@link ChangelogCompactTask}s emitted by {@link
@@ -34,20 +39,40 @@ import java.util.List;
*/
public class ChangelogCompactWorkerOperator extends
AbstractStreamOperator<Committable>
implements OneInputStreamOperator<Either<Committable,
ChangelogCompactTask>, Committable> {
+
private final FileStoreTable table;
+ private transient ExecutorService executor;
+ private transient MemorySize bufferSize;
+
public ChangelogCompactWorkerOperator(FileStoreTable table) {
this.table = table;
}
+ @Override
+ public void open() throws Exception {
+ Options options = new Options(table.options());
+ int numThreads =
+
options.getOptional(FlinkConnectorOptions.CHANGELOG_PRECOMMIT_COMPACT_THREAD_NUM)
+ .orElse(Runtime.getRuntime().availableProcessors());
+ executor =
+ ThreadPoolUtils.createCachedThreadPool(
+ numThreads, "changelog-compact-async-read-bytes");
+ bufferSize =
options.get(FlinkConnectorOptions.CHANGELOG_PRECOMMIT_COMPACT_BUFFER_SIZE);
+ LOG.info(
+ "Creating {} threads and a buffer of {} bytes for changelog
compaction.",
+ numThreads,
+ bufferSize.getBytes());
+ }
+
+ @Override
public void processElement(StreamRecord<Either<Committable,
ChangelogCompactTask>> record)
throws Exception {
-
if (record.getValue().isLeft()) {
output.collect(new StreamRecord<>(record.getValue().left()));
} else {
ChangelogCompactTask task = record.getValue().right();
- List<Committable> committables = task.doCompact(table);
+ List<Committable> committables = task.doCompact(table, executor,
bufferSize);
committables.forEach(committable -> output.collect(new
StreamRecord<>(committable)));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 9c5bf11221..d99dd255e4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -248,7 +248,7 @@ public abstract class FlinkSink<T> implements Serializable {
"Changelog Compact Coordinator",
new EitherTypeInfo<>(
new CommittableTypeInfo(), new
ChangelogTaskTypeInfo()),
- new
ChangelogCompactCoordinateOperator(table))
+ new
ChangelogCompactCoordinateOperator(table.coreOptions()))
.forceNonParallel()
.transform(
"Changelog Compact Worker",
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index c701cefb30..d1ed5dbc84 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -679,7 +679,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
+ "WITH ("
+ " 'bucket' = '10',\n"
+ " 'changelog-producer' = 'lookup',\n"
- + " 'changelog.precommit-compact' = 'true',\n"
+ + " 'precommit-compact' = 'true',\n"
+ " 'snapshot.num-retained.min' = '3',\n"
+ " 'snapshot.num-retained.max' = '3'\n"
+ ")");
@@ -773,7 +773,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
+ "WITH ("
+ " 'bucket' = '10',\n"
+ " 'changelog-producer' = 'lookup',\n"
- + " 'changelog.precommit-compact' = 'true'\n"
+ + " 'precommit-compact' = 'true'\n"
+ ")");
Path inputPath = new Path(path, "input");
@@ -1014,7 +1014,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
+ "'changelog-producer' = 'lookup', "
+ "'lookup-wait' = '%s', "
+ "'deletion-vectors.enabled' = '%s', "
- + "'changelog.precommit-compact' = '%s'",
+ + "'precommit-compact' = '%s'",
random.nextBoolean() ? "4mb" : "8mb",
random.nextBoolean(),
enableDeletionVectors,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperatorTest.java
new file mode 100644
index 0000000000..4d5e9ac1b9
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperatorTest.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact.changelog;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableTypeInfo;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.Preconditions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ChangelogCompactCoordinateOperator}. */
+public class ChangelogCompactCoordinateOperatorTest {
+
+ @Test
+ public void testPrepareSnapshotWithMultipleFiles() throws Exception {
+ Options options = new Options();
+ options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofMebiBytes(8));
+ ChangelogCompactCoordinateOperator operator =
+ new ChangelogCompactCoordinateOperator(new
CoreOptions(options));
+ OneInputStreamOperatorTestHarness<Committable, Either<Committable,
ChangelogCompactTask>>
+ testHarness = createTestHarness(operator);
+
+ testHarness.open();
+ testHarness.processElement(
+ new StreamRecord<>(
+ createCommittable(
+ 1,
+ BinaryRow.EMPTY_ROW,
+ 0,
+ Collections.emptyList(),
+ Arrays.asList(3, 2, 5, 4))));
+ testHarness.processElement(
+ new StreamRecord<>(
+ createCommittable(
+ 1,
+ BinaryRow.EMPTY_ROW,
+ 1,
+ Collections.emptyList(),
+ Arrays.asList(3, 3, 2, 2))));
+ testHarness.prepareSnapshotPreBarrier(1);
+ testHarness.processElement(
+ new StreamRecord<>(
+ createCommittable(
+ 2,
+ BinaryRow.EMPTY_ROW,
+ 0,
+ Collections.emptyList(),
+ Arrays.asList(2, 3))));
+ testHarness.prepareSnapshotPreBarrier(2);
+
+ List<Object> output = new ArrayList<>(testHarness.getOutput());
+ assertThat(output).hasSize(7);
+
+ Map<Integer, List<Integer>> expected = new HashMap<>();
+ expected.put(0, Arrays.asList(3, 2, 5));
+ assertCompactionTask(output.get(0), 1, BinaryRow.EMPTY_ROW, new
HashMap<>(), expected);
+
+ expected.clear();
+ expected.put(0, Collections.singletonList(4));
+ expected.put(1, Arrays.asList(3, 3));
+ assertCompactionTask(output.get(2), 1, BinaryRow.EMPTY_ROW, new
HashMap<>(), expected);
+
+ expected.clear();
+ expected.put(1, Arrays.asList(2, 2));
+ assertCompactionTask(output.get(4), 1, BinaryRow.EMPTY_ROW, new
HashMap<>(), expected);
+
+ expected.clear();
+ expected.put(0, Arrays.asList(2, 3));
+ assertCompactionTask(output.get(6), 2, BinaryRow.EMPTY_ROW, new
HashMap<>(), expected);
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testPrepareSnapshotWithSingleFile() throws Exception {
+ Options options = new Options();
+ options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofMebiBytes(8));
+ ChangelogCompactCoordinateOperator operator =
+ new ChangelogCompactCoordinateOperator(new
CoreOptions(options));
+ OneInputStreamOperatorTestHarness<Committable, Either<Committable,
ChangelogCompactTask>>
+ testHarness = createTestHarness(operator);
+
+ testHarness.open();
+ testHarness.processElement(
+ new StreamRecord<>(
+ createCommittable(
+ 1,
+ BinaryRow.EMPTY_ROW,
+ 0,
+ Arrays.asList(3, 5, 2),
+ Collections.emptyList())));
+ testHarness.prepareSnapshotPreBarrier(1);
+
+ List<Object> output = new ArrayList<>(testHarness.getOutput());
+ assertThat(output).hasSize(3);
+
+ Map<Integer, List<Integer>> expected = new HashMap<>();
+ expected.put(0, Arrays.asList(3, 5));
+ assertCompactionTask(output.get(0), 1, BinaryRow.EMPTY_ROW, expected,
new HashMap<>());
+ assertCommittable(
+ output.get(2),
+ BinaryRow.EMPTY_ROW,
+ Collections.singletonList(2),
+ Collections.emptyList());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testPrepareSnapshotWithMultiplePartitions() throws Exception {
+ Options options = new Options();
+ options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofMebiBytes(8));
+ ChangelogCompactCoordinateOperator operator =
+ new ChangelogCompactCoordinateOperator(new
CoreOptions(options));
+ OneInputStreamOperatorTestHarness<Committable, Either<Committable,
ChangelogCompactTask>>
+ testHarness = createTestHarness(operator);
+
+ Function<Integer, BinaryRow> binaryRow =
+ i -> {
+ BinaryRow row = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ writer.writeInt(0, i);
+ writer.complete();
+ return row;
+ };
+
+ testHarness.open();
+ testHarness.processElement(
+ new StreamRecord<>(
+ createCommittable(
+ 1,
+ binaryRow.apply(1),
+ 0,
+ Collections.emptyList(),
+ Arrays.asList(3, 2, 5, 4))));
+ testHarness.processElement(
+ new StreamRecord<>(
+ createCommittable(
+ 1,
+ binaryRow.apply(2),
+ 1,
+ Collections.emptyList(),
+ Arrays.asList(3, 3, 2, 2, 3))));
+ testHarness.prepareSnapshotPreBarrier(1);
+ testHarness.processElement(
+ new StreamRecord<>(
+ createCommittable(
+ 2,
+ binaryRow.apply(1),
+ 0,
+ Collections.emptyList(),
+ Arrays.asList(2, 3))));
+ testHarness.prepareSnapshotPreBarrier(2);
+
+ List<Object> output = new ArrayList<>(testHarness.getOutput());
+ assertThat(output).hasSize(8);
+
+ Map<Integer, List<Integer>> expected = new HashMap<>();
+ expected.put(0, Arrays.asList(3, 2, 5));
+ assertCompactionTask(output.get(0), 1, binaryRow.apply(1), new
HashMap<>(), expected);
+
+ expected.clear();
+ expected.put(1, Arrays.asList(3, 3, 2));
+ assertCompactionTask(output.get(2), 1, binaryRow.apply(2), new
HashMap<>(), expected);
+
+ assertCommittable(
+ output.get(4),
+ binaryRow.apply(1),
+ Collections.emptyList(),
+ Collections.singletonList(4));
+
+ expected.clear();
+ expected.put(1, Arrays.asList(2, 3));
+ assertCompactionTask(output.get(5), 1, binaryRow.apply(2), new
HashMap<>(), expected);
+
+ expected.clear();
+ expected.put(0, Arrays.asList(2, 3));
+ assertCompactionTask(output.get(7), 2, binaryRow.apply(1), new
HashMap<>(), expected);
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testSkipLargeFiles() throws Exception {
+ Options options = new Options();
+ options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofMebiBytes(8));
+ ChangelogCompactCoordinateOperator operator =
+ new ChangelogCompactCoordinateOperator(new
CoreOptions(options));
+ OneInputStreamOperatorTestHarness<Committable, Either<Committable,
ChangelogCompactTask>>
+ testHarness = createTestHarness(operator);
+
+ testHarness.open();
+ testHarness.processElement(
+ new StreamRecord<>(
+ createCommittable(
+ 1,
+ BinaryRow.EMPTY_ROW,
+ 0,
+ Collections.emptyList(),
+ Arrays.asList(3, 10, 5, 9))));
+ testHarness.prepareSnapshotPreBarrier(1);
+
+ List<Object> output = new ArrayList<>(testHarness.getOutput());
+ assertThat(output).hasSize(2);
+
+ Map<Integer, List<Integer>> expected = new HashMap<>();
+ expected.put(0, Arrays.asList(3, 5));
+ assertCompactionTask(output.get(0), 1, BinaryRow.EMPTY_ROW, new
HashMap<>(), expected);
+ assertCommittable(
+ output.get(1), BinaryRow.EMPTY_ROW, Collections.emptyList(),
Arrays.asList(10, 9));
+
+ testHarness.close();
+ }
+
+ @SuppressWarnings("unchecked")
+ private void assertCommittable(
+ Object o,
+ BinaryRow partition,
+ List<Integer> newFilesChangelogMbs,
+ List<Integer> compactChangelogMbs) {
+ StreamRecord<Either<Committable, ChangelogCompactTask>> record =
+ (StreamRecord<Either<Committable, ChangelogCompactTask>>) o;
+ assertThat(record.getValue().isLeft()).isTrue();
+ Committable committable = record.getValue().left();
+
+ assertThat(committable.checkpointId()).isEqualTo(1);
+ CommitMessageImpl message = (CommitMessageImpl)
committable.wrappedCommittable();
+ assertThat(message.partition()).isEqualTo(partition);
+ assertThat(message.bucket()).isEqualTo(0);
+
+ assertSameSizes(message.newFilesIncrement().changelogFiles(),
newFilesChangelogMbs);
+ assertSameSizes(message.compactIncrement().changelogFiles(),
compactChangelogMbs);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void assertCompactionTask(
+ Object o,
+ long checkpointId,
+ BinaryRow partition,
+ Map<Integer, List<Integer>> newFilesChangelogMbs,
+ Map<Integer, List<Integer>> compactChangelogMbs) {
+ StreamRecord<Either<Committable, ChangelogCompactTask>> record =
+ (StreamRecord<Either<Committable, ChangelogCompactTask>>) o;
+ assertThat(record.getValue().isRight()).isTrue();
+ ChangelogCompactTask task = record.getValue().right();
+
+ assertThat(task.checkpointId()).isEqualTo(checkpointId);
+ assertThat(task.partition()).isEqualTo(partition);
+
+
assertThat(task.newFileChangelogFiles().keySet()).isEqualTo(newFilesChangelogMbs.keySet());
+ for (int bucket : task.newFileChangelogFiles().keySet()) {
+ assertSameSizes(
+ task.newFileChangelogFiles().get(bucket),
newFilesChangelogMbs.get(bucket));
+ }
+
assertThat(task.compactChangelogFiles().keySet()).isEqualTo(compactChangelogMbs.keySet());
+ for (int bucket : task.compactChangelogFiles().keySet()) {
+ assertSameSizes(
+ task.compactChangelogFiles().get(bucket),
compactChangelogMbs.get(bucket));
+ }
+ }
+
+ private void assertSameSizes(List<DataFileMeta> metas, List<Integer> mbs) {
+ assertThat(metas.stream().mapToLong(DataFileMeta::fileSize).toArray())
+ .containsExactlyInAnyOrder(
+ mbs.stream()
+ .mapToLong(mb ->
MemorySize.ofMebiBytes(mb).getBytes())
+ .toArray());
+ }
+
+ private Committable createCommittable(
+ long checkpointId,
+ BinaryRow partition,
+ int bucket,
+ List<Integer> newFilesChangelogMbs,
+ List<Integer> compactChangelogMbs) {
+ CommitMessageImpl message =
+ new CommitMessageImpl(
+ partition,
+ bucket,
+ 2,
+ new DataIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ newFilesChangelogMbs.stream()
+ .map(this::createDataFileMetaOfSize)
+ .collect(Collectors.toList())),
+ new CompactIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ compactChangelogMbs.stream()
+ .map(this::createDataFileMetaOfSize)
+ .collect(Collectors.toList())));
+ return new Committable(checkpointId, Committable.Kind.FILE, message);
+ }
+
+ private DataFileMeta createDataFileMetaOfSize(int mb) {
+ return DataFileMeta.forAppend(
+ UUID.randomUUID().toString(),
+ MemorySize.ofMebiBytes(mb).getBytes(),
+ 0,
+ SimpleStats.EMPTY_STATS,
+ 0,
+ 0,
+ 1,
+ Collections.emptyList(),
+ null,
+ null,
+ null,
+ null);
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private OneInputStreamOperatorTestHarness<
+ Committable, Either<Committable, ChangelogCompactTask>>
+ createTestHarness(ChangelogCompactCoordinateOperator operator)
throws Exception {
+ TypeSerializer serializer =
+ new EitherSerializer<>(
+ new CommittableTypeInfo().createSerializer(new
ExecutionConfig()),
+ new ChangelogTaskTypeInfo().createSerializer(new
ExecutionConfig()));
+ OneInputStreamOperatorTestHarness harness =
+ new OneInputStreamOperatorTestHarness(operator, 1, 1, 0);
+
harness.getStreamConfig().setupNetworkInputs(Preconditions.checkNotNull(serializer));
+ harness.getStreamConfig().serializeAllConfigs();
+ harness.setup(serializer);
+ return harness;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
index 7fcde6214f..580220e77d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
@@ -56,13 +56,24 @@ public class ChangelogCompactTaskSerializerTest {
put(1, newFiles(20));
}
},
+ new HashMap<>());
+ ChangelogCompactTask serializeTask =
+ serializer.deserialize(serializer.getVersion(),
serializer.serialize(task));
+ assertThat(task).isEqualTo(serializeTask);
+
+ task =
+ new ChangelogCompactTask(
+ 2L,
+ partition,
+ 2,
+ new HashMap<>(),
new HashMap<Integer, List<DataFileMeta>>() {
{
put(0, newFiles(10));
put(1, newFiles(10));
}
});
- ChangelogCompactTask serializeTask = serializer.deserialize(2,
serializer.serialize(task));
+ serializeTask = serializer.deserialize(2, serializer.serialize(task));
assertThat(task).isEqualTo(serializeTask);
}