This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3c0fc7df1c [core][flink] Introduce postpone bucket tables. (#5095)
3c0fc7df1c is described below
commit 3c0fc7df1cd19a4d902713fbb44e5e7c742448fc
Author: tsreaper <[email protected]>
AuthorDate: Thu Feb 20 16:04:13 2025 +0800
[core][flink] Introduce postpone bucket tables. (#5095)
---
docs/content/flink/procedures.md | 15 +
.../content/primary-key-table/data-distribution.md | 22 ++
.../shortcodes/generated/core_configuration.html | 2 +-
.../generated/flink_connector_configuration.html | 6 +
.../main/java/org/apache/paimon/CoreOptions.java | 4 +-
.../java/org/apache/paimon/table/BucketMode.java | 2 +
.../java/org/apache/paimon/AbstractFileStore.java | 13 +-
.../org/apache/paimon/AppendOnlyFileStore.java | 13 +-
.../java/org/apache/paimon/KeyValueFileStore.java | 105 +++--
.../paimon/operation/AbstractFileStoreScan.java | 11 +
.../paimon/operation/FileStoreCommitImpl.java | 14 +-
.../org/apache/paimon/operation/FileStoreScan.java | 2 +
.../paimon/operation/KeyValueFileStoreScan.java | 8 +-
.../paimon/operation/MergeFileSplitRead.java | 3 +-
.../postpone/PostponeBucketFileStoreWrite.java | 118 ++++++
.../paimon/postpone/PostponeBucketWriter.java | 104 +++++
.../org/apache/paimon/schema/SchemaManager.java | 5 +
.../org/apache/paimon/schema/SchemaValidation.java | 7 +-
.../paimon/table/AbstractFileStoreTable.java | 3 +-
.../paimon/table/source/DataTableStreamScan.java | 7 +-
...pScanner.java => ChangelogFollowUpScanner.java} | 14 +-
.../CompactionChangelogFollowUpScanner.java | 54 ---
.../org/apache/paimon/utils/BatchRecordWriter.java | 2 +-
.../apache/paimon/utils/FileStorePathFactory.java | 7 +-
.../org/apache/paimon/schema/TableSchemaTest.java | 2 +-
.../CompactionChangelogFollowUpScannerTest.java | 7 +-
.../InputChangelogFollowUpScannerTest.java | 7 +-
.../apache/paimon/flink/FlinkConnectorOptions.java | 7 +
.../apache/paimon/flink/action/CompactAction.java | 156 +++++++-
.../paimon/flink/action/CompactActionFactory.java | 28 +-
.../apache/paimon/flink/action/RescaleAction.java | 103 +++++
.../paimon/flink/action/RescaleActionFactory.java | 70 ++++
.../postpone/PostponeBucketCompactSplitSource.java | 189 +++++++++
.../RemovePostponeBucketFilesOperator.java | 60 +++
.../RewritePostponeBucketCommittableOperator.java | 162 ++++++++
.../paimon/flink/procedure/CompactProcedure.java | 4 +-
.../paimon/flink/procedure/RescaleProcedure.java | 69 ++++
.../org/apache/paimon/flink/sink/FlinkSink.java | 2 +-
.../apache/paimon/flink/sink/FlinkSinkBuilder.java | 12 +-
.../sink/PostponeBucketTableWriteOperator.java | 98 +++++
.../paimon/flink/sink/PostponeBucketWriteSink.java | 58 +++
.../paimon/flink/sink/TableWriteOperator.java | 16 +-
.../services/org.apache.paimon.factories.Factory | 2 +
.../paimon/flink/PostponeBucketTableITCase.java | 438 +++++++++++++++++++++
.../apache/paimon/flink/RescaleBucketITCase.java | 10 +-
45 files changed, 1864 insertions(+), 177 deletions(-)
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index db364ff0e3..3fc2ed7d37 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -754,5 +754,20 @@ All available procedures are listed below.
CALL sys.compact_manifest(`table` => 'default.T')
</td>
</tr>
+ <tr>
+ <td>rescale</td>
+ <td>
+ CALL sys.rescale(`table` => 'identifier', `bucket_num` => bucket_num,
`partition` => 'partition')
+ </td>
+ <td>
+ Rescale one partition of a table. Arguments:
+ <li>identifier: The target table identifier. Cannot be empty.</li>
+ <li>bucket_num: Resulting bucket number after rescale. The default
value of argument bucket_num is the current bucket number of the table. Cannot
be empty for postpone bucket tables.</li>
+ <li>partition: What partition to rescale. For partitioned table this
argument cannot be empty.</li>
+ </td>
+ <td>
+ CALL sys.rescale(`table` => 'default.T', `bucket_num` => 16,
`partition` => 'dt=20250217,hh=08')
+ </td>
+ </tr>
</tbody>
</table>
diff --git a/docs/content/primary-key-table/data-distribution.md
b/docs/content/primary-key-table/data-distribution.md
index baf3327ed9..edfb0133f8 100644
--- a/docs/content/primary-key-table/data-distribution.md
+++ b/docs/content/primary-key-table/data-distribution.md
@@ -87,6 +87,28 @@ If your upsert does not rely on too old data, you can
consider configuring index
But please note that this may also cause data duplication.
+## Postpone Bucket
+
+Postpone bucket mode is configured by `'bucket' = '-2'`.
+This mode aims to solve the difficulty to determine a fixed number of buckets
+and support different buckets for different partitions.
+
+Currently, only Flink supports this mode.
+
+When writing records into the table,
+all records will first be stored in the `bucket-postpone` directory of each
partition
+and are not available to readers.
+
+To move the records into the correct bucket and make them readable,
+you need to run a compaction job.
+See `compact` [procedure]({{< ref "flink/procedures" >}}).
+The bucket number for the partitions compacted for the first time
+is configured by the option `postpone.default-bucket-num`, whose default value
is `4`.
+
+Finally, when you feel that the bucket number of some partition is too small,
+you can also run a rescale job.
+See `rescale` [procedure]({{< ref "flink/procedures" >}}).
+
## Pick Partition Fields
The following three types of fields may be defined as partition fields in the
warehouse:
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index e3e1c0f673..fc5c04477b 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -48,7 +48,7 @@ under the License.
<td><h5>bucket</h5></td>
<td style="word-wrap: break-word;">-1</td>
<td>Integer</td>
- <td>Bucket number for file store.<br />It should either be equal
to -1 (dynamic bucket mode), or it must be greater than 0 (fixed bucket
mode).</td>
+ <td>Bucket number for file store.<br />It should either be equal
to -1 (dynamic bucket mode), -2 (postpone bucket mode), or it must be greater
than 0 (fixed bucket mode).</td>
</tr>
<tr>
<td><h5>bucket-key</h5></td>
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index eb15eb3145..14a96eed0d 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -98,6 +98,12 @@ under the License.
<td>Duration</td>
<td>You can specify time interval for partition, for example,
daily partition is '1 d', hourly partition is '1 h'.</td>
</tr>
+ <tr>
+ <td><h5>postpone.default-bucket-num</h5></td>
+ <td style="word-wrap: break-word;">4</td>
+ <td>Integer</td>
+ <td>Bucket number for the partitions compacted for the first time
in postpone bucket tables.</td>
+ </tr>
<tr>
<td><h5>precommit-compact</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 380cf84744..9e0c679b57 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -104,7 +104,9 @@ public class CoreOptions implements Serializable {
.text("Bucket number for file store.")
.linebreak()
.text(
- "It should either be equal to -1
(dynamic bucket mode), or it must be greater than 0 (fixed bucket mode).")
+ "It should either be equal to -1
(dynamic bucket mode), "
+ + "-2 (postpone bucket
mode), "
+ + "or it must be greater
than 0 (fixed bucket mode).")
.build());
@Immutable
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java
b/paimon-common/src/main/java/org/apache/paimon/table/BucketMode.java
similarity index 97%
rename from paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java
rename to paimon-common/src/main/java/org/apache/paimon/table/BucketMode.java
index c3b7ca1abd..74ff34613e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java
+++ b/paimon-common/src/main/java/org/apache/paimon/table/BucketMode.java
@@ -60,4 +60,6 @@ public enum BucketMode {
BUCKET_UNAWARE;
public static final int UNAWARE_BUCKET = 0;
+
+ public static final int POSTPONE_BUCKET = -2;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index d9a1929dec..b3398035b5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -33,6 +33,7 @@ import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.metastore.AddPartitionTagCallback;
import org.apache.paimon.operation.ChangelogDeletion;
import org.apache.paimon.operation.FileStoreCommitImpl;
+import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.operation.ManifestsReader;
import org.apache.paimon.operation.PartitionExpire;
@@ -257,6 +258,14 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
return schemaManager.mergeSchema(rowType, allowExplicitCast);
}
+ protected abstract FileStoreScan newScan(ScanType scanType);
+
+ protected enum ScanType {
+ FOR_READ,
+ FOR_WRITE,
+ FOR_COMMIT
+ }
+
@Override
public FileStoreCommitImpl newCommit(String commitUser) {
return newCommit(commitUser, Collections.emptyList());
@@ -283,7 +292,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
manifestFileFactory(),
manifestListFactory(),
indexManifestFileFactory(),
- newScan(),
+ newScan(ScanType.FOR_COMMIT),
options.bucket(),
options.manifestTargetSize(),
options.manifestFullCompactionThresholdSize(),
@@ -363,7 +372,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
partitionExpireTime,
options.partitionExpireCheckInterval(),
PartitionExpireStrategy.createPartitionExpireStrategy(options,
partitionType()),
- newScan(),
+ newScan(ScanType.FOR_COMMIT),
newCommit(commitUser),
partitionHandler,
options.endInputCheckPartitionExpire(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index a06b98d7b3..dc5171a744 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -72,7 +72,7 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
@Override
public AppendOnlyFileStoreScan newScan() {
- return newScan(false);
+ return newScan(ScanType.FOR_READ);
}
@Override
@@ -108,7 +108,7 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
partitionType,
pathFactory(),
snapshotManager(),
- newScan(true).withManifestCacheFilter(manifestFilter),
+
newScan(ScanType.FOR_WRITE).withManifestCacheFilter(manifestFilter),
options,
dvMaintainerFactory,
tableName);
@@ -122,14 +122,15 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
partitionType,
pathFactory(),
snapshotManager(),
- newScan(true).withManifestCacheFilter(manifestFilter),
+
newScan(ScanType.FOR_WRITE).withManifestCacheFilter(manifestFilter),
options,
dvMaintainerFactory,
tableName);
}
}
- private AppendOnlyFileStoreScan newScan(boolean forWrite) {
+ @Override
+ protected AppendOnlyFileStoreScan newScan(ScanType scanType) {
BucketSelectConverter bucketSelectConverter =
predicate -> {
if (bucketMode() != BucketMode.HASH_FIXED) {
@@ -152,12 +153,12 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
};
return new AppendOnlyFileStoreScan(
- newManifestsReader(forWrite),
+ newManifestsReader(scanType == ScanType.FOR_WRITE),
bucketSelectConverter,
snapshotManager(),
schemaManager,
schema,
- manifestFileFactory(forWrite),
+ manifestFileFactory(scanType == ScanType.FOR_WRITE),
options.scanManifestParallelism(),
options.fileIndexReadEnabled());
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index a969fca037..18316901bf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -28,11 +28,13 @@ import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
+import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.operation.BucketSelectConverter;
import org.apache.paimon.operation.KeyValueFileStoreScan;
import org.apache.paimon.operation.KeyValueFileStoreWrite;
import org.apache.paimon.operation.MergeFileSplitRead;
import org.apache.paimon.operation.RawFileSplitRead;
+import org.apache.paimon.postpone.PostponeBucketFileStoreWrite;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
@@ -112,7 +114,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
@Override
public KeyValueFileStoreScan newScan() {
- return newScan(false);
+ return newScan(ScanType.FOR_READ);
}
@Override
@@ -152,12 +154,13 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
}
@Override
- public KeyValueFileStoreWrite newWrite(String commitUser) {
+ public AbstractFileStoreWrite<KeyValue> newWrite(String commitUser) {
return newWrite(commitUser, null);
}
@Override
- public KeyValueFileStoreWrite newWrite(String commitUser,
ManifestCacheFilter manifestFilter) {
+ public AbstractFileStoreWrite<KeyValue> newWrite(
+ String commitUser, ManifestCacheFilter manifestFilter) {
IndexMaintainer.Factory<KeyValue> indexFactory = null;
if (bucketMode() == BucketMode.HASH_DYNAMIC) {
indexFactory = new
HashIndexMaintainer.Factory(newIndexFileHandler());
@@ -167,27 +170,42 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
deletionVectorsMaintainerFactory =
new
DeletionVectorsMaintainer.Factory(newIndexFileHandler());
}
- return new KeyValueFileStoreWrite(
- fileIO,
- schemaManager,
- schema,
- commitUser,
- partitionType,
- keyType,
- valueType,
- keyComparatorSupplier,
- () -> UserDefinedSeqComparator.create(valueType, options),
- logDedupEqualSupplier,
- mfFactory,
- pathFactory(),
- format2PathFactory(),
- snapshotManager(),
- newScan(true).withManifestCacheFilter(manifestFilter),
- indexFactory,
- deletionVectorsMaintainerFactory,
- options,
- keyValueFieldsExtractor,
- tableName);
+
+ if (options.bucket() == BucketMode.POSTPONE_BUCKET) {
+ return new PostponeBucketFileStoreWrite(
+ fileIO,
+ schema,
+ partitionType,
+ keyType,
+ valueType,
+ format2PathFactory(),
+ snapshotManager(),
+
newScan(ScanType.FOR_WRITE).withManifestCacheFilter(manifestFilter),
+ options,
+ tableName);
+ } else {
+ return new KeyValueFileStoreWrite(
+ fileIO,
+ schemaManager,
+ schema,
+ commitUser,
+ partitionType,
+ keyType,
+ valueType,
+ keyComparatorSupplier,
+ () -> UserDefinedSeqComparator.create(valueType, options),
+ logDedupEqualSupplier,
+ mfFactory,
+ pathFactory(),
+ format2PathFactory(),
+ snapshotManager(),
+
newScan(ScanType.FOR_WRITE).withManifestCacheFilter(manifestFilter),
+ indexFactory,
+ deletionVectorsMaintainerFactory,
+ options,
+ keyValueFieldsExtractor,
+ tableName);
+ }
}
private Map<String, FileStorePathFactory> format2PathFactory() {
@@ -198,7 +216,8 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
return pathFactoryMap;
}
- private KeyValueFileStoreScan newScan(boolean forWrite) {
+ @Override
+ protected KeyValueFileStoreScan newScan(ScanType scanType) {
BucketSelectConverter bucketSelectConverter =
keyFilter -> {
if (bucketMode() != BucketMode.HASH_FIXED) {
@@ -210,24 +229,32 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
splitAnd(keyFilter),
keyType.getFieldNames(),
bucketKeyType.getFieldNames());
- if (bucketFilters.size() > 0) {
+ if (!bucketFilters.isEmpty()) {
return
BucketSelectConverter.create(and(bucketFilters), bucketKeyType);
}
return Optional.empty();
};
- return new KeyValueFileStoreScan(
- newManifestsReader(forWrite),
- bucketSelectConverter,
- snapshotManager(),
- schemaManager,
- schema,
- keyValueFieldsExtractor,
- manifestFileFactory(forWrite),
- options.scanManifestParallelism(),
- options.deletionVectorsEnabled(),
- options.mergeEngine(),
- options.changelogProducer(),
- options.fileIndexReadEnabled() &&
options.deletionVectorsEnabled());
+
+ KeyValueFileStoreScan scan =
+ new KeyValueFileStoreScan(
+ newManifestsReader(scanType == ScanType.FOR_WRITE),
+ bucketSelectConverter,
+ snapshotManager(),
+ schemaManager,
+ schema,
+ keyValueFieldsExtractor,
+ manifestFileFactory(scanType == ScanType.FOR_WRITE),
+ options.scanManifestParallelism(),
+ options.deletionVectorsEnabled(),
+ options.mergeEngine(),
+ options.changelogProducer(),
+ options.fileIndexReadEnabled() &&
options.deletionVectorsEnabled());
+
+ if (options.bucket() == BucketMode.POSTPONE_BUCKET && scanType ==
ScanType.FOR_READ) {
+ scan.onlyReadRealBuckets();
+ }
+
+ return scan;
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 27ba4703b9..861128155d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -78,6 +78,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private final TableSchema schema;
private Snapshot specifiedSnapshot = null;
+ private boolean onlyReadRealBuckets = false;
private Filter<Integer> bucketFilter = null;
private BiFilter<Integer, Integer> totalAwareBucketFilter = null;
protected ScanMode scanMode = ScanMode.ALL;
@@ -136,6 +137,12 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return this;
}
+ @Override
+ public FileStoreScan onlyReadRealBuckets() {
+ this.onlyReadRealBuckets = true;
+ return this;
+ }
+
@Override
public FileStoreScan withBucketFilter(Filter<Integer> bucketFilter) {
this.bucketFilter = bucketFilter;
@@ -497,6 +504,10 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
}
int bucket = bucketGetter.apply(row);
+ if (onlyReadRealBuckets && bucket < 0) {
+ return false;
+ }
+
if (bucketFilter != null && !bucketFilter.test(bucket)) {
return false;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index af7e450e8e..4a59de8dac 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -775,12 +775,14 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
List<IndexManifestEntry> indexChangesWithOverwrite = new ArrayList<>();
if (latestSnapshot != null) {
- List<ManifestEntry> currentEntries =
- scan.withSnapshot(latestSnapshot)
- .withPartitionFilter(partitionFilter)
- .withKind(ScanMode.ALL)
- .plan()
- .files();
+ scan.withSnapshot(latestSnapshot)
+ .withPartitionFilter(partitionFilter)
+ .withKind(ScanMode.ALL);
+ if (numBucket != BucketMode.POSTPONE_BUCKET) {
+ // bucket = -2 can only be overwritten in postpone bucket
tables
+ scan.withBucketFilter(bucket -> bucket >= 0);
+ }
+ List<ManifestEntry> currentEntries = scan.plan().files();
for (ManifestEntry entry : currentEntries) {
changesWithOverwrite.add(
new ManifestEntry(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 99ae3ef47d..3e0bd25475 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -59,6 +59,8 @@ public interface FileStoreScan {
FileStoreScan withBucket(int bucket);
+ FileStoreScan onlyReadRealBuckets();
+
FileStoreScan withBucketFilter(Filter<Integer> bucketFilter);
FileStoreScan withTotalAwareBucketFilter(BiFilter<Integer, Integer>
bucketFilter);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index e39ad2e3c2..22ba024460 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -54,16 +54,14 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
private final SimpleStatsEvolutions fieldKeyStatsConverters;
private final SimpleStatsEvolutions fieldValueStatsConverters;
private final BucketSelectConverter bucketSelectConverter;
-
- private Predicate keyFilter;
- private Predicate valueFilter;
private final boolean deletionVectorsEnabled;
private final MergeEngine mergeEngine;
private final ChangelogProducer changelogProducer;
-
private final boolean fileIndexReadEnabled;
- private final Map<Long, Predicate> schemaId2DataFilter = new HashMap<>();
+ private Predicate keyFilter;
+ private Predicate valueFilter;
+ private final Map<Long, Predicate> schemaId2DataFilter = new HashMap<>();
private boolean valueFilterForceEnabled = false;
public KeyValueFileStoreScan(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
index 23a3a576e4..67a15d2058 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
@@ -42,6 +42,7 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.types.DataField;
@@ -245,7 +246,7 @@ public class MergeFileSplitRead implements
SplitRead<KeyValue> {
throw new IllegalArgumentException("This read cannot accept split
with before files.");
}
- if (split.isStreaming()) {
+ if (split.isStreaming() || split.bucket() ==
BucketMode.POSTPONE_BUCKET) {
return createNoMergeReader(
split.partition(),
split.bucket(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
new file mode 100644
index 0000000000..b7cc166251
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.postpone;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.KeyValueFileWriterFactory;
+import org.apache.paimon.operation.AbstractFileStoreWrite;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.operation.FileStoreWrite;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CommitIncrement;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SnapshotManager;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+
+/** {@link FileStoreWrite} for {@code bucket = -2} tables. */
+public class PostponeBucketFileStoreWrite extends
AbstractFileStoreWrite<KeyValue> {
+
+ private final CoreOptions options;
+ private final KeyValueFileWriterFactory.Builder writerFactoryBuilder;
+
+ public PostponeBucketFileStoreWrite(
+ FileIO fileIO,
+ TableSchema schema,
+ RowType partitionType,
+ RowType keyType,
+ RowType valueType,
+ Map<String, FileStorePathFactory> format2PathFactory,
+ SnapshotManager snapshotManager,
+ FileStoreScan scan,
+ CoreOptions options,
+ String tableName) {
+ super(
+ snapshotManager,
+ scan,
+ null,
+ null,
+ tableName,
+ options,
+ options.bucket(),
+ partitionType,
+ options.writeMaxWritersToSpill(),
+ options.legacyPartitionName());
+
+ this.options = options;
+ this.writerFactoryBuilder =
+ KeyValueFileWriterFactory.builder(
+ fileIO,
+ schema.id(),
+ keyType,
+ valueType,
+ options.fileFormat(),
+ format2PathFactory,
+ options.targetFileSize(true));
+
+ // Ignoring previous files saves scanning time.
+ //
+ // For postpone bucket tables, we only append new files to bucket = -2
directories.
+ //
+ // Also, we don't need to know current largest sequence id, because
when compacting these
+ // files, we will read the records file by file without merging, and
then give them to
+ // normal bucket writers.
+ //
+ // Because there is no merging when reading, sequence id across files
are useless.
+ withIgnorePreviousFiles(true);
+ }
+
+ @Override
+ protected PostponeBucketWriter createWriter(
+ @Nullable Long snapshotId,
+ BinaryRow partition,
+ int bucket,
+ List<DataFileMeta> restoreFiles,
+ long restoredMaxSeqNumber,
+ @Nullable CommitIncrement restoreIncrement,
+ ExecutorService compactExecutor,
+ @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
+ Preconditions.checkArgument(bucket == BucketMode.POSTPONE_BUCKET);
+ KeyValueFileWriterFactory writerFactory =
+ writerFactoryBuilder.build(partition, bucket, options);
+ return new PostponeBucketWriter(writerFactory);
+ }
+
+ @Override
+ protected Function<WriterContainer<KeyValue>, Boolean>
createWriterCleanChecker() {
+ return createNoConflictAwareWriterCleanChecker();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
new file mode 100644
index 0000000000..47cafb9645
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.postpone;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.KeyValueFileWriterFactory;
+import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.utils.CommitIncrement;
+import org.apache.paimon.utils.RecordWriter;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/** {@link RecordWriter} for {@code bucket = -2} tables. */
+public class PostponeBucketWriter implements RecordWriter<KeyValue> {
+
+ private final KeyValueFileWriterFactory writerFactory;
+ private RollingFileWriter<KeyValue, DataFileMeta> writer;
+
+ public PostponeBucketWriter(KeyValueFileWriterFactory writerFactory) {
+ this.writerFactory = writerFactory;
+ this.writer = null;
+ }
+
+ @Override
+ public void write(KeyValue record) throws Exception {
+ if (writer == null) {
+ writer = writerFactory.createRollingMergeTreeFileWriter(0,
FileSource.APPEND);
+ }
+ writer.write(record);
+ }
+
+ @Override
+ public void compact(boolean fullCompaction) throws Exception {}
+
+ @Override
+ public void addNewFiles(List<DataFileMeta> files) {}
+
+ @Override
+ public Collection<DataFileMeta> dataFiles() {
+ // this method is only for checkpointing, while this writer does not
need any checkpoint
+ return Collections.emptyList();
+ }
+
+ @Override
+ public long maxSequenceNumber() {
+ // see comments in the constructor of PostponeBucketFileStoreWrite
+ return 0;
+ }
+
+ @Override
+ public CommitIncrement prepareCommit(boolean waitCompaction) throws
Exception {
+ List<DataFileMeta> newFiles = Collections.emptyList();
+ if (writer != null) {
+ writer.close();
+ newFiles = writer.result();
+ writer = null;
+ }
+ return new CommitIncrement(
+ new DataIncrement(newFiles, Collections.emptyList(),
Collections.emptyList()),
+ CompactIncrement.emptyIncrement(),
+ null);
+ }
+
+ @Override
+ public boolean isCompacting() {
+ return false;
+ }
+
+ @Override
+ public void sync() throws Exception {}
+
+ @Override
+ public void withInsertOnly(boolean insertOnly) {}
+
+ @Override
+ public void close() throws Exception {
+ if (writer != null) {
+ writer.abort();
+ writer = null;
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 753bc34d95..98e3b48fe4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -35,6 +35,7 @@ import
org.apache.paimon.schema.SchemaChange.UpdateColumnNullability;
import org.apache.paimon.schema.SchemaChange.UpdateColumnPosition;
import org.apache.paimon.schema.SchemaChange.UpdateColumnType;
import org.apache.paimon.schema.SchemaChange.UpdateComment;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
@@ -847,6 +848,10 @@ public class SchemaManager implements Serializable {
if (newBucket == -1) {
throw new UnsupportedOperationException("Cannot change bucket
to -1.");
}
+ if (oldBucket == BucketMode.POSTPONE_BUCKET) {
+ throw new UnsupportedOperationException(
+ "Cannot change bucket for postpone bucket tables.");
+ }
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 3336d1e5c1..a0a217b39d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
@@ -605,7 +606,7 @@ public class SchemaValidation {
throw new RuntimeException(
"AppendOnlyTable of unware or dynamic bucket does not
support 'full-compaction.delta-commits'");
}
- } else if (bucket < 1) {
+ } else if (bucket < 1 && !isPostponeBucketTable(schema, bucket)) {
throw new RuntimeException("The number of buckets needs to be
greater than 0.");
} else {
if (schema.crossPartitionUpdate()) {
@@ -645,4 +646,8 @@ public class SchemaValidation {
}
}
}
+
+ private static boolean isPostponeBucketTable(TableSchema schema, int
bucket) {
+ return !schema.primaryKeys().isEmpty() && bucket ==
BucketMode.POSTPONE_BUCKET;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index b23e50de19..9ee95afa8c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -317,7 +317,8 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
});
}
- private FileStoreTable copyInternal(Map<String, String> dynamicOptions,
boolean tryTimeTravel) {
+ protected FileStoreTable copyInternal(
+ Map<String, String> dynamicOptions, boolean tryTimeTravel) {
Map<String, String> options = new HashMap<>(tableSchema.options());
// merge non-null dynamic options into schema.options
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index 9dab573471..5d92d6f707 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -27,10 +27,9 @@ import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner;
import org.apache.paimon.table.source.snapshot.BoundedChecker;
-import
org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
+import org.apache.paimon.table.source.snapshot.ChangelogFollowUpScanner;
import org.apache.paimon.table.source.snapshot.DeltaFollowUpScanner;
import org.apache.paimon.table.source.snapshot.FollowUpScanner;
-import org.apache.paimon.table.source.snapshot.InputChangelogFollowUpScanner;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingContext;
import org.apache.paimon.table.source.snapshot.StartingScanner;
@@ -264,11 +263,9 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
followUpScanner = new DeltaFollowUpScanner();
break;
case INPUT:
- followUpScanner = new InputChangelogFollowUpScanner();
- break;
case FULL_COMPACTION:
case LOOKUP:
- followUpScanner = new CompactionChangelogFollowUpScanner();
+ followUpScanner = new ChangelogFollowUpScanner();
break;
default:
throw new UnsupportedOperationException(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ChangelogFollowUpScanner.java
similarity index 71%
rename from
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
rename to
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ChangelogFollowUpScanner.java
index 58edc40f45..2f0ece0b16 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ChangelogFollowUpScanner.java
@@ -18,28 +18,24 @@
package org.apache.paimon.table.source.snapshot;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.table.source.ScanMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** {@link FollowUpScanner} for {@link CoreOptions.ChangelogProducer#INPUT}
changelog producer. */
-public class InputChangelogFollowUpScanner implements FollowUpScanner {
+/** {@link FollowUpScanner} for tables with changelog producer. */
+public class ChangelogFollowUpScanner implements FollowUpScanner {
- private static final Logger LOG =
LoggerFactory.getLogger(InputChangelogFollowUpScanner.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(ChangelogFollowUpScanner.class);
@Override
public boolean shouldScanSnapshot(Snapshot snapshot) {
- if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) {
+ if (snapshot.changelogManifestList() != null) {
return true;
}
- LOG.debug(
- "Next snapshot id {} is not APPEND, but is {}, check next
one.",
- snapshot.id(),
- snapshot.commitKind());
+ LOG.debug("Next snapshot id {} has no changelog, check next one.",
snapshot.id());
return false;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
deleted file mode 100644
index 2688b43468..0000000000
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source.snapshot;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.table.source.ScanMode;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link FollowUpScanner} for {@link
CoreOptions.ChangelogProducer#FULL_COMPACTION} changelog
- * producer.
- */
-public class CompactionChangelogFollowUpScanner implements FollowUpScanner {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(CompactionChangelogFollowUpScanner.class);
-
- @Override
- public boolean shouldScanSnapshot(Snapshot snapshot) {
- if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
- return true;
- }
-
- LOG.debug(
- "Next snapshot id {} is not COMPACT, but is {}, check next
one.",
- snapshot.id(),
- snapshot.commitKind());
- return false;
- }
-
- @Override
- public SnapshotReader.Plan scan(Snapshot snapshot, SnapshotReader
snapshotReader) {
- return
snapshotReader.withMode(ScanMode.CHANGELOG).withSnapshot(snapshot).read();
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/BatchRecordWriter.java
b/paimon-core/src/main/java/org/apache/paimon/utils/BatchRecordWriter.java
index 3c4c2bd390..cc874e2e28 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BatchRecordWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BatchRecordWriter.java
@@ -24,6 +24,6 @@ import org.apache.paimon.io.BundleRecords;
/** Write {@link BundleRecords} directly. */
public interface BatchRecordWriter extends RecordWriter<InternalRow> {
- /** Add a batch elemens to the writer. */
+ /** Add a batch elements to the writer. */
void writeBundle(BundleRecords record) throws Exception;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 811a2a4e6d..e8e40b76c1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.ExternalPathProvider;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
@@ -175,7 +176,11 @@ public class FileStorePathFactory {
}
public Path relativeBucketPath(BinaryRow partition, int bucket) {
- Path relativeBucketPath = new Path(BUCKET_PATH_PREFIX + bucket);
+ String bucketName = String.valueOf(bucket);
+ if (bucket == BucketMode.POSTPONE_BUCKET) {
+ bucketName = "postpone";
+ }
+ Path relativeBucketPath = new Path(BUCKET_PATH_PREFIX + bucketName);
String partitionPath = getPartitionString(partition);
if (!partitionPath.isEmpty()) {
relativeBucketPath = new Path(partitionPath, relativeBucketPath);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
index 25d4601c6b..7f0cd60425 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
@@ -212,7 +212,7 @@ public class TableSchemaTest {
TableSchema schema =
new TableSchema(1, fields, 10, partitionKeys, primaryKeys,
options, "");
- options.put(BUCKET.key(), "-2");
+ options.put(BUCKET.key(), "-10");
assertThatThrownBy(() -> validateTableSchema(schema))
.hasMessageContaining("The number of buckets needs to be
greater than 0.");
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
index 1bf70b64b4..a51d24eaf4 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
@@ -35,7 +35,10 @@ import java.util.Arrays;
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for {@link CompactionChangelogFollowUpScanner}. */
+/**
+ * Tests for {@link ChangelogFollowUpScanner} when changelog producer is {@link
+ * CoreOptions.ChangelogProducer#FULL_COMPACTION}.
+ */
public class CompactionChangelogFollowUpScannerTest extends ScannerTestBase {
@Test
@@ -66,7 +69,7 @@ public class CompactionChangelogFollowUpScannerTest extends
ScannerTestBase {
snapshotReader.withLevelFilter(level -> level ==
table.coreOptions().numLevels() - 1);
TableRead read = table.newRead();
- CompactionChangelogFollowUpScanner scanner = new
CompactionChangelogFollowUpScanner();
+ ChangelogFollowUpScanner scanner = new ChangelogFollowUpScanner();
Snapshot snapshot = snapshotManager.snapshot(1);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java
index 09686a7c7d..14012068b9 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java
@@ -35,7 +35,10 @@ import java.util.Arrays;
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for {@link InputChangelogFollowUpScanner}. */
+/**
+ * Tests for {@link ChangelogFollowUpScanner} when changelog producer is {@link
+ * CoreOptions.ChangelogProducer#INPUT}.
+ */
public class InputChangelogFollowUpScannerTest extends ScannerTestBase {
@Test
@@ -59,7 +62,7 @@ public class InputChangelogFollowUpScannerTest extends
ScannerTestBase {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(3);
TableRead read = table.newRead();
- InputChangelogFollowUpScanner scanner = new
InputChangelogFollowUpScanner();
+ ChangelogFollowUpScanner scanner = new ChangelogFollowUpScanner();
Snapshot snapshot = snapshotManager.snapshot(1);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index cf1016a56a..195b5163f8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -460,6 +460,13 @@ public class FlinkConnectorOptions {
"Bounded mode for Paimon consumer. "
+ "By default, Paimon automatically
selects bounded mode based on the mode of the Flink job.");
+ public static final ConfigOption<Integer> POSTPONE_DEFAULT_BUCKET_NUM =
+ key("postpone.default-bucket-num")
+ .intType()
+ .defaultValue(4)
+ .withDescription(
+ "Bucket number for the partitions compacted for
the first time in postpone bucket tables.");
+
public static List<ConfigOption<?>> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 73c96b2c4b..9722847852 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -19,19 +19,37 @@
package org.apache.paimon.flink.action;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
+import org.apache.paimon.flink.postpone.PostponeBucketCompactSplitSource;
+import
org.apache.paimon.flink.postpone.RewritePostponeBucketCommittableOperator;
import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.sink.CompactorSinkBuilder;
+import org.apache.paimon.flink.sink.FixedBucketSink;
+import org.apache.paimon.flink.sink.FlinkSinkBuilder;
+import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
+import org.apache.paimon.flink.sink.RowDataChannelComputer;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.PartitionPredicateVisitor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
@@ -40,8 +58,12 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.io.Serializable;
import java.time.Duration;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -103,22 +125,23 @@ public class CompactAction extends TableActionBase {
@Override
public void build() throws Exception {
+ buildImpl();
+ }
+
+ private boolean buildImpl() throws Exception {
ReadableConfig conf = env.getConfiguration();
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) ==
RuntimeExecutionMode.STREAMING;
FileStoreTable fileStoreTable = (FileStoreTable) table;
- switch (fileStoreTable.bucketMode()) {
- case BUCKET_UNAWARE:
- {
- buildForUnawareBucketCompaction(env, fileStoreTable,
isStreaming);
- break;
- }
- case HASH_FIXED:
- case HASH_DYNAMIC:
- default:
- {
- buildForTraditionalCompaction(env, fileStoreTable,
isStreaming);
- }
+
+ if (fileStoreTable.coreOptions().bucket() ==
BucketMode.POSTPONE_BUCKET) {
+ return buildForPostponeBucketCompaction(env, fileStoreTable,
isStreaming);
+ } else if (fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE) {
+ buildForUnawareBucketCompaction(env, fileStoreTable, isStreaming);
+ return true;
+ } else {
+ buildForTraditionalCompaction(env, fileStoreTable, isStreaming);
+ return true;
}
}
@@ -207,9 +230,114 @@ public class CompactAction extends TableActionBase {
return predicate;
}
+ private boolean buildForPostponeBucketCompaction(
+ StreamExecutionEnvironment env, FileStoreTable table, boolean
isStreaming) {
+ Preconditions.checkArgument(
+ !isStreaming, "Postpone bucket compaction currently only
supports batch mode");
+ Preconditions.checkArgument(
+ partitions == null,
+ "Postpone bucket compaction currently does not support
specifying partitions");
+ Preconditions.checkArgument(
+ whereSql == null,
+ "Postpone bucket compaction currently does not support
predicates");
+
+ // change bucket to a positive value, so we can scan files from the
bucket = -2 directory
+ Map<String, String> bucketOptions = new HashMap<>(table.options());
+ bucketOptions.put(CoreOptions.BUCKET.key(), "1");
+ FileStoreTable fileStoreTable =
table.copy(table.schema().copy(bucketOptions));
+
+ List<BinaryRow> partitions =
+ fileStoreTable
+ .newScan()
+ .withBucketFilter(new PostponeBucketFilter())
+ .listPartitions();
+ if (partitions.isEmpty()) {
+ return false;
+ }
+
+ Options options = new Options(fileStoreTable.options());
+ InternalRowPartitionComputer partitionComputer =
+ new InternalRowPartitionComputer(
+ fileStoreTable.coreOptions().partitionDefaultName(),
+ fileStoreTable.rowType(),
+ fileStoreTable.partitionKeys().toArray(new String[0]),
+ fileStoreTable.coreOptions().legacyPartitionName());
+ for (BinaryRow partition : partitions) {
+ int bucketNum =
options.get(FlinkConnectorOptions.POSTPONE_DEFAULT_BUCKET_NUM);
+
+ Iterator<ManifestEntry> it =
+ fileStoreTable
+ .newSnapshotReader()
+
.withPartitionFilter(Collections.singletonList(partition))
+ .withBucketFilter(new NormalBucketFilter())
+ .readFileIterator();
+ if (it.hasNext()) {
+ bucketNum = it.next().totalBuckets();
+ }
+
+ bucketOptions = new HashMap<>(table.options());
+ bucketOptions.put(CoreOptions.BUCKET.key(),
String.valueOf(bucketNum));
+ FileStoreTable realTable =
table.copy(table.schema().copy(bucketOptions));
+
+ LinkedHashMap<String, String> partitionSpec =
+ partitionComputer.generatePartValues(partition);
+ Pair<DataStream<RowData>, DataStream<Committable>> sourcePair =
+ PostponeBucketCompactSplitSource.buildSource(
+ env,
+ realTable.fullName() + partitionSpec,
+ realTable.rowType(),
+ realTable
+ .newReadBuilder()
+ .withPartitionFilter(partitionSpec)
+ .withBucketFilter(new
PostponeBucketFilter()),
+
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
+
+ DataStream<InternalRow> partitioned =
+ FlinkStreamPartitioner.partition(
+ FlinkSinkBuilder.mapToInternalRow(
+ sourcePair.getLeft(), realTable.rowType()),
+ new RowDataChannelComputer(realTable.schema(),
false),
+ null);
+ FixedBucketSink sink = new FixedBucketSink(realTable, null, null);
+ String commitUser =
+
CoreOptions.createCommitUser(realTable.coreOptions().toConfiguration());
+ DataStream<Committable> written =
+ sink.doWrite(partitioned, commitUser,
partitioned.getParallelism())
+ .forward()
+ .transform(
+ "Rewrite compact committable",
+ new CommittableTypeInfo(),
+ new
RewritePostponeBucketCommittableOperator(realTable));
+ sink.doCommit(written.union(sourcePair.getRight()), commitUser);
+ }
+
+ return true;
+ }
+
+ private static class PostponeBucketFilter implements Filter<Integer>,
Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean test(Integer bucket) {
+ return bucket == BucketMode.POSTPONE_BUCKET;
+ }
+ }
+
+ private static class NormalBucketFilter implements Filter<Integer>,
Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean test(Integer bucket) {
+ return bucket >= 0;
+ }
+ }
+
@Override
public void run() throws Exception {
- build();
- execute("Compact job");
+ if (buildImpl()) {
+ execute("Compact job : " + table.fullName());
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
index 238e23aaeb..dc9614ce34 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
@@ -101,12 +101,12 @@ public class CompactActionFactory implements
ActionFactory {
System.out.println("Syntax:");
System.out.println(
- " compact --warehouse <warehouse_path> --database
<database_name> \\\n"
- + "--table <table_name> [--partition <partition_name>]
\\\n"
- + "[--order_strategy <order_strategy>] \\\n"
- + "[--table_conf <key>=<value>] \\\n"
- + "[--order_by <order_columns>] \\\n"
- + "[--partition_idle_time <partition_idle_time>] \\\n"
+ " compact --warehouse <warehouse_path> --database
<database_name> \n"
+ + "--table <table_name> [--partition <partition_name>]
\n"
+ + "[--order_strategy <order_strategy>] \n"
+ + "[--table_conf <key>=<value>] \n"
+ + "[--order_by <order_columns>] \n"
+ + "[--partition_idle_time <partition_idle_time>] \n"
+ "[--compact_strategy <compact_strategy>]");
System.out.println(
" compact --warehouse s3://path/to/warehouse --database
<database_name> "
@@ -140,14 +140,14 @@ public class CompactActionFactory implements
ActionFactory {
+ "`full` : Only supports batch mode. All files will
be selected for merging."
+ "`minor`: Pick the set of files that need to be
merged based on specified conditions.");
System.out.println(
- " compact --warehouse s3:///path/to/warehouse \\\n"
- + "--database test_db \\\n"
- + "--table test_table \\\n"
- + "--order_strategy zorder \\\n"
- + "--order_by a,b,c \\\n"
- + "--table_conf sink.parallelism=9 \\\n"
- + "--catalog_conf s3.endpoint=https://****.com \\\n"
- + "--catalog_conf s3.access-key=***** \\\n"
+ " compact --warehouse s3:///path/to/warehouse \n"
+ + "--database test_db \n"
+ + "--table test_table \n"
+ + "--order_strategy zorder \n"
+ + "--order_by a,b,c \n"
+ + "--table_conf sink.parallelism=9 \n"
+ + "--catalog_conf s3.endpoint=https://****.com \n"
+ + "--catalog_conf s3.access-key=***** \n"
+ "--catalog_conf s3.secret-key=***** ");
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
new file mode 100644
index 0000000000..64a205aee0
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.sink.FlinkSinkBuilder;
+import org.apache.paimon.flink.source.FlinkSourceBuilder;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Action to rescale one partition of a table. */
+public class RescaleAction extends TableActionBase {
+
+ private @Nullable Integer bucketNum;
+ private Map<String, String> partition = new HashMap<>();
+
+ public RescaleAction(String databaseName, String tableName, Map<String,
String> catalogConfig) {
+ super(databaseName, tableName, catalogConfig);
+ }
+
+ public RescaleAction withBucketNum(int bucketNum) {
+ this.bucketNum = bucketNum;
+ return this;
+ }
+
+ public RescaleAction withPartition(Map<String, String> partition) {
+ this.partition = partition;
+ return this;
+ }
+
+ @Override
+ public void build() throws Exception {
+ Configuration flinkConf = new Configuration();
+ flinkConf.set(ExecutionOptions.RUNTIME_MODE,
RuntimeExecutionMode.BATCH);
+ env.configure(flinkConf);
+
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ RowType partitionType = fileStoreTable.schema().logicalPartitionType();
+ Predicate partitionPredicate =
+ PartitionPredicate.createPartitionPredicate(
+ partitionType,
+ InternalRowPartitionComputer.convertSpecToInternal(
+ partition,
+ partitionType,
+
fileStoreTable.coreOptions().partitionDefaultName()));
+ DataStream<RowData> source =
+ new FlinkSourceBuilder(fileStoreTable)
+ .env(env)
+ .sourceBounded(true)
+ .predicate(partitionPredicate)
+ .build();
+
+ Map<String, String> bucketOptions = new
HashMap<>(fileStoreTable.options());
+ if (bucketNum == null) {
+ Preconditions.checkArgument(
+ fileStoreTable.coreOptions().bucket() !=
BucketMode.POSTPONE_BUCKET,
+ "When rescaling postpone bucket tables, you must provide
the resulting bucket number.");
+ } else {
+ bucketOptions.put(CoreOptions.BUCKET.key(),
String.valueOf(bucketNum));
+ }
+ FileStoreTable rescaledTable =
+
fileStoreTable.copy(fileStoreTable.schema().copy(bucketOptions));
+ new
FlinkSinkBuilder(rescaledTable).overwrite(partition).forRowData(source).build();
+ }
+
+ @Override
+ public void run() throws Exception {
+ build();
+ env.execute("Rescale Postpone Bucket : " + table.fullName());
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleActionFactory.java
new file mode 100644
index 0000000000..8a5465ff41
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleActionFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import java.util.Optional;
+
+/** Factory to create {@link RescaleAction}. */
+public class RescaleActionFactory implements ActionFactory {
+
+ public static final String IDENTIFIER = "rescale";
+ private static final String BUCKET_NUM = "bucket_num";
+ private static final String PARTITION = "partition";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
+ RescaleAction action =
+ new RescaleAction(
+ params.getRequired(DATABASE),
+ params.getRequired(TABLE),
+ catalogConfigMap(params));
+
+ if (params.has(BUCKET_NUM)) {
+ action.withBucketNum(Integer.parseInt(params.get(BUCKET_NUM)));
+ }
+
+ if (params.has(PARTITION)) {
+ action.withPartition(getPartitions(params).get(0));
+ }
+
+ return Optional.of(action);
+ }
+
+ @Override
+ public void printHelp() {
+ System.out.println("Action \"rescale\" rescales one partition of a
table.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " rescale --warehouse <warehouse_path> --database
<database_name> "
+ + "--table <table_name> [--bucket_num <bucket_num>] "
+ + "[--partition <partition>]");
+ System.out.println(
+ "The default value of argument bucket_num is the current
bucket number of the table. "
+ + "For postpone bucket tables, this argument must be
specified.");
+ System.out.println(
+ "Argument partition must be specified if the table is a
partitioned table.");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
new file mode 100644
index 0000000000..0bfab0d196
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.postpone;
+
+import org.apache.paimon.flink.LogicalTypeConversion;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableTypeInfo;
+import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
+import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
+import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
+import org.apache.paimon.flink.source.SimpleSourceSplit;
+import org.apache.paimon.flink.source.operator.ReadOperator;
+import org.apache.paimon.flink.utils.JavaTypeInfo;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.table.sink.ChannelComputer;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Source for compacting postpone bucket tables. This source scans all files
from {@code bucket =
+ * -2} directory and distributes the files to the readers.
+ */
+public class PostponeBucketCompactSplitSource extends
AbstractNonCoordinatedSource<Split> {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PostponeBucketCompactSplitSource.class);
+
+ private final ReadBuilder readBuilder;
+
+ public PostponeBucketCompactSplitSource(ReadBuilder readBuilder) {
+ this.readBuilder = readBuilder;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SourceReader<Split, SimpleSourceSplit>
createReader(SourceReaderContext readerContext)
+ throws Exception {
+ return new Reader();
+ }
+
+ private class Reader extends AbstractNonCoordinatedSourceReader<Split> {
+
+ private final TableScan scan = readBuilder.newScan();
+
+ @Override
+ public InputStatus pollNext(ReaderOutput<Split> output) throws
Exception {
+ try {
+ List<Split> splits = scan.plan().splits();
+
+ for (Split split : splits) {
+ DataSplit dataSplit = (DataSplit) split;
+ List<DataFileMeta> files = new
ArrayList<>(dataSplit.dataFiles());
+ // we must replay the written records in exact order
+
files.sort(Comparator.comparing(DataFileMeta::creationTime));
+ for (DataFileMeta meta : files) {
+ DataSplit s =
+ DataSplit.builder()
+ .withPartition(dataSplit.partition())
+ .withBucket(dataSplit.bucket())
+ .withBucketPath(dataSplit.bucketPath())
+
.withDataFiles(Collections.singletonList(meta))
+ .isStreaming(false)
+ .build();
+ output.collect(s);
+ }
+ }
+ } catch (EndOfScanException esf) {
+ LOG.info("Catching EndOfStreamException, the stream is
finished.");
+ return InputStatus.END_OF_INPUT;
+ }
+ return InputStatus.MORE_AVAILABLE;
+ }
+ }
+
+ public static Pair<DataStream<RowData>, DataStream<Committable>>
buildSource(
+ StreamExecutionEnvironment env,
+ String name,
+ RowType rowType,
+ ReadBuilder readBuilder,
+ @Nullable Integer parallelism) {
+ DataStream<Split> source =
+ env.fromSource(
+ new
PostponeBucketCompactSplitSource(readBuilder),
+ WatermarkStrategy.noWatermarks(),
+ "Compact split generator: " + name,
+ new JavaTypeInfo<>(Split.class))
+ .forceNonParallel();
+
+ FlinkStreamPartitioner<Split> partitioner =
+ new FlinkStreamPartitioner<>(new SplitChannelComputer());
+ PartitionTransformation<Split> partitioned =
+ new PartitionTransformation<>(source.getTransformation(),
partitioner);
+ if (parallelism != null) {
+ partitioned.setParallelism(parallelism);
+ }
+
+ return Pair.of(
+ new DataStream<>(source.getExecutionEnvironment(), partitioned)
+ .transform(
+ "Compact split reader: " + name,
+
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(rowType)),
+ new ReadOperator(readBuilder, null)),
+ source.forward()
+ .transform(
+ "Remove new files",
+ new CommittableTypeInfo(),
+ new RemovePostponeBucketFilesOperator())
+ .forceNonParallel());
+ }
+
+ private static class SplitChannelComputer implements
ChannelComputer<Split> {
+
+ private transient int numChannels;
+ private transient Pattern pattern;
+
+ @Override
+ public void setup(int numChannels) {
+ this.numChannels = numChannels;
+ // see PostponeBucketTableWriteOperator
+ this.pattern = Pattern.compile("-s-(\\d+?)-");
+ }
+
+ @Override
+ public int channel(Split record) {
+ DataSplit dataSplit = (DataSplit) record;
+ String fileName = dataSplit.dataFiles().get(0).fileName();
+
+ Matcher matcher = pattern.matcher(fileName);
+ Preconditions.checkState(
+ matcher.find(),
+ "Data file name does not match the pattern. This is
unexpected.");
+ int subtaskId = Integer.parseInt(matcher.group(1));
+
+ // send records written by the same subtask to the same subtask
+ // to make sure we replay the written records in the exact order
+ return (Math.abs(dataSplit.partition().hashCode()) + subtaskId) %
numChannels;
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RemovePostponeBucketFilesOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RemovePostponeBucketFilesOperator.java
new file mode 100644
index 0000000000..63b44d0f85
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RemovePostponeBucketFilesOperator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.postpone;
+
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.utils.BoundedOneInputOperator;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Collections;
+
+/**
+ * Operator used with {@link PostponeBucketCompactSplitSource}, to remove
files in {@code bucket =
+ * -2} directory.
+ */
+public class RemovePostponeBucketFilesOperator extends
BoundedOneInputOperator<Split, Committable> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement(StreamRecord<Split> element) throws Exception {
+ DataSplit dataSplit = (DataSplit) element.getValue();
+ CommitMessageImpl message =
+ new CommitMessageImpl(
+ dataSplit.partition(),
+ dataSplit.bucket(),
+ DataIncrement.emptyIncrement(),
+ new CompactIncrement(
+ dataSplit.dataFiles(),
+ Collections.emptyList(),
+ Collections.emptyList()));
+ output.collect(
+ new StreamRecord<>(
+ new Committable(Long.MAX_VALUE, Committable.Kind.FILE,
message)));
+ }
+
+ @Override
+ public void endInput() throws Exception {}
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
new file mode 100644
index 0000000000..f387222e4f
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.postpone;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.utils.BoundedOneInputOperator;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.utils.FileStorePathFactory;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Rewrite committable from postpone bucket table compactor. It moves all new
files into compact
+ * results, and delete unused new files, because compactor only produce
compact snapshots.
+ */
+public class RewritePostponeBucketCommittableOperator
+ extends BoundedOneInputOperator<Committable, Committable> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FileStoreTable table;
+
+ private transient FileStorePathFactory pathFactory;
+ private transient Map<BinaryRow, Map<Integer, BucketFiles>> bucketFiles;
+
+ public RewritePostponeBucketCommittableOperator(FileStoreTable table) {
+ this.table = table;
+ }
+
+ @Override
+ public void open() throws Exception {
+ pathFactory = table.store().pathFactory();
+ bucketFiles = new HashMap<>();
+ }
+
+ @Override
+ public void processElement(StreamRecord<Committable> element) throws
Exception {
+ Committable committable = element.getValue();
+ if (committable.kind() != Committable.Kind.FILE) {
+ output.collect(element);
+ }
+
+ CommitMessageImpl message = (CommitMessageImpl)
committable.wrappedCommittable();
+ bucketFiles
+ .computeIfAbsent(message.partition(), p -> new HashMap<>())
+ .computeIfAbsent(
+ message.bucket(),
+ b ->
+ new BucketFiles(
+ pathFactory.createDataFilePathFactory(
+ message.partition(),
message.bucket()),
+ table.fileIO()))
+ .update(message);
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ emitAll(Long.MAX_VALUE);
+ }
+
+ protected void emitAll(long checkpointId) {
+ for (Map.Entry<BinaryRow, Map<Integer, BucketFiles>> partitionEntry :
+ bucketFiles.entrySet()) {
+ for (Map.Entry<Integer, BucketFiles> bucketEntry :
+ partitionEntry.getValue().entrySet()) {
+ CommitMessageImpl message =
+ new CommitMessageImpl(
+ partitionEntry.getKey(),
+ bucketEntry.getKey(),
+ DataIncrement.emptyIncrement(),
+ bucketEntry.getValue().makeIncrement());
+ output.collect(
+ new StreamRecord<>(
+ new Committable(checkpointId,
Committable.Kind.FILE, message)));
+ }
+ }
+ bucketFiles.clear();
+ }
+
+ private static class BucketFiles {
+
+ private final DataFilePathFactory pathFactory;
+ private final FileIO fileIO;
+
+ private final Map<String, DataFileMeta> newFiles;
+ private final List<DataFileMeta> compactBefore;
+ private final List<DataFileMeta> compactAfter;
+ private final List<DataFileMeta> changelogFiles;
+
+ private BucketFiles(DataFilePathFactory pathFactory, FileIO fileIO) {
+ this.pathFactory = pathFactory;
+ this.fileIO = fileIO;
+
+ this.newFiles = new LinkedHashMap<>();
+ this.compactBefore = new ArrayList<>();
+ this.compactAfter = new ArrayList<>();
+ this.changelogFiles = new ArrayList<>();
+ }
+
+ private void update(CommitMessageImpl message) {
+ for (DataFileMeta file : message.newFilesIncrement().newFiles()) {
+ newFiles.put(file.fileName(), file);
+ }
+
+ Map<String, Path> toDelete = new HashMap<>();
+ for (DataFileMeta file :
message.compactIncrement().compactBefore()) {
+ if (newFiles.containsKey(file.fileName())) {
+ toDelete.put(file.fileName(), pathFactory.toPath(file));
+ newFiles.remove(file.fileName());
+ } else {
+ compactBefore.add(file);
+ }
+ }
+
+ for (DataFileMeta file :
message.compactIncrement().compactAfter()) {
+ compactAfter.add(file);
+ toDelete.remove(file.fileName());
+ }
+
+
changelogFiles.addAll(message.newFilesIncrement().changelogFiles());
+ changelogFiles.addAll(message.compactIncrement().changelogFiles());
+
+ toDelete.forEach((fileName, path) -> fileIO.deleteQuietly(path));
+ }
+
+ private CompactIncrement makeIncrement() {
+ List<DataFileMeta> realCompactAfter = new
ArrayList<>(newFiles.values());
+ realCompactAfter.addAll(compactAfter);
+ return new CompactIncrement(compactBefore, realCompactAfter,
changelogFiles);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
index b80693b39e..74ae5f786d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
@@ -99,7 +99,7 @@ public class CompactProcedure extends ProcedureBase {
if (checkCompactStrategy(compactStrategy)) {
action.withFullCompaction(compactStrategy.trim().equalsIgnoreCase(FULL));
}
- jobName = "Compact Job";
+ jobName = "Compact Job : " + identifier.getFullName();
} else if (!isNullOrWhitespaceOnly(orderStrategy)
&& !isNullOrWhitespaceOnly(orderByColumns)) {
Preconditions.checkArgument(
@@ -113,7 +113,7 @@ public class CompactProcedure extends ProcedureBase {
tableConf)
.withOrderStrategy(orderStrategy)
.withOrderColumns(orderByColumns.split(","));
- jobName = "Sort Compact Job";
+ jobName = "Sort Compact Job : " + identifier.getFullName();
} else {
throw new IllegalArgumentException(
"You must specify 'order strategy' and 'order by columns'
both.");
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java
new file mode 100644
index 0000000000..c7d2a1f2e7
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.RescaleAction;
+import org.apache.paimon.utils.ParameterUtils;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import javax.annotation.Nullable;
+
+/** Procedure to rescale one partition of a table. */
+public class RescaleProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "rescale";
+
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "bucket_num", type =
@DataTypeHint("INT"), isOptional = true),
+ @ArgumentHint(name = "partition", type =
@DataTypeHint("STRING"), isOptional = true)
+ })
+ public String[] call(
+ ProcedureContext procedureContext,
+ String tableId,
+ @Nullable Integer bucketNum,
+ @Nullable String partition)
+ throws Exception {
+ Identifier identifier = Identifier.fromString(tableId);
+ String databaseName = identifier.getDatabaseName();
+ String tableName = identifier.getObjectName();
+
+ RescaleAction action = new RescaleAction(databaseName, tableName,
catalog.options());
+ if (bucketNum != null) {
+ action.withBucketNum(bucketNum);
+ }
+ if (partition != null) {
+
action.withPartition(ParameterUtils.getPartitions(partition).get(0));
+ }
+
+ return execute(
+ procedureContext, action, "Rescale Postpone Bucket : " +
identifier.getFullName());
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 2098729858..fb0f8184b5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -262,7 +262,7 @@ public abstract class FlinkSink<T> implements Serializable {
return written;
}
- protected DataStreamSink<?> doCommit(DataStream<Committable> written,
String commitUser) {
+ public DataStreamSink<?> doCommit(DataStream<Committable> written, String
commitUser) {
StreamExecutionEnvironment env = written.getExecutionEnvironment();
ReadableConfig conf = env.getConfiguration();
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index d99a7d818f..35a7d744c9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -234,7 +234,11 @@ public class FlinkSinkBuilder {
BucketMode bucketMode = table.bucketMode();
switch (bucketMode) {
case HASH_FIXED:
- return buildForFixedBucket(input);
+ if (table.coreOptions().bucket() ==
BucketMode.POSTPONE_BUCKET) {
+ return buildPostponeBucketSink(input);
+ } else {
+ return buildForFixedBucket(input);
+ }
case HASH_DYNAMIC:
return buildDynamicBucketSink(input, false);
case CROSS_PARTITION:
@@ -246,7 +250,7 @@ public class FlinkSinkBuilder {
}
}
- protected DataStream<InternalRow> mapToInternalRow(
+ public static DataStream<InternalRow> mapToInternalRow(
DataStream<RowData> input, org.apache.paimon.types.RowType
rowType) {
SingleOutputStreamOperator<InternalRow> result =
input.transform(
@@ -291,6 +295,10 @@ public class FlinkSinkBuilder {
return sink.sinkFrom(partitioned);
}
+ private DataStreamSink<?> buildPostponeBucketSink(DataStream<InternalRow>
input) {
+ return new PostponeBucketWriteSink(table,
overwritePartition).sinkFrom(input, parallelism);
+ }
+
private DataStreamSink<?> buildUnawareBucketSink(DataStream<InternalRow>
input) {
checkArgument(
table.primaryKeys().isEmpty(),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketTableWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketTableWriteOperator.java
new file mode 100644
index 0000000000..a233c6a6cc
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketTableWriteOperator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** {@link TableWriteOperator} for writing records in postpone bucket table. */
+public class PostponeBucketTableWriteOperator extends
TableWriteOperator<InternalRow> {
+
+ private static final long serialVersionUID = 1L;
+
+ public PostponeBucketTableWriteOperator(
+ StreamOperatorParameters<Committable> parameters,
+ FileStoreTable table,
+ StoreSinkWrite.Provider storeSinkWriteProvider,
+ String initialCommitUser) {
+ super(parameters, table, storeSinkWriteProvider, initialCommitUser);
+ }
+
+ @Override
+ protected boolean containLogSystem() {
+ return false;
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ Map<String, String> dynamicOptions = new HashMap<>();
+ dynamicOptions.put(
+ CoreOptions.DATA_FILE_PREFIX.key(),
+ String.format(
+ "%s-u-%s-s-%d-w-",
+ table.coreOptions().dataFilePrefix(),
+ getCommitUser(context),
+
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext())));
+ table = table.copy(dynamicOptions);
+
+ super.initializeState(context);
+ }
+
+ @Override
+ public void processElement(StreamRecord<InternalRow> element) throws
Exception {
+ write.write(element.getValue(), BucketMode.POSTPONE_BUCKET);
+ }
+
+ /** Factory to create {@link PostponeBucketTableWriteOperator}. */
+ public static class Factory extends
TableWriteOperator.Factory<InternalRow> {
+
+ protected Factory(
+ FileStoreTable table,
+ StoreSinkWrite.Provider storeSinkWriteProvider,
+ String initialCommitUser) {
+ super(table, storeSinkWriteProvider, initialCommitUser);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T extends StreamOperator<Committable>> T createStreamOperator(
+ StreamOperatorParameters<Committable> parameters) {
+ return (T)
+ new PostponeBucketTableWriteOperator(
+ parameters, table, storeSinkWriteProvider,
initialCommitUser);
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public Class<? extends StreamOperator>
getStreamOperatorClass(ClassLoader classLoader) {
+ return PostponeBucketTableWriteOperator.class;
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketWriteSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketWriteSink.java
new file mode 100644
index 0000000000..4c6d271f72
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketWriteSink.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+import static org.apache.paimon.CoreOptions.createCommitUser;
+
+/** {@link FlinkSink} for writing records into postpone bucket table. */
+public class PostponeBucketWriteSink extends FlinkWriteSink<InternalRow> {
+
+ public PostponeBucketWriteSink(
+ FileStoreTable table, @Nullable Map<String, String>
overwritePartition) {
+ super(table, overwritePartition);
+ }
+
+ @Override
+ protected OneInputStreamOperatorFactory<InternalRow, Committable>
createWriteOperatorFactory(
+ StoreSinkWrite.Provider writeProvider, String commitUser) {
+ return new PostponeBucketTableWriteOperator.Factory(table,
writeProvider, commitUser);
+ }
+
+ public DataStreamSink<?> sinkFrom(
+ DataStream<InternalRow> input, @Nullable Integer parallelism) {
+ String initialCommitUser =
createCommitUser(table.coreOptions().toConfiguration());
+ DataStream<Committable> written =
+ doWrite(
+ input,
+ initialCommitUser,
+ parallelism == null ? input.getParallelism() :
parallelism);
+ return doCommit(written, initialCommitUser);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index fd876698c0..ec69ce59f6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -43,6 +43,7 @@ public abstract class TableWriteOperator<IN> extends
PrepareCommitOperator<IN, C
private final StoreSinkWrite.Provider storeSinkWriteProvider;
private final String initialCommitUser;
+ private transient String commitUser;
private transient StoreSinkWriteState state;
protected transient StoreSinkWrite write;
@@ -90,11 +91,16 @@ public abstract class TableWriteOperator<IN> extends
PrepareCommitOperator<IN, C
}
protected String getCommitUser(StateInitializationContext context) throws
Exception {
- // Each job can only have one username and this name must be
consistent across restarts.
- // We cannot use job id as commit username here because user may
change job id by creating
- // a savepoint, stop the job and then resume from savepoint.
- return StateUtils.getSingleValueFromState(
- context, "commit_user_state", String.class, initialCommitUser);
+ if (commitUser == null) {
+ // Each job can only have one username and this name must be
consistent across restarts.
+ // We cannot use job id as commit username here because user may
change job id by
+ // creating a savepoint, stop the job and then resume from
savepoint.
+ commitUser =
+ StateUtils.getSingleValueFromState(
+ context, "commit_user_state", String.class,
initialCommitUser);
+ }
+
+ return commitUser;
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index efaa25627d..ae624f848d 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -45,6 +45,7 @@ org.apache.paimon.flink.action.RewriteFileIndexActionFactory
org.apache.paimon.flink.action.ExpireSnapshotsActionFactory
org.apache.paimon.flink.action.RemoveUnexistingFilesActionFactory
org.apache.paimon.flink.action.ClearConsumerActionFactory
+org.apache.paimon.flink.action.RescaleActionFactory
### procedure factories
org.apache.paimon.flink.procedure.CompactDatabaseProcedure
@@ -87,3 +88,4 @@ org.apache.paimon.flink.procedure.RefreshObjectTableProcedure
org.apache.paimon.flink.procedure.RemoveUnexistingFilesProcedure
org.apache.paimon.flink.procedure.ClearConsumersProcedure
org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure
+org.apache.paimon.flink.procedure.RescaleProcedure
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
new file mode 100644
index 0000000000..d274a02cd8
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.flink.util.AbstractTestBase;
+
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for postpone bucket tables. */
+public class PostponeBucketTableITCase extends AbstractTestBase {
+
+ private static final int TIMEOUT = 120;
+
+ @Test
+ public void testWriteThenCompact() throws Exception {
+ String warehouse = getTempDirPath();
+ TableEnvironment tEnv =
+ tableEnvironmentBuilder()
+ .batchMode()
+ .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+ .build();
+
+ tEnv.executeSql(
+ "CREATE CATALOG mycat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "'\n"
+ + ")");
+ tEnv.executeSql("USE CATALOG mycat");
+ tEnv.executeSql(
+ "CREATE TABLE T (\n"
+ + " pt INT,\n"
+ + " k INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (pt, k) NOT ENFORCED\n"
+ + ") PARTITIONED BY (pt) WITH (\n"
+ + " 'bucket' = '-2'\n"
+ + ")");
+
+ int numPartitions = 3;
+ int numKeys = 100;
+ List<String> values = new ArrayList<>();
+ for (int i = 0; i < numPartitions; i++) {
+ for (int j = 0; j < numKeys; j++) {
+ values.add(String.format("(%d, %d, %d)", i, j, i * numKeys +
j));
+ }
+ }
+ tEnv.executeSql("INSERT INTO T VALUES " + String.join(", ",
values)).await();
+ assertThat(collect(tEnv.executeSql("SELECT * FROM T"))).isEmpty();
+
+ tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+
+ List<String> expected = new ArrayList<>();
+ for (int i = 0; i < numPartitions; i++) {
+ expected.add(
+ String.format(
+ "+I[%d, %d]",
+ i, (i * numKeys + i * numKeys + numKeys - 1) *
numKeys / 2));
+ }
+ String query = "SELECT pt, SUM(v) FROM T GROUP BY pt";
+
assertThat(collect(tEnv.executeSql(query))).hasSameElementsAs(expected);
+
+ values.clear();
+ int changedPartition = 1;
+ for (int j = 0; j < numKeys; j++) {
+ values.add(
+ String.format(
+ "(%d, %d, %d)",
+ changedPartition, j, -(changedPartition * numKeys
+ j)));
+ }
+ tEnv.executeSql("INSERT INTO T VALUES " + String.join(", ",
values)).await();
+
assertThat(collect(tEnv.executeSql(query))).hasSameElementsAs(expected);
+
+ tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+
+ expected.clear();
+ for (int i = 0; i < numPartitions; i++) {
+ int val = (i * numKeys + i * numKeys + numKeys - 1) * numKeys / 2;
+ if (i == changedPartition) {
+ val *= -1;
+ }
+ expected.add(String.format("+I[%d, %d]", i, val));
+ }
+
assertThat(collect(tEnv.executeSql(query))).hasSameElementsAs(expected);
+ }
+
+ @Test
+ public void testOverwrite() throws Exception {
+ String warehouse = getTempDirPath();
+ TableEnvironment tEnv =
+ tableEnvironmentBuilder()
+ .batchMode()
+ .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+ .build();
+
+ tEnv.executeSql(
+ "CREATE CATALOG mycat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "'\n"
+ + ")");
+ tEnv.executeSql("USE CATALOG mycat");
+ tEnv.executeSql(
+ "CREATE TABLE T (\n"
+ + " pt INT,\n"
+ + " k INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (pt, k) NOT ENFORCED\n"
+ + ") PARTITIONED BY (pt) WITH (\n"
+ + " 'bucket' = '-2'\n"
+ + ")");
+
+ tEnv.executeSql(
+ "INSERT INTO T VALUES (1, 10, 110), (1, 20, 120), (2,
10, 210), (2, 20, 220)")
+ .await();
+ assertThat(collect(tEnv.executeSql("SELECT * FROM T"))).isEmpty();
+ tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+ assertThat(collect(tEnv.executeSql("SELECT k, v, pt FROM T")))
+ .containsExactlyInAnyOrder(
+ "+I[10, 110, 1]", "+I[20, 120, 1]", "+I[10, 210, 2]",
"+I[20, 220, 2]");
+
+ // no compact, so the result is the same
+ tEnv.executeSql("INSERT INTO T VALUES (2, 40, 240)").await();
+ assertThat(collect(tEnv.executeSql("SELECT k, v, pt FROM T")))
+ .containsExactlyInAnyOrder(
+ "+I[10, 110, 1]", "+I[20, 120, 1]", "+I[10, 210, 2]",
"+I[20, 220, 2]");
+
+ tEnv.executeSql("INSERT OVERWRITE T VALUES (2, 20, 221), (2, 30,
230)").await();
+ assertThat(collect(tEnv.executeSql("SELECT k, v, pt FROM T")))
+ .containsExactlyInAnyOrder("+I[10, 110, 1]", "+I[20, 120, 1]");
+ tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+ // overwrite should also clean up files in bucket = -2 directory,
+ // which the record with key = 40
+ assertThat(collect(tEnv.executeSql("SELECT k, v, pt FROM T")))
+ .containsExactlyInAnyOrder(
+ "+I[10, 110, 1]", "+I[20, 120, 1]", "+I[20, 221, 2]",
"+I[30, 230, 2]");
+ }
+
+ @Timeout(TIMEOUT)
+ @Test
+ public void testLookupChangelogProducer() throws Exception {
+ String warehouse = getTempDirPath();
+ TableEnvironment bEnv =
+ tableEnvironmentBuilder()
+ .batchMode()
+ .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+ .build();
+
+ String createCatalogSql =
+ "CREATE CATALOG mycat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "'\n"
+ + ")";
+ bEnv.executeSql(createCatalogSql);
+ bEnv.executeSql("USE CATALOG mycat");
+ bEnv.executeSql(
+ "CREATE TABLE T (\n"
+ + " pt INT,\n"
+ + " k INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (pt, k) NOT ENFORCED\n"
+ + ") PARTITIONED BY (pt) WITH (\n"
+ + " 'bucket' = '-2',\n"
+ + " 'changelog-producer' = 'lookup'\n"
+ + ")");
+
+ TableEnvironment sEnv =
+ tableEnvironmentBuilder()
+ .streamingMode()
+ .parallelism(1)
+ .checkpointIntervalMs(1000)
+ .build();
+ sEnv.executeSql(createCatalogSql);
+ sEnv.executeSql("USE CATALOG mycat");
+ TableResult streamingSelect = sEnv.executeSql("SELECT k, v, pt FROM
T");
+ JobClient client = streamingSelect.getJobClient().get();
+ CloseableIterator<Row> it = streamingSelect.collect();
+
+ bEnv.executeSql(
+ "INSERT INTO T VALUES (1, 10, 110), (1, 20, 120), (2,
10, 210), (2, 20, 220)")
+ .await();
+ assertThat(collect(bEnv.executeSql("SELECT * FROM T"))).isEmpty();
+ bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+ assertThat(collect(bEnv.executeSql("SELECT k, v, pt FROM T")))
+ .containsExactlyInAnyOrder(
+ "+I[10, 110, 1]", "+I[20, 120, 1]", "+I[10, 210, 2]",
"+I[20, 220, 2]");
+ assertThat(collect(client, it, 4))
+ .containsExactlyInAnyOrder(
+ "+I[10, 110, 1]", "+I[20, 120, 1]", "+I[10, 210, 2]",
"+I[20, 220, 2]");
+
+ bEnv.executeSql("INSERT INTO T VALUES (1, 20, 121), (2, 30,
230)").await();
+ bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+ assertThat(collect(bEnv.executeSql("SELECT k, v, pt FROM T")))
+ .containsExactlyInAnyOrder(
+ "+I[10, 110, 1]",
+ "+I[20, 121, 1]",
+ "+I[10, 210, 2]",
+ "+I[20, 220, 2]",
+ "+I[30, 230, 2]");
+ assertThat(collect(client, it, 3))
+ .containsExactlyInAnyOrder("-U[20, 120, 1]", "+U[20, 121, 1]",
"+I[30, 230, 2]");
+
+ it.close();
+ }
+
+ @Test
+ public void testRescaleBucket() throws Exception {
+ String warehouse = getTempDirPath();
+ TableEnvironment tEnv =
+ tableEnvironmentBuilder()
+ .batchMode()
+ .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+ .build();
+
+ tEnv.executeSql(
+ "CREATE CATALOG mycat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "'\n"
+ + ")");
+ tEnv.executeSql("USE CATALOG mycat");
+ tEnv.executeSql(
+ "CREATE TABLE T (\n"
+ + " pt INT,\n"
+ + " k INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (pt, k) NOT ENFORCED\n"
+ + ") PARTITIONED BY (pt) WITH (\n"
+ + " 'bucket' = '-2',\n"
+ + " 'postpone.default-bucket-num' = '2'\n"
+ + ")");
+
+ int numPartitions = 3;
+ int numKeys = 100;
+ List<String> values = new ArrayList<>();
+ for (int i = 0; i < numPartitions; i++) {
+ for (int j = 0; j < numKeys; j++) {
+ values.add(String.format("(%d, %d, %d)", i, j, i * numKeys +
j));
+ }
+ }
+ tEnv.executeSql("INSERT INTO T VALUES " + String.join(", ",
values)).await();
+ tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+
+ List<String> expectedBuckets = new ArrayList<>();
+ for (int i = 0; i < numPartitions; i++) {
+ expectedBuckets.add(String.format("+I[{%d}, 2]", i));
+ }
+ String bucketSql =
+ "SELECT `partition`, COUNT(DISTINCT bucket) FROM `T$files`
GROUP BY `partition`";
+
assertThat(collect(tEnv.executeSql(bucketSql))).hasSameElementsAs(expectedBuckets);
+
+ List<String> expectedData = new ArrayList<>();
+ for (int i = 0; i < numPartitions; i++) {
+ expectedData.add(
+ String.format(
+ "+I[%d, %d]",
+ i, (i * numKeys + i * numKeys + numKeys - 1) *
numKeys / 2));
+ }
+ String query = "SELECT pt, SUM(v) FROM T GROUP BY pt";
+
assertThat(collect(tEnv.executeSql(query))).hasSameElementsAs(expectedData);
+
+ // before rescaling, write some files in bucket = -2 directory,
+ // these files should not be touched by rescaling
+ values.clear();
+ int changedPartition = 1;
+ for (int j = 0; j < numKeys; j++) {
+ values.add(
+ String.format(
+ "(%d, %d, %d)",
+ changedPartition, j, -(changedPartition * numKeys
+ j)));
+ }
+ tEnv.executeSql("INSERT INTO T VALUES " + String.join(", ",
values)).await();
+
+ tEnv.executeSql(
+ "CALL sys.rescale(`table` => 'default.T', `bucket_num` => 4,
`partition` => 'pt="
+ + changedPartition
+ + "')");
+ expectedBuckets.clear();
+ for (int i = 0; i < numPartitions; i++) {
+ expectedBuckets.add(String.format("+I[{%d}, %d]", i, i ==
changedPartition ? 4 : 2));
+ }
+
assertThat(collect(tEnv.executeSql(bucketSql))).hasSameElementsAs(expectedBuckets);
+
assertThat(collect(tEnv.executeSql(query))).hasSameElementsAs(expectedData);
+
+ // rescaling bucket should not touch the files in bucket = -2 directory
+ tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+
assertThat(collect(tEnv.executeSql(bucketSql))).hasSameElementsAs(expectedBuckets);
+
+ expectedData.clear();
+ for (int i = 0; i < numPartitions; i++) {
+ int val = (i * numKeys + i * numKeys + numKeys - 1) * numKeys / 2;
+ if (i == changedPartition) {
+ val *= -1;
+ }
+ expectedData.add(String.format("+I[%d, %d]", i, val));
+ }
+
assertThat(collect(tEnv.executeSql(query))).hasSameElementsAs(expectedData);
+ }
+
+ @Timeout(TIMEOUT)
+ @Test
+ public void testInputChangelogProducer() throws Exception {
+ String warehouse = getTempDirPath();
+ TableEnvironment sEnv =
+ tableEnvironmentBuilder()
+ .streamingMode()
+ .parallelism(1)
+ .checkpointIntervalMs(500)
+ .build();
+ String createCatalog =
+ "CREATE CATALOG mycat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "'\n"
+ + ")";
+ sEnv.executeSql(createCatalog);
+ sEnv.executeSql("USE CATALOG mycat");
+ sEnv.executeSql(
+ "CREATE TEMPORARY TABLE S (\n"
+ + " i INT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'datagen',\n"
+ + " 'fields.i.kind' = 'sequence',\n"
+ + " 'fields.i.start' = '0',\n"
+ + " 'fields.i.end' = '199',\n"
+ + " 'number-of-rows' = '200',\n"
+ + " 'rows-per-second' = '50'\n"
+ + ")");
+ sEnv.executeSql(
+ "CREATE TABLE T (\n"
+ + " k INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (k) NOT ENFORCED\n"
+ + ") WITH (\n"
+ + " 'bucket' = '-2',\n"
+ + " 'changelog-producer' = 'input',\n"
+ + " 'continuous.discovery-interval' = '1ms'\n"
+ + ")");
+ sEnv.executeSql(
+ "CREATE TEMPORARY VIEW V AS SELECT MOD(i, 2) AS x, IF(MOD(i,
2) = 0, 1, 1000) AS y FROM S");
+ sEnv.executeSql("INSERT INTO T SELECT SUM(y), x FROM V GROUP BY
x").await();
+
+ TableEnvironment bEnv =
+ tableEnvironmentBuilder()
+ .batchMode()
+ .parallelism(2)
+ .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+ .build();
+ bEnv.executeSql(createCatalog);
+ bEnv.executeSql("USE CATALOG mycat");
+ bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+
+ // if the read order when compacting is wrong, this check will fail
+ assertThat(collect(bEnv.executeSql("SELECT * FROM T")))
+ .containsExactlyInAnyOrder("+U[100, 0]", "+U[100000, 1]");
+ TableResult streamingSelect =
+ sEnv.executeSql("SELECT * FROM T /*+
OPTIONS('scan.snapshot-id' = '1') */");
+ JobClient client = streamingSelect.getJobClient().get();
+ CloseableIterator<Row> it = streamingSelect.collect();
+ // if the number of changelog is not sufficient, this call will fail
+ collect(client, it, 400 - 2);
+ }
+
+ private List<String> collect(TableResult result) throws Exception {
+ List<String> ret = new ArrayList<>();
+ try (CloseableIterator<Row> it = result.collect()) {
+ while (it.hasNext()) {
+ ret.add(it.next().toString());
+ }
+ }
+ return ret;
+ }
+
+ private List<String> collect(JobClient client, CloseableIterator<Row> it,
int limit)
+ throws Exception {
+ AtomicBoolean shouldStop = new AtomicBoolean(false);
+ Thread timerThread =
+ new Thread(
+ () -> {
+ try {
+ for (int i = 0; i < TIMEOUT; i++) {
+ Thread.sleep(1000);
+ if (shouldStop.get()) {
+ return;
+ }
+ }
+ client.cancel().get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ timerThread.start();
+
+ List<String> ret = new ArrayList<>();
+ for (int i = 0; i < limit && it.hasNext(); i++) {
+ ret.add(it.next().toString());
+ }
+
+ shouldStop.set(true);
+ timerThread.join();
+ return ret;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
index d5747d2e28..d44ba03f12 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
@@ -26,6 +26,7 @@ import org.apache.paimon.utils.SnapshotManager;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
@@ -35,6 +36,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.assertj.core.api.Assertions.assertThat;
@@ -204,7 +206,13 @@ public class RescaleBucketITCase extends CatalogITCaseBase
{
"Try to write table with a new bucket num 4, but the
previous bucket num is 2. "
+ "Please switch to batch mode, and perform
INSERT OVERWRITE to rescale current data layout first.");
- batchSql(rescaleOverwriteSql, tableName, tableName);
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ batchSql(rescaleOverwriteSql, tableName, tableName);
+ } else {
+ tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
+ batchSql(String.format("CALL sys.rescale(`table` =>
'default.%s')", tableName));
+ }
+
snapshot = findLatestSnapshot(tableName);
assertThat(snapshot).isNotNull();
assertThat(snapshot.id()).isEqualTo(2L);