This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 23eb70a2b [core] Introduce full-compaction.delta-commits and read
full-compacted mode (#791)
23eb70a2b is described below
commit 23eb70a2bba3fde75ee1026e2e3f5b583f684292
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 3 15:37:03 2023 +0800
[core] Introduce full-compaction.delta-commits and read full-compacted mode
(#791)
---
docs/content/concepts/primary-key-table.md | 2 +-
docs/content/maintenance/expiring-snapshots.md | 2 +-
docs/content/maintenance/manage-partition.md | 2 +-
docs/content/maintenance/read-performance.md | 57 ++++
docs/content/maintenance/rescale-bucket.md | 2 +-
.../shortcodes/generated/core_configuration.html | 20 +-
.../generated/flink_connector_configuration.html | 18 +-
.../main/java/org/apache/paimon/CoreOptions.java | 13 +-
.../paimon/append/AppendOnlyCompactManager.java | 175 +++++-------
.../paimon/operation/AppendOnlyFileStoreWrite.java | 2 -
.../paimon/table/source/AbstractDataTableScan.java | 15 +-
.../snapshot/FullCompactedStartingScanner.java | 71 +++++
.../org/apache/paimon/utils/SnapshotManager.java | 8 +-
.../append/AppendOnlyCompactManagerTest.java | 6 +-
.../apache/paimon/append/AppendOnlyWriterTest.java | 26 +-
.../apache/paimon/append/FullCompactTaskTest.java | 152 ++++++++++
.../paimon/append/IterativeCompactTaskTest.java | 307 ---------------------
.../apache/paimon/format/FileFormatSuffixTest.java | 10 +-
.../table/ChangelogWithKeyFileStoreTableTest.java | 38 +++
.../apache/paimon/flink/FlinkConnectorOptions.java | 3 +
.../org/apache/paimon/flink/sink/FlinkSink.java | 61 ++--
.../flink/sink/FullChangelogStoreSinkWrite.java | 4 +-
...ite.java => GlobalFullCompactionSinkWrite.java} | 69 ++---
.../flink/sink/LookupChangelogStoreSinkWrite.java | 51 ----
.../apache/paimon/flink/sink/StoreSinkWrite.java | 2 +-
.../paimon/flink/sink/StoreSinkWriteImpl.java | 12 +-
.../ChangelogWithKeyFileStoreTableITCase.java | 7 +-
.../cdc/SchemaAwareStoreWriteOperatorTest.java | 3 +-
28 files changed, 526 insertions(+), 612 deletions(-)
diff --git a/docs/content/concepts/primary-key-table.md
b/docs/content/concepts/primary-key-table.md
index 2cf5586af..9a4175b1c 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -196,7 +196,7 @@ which can decouple data writing and changelog generation,
and is more suitable f
By specifying `'changelog-producer' = 'full-compaction'`, Paimon will compare
the results between full compactions and produce the differences as changelog.
The latency of changelog is affected by the frequency of full compactions.
-By specifying `changelog-producer.compaction-interval` table property (default
value `0s`), users can define the maximum interval between two full compactions
to ensure latency. This is set to 0 by default, so each checkpoint will have a
full compression and generate a change log.
+By specifying `full-compaction.delta-commits` table property, full compaction
will be constantly triggered after delta commits (checkpoints). This is set to
1 by default, so each checkpoint will have a full compression and generate a
change log.
{{< img src="/img/changelog-producer-full-compaction.png">}}
diff --git a/docs/content/maintenance/expiring-snapshots.md
b/docs/content/maintenance/expiring-snapshots.md
index 479316b52..fddaa045a 100644
--- a/docs/content/maintenance/expiring-snapshots.md
+++ b/docs/content/maintenance/expiring-snapshots.md
@@ -1,6 +1,6 @@
---
title: "Expiring Snapshots"
-weight: 2
+weight: 3
type: docs
aliases:
- /maintenance/expiring-snapshots.html
diff --git a/docs/content/maintenance/manage-partition.md
b/docs/content/maintenance/manage-partition.md
index c02c39113..126cc7dbd 100644
--- a/docs/content/maintenance/manage-partition.md
+++ b/docs/content/maintenance/manage-partition.md
@@ -1,6 +1,6 @@
---
title: "Manage Partition"
-weight: 4
+weight: 5
type: docs
aliases:
- /maintenance/manage-partition.html
diff --git a/docs/content/maintenance/read-performance.md
b/docs/content/maintenance/read-performance.md
new file mode 100644
index 000000000..740d223bf
--- /dev/null
+++ b/docs/content/maintenance/read-performance.md
@@ -0,0 +1,57 @@
+---
+title: "Read Performance"
+weight: 2
+type: docs
+aliases:
+- /maintenance/read-performance.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Read Performance
+
+## Full Compaction
+
+Configure 'full-compaction.delta-commits' perform full-compaction periodically
in Flink writing.
+And it can ensure that partitions are full compacted before writing ends.
+
+### Primary Key Table
+
+For Primary Key Table, it's a 'MergeOnRead' technology. When reading data,
multiple layers of LSM data are merged,
+and the number of parallelism will be limited by the number of buckets.
Although Paimon's merge will be efficient,
+it still cannot catch up with the ordinary AppendOnly table.
+
+If you want to query fast enough in certain scenarios, but can only find older
data, you can:
+
+1. Configure 'full-compaction.delta-commits', when writing data (currently
only Flink), full compaction will be performed periodically.
+2. Configure 'scan.mode' to 'compacted-full', when reading data, snapshot of
full compaction is picked. Read performance is good.
+
+You can flexibly balance query performance and data latency when reading.
+
+### Append Only Table
+
+Small files can slow reading and affect DFS stability. By default, when there
are more than 'compaction.max.file-num'
+(default 50) small files in a single bucket, a compaction is triggered.
However, when there are multiple buckets, many
+small files will be generated.
+
+You can use full-compaction to reduce small files. Full-compaction will
eliminate most small files.
+
+## Format
+
+Paimon has some query optimizations to parquet reading, so parquet will be
slightly faster that orc.
diff --git a/docs/content/maintenance/rescale-bucket.md
b/docs/content/maintenance/rescale-bucket.md
index 4878a837f..89e510333 100644
--- a/docs/content/maintenance/rescale-bucket.md
+++ b/docs/content/maintenance/rescale-bucket.md
@@ -1,6 +1,6 @@
---
title: "Rescale Bucket"
-weight: 3
+weight: 4
type: docs
aliases:
- /maintenance/rescale-bucket.html
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 5c023d244..61a430146 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -38,12 +38,6 @@
<td>Boolean</td>
<td>Whether to force a compaction before commit.</td>
</tr>
- <tr>
- <td><h5>compaction.early-max.file-num</h5></td>
- <td style="word-wrap: break-word;">50</td>
- <td>Integer</td>
- <td>For file set [f_0,...,f_N], the maximum file number to trigger
a compaction for append-only table, even if sum(size(f_i)) < targetFileSize.
This value avoids pending too much small files, which slows down the
performance.</td>
- </tr>
<tr>
<td><h5>compaction.max-size-amplification-percent</h5></td>
<td style="word-wrap: break-word;">200</td>
@@ -56,6 +50,12 @@
<td>Integer</td>
<td>The maximum sorted run number to pick for compaction. This
value avoids merging too much sorted runs at the same time during compaction,
which may lead to OutOfMemoryError.</td>
</tr>
+ <tr>
+ <td><h5>compaction.max.file-num</h5></td>
+ <td style="word-wrap: break-word;">50</td>
+ <td>Integer</td>
+ <td>For file set [f_0,...,f_N], the maximum file number to trigger
a compaction for append-only table, even if sum(size(f_i)) < targetFileSize.
This value avoids pending too much small files, which slows down the
performance.</td>
+ </tr>
<tr>
<td><h5>compaction.min.file-num</h5></td>
<td style="word-wrap: break-word;">5</td>
@@ -92,6 +92,12 @@
<td><p>Enum</p></td>
<td>Specify the message format of data files, currently orc,
parquet and avro are supported.<br /><br />Possible values:<ul><li>"orc": ORC
file format.</li><li>"parquet": Parquet file format.</li><li>"avro": Avro file
format.</li></ul></td>
</tr>
+ <tr>
+ <td><h5>full-compaction.delta-commits</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Integer</td>
+ <td>Full compaction will be constantly triggered after delta
commits.</td>
+ </tr>
<tr>
<td><h5>local-sort.max-num-file-handles</h5></td>
<td style="word-wrap: break-word;">128</td>
@@ -276,7 +282,7 @@
<td><h5>scan.mode</h5></td>
<td style="word-wrap: break-word;">default</td>
<td><p>Enum</p></td>
- <td>Specify the scanning behavior of the source.<br /><br
/>Possible values:<ul><li>"default": Determines actual startup mode according
to other table properties. If "scan.timestamp-millis" is set the actual startup
mode will be "from-timestamp", and if "scan.snapshot-id" is set the actual
startup mode will be "from-snapshot". Otherwise the actual startup mode will be
"latest-full".</li><li>"latest-full": For streaming sources, produces the
latest snapshot on the table upon f [...]
+ <td>Specify the scanning behavior of the source.<br /><br
/>Possible values:<ul><li>"default": Determines actual startup mode according
to other table properties. If "scan.timestamp-millis" is set the actual startup
mode will be "from-timestamp", and if "scan.snapshot-id" is set the actual
startup mode will be "from-snapshot". Otherwise the actual startup mode will be
"latest-full".</li><li>"latest-full": For streaming sources, produces the
latest snapshot on the table upon f [...]
</tr>
<tr>
<td><h5>scan.plan-sort-partition</h5></td>
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 11cfa050b..6d48d12f5 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -8,12 +8,6 @@
</tr>
</thead>
<tbody>
- <tr>
- <td><h5>changelog-producer.compaction-interval</h5></td>
- <td style="word-wrap: break-word;">0 ms</td>
- <td>Duration</td>
- <td>When changelog-producer is set to FULL_COMPACTION, full
compaction will be constantly triggered after this interval.</td>
- </tr>
<tr>
<td><h5>changelog-producer.lookup-wait</h5></td>
<td style="word-wrap: break-word;">true</td>
@@ -26,18 +20,18 @@
<td>String</td>
<td>The log system used to keep changes of the table.<br /><br
/>Possible values:<br /><ul><li>"none": No log system, the data is written only
to file store, and the streaming read will be directly read from the file
store.</li></ul><ul><li>"kafka": Kafka log system, the data is double written
to file store and kafka, and the streaming read will be read from
kafka.</li></ul></td>
</tr>
- <tr>
- <td><h5>scan.parallelism</h5></td>
- <td style="word-wrap: break-word;">(none)</td>
- <td>Integer</td>
- <td>Define a custom parallelism for the scan source. By default,
if this option is not defined, the planner will derive the parallelism for each
statement individually by also considering the global configuration. If user
enable the scan.infer-parallelism, the planner will derive the parallelism by
inferred parallelism.</td>
- </tr>
<tr>
<td><h5>scan.infer-parallelism</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If it is false, parallelism of source are set by
scan.parallelism. Otherwise, source parallelism is inferred from splits number
(batch mode) or bucket number(streaming mode).</td>
</tr>
+ <tr>
+ <td><h5>scan.parallelism</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Integer</td>
+ <td>Define a custom parallelism for the scan source. By default,
if this option is not defined, the planner will derive the parallelism for each
statement individually by also considering the global configuration. If user
enable the scan.infer-parallelism, the planner will derive the parallelism by
inferred parallelism.</td>
+ </tr>
<tr>
<td><h5>scan.split-enumerator.batch-size</h5></td>
<td style="word-wrap: break-word;">10</td>
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 536bd3b47..865ba1b3a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -300,9 +300,10 @@ public class CoreOptions implements Serializable {
+ "which is not cost-effective.");
public static final ConfigOption<Integer> COMPACTION_MAX_FILE_NUM =
- key("compaction.early-max.file-num")
+ key("compaction.max.file-num")
.intType()
.defaultValue(50)
+ .withDeprecatedKeys("compaction.early-max.file-num")
.withDescription(
"For file set [f_0,...,f_N], the maximum file
number to trigger a compaction "
+ "for append-only table, even if
sum(size(f_i)) < targetFileSize. This value "
@@ -551,6 +552,13 @@ public class CoreOptions implements Serializable {
.withDescription(
"Should assert disorder files, this just for
compatibility with older versions.");
+ public static final ConfigOption<Integer> FULL_COMPACTION_DELTA_COMMITS =
+ key("full-compaction.delta-commits")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "Full compaction will be constantly triggered
after delta commits.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -860,7 +868,8 @@ public class CoreOptions implements Serializable {
"For streaming sources, produces a snapshot after the latest
compaction on the table "
+ "upon first startup, and continue to read the latest
changes. "
+ "For batch sources, just produce a snapshot after
the latest compaction "
- + "but does not read new changes."),
+ + "but does not read new changes. Snapshots of full
compaction are picked "
+ + "when scheduled full-compaction is enabled."),
FROM_TIMESTAMP(
"from-timestamp",
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java
index 4ff3bb795..b598ca181 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java
@@ -23,21 +23,16 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compact.CompactFutureManager;
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.compact.CompactTask;
-import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.utils.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
-import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -45,36 +40,32 @@ import java.util.concurrent.ExecutorService;
/** Compact manager for {@link AppendOnlyFileStore}. */
public class AppendOnlyCompactManager extends CompactFutureManager {
- private final FileIO fileIO;
+ private static final int FULL_COMPACT_MIN_FILE = 3;
+
private final ExecutorService executor;
private final TreeSet<DataFileMeta> toCompact;
private final int minFileNum;
private final int maxFileNum;
private final long targetFileSize;
private final CompactRewriter rewriter;
- private final DataFilePathFactory pathFactory;
- private final boolean assertDisorder;
+
+ private List<DataFileMeta> compacting;
public AppendOnlyCompactManager(
- FileIO fileIO,
ExecutorService executor,
List<DataFileMeta> restored,
int minFileNum,
int maxFileNum,
long targetFileSize,
CompactRewriter rewriter,
- DataFilePathFactory pathFactory,
boolean assertDisorder) {
- this.fileIO = fileIO;
this.executor = executor;
- this.assertDisorder = assertDisorder;
this.toCompact = new TreeSet<>(fileComparator(assertDisorder));
this.toCompact.addAll(restored);
this.minFileNum = minFileNum;
this.maxFileNum = maxFileNum;
this.targetFileSize = targetFileSize;
this.rewriter = rewriter;
- this.pathFactory = pathFactory;
}
@Override
@@ -91,28 +82,24 @@ public class AppendOnlyCompactManager extends
CompactFutureManager {
taskFuture == null,
"A compaction task is still running while the user "
+ "forces a new compaction. This is unexpected.");
- taskFuture =
- executor.submit(
- new AppendOnlyCompactManager.IterativeCompactTask(
- fileIO,
- toCompact,
- targetFileSize,
- minFileNum,
- maxFileNum,
- rewriter,
- pathFactory,
- assertDisorder));
+ if (toCompact.size() < FULL_COMPACT_MIN_FILE) {
+ return;
+ }
+
+ taskFuture = executor.submit(new FullCompactTask(toCompact,
targetFileSize, rewriter));
+ compacting = new ArrayList<>(toCompact);
+ toCompact.clear();
}
private void triggerCompactionWithBestEffort() {
if (taskFuture != null) {
return;
}
- pickCompactBefore()
- .ifPresent(
- (inputs) ->
- taskFuture =
- executor.submit(new
AutoCompactTask(inputs, rewriter)));
+ Optional<List<DataFileMeta>> picked = pickCompactBefore();
+ if (picked.isPresent()) {
+ compacting = picked.get();
+ taskFuture = executor.submit(new AutoCompactTask(compacting,
rewriter));
+ }
}
@Override
@@ -126,8 +113,13 @@ public class AppendOnlyCompactManager extends
CompactFutureManager {
}
@Override
- public Collection<DataFileMeta> allFiles() {
- return toCompact;
+ public List<DataFileMeta> allFiles() {
+ List<DataFileMeta> allFiles = new ArrayList<>();
+ if (compacting != null) {
+ allFiles.addAll(compacting);
+ }
+ allFiles.addAll(toCompact);
+ return allFiles;
}
/** Finish current task, and update result files to {@link #toCompact}. */
@@ -135,27 +127,23 @@ public class AppendOnlyCompactManager extends
CompactFutureManager {
public Optional<CompactResult> getCompactionResult(boolean blocking)
throws ExecutionException, InterruptedException {
Optional<CompactResult> result = innerGetCompactionResult(blocking);
- result.ifPresent(
- r -> {
- if (!r.after().isEmpty()) {
- // if the last compacted file is still small,
- // add it back to the head
- DataFileMeta lastFile = r.after().get(r.after().size()
- 1);
- if (lastFile.fileSize() < targetFileSize) {
- toCompact.add(lastFile);
- }
- }
- });
+ if (result.isPresent()) {
+ CompactResult compactResult = result.get();
+ if (!compactResult.after().isEmpty()) {
+ // if the last compacted file is still small,
+ // add it back to the head
+ DataFileMeta lastFile =
compactResult.after().get(compactResult.after().size() - 1);
+ if (lastFile.fileSize() < targetFileSize) {
+ toCompact.add(lastFile);
+ }
+ }
+ compacting = null;
+ }
return result;
}
@VisibleForTesting
Optional<List<DataFileMeta>> pickCompactBefore() {
- return pick(toCompact, targetFileSize, minFileNum, maxFileNum);
- }
-
- private static Optional<List<DataFileMeta>> pick(
- TreeSet<DataFileMeta> toCompact, long targetFileSize, int
minFileNum, int maxFileNum) {
if (toCompact.isEmpty()) {
return Optional.empty();
}
@@ -192,82 +180,51 @@ public class AppendOnlyCompactManager extends
CompactFutureManager {
@Override
public void close() throws IOException {}
- /**
- * A {@link CompactTask} impl for full compaction of append-only table.
- *
- * <p>This task accepts a pre-scanned file list as input and pick the
candidate files to compact
- * iteratively until reach the end of the input. There might be multiple
times of rewrite
- * happens during one task.
- */
- public static class IterativeCompactTask extends CompactTask {
+ /** A {@link CompactTask} impl for full compaction of append-only table. */
+ public static class FullCompactTask extends CompactTask {
- private final FileIO fileIO;
- private final Collection<DataFileMeta> inputs;
+ private final LinkedList<DataFileMeta> inputs;
private final long targetFileSize;
- private final int minFileNum;
- private final int maxFileNum;
private final CompactRewriter rewriter;
- private final DataFilePathFactory factory;
- private final boolean assertDisorder;
-
- public IterativeCompactTask(
- FileIO fileIO,
- Collection<DataFileMeta> inputs,
- long targetFileSize,
- int minFileNum,
- int maxFileNum,
- CompactRewriter rewriter,
- DataFilePathFactory factory,
- boolean assertDisorder) {
- this.fileIO = fileIO;
- this.inputs = inputs;
+
+ public FullCompactTask(
+ Collection<DataFileMeta> inputs, long targetFileSize,
CompactRewriter rewriter) {
+ this.inputs = new LinkedList<>(inputs);
this.targetFileSize = targetFileSize;
- this.minFileNum = minFileNum;
- this.maxFileNum = maxFileNum;
this.rewriter = rewriter;
- this.factory = factory;
- this.assertDisorder = assertDisorder;
}
@Override
protected CompactResult doCompact() throws Exception {
- TreeSet<DataFileMeta> toCompact = new
TreeSet<>(fileComparator(assertDisorder));
- toCompact.addAll(inputs);
- Set<DataFileMeta> compactBefore = new LinkedHashSet<>();
- List<DataFileMeta> compactAfter = new ArrayList<>();
- while (!toCompact.isEmpty()) {
- Optional<List<DataFileMeta>> candidates =
- AppendOnlyCompactManager.pick(
- toCompact, targetFileSize, minFileNum,
maxFileNum);
- if (candidates.isPresent()) {
- List<DataFileMeta> before = candidates.get();
- compactBefore.addAll(before);
- List<DataFileMeta> after = rewriter.rewrite(before);
- compactAfter.addAll(after);
- DataFileMeta lastFile = after.get(after.size() - 1);
- if (lastFile.fileSize() < targetFileSize) {
- toCompact.add(lastFile);
- }
- } else {
- break;
+ // remove large files
+ while (!inputs.isEmpty()) {
+ DataFileMeta file = inputs.peekFirst();
+ if (file.fileSize() >= targetFileSize) {
+ inputs.poll();
+ continue;
}
+ break;
}
- // remove and delete intermediate files
- Iterator<DataFileMeta> afterIterator = compactAfter.iterator();
- while (afterIterator.hasNext()) {
- DataFileMeta file = afterIterator.next();
- if (compactBefore.contains(file)) {
- compactBefore.remove(file);
- afterIterator.remove();
- delete(file);
+
+ // compute small files
+ int big = 0;
+ int small = 0;
+ for (DataFileMeta file : inputs) {
+ if (file.fileSize() >= targetFileSize) {
+ big++;
+ } else {
+ small++;
}
}
- return result(new ArrayList<>(compactBefore), compactAfter);
- }
- @VisibleForTesting
- void delete(DataFileMeta tmpFile) {
- fileIO.deleteQuietly(factory.toPath(tmpFile.fileName()));
+ // do compaction
+ List<DataFileMeta> compactBefore = new ArrayList<>();
+ List<DataFileMeta> compactAfter = new ArrayList<>();
+ if (small > big && inputs.size() >= FULL_COMPACT_MIN_FILE) {
+ compactBefore = new ArrayList<>(inputs);
+ compactAfter = rewriter.rewrite(inputs);
+ }
+ return result(new ArrayList<>(compactBefore), compactAfter);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index 3eac01f34..52f0fc2e9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -105,14 +105,12 @@ public class AppendOnlyFileStoreWrite extends
AbstractFileStoreWrite<InternalRow
skipCompaction
? new NoopCompactManager()
: new AppendOnlyCompactManager(
- fileIO,
compactExecutor,
restoredFiles,
compactionMinFileNum,
compactionMaxFileNum,
targetFileSize,
compactRewriter(partition, bucket),
- factory,
assertDisorder);
return new AppendOnlyWriter(
fileIO,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 7c4f3bad7..345535d7e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -19,12 +19,14 @@
package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
import
org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotStartingScanner;
import
org.apache.paimon.table.source.snapshot.ContinuousFromTimestampStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousLatestStartingScanner;
+import org.apache.paimon.table.source.snapshot.FullCompactedStartingScanner;
import org.apache.paimon.table.source.snapshot.FullStartingScanner;
import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
@@ -32,6 +34,8 @@ import
org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner
import
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
import org.apache.paimon.utils.Preconditions;
+import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
+
/** An abstraction layer above {@link FileStoreScan} to provide input split
generation. */
public abstract class AbstractDataTableScan implements DataTableScan {
@@ -63,7 +67,16 @@ public abstract class AbstractDataTableScan implements
DataTableScan {
? new ContinuousLatestStartingScanner()
: new FullStartingScanner();
case COMPACTED_FULL:
- return new CompactedStartingScanner();
+ if (options.changelogProducer() ==
ChangelogProducer.FULL_COMPACTION
+ ||
options.toConfiguration().contains(FULL_COMPACTION_DELTA_COMMITS)) {
+ int deltaCommits =
+ options.toConfiguration()
+ .getOptional(FULL_COMPACTION_DELTA_COMMITS)
+ .orElse(1);
+ return new FullCompactedStartingScanner(deltaCommits);
+ } else {
+ return new CompactedStartingScanner();
+ }
case FROM_TIMESTAMP:
Long startupMillis = options.scanTimestampMills();
Preconditions.checkNotNull(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
new file mode 100644
index 000000000..bb7b38b4b
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source.snapshot;
+
+import org.apache.paimon.CoreOptions.StartupMode;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.Snapshot.CommitKind;
+import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link StartingScanner} for the {@link StartupMode#COMPACTED_FULL} startup
mode with
+ * 'full-compaction.delta-commits'.
+ */
+public class FullCompactedStartingScanner implements StartingScanner {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FullCompactedStartingScanner.class);
+
+ private final int deltaCommits;
+
+ public FullCompactedStartingScanner(int deltaCommits) {
+ this.deltaCommits = deltaCommits;
+ }
+
+ @Override
+ @Nullable
+ public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader
snapshotSplitReader) {
+ Long startingSnapshotId = snapshotManager.pickSnapshot(this::picked);
+ if (startingSnapshotId == null) {
+ LOG.debug("There is currently no compact snapshot. Waiting for
snapshot generation.");
+ return null;
+ }
+ return new Result(
+ startingSnapshotId,
+ snapshotSplitReader
+ .withKind(ScanKind.ALL)
+ .withSnapshot(startingSnapshotId)
+ .splits());
+ }
+
+ private boolean picked(Snapshot snapshot) {
+ long identifier = snapshot.commitIdentifier();
+ return snapshot.commitKind() == CommitKind.COMPACT
+ && isFullCompactedIdentifier(identifier, deltaCommits);
+ }
+
+ public static boolean isFullCompactedIdentifier(long identifier, int
deltaCommits) {
+ return identifier % deltaCommits == 0 || identifier == Long.MAX_VALUE;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 9d0bf13e7..8ef5c8d5f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -19,6 +19,7 @@
package org.apache.paimon.utils;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.Snapshot.CommitKind;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -31,6 +32,7 @@ import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BinaryOperator;
+import java.util.function.Predicate;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
@@ -98,6 +100,10 @@ public class SnapshotManager implements Serializable {
}
public @Nullable Long latestCompactedSnapshotId() {
+ return pickSnapshot(s -> s.commitKind() == CommitKind.COMPACT);
+ }
+
+ public @Nullable Long pickSnapshot(Predicate<Snapshot> predicate) {
Long latestId = latestSnapshotId();
Long earliestId = earliestSnapshotId();
if (latestId == null || earliestId == null) {
@@ -107,7 +113,7 @@ public class SnapshotManager implements Serializable {
for (long snapshotId = latestId; snapshotId >= earliestId;
snapshotId--) {
if (snapshotExists(snapshotId)) {
Snapshot snapshot = snapshot(snapshotId);
- if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+ if (predicate.test(snapshot)) {
return snapshot.id();
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyCompactManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyCompactManagerTest.java
index 045902c7a..695e411df 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyCompactManagerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyCompactManagerTest.java
@@ -18,7 +18,6 @@
package org.apache.paimon.append;
-import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
import org.junit.jupiter.api.Test;
@@ -26,7 +25,6 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
@@ -242,14 +240,12 @@ public class AppendOnlyCompactManagerTest {
long targetFileSize = 1024;
AppendOnlyCompactManager manager =
new AppendOnlyCompactManager(
- LocalFileIO.create(),
null, // not used
- new LinkedList<>(toCompactBeforePick),
+ toCompactBeforePick,
minFileNum,
maxFileNum,
targetFileSize,
null, // not used
- null, // not used
false);
Optional<List<DataFileMeta>> actual = manager.pickCompactBefore();
assertThat(actual.isPresent()).isEqualTo(expectedPresent);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index a0982e2fa..5ac63cb9d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -45,11 +45,9 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.Executors;
@@ -236,10 +234,10 @@ public class AppendOnlyWriterTest {
// increase target file size to test compaction
long targetFileSize = 1024 * 1024L;
- Pair<AppendOnlyWriter, TreeSet<DataFileMeta>> writerAndToCompact =
+ Pair<AppendOnlyWriter, List<DataFileMeta>> writerAndToCompact =
createWriter(targetFileSize, true,
firstInc.newFilesIncrement().newFiles());
writer = writerAndToCompact.getLeft();
- TreeSet<DataFileMeta> toCompact = writerAndToCompact.getRight();
+ List<DataFileMeta> toCompact = writerAndToCompact.getRight();
assertThat(toCompact).containsExactlyElementsOf(firstInc.newFilesIncrement().newFiles());
writer.write(row(id, String.format("%03d", id), PART));
writer.sync();
@@ -262,20 +260,6 @@ public class AppendOnlyWriterTest {
assertThat(compactBefore.get(compactBefore.size() -
1).maxSequenceNumber())
.isEqualTo(compactAfter.get(compactAfter.size() -
1).maxSequenceNumber());
assertThat(secInc.newFilesIncrement().newFiles()).hasSize(1);
-
- /* check toCompact[round + 1] is composed of
- * <1> the compactAfter[round] (due to small size)
- * <2> the rest of toCompact[round]
- * <3> the newFiles[round]
- * with strict order
- */
- List<DataFileMeta> toCompactResult = new ArrayList<>(compactAfter);
- toCompactResult.addAll(
- firstInc.newFilesIncrement()
- .newFiles()
- .subList(4,
firstInc.newFilesIncrement().newFiles().size()));
- toCompactResult.addAll(secInc.newFilesIncrement().newFiles());
- assertThat(toCompact).containsExactlyElementsOf(toCompactResult);
}
private FieldStats initStats(Integer min, Integer max, long nullCount) {
@@ -303,13 +287,12 @@ public class AppendOnlyWriterTest {
return createWriter(targetFileSize, false,
Collections.emptyList()).getLeft();
}
- private Pair<AppendOnlyWriter, TreeSet<DataFileMeta>> createWriter(
+ private Pair<AppendOnlyWriter, List<DataFileMeta>> createWriter(
long targetFileSize, boolean forceCompact, List<DataFileMeta>
scannedFiles) {
FileFormat fileFormat = FileFormat.fromIdentifier(AVRO, new Options());
LinkedList<DataFileMeta> toCompact = new LinkedList<>(scannedFiles);
AppendOnlyCompactManager compactManager =
new AppendOnlyCompactManager(
- LocalFileIO.create(),
Executors.newSingleThreadScheduledExecutor(
new
ExecutorThreadFactory("compaction-thread")),
toCompact,
@@ -321,7 +304,6 @@ public class AppendOnlyWriterTest {
? Collections.emptyList()
: Collections.singletonList(
generateCompactAfter(compactBefore)),
- pathFactory,
false);
AppendOnlyWriter writer =
new AppendOnlyWriter(
@@ -335,7 +317,7 @@ public class AppendOnlyWriterTest {
forceCompact,
pathFactory,
null);
- return Pair.of(writer, (TreeSet<DataFileMeta>)
compactManager.allFiles());
+ return Pair.of(writer, compactManager.allFiles());
}
private DataFileMeta generateCompactAfter(List<DataFileMeta> toCompact) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
new file mode 100644
index 000000000..dee5ba4f1
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append;
+
+import org.apache.paimon.compact.CompactResult;
+import org.apache.paimon.io.DataFileMeta;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.paimon.io.DataFileTestUtils.newFile;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/** Test for {@link FullCompactTaskTest}. */
+public class FullCompactTaskTest {
+
+ private static final long TARGET_FILE_SIZE = 1024L;
+
+ @Test
+ public void testNoCompact() {
+ // empty
+ innerTest(Collections.emptyList(), Collections.emptyList(),
Collections.emptyList());
+
+ // single small file
+ innerTest(
+ Collections.singletonList(newFile(1L, 10L)),
+ Collections.emptyList(),
+ Collections.emptyList());
+
+ // single large file
+ innerTest(
+ Collections.singletonList(newFile(1L, 1024L)),
+ Collections.emptyList(),
+ Collections.emptyList());
+
+ // large files
+ innerTest(
+ Arrays.asList(newFile(1L, 1024L), newFile(1025L, 2048L),
newFile(2049, 3200)),
+ Collections.emptyList(),
+ Collections.emptyList());
+
+ // large files more than small files
+ innerTest(
+ Arrays.asList(
+ newFile(1L, 1024L),
+ newFile(1025L, 2048L),
+ newFile(2049, 3200),
+ newFile(3201, 3220),
+ newFile(3221, 3280)),
+ Collections.emptyList(),
+ Collections.emptyList());
+ }
+
+ @Test
+ public void testCompact() {
+ innerTest(
+ Arrays.asList(
+ newFile(1L, 500L),
+ newFile(501L, 1000L),
+ newFile(1001L, 1010L),
+ newFile(1011L, 1024L),
+ newFile(1025L, 3000L)),
+ Arrays.asList(
+ newFile(1L, 500L),
+ newFile(501L, 1000L),
+ newFile(1001L, 1010L),
+ newFile(1011L, 1024L),
+ newFile(1025L, 3000L)),
+ Arrays.asList(newFile(1L, 1024L), newFile(1025L, 2048L),
newFile(2049, 3000)));
+
+ // no head
+ innerTest(
+ Arrays.asList(
+ newFile(1L, 1025),
+ newFile(1026, 1080),
+ newFile(1090, 1500),
+ newFile(1600, 1900)),
+ Arrays.asList(newFile(1026, 1080), newFile(1090, 1500),
newFile(1600, 1900)),
+ Collections.singletonList(newFile(1026, 1900)));
+ }
+
+ private void innerTest(
+ List<DataFileMeta> compactFiles,
+ List<DataFileMeta> expectBefore,
+ List<DataFileMeta> expectAfter) {
+ MockFullCompactTask task =
+ new MockFullCompactTask(compactFiles, TARGET_FILE_SIZE,
rewriter());
+ try {
+ CompactResult actual = task.doCompact();
+
assertThat(actual.before()).containsExactlyInAnyOrderElementsOf(expectBefore);
+
assertThat(actual.after()).containsExactlyInAnyOrderElementsOf(expectAfter);
+ } catch (Exception e) {
+ fail("This should not happen", e);
+ }
+ }
+
+ /** A Mock {@link AppendOnlyCompactManager.FullCompactTask} to test. */
+ private static class MockFullCompactTask extends
AppendOnlyCompactManager.FullCompactTask {
+
+ public MockFullCompactTask(
+ Collection<DataFileMeta> inputs,
+ long targetFileSize,
+ AppendOnlyCompactManager.CompactRewriter rewriter) {
+ super(inputs, targetFileSize, rewriter);
+ }
+ }
+
+ private AppendOnlyCompactManager.CompactRewriter rewriter() {
+ return compactBefore -> {
+ List<DataFileMeta> compactAfter = new ArrayList<>();
+ long totalFileSize = 0L;
+ long minSeq = -1L;
+ for (int i = 0; i < compactBefore.size(); i++) {
+ DataFileMeta file = compactBefore.get(i);
+ if (i == 0) {
+ minSeq = file.minSequenceNumber();
+ }
+ totalFileSize += file.fileSize();
+ if (totalFileSize >= TARGET_FILE_SIZE) {
+ compactAfter.add(newFile(minSeq, minSeq + TARGET_FILE_SIZE
- 1));
+ minSeq += TARGET_FILE_SIZE;
+ }
+ if (i == compactBefore.size() - 1 && minSeq <=
file.maxSequenceNumber()) {
+ compactAfter.add(newFile(minSeq,
file.maxSequenceNumber()));
+ }
+ }
+ return compactAfter;
+ };
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/IterativeCompactTaskTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/IterativeCompactTaskTest.java
deleted file mode 100644
index d79bbd7b2..000000000
---
a/paimon-core/src/test/java/org/apache/paimon/append/IterativeCompactTaskTest.java
+++ /dev/null
@@ -1,307 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.append;
-
-import org.apache.paimon.compact.CompactResult;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.io.DataFileMeta;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static org.apache.paimon.io.DataFileTestUtils.newFile;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.fail;
-
-/** Test for {@link IterativeCompactTaskTest}. */
-public class IterativeCompactTaskTest {
-
- private static final long TARGET_FILE_SIZE = 1024L;
- private static final int MIN_FILE_NUM = 3;
- private static final int MAX_FILE_NUM = 10;
-
- @Test
- public void testNoCompact() {
- // empty
- innerTest(
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.emptyList());
-
- // single small file
- innerTest(
- Collections.singletonList(newFile(1L, 10L)),
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.emptyList());
-
- // single large file
- innerTest(
- Collections.singletonList(newFile(1L, 1024L)),
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.emptyList());
-
- // almost-full files
- innerTest(
- Arrays.asList(newFile(1L, 1024L), newFile(1025L, 2048L)),
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.emptyList());
-
- // large files
- innerTest(
- Arrays.asList(newFile(1L, 1000L), newFile(1001L, 1100L)),
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.emptyList());
- }
-
- @Test
- public void testCompactOnce() {
- // small files on the head
- innerTest(
- Arrays.asList(
- newFile(1L, 500L),
- newFile(501L, 1000L),
- newFile(1001L, 1010L),
- newFile(1011L, 1024L),
- newFile(1025L, 3000L)),
- Arrays.asList(
- newFile(1L, 500L),
- newFile(501L, 1000L),
- newFile(1001L, 1010L),
- newFile(1011L, 1024L)),
- Collections.singletonList(newFile(1L, 1024L)),
- Collections.emptyList());
-
- innerTest(
- Arrays.asList(
- newFile(1L, 500L),
- newFile(501L, 1000L),
- newFile(1001L, 1010L),
- newFile(1011L, 2000L),
- newFile(2001L, 3000L)),
- Arrays.asList(
- newFile(1L, 500L),
- newFile(501L, 1000L),
- newFile(1001L, 1010L),
- newFile(1011L, 2000L)),
- Arrays.asList(newFile(1L, 1024L), newFile(1025L, 2000L)),
- Collections.emptyList());
-
- // small files in the middle
- innerTest(
- Arrays.asList(
- newFile(1L, 2000L),
- newFile(2001L, 4000L),
- newFile(4001L, 4500L),
- newFile(4501L, 4600L),
- newFile(4601L, 4700L),
- newFile(4701L, 5024L),
- newFile(5025L, 7000L)),
- Arrays.asList(
- newFile(4001L, 4500L),
- newFile(4501L, 4600L),
- newFile(4601L, 4700L),
- newFile(4701L, 5024L)),
- Collections.singletonList(newFile(4001L, 5024L)),
- Collections.emptyList());
-
- // small files on the tail
- innerTest(
- Arrays.asList(
- newFile(1L, 2000L),
- newFile(2001L, 4000L),
- newFile(4001L, 4010L),
- newFile(4011L, 4020L),
- newFile(4021L, 4030L),
- newFile(4031L, 4040L),
- newFile(4041L, 4050L),
- newFile(4051L, 4060L),
- newFile(4061L, 4070L),
- newFile(4071L, 4080L),
- newFile(4081L, 4090L),
- newFile(4091L, 4110L)),
- Arrays.asList(
- newFile(4001L, 4010L),
- newFile(4011L, 4020L),
- newFile(4021L, 4030L),
- newFile(4031L, 4040L),
- newFile(4041L, 4050L),
- newFile(4051L, 4060L),
- newFile(4061L, 4070L),
- newFile(4071L, 4080L),
- newFile(4081L, 4090L),
- newFile(4091L, 4110L)),
- Collections.singletonList(newFile(4001L, 4110L)),
- Collections.emptyList());
- }
-
- @Test
- public void testCompactMultiple() {
- // continuous compact
- innerTest(
- Arrays.asList(
- newFile(1L, 2000L),
- newFile(2001L, 4000L),
- // 4001~4010, ..., 4091~4110
- newFile(4001L, 4010L),
- newFile(4011L, 4020L),
- newFile(4021L, 4030L),
- newFile(4031L, 4040L),
- newFile(4041L, 4050L),
- newFile(4051L, 4060L),
- newFile(4061L, 4070L),
- newFile(4071L, 4080L),
- newFile(4081L, 4090L),
- newFile(4091L, 4110L),
- // 4001~4110, 5015~5024
- newFile(4111L, 5000L),
- newFile(5001L, 5014L),
- newFile(5015L, 5024L)),
- Arrays.asList(
- newFile(4001L, 4010L),
- newFile(4011L, 4020L),
- newFile(4021L, 4030L),
- newFile(4031L, 4040L),
- newFile(4041L, 4050L),
- newFile(4051L, 4060L),
- newFile(4061L, 4070L),
- newFile(4071L, 4080L),
- newFile(4081L, 4090L),
- newFile(4091L, 4110L),
- newFile(4111L, 5000L),
- newFile(5001L, 5014L),
- newFile(5015L, 5024L)),
- Collections.singletonList(newFile(4001L, 5024L)),
- Collections.singletonList(newFile(4001L, 4110L)));
-
- // alternate compact
- innerTest(
- Arrays.asList(
- newFile(1L, 2000L),
- newFile(2001L, 4000L),
- // 4001~4500, ..., 4701~6000
- newFile(4001L, 4500L),
- newFile(4501L, 4600L),
- newFile(4601L, 4700L),
- newFile(4701L, 6000L),
- newFile(6001L, 7500L),
- // 7501~8000, 8201~8900
- newFile(7501L, 8000L),
- newFile(8001L, 8200L),
- newFile(8201L, 8900L),
- newFile(8901L, 9550L)),
- Arrays.asList(
- newFile(4001L, 4500L),
- newFile(4501L, 4600L),
- newFile(4601L, 4700L),
- newFile(4701L, 6000L),
- newFile(7501L, 8000L),
- newFile(8001L, 8200L),
- newFile(8201L, 8900L)),
- Arrays.asList(
- newFile(4001L, 5024L),
- newFile(5025L, 6000L),
- newFile(7501L, 8524L),
- newFile(8525L, 8900L)),
- Collections.emptyList());
- }
-
- private void innerTest(
- List<DataFileMeta> compactFiles,
- List<DataFileMeta> expectBefore,
- List<DataFileMeta> expectAfter,
- List<DataFileMeta> expectDeleted) {
- MockIterativeCompactTask task =
- new MockIterativeCompactTask(
- compactFiles, TARGET_FILE_SIZE, MIN_FILE_NUM,
MAX_FILE_NUM, rewriter());
- try {
- CompactResult actual = task.doCompact();
-
assertThat(actual.before()).containsExactlyInAnyOrderElementsOf(expectBefore);
-
assertThat(actual.after()).containsExactlyInAnyOrderElementsOf(expectAfter);
-
- // assert the temporary files are deleted
-
assertThat(task.deleted).containsExactlyInAnyOrderElementsOf(expectDeleted);
- } catch (Exception e) {
- fail("This should not happen", e);
- }
- }
-
- /** A Mock {@link AppendOnlyCompactManager.IterativeCompactTask} to test.
*/
- private static class MockIterativeCompactTask
- extends AppendOnlyCompactManager.IterativeCompactTask {
-
- private final Set<DataFileMeta> deleted;
-
- public MockIterativeCompactTask(
- List<DataFileMeta> inputs,
- long targetFileSize,
- int minFileNum,
- int maxFileNum,
- AppendOnlyCompactManager.CompactRewriter rewriter) {
- super(
- LocalFileIO.create(),
- inputs,
- targetFileSize,
- minFileNum,
- maxFileNum,
- rewriter,
- null,
- false);
- deleted = new HashSet<>();
- }
-
- @Override
- void delete(DataFileMeta tmpFile) {
- deleted.add(tmpFile);
- }
- }
-
- private AppendOnlyCompactManager.CompactRewriter rewriter() {
- return compactBefore -> {
- List<DataFileMeta> compactAfter = new ArrayList<>();
- long totalFileSize = 0L;
- long minSeq = -1L;
- for (int i = 0; i < compactBefore.size(); i++) {
- DataFileMeta file = compactBefore.get(i);
- if (i == 0) {
- minSeq = file.minSequenceNumber();
- }
- totalFileSize += file.fileSize();
- if (totalFileSize >= TARGET_FILE_SIZE) {
- compactAfter.add(newFile(minSeq, minSeq + TARGET_FILE_SIZE
- 1));
- minSeq += TARGET_FILE_SIZE;
- }
- if (i == compactBefore.size() - 1 && minSeq <=
file.maxSequenceNumber()) {
- compactAfter.add(newFile(minSeq,
file.maxSequenceNumber()));
- }
- }
- return compactAfter;
- };
- }
-}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 6e85d546b..60eb362c6 100644
---
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -69,15 +69,7 @@ public class FileFormatSuffixTest extends
KeyValueFileReadWriteTest {
SCHEMA,
0,
new AppendOnlyCompactManager(
- LocalFileIO.create(),
- null,
- toCompact,
- 4,
- 10,
- 10,
- null,
- dataFilePathFactory,
- false), // not used
+ null, toCompact, 4, 10, 10, null, false), //
not used
false,
dataFilePathFactory,
null);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
index 4dc4298d3..c957ddf28 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
@@ -40,6 +40,7 @@ import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.InnerTableCommit;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
@@ -66,6 +67,7 @@ import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
+import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -816,6 +818,42 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
+ " you can configure
'fields.${field_name}.ignore-retract'='true'");
}
+ @Test
+ public void testFullCompactedRead() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "2");
+ options.put(CoreOptions.SCAN_MODE.key(), "compacted-full");
+ options.put(BUCKET.key(), "1");
+ FileStoreTable table = createFileStoreTable().copy(options);
+
+ StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
+ StreamTableWrite write = writeBuilder.newWrite();
+ StreamTableCommit commit = writeBuilder.newCommit();
+
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(1, 10, 200L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ write.write(rowData(1, 10, 300L));
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(3, write.prepareCommit(true, 3));
+
+ write.close();
+
+ ReadBuilder readBuilder = table.newReadBuilder();
+ assertThat(
+ getResult(
+ readBuilder.newRead(),
+ readBuilder.newScan().plan().splits(),
+ BATCH_ROW_TO_STRING))
+
.containsExactly("1|10|200|binary|varbinary|mapKey:mapVal|multiset");
+ }
+
@Override
protected FileStoreTable createFileStoreTable(Consumer<Options> configure)
throws Exception {
return createFileStoreTable(configure, ROW_TYPE);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 91dd1b8be..a1302cdfd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.Documentation.ExcludeFromDocumentation;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.description.DescribedEnum;
@@ -111,6 +112,8 @@ public class FlinkConnectorOptions {
+ " Downstream can see
intermediate state.")
.build());
+ @Deprecated
+ @ExcludeFromDocumentation("Deprecated")
public static final ConfigOption<Duration>
CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL =
key("changelog-producer.compaction-interval")
.durationType()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index d9565f9af..f0fe07081 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
@@ -40,6 +41,7 @@ import org.apache.flink.util.function.SerializableFunction;
import java.io.Serializable;
import java.util.UUID;
+import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
import static
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT;
@@ -60,34 +62,47 @@ public abstract class FlinkSink<T> implements Serializable {
}
protected StoreSinkWrite.Provider createWriteProvider(String
initialCommitUser) {
- if (!table.coreOptions().writeOnly()) {
+ boolean waitCompaction;
+ if (table.coreOptions().writeOnly()) {
+ waitCompaction = false;
+ } else {
Options options = table.coreOptions().toConfiguration();
- switch (table.coreOptions().changelogProducer()) {
- case FULL_COMPACTION:
- long fullCompactionThresholdMs =
-
options.get(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)
- .toMillis();
- return (table, context, ioManager) ->
- new FullChangelogStoreSinkWrite(
- table,
- context,
- initialCommitUser,
- ioManager,
- isOverwrite,
- fullCompactionThresholdMs);
- case LOOKUP:
- if (options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT)) {
- return (table, context, ioManager) ->
- new LookupChangelogStoreSinkWrite(
- table, context, initialCommitUser,
ioManager, isOverwrite);
- }
- break;
- default:
+ ChangelogProducer changelogProducer =
table.coreOptions().changelogProducer();
+ if (changelogProducer == ChangelogProducer.FULL_COMPACTION
+ &&
options.contains(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
+ long fullCompactionThresholdMs =
+
options.get(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL).toMillis();
+ return (table, context, ioManager) ->
+ new FullChangelogStoreSinkWrite(
+ table,
+ context,
+ initialCommitUser,
+ ioManager,
+ isOverwrite,
+ fullCompactionThresholdMs);
+ }
+
+ waitCompaction =
+ changelogProducer == ChangelogProducer.LOOKUP
+ && options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT);
+ if (changelogProducer == ChangelogProducer.FULL_COMPACTION
+ || options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
+ int deltaCommits =
options.getOptional(FULL_COMPACTION_DELTA_COMMITS).orElse(1);
+ return (table, context, ioManager) ->
+ new GlobalFullCompactionSinkWrite(
+ table,
+ context,
+ initialCommitUser,
+ ioManager,
+ isOverwrite,
+ waitCompaction,
+ deltaCommits);
}
}
return (table, context, ioManager) ->
- new StoreSinkWriteImpl(table, context, initialCommitUser,
ioManager, isOverwrite);
+ new StoreSinkWriteImpl(
+ table, context, initialCommitUser, ioManager,
isOverwrite, waitCompaction);
}
public DataStreamSink<?> sinkFrom(DataStream<T> input) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FullChangelogStoreSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FullChangelogStoreSinkWrite.java
index c733c6435..932a8bb01 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FullChangelogStoreSinkWrite.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FullChangelogStoreSinkWrite.java
@@ -54,6 +54,8 @@ import java.util.TreeSet;
/**
* {@link StoreSinkWrite} for {@link
CoreOptions.ChangelogProducer#FULL_COMPACTION} changelog
* producer. This writer will perform full compaction once in a while.
+ *
+ * @deprecated use {@link GlobalFullCompactionSinkWrite}.
*/
public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
@@ -80,7 +82,7 @@ public class FullChangelogStoreSinkWrite extends
StoreSinkWriteImpl {
boolean isOverwrite,
long fullCompactionThresholdMs)
throws Exception {
- super(table, context, initialCommitUser, ioManager, isOverwrite);
+ super(table, context, initialCommitUser, ioManager, isOverwrite,
false);
this.snapshotManager = table.snapshotManager();
this.fullCompactionThresholdMs = fullCompactionThresholdMs;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FullChangelogStoreSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
similarity index 79%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FullChangelogStoreSinkWrite.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
index c733c6435..ac666ff12 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FullChangelogStoreSinkWrite.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.sink;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
@@ -51,41 +50,43 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import static
org.apache.paimon.table.source.snapshot.FullCompactedStartingScanner.isFullCompactedIdentifier;
+
/**
- * {@link StoreSinkWrite} for {@link
CoreOptions.ChangelogProducer#FULL_COMPACTION} changelog
- * producer. This writer will perform full compaction once in a while.
+ * {@link StoreSinkWrite} for execute full compaction globally. All writers
will be full compaction
+ * at the same time (in the specified checkpoint).
*/
-public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
+public class GlobalFullCompactionSinkWrite extends StoreSinkWriteImpl {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(GlobalFullCompactionSinkWrite.class);
- private static final Logger LOG =
LoggerFactory.getLogger(FullChangelogStoreSinkWrite.class);
+ private final int deltaCommits;
private final SnapshotManager snapshotManager;
- private final long fullCompactionThresholdMs;
private final Set<Tuple2<BinaryRow, Integer>> currentWrittenBuckets;
private final NavigableMap<Long, Set<Tuple2<BinaryRow, Integer>>>
writtenBuckets;
private final ListState<Tuple3<Long, BinaryRow, Integer>>
writtenBucketState;
- private Long currentFirstWriteMs;
- private final NavigableMap<Long, Long> firstWriteMs;
- private final ListState<Tuple2<Long, Long>> firstWriteMsState;
-
private final TreeSet<Long> commitIdentifiersToCheck;
- public FullChangelogStoreSinkWrite(
+ public GlobalFullCompactionSinkWrite(
FileStoreTable table,
StateInitializationContext context,
String initialCommitUser,
IOManager ioManager,
boolean isOverwrite,
- long fullCompactionThresholdMs)
+ boolean waitCompaction,
+ int deltaCommits)
throws Exception {
- super(table, context, initialCommitUser, ioManager, isOverwrite);
+ super(table, context, initialCommitUser, ioManager, isOverwrite,
waitCompaction);
+
+ this.deltaCommits = deltaCommits;
this.snapshotManager = table.snapshotManager();
- this.fullCompactionThresholdMs = fullCompactionThresholdMs;
currentWrittenBuckets = new HashSet<>();
+ @SuppressWarnings("unchecked")
TupleSerializer<Tuple3<Long, BinaryRow, Integer>>
writtenBucketStateSerializer =
new TupleSerializer<>(
(Class<Tuple3<Long, BinaryRow, Integer>>) (Class<?>)
Tuple3.class,
@@ -109,19 +110,6 @@ public class FullChangelogStoreSinkWrite extends
StoreSinkWriteImpl {
.computeIfAbsent(t.f0, k -> new
HashSet<>())
.add(Tuple2.of(t.f1, t.f2)));
- currentFirstWriteMs = null;
- TupleSerializer<Tuple2<Long, Long>> firstWriteMsStateSerializer =
- new TupleSerializer<>(
- (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
- new TypeSerializer[] {LongSerializer.INSTANCE,
LongSerializer.INSTANCE});
- firstWriteMsState =
- context.getOperatorStateStore()
- .getListState(
- new ListStateDescriptor<>(
- "first_write_ms",
firstWriteMsStateSerializer));
- firstWriteMs = new TreeMap<>();
- firstWriteMsState.get().forEach(t -> firstWriteMs.put(t.f0, t.f1));
-
commitIdentifiersToCheck = new TreeSet<>();
}
@@ -148,10 +136,6 @@ public class FullChangelogStoreSinkWrite extends
StoreSinkWriteImpl {
if (!currentWrittenBuckets.contains(Tuple2.of(partition, bucket))) {
currentWrittenBuckets.add(Tuple2.of(partition.copy(), bucket));
}
-
- if (currentFirstWriteMs == null) {
- currentFirstWriteMs = System.currentTimeMillis();
- }
}
@Override
@@ -159,14 +143,12 @@ public class FullChangelogStoreSinkWrite extends
StoreSinkWriteImpl {
throws IOException {
checkSuccessfulFullCompaction();
- // check what buckets we've modified during this checkpoint interval
+ // collects what buckets we've modified during this checkpoint interval
if (!currentWrittenBuckets.isEmpty()) {
writtenBuckets
.computeIfAbsent(checkpointId, k -> new HashSet<>())
.addAll(currentWrittenBuckets);
currentWrittenBuckets.clear();
- firstWriteMs.putIfAbsent(checkpointId, currentFirstWriteMs);
- currentFirstWriteMs = null;
}
if (LOG.isDebugEnabled()) {
@@ -180,10 +162,7 @@ public class FullChangelogStoreSinkWrite extends
StoreSinkWriteImpl {
}
}
- if (!writtenBuckets.isEmpty() // there should be something to compact
- && System.currentTimeMillis() -
firstWriteMs.firstEntry().getValue()
- >= fullCompactionThresholdMs // time without full
compaction exceeds
- ) {
+ if (!writtenBuckets.isEmpty() &&
isFullCompactedIdentifier(checkpointId, deltaCommits)) {
doCompaction = true;
}
@@ -191,7 +170,7 @@ public class FullChangelogStoreSinkWrite extends
StoreSinkWriteImpl {
if (LOG.isDebugEnabled()) {
LOG.debug("Submit full compaction for checkpoint #{}",
checkpointId);
}
- submitFullCompaction();
+ submitFullCompaction(checkpointId);
commitIdentifiersToCheck.add(checkpointId);
}
@@ -232,7 +211,6 @@ public class FullChangelogStoreSinkWrite extends
StoreSinkWriteImpl {
commitIdentifier);
}
writtenBuckets.headMap(commitIdentifier, true).clear();
- firstWriteMs.headMap(commitIdentifier, true).clear();
commitIdentifiersToCheck.headSet(commitIdentifier).clear();
break;
}
@@ -240,7 +218,10 @@ public class FullChangelogStoreSinkWrite extends
StoreSinkWriteImpl {
}
}
- private void submitFullCompaction() {
+ private void submitFullCompaction(long currentCheckpointId) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Submit full compaction for checkpoint #{}",
currentCheckpointId);
+ }
Set<Tuple2<BinaryRow, Integer>> compactedBuckets = new HashSet<>();
writtenBuckets.forEach(
(checkpointId, buckets) -> {
@@ -269,11 +250,5 @@ public class FullChangelogStoreSinkWrite extends
StoreSinkWriteImpl {
}
}
writtenBucketState.update(writtenBucketList);
-
- List<Tuple2<Long, Long>> firstWriteMsList = new ArrayList<>();
- for (Map.Entry<Long, Long> entry : firstWriteMs.entrySet()) {
- firstWriteMsList.add(Tuple2.of(entry.getKey(), entry.getValue()));
- }
- firstWriteMsState.update(firstWriteMsList);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LookupChangelogStoreSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LookupChangelogStoreSinkWrite.java
deleted file mode 100644
index 314a4d725..000000000
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LookupChangelogStoreSinkWrite.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.table.FileStoreTable;
-
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.state.StateInitializationContext;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * {@link StoreSinkWrite} for {@link CoreOptions.ChangelogProducer#LOOKUP}
changelog producer. This
- * writer will wait compaction in {@link #prepareCommit}.
- */
-public class LookupChangelogStoreSinkWrite extends StoreSinkWriteImpl {
-
- public LookupChangelogStoreSinkWrite(
- FileStoreTable table,
- StateInitializationContext context,
- String initialCommitUser,
- IOManager ioManager,
- boolean isOverwrite)
- throws Exception {
- super(table, context, initialCommitUser, ioManager, isOverwrite);
- }
-
- @Override
- public List<Committable> prepareCommit(boolean doCompaction, long
checkpointId)
- throws IOException {
- return super.prepareCommit(true, checkpointId);
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
index ff6dce668..4e3e3a60a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
@@ -45,7 +45,7 @@ public interface StoreSinkWrite {
void notifyNewFiles(long snapshotId, BinaryRow partition, int bucket,
List<DataFileMeta> files);
- List<Committable> prepareCommit(boolean doCompaction, long checkpointId)
throws IOException;
+ List<Committable> prepareCommit(boolean waitCompaction, long checkpointId)
throws IOException;
void snapshotState(StateSnapshotContext context) throws Exception;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 5c24bd035..e5988cca9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -45,6 +45,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
private static final Logger LOG =
LoggerFactory.getLogger(StoreSinkWriteImpl.class);
protected final String commitUser;
+ private final boolean waitCompaction;
+
protected TableWriteImpl<?> write;
public StoreSinkWriteImpl(
@@ -52,7 +54,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
StateInitializationContext context,
String initialCommitUser,
IOManager ioManager,
- boolean isOverwrite)
+ boolean isOverwrite,
+ boolean waitCompaction)
throws Exception {
// Each job can only have one user name and this name must be
consistent across restarts.
// We cannot use job id as commit user name here because user may
change job id by creating
@@ -76,6 +79,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
new
IOManagerImpl(ioManager.getSpillingDirectoriesPaths()))
.withOverwrite(isOverwrite);
}
+
+ this.waitCompaction = waitCompaction;
}
@Override
@@ -108,12 +113,13 @@ public class StoreSinkWriteImpl implements StoreSinkWrite
{
}
@Override
- public List<Committable> prepareCommit(boolean doCompaction, long
checkpointId)
+ public List<Committable> prepareCommit(boolean waitCompaction, long
checkpointId)
throws IOException {
List<Committable> committables = new ArrayList<>();
if (write != null) {
try {
- for (CommitMessage committable :
write.prepareCommit(doCompaction, checkpointId)) {
+ for (CommitMessage committable :
+ write.prepareCommit(this.waitCompaction ||
waitCompaction, checkpointId)) {
committables.add(
new Committable(checkpointId,
Committable.Kind.FILE, committable));
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase.java
index 8323d74ca..8f7e254c4 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase.java
@@ -101,7 +101,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends
AbstractTestBase {
innerTestChangelogProducing(
Arrays.asList(
"'changelog-producer' = 'full-compaction'",
- "'changelog-producer.compaction-interval' = '1s'"));
+ "'full-compaction.delta-commits' = '3'"));
}
@Test
@@ -125,7 +125,6 @@ public class ChangelogWithKeyFileStoreTableITCase extends
AbstractTestBase {
+ ") WITH ("
+ " 'bucket' = '1',"
+ " 'changelog-producer' = 'full-compaction',"
- + " 'changelog-producer.compaction-interval' = '2s',"
+ " 'write-only' = 'true'"
+ ")");
@@ -335,7 +334,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends
AbstractTestBase {
"'write-buffer-size' = '%s',",
random.nextBoolean() ? "512kb" : "1mb")
+ "'changelog-producer' = 'full-compaction',"
- + "'changelog-producer.compaction-interval' = '1s'");
+ + "'full-compaction.delta-commits' = '3'");
// sleep for a random amount of time to check
// if we can first read complete records then read incremental records
correctly
@@ -378,7 +377,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends
AbstractTestBase {
"'write-buffer-size' = '%s',",
random.nextBoolean() ? "512kb" : "1mb")
+ "'changelog-producer' = 'full-compaction',"
- + "'changelog-producer.compaction-interval' = '2s',"
+ + "'full-compaction.delta-commits' = '3',"
+ "'write-only' = 'true'");
// sleep for a random amount of time to check
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
index 0fb5068e3..50cc9a95c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
@@ -216,7 +216,8 @@ public class SchemaAwareStoreWriteOperatorTest {
table,
null,
(t, context, ioManager) ->
- new StoreSinkWriteImpl(t, context, commitUser,
ioManager, false));
+ new StoreSinkWriteImpl(
+ t, context, commitUser, ioManager,
false, false));
TypeSerializer<CdcRecord> inputSerializer = new JavaSerializer<>();
TypeSerializer<Committable> outputSerializer =
new CommittableTypeInfo().createSerializer(new
ExecutionConfig());