This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 406ab33d4 [flink] Remove state from append only unaware bucket writer
(#4219)
406ab33d4 is described below
commit 406ab33d48c6c328732a74c38a27049e8d967045
Author: tsreaper <[email protected]>
AuthorDate: Tue Sep 24 09:32:24 2024 +0800
[flink] Remove state from append only unaware bucket writer (#4219)
---
.../org/apache/paimon/AppendOnlyFileStore.java | 45 ++++++---
.../paimon/operation/AbstractFileStoreWrite.java | 70 ++++++++------
.../paimon/operation/AppendOnlyFileStoreWrite.java | 105 +++++----------------
.../AppendOnlyFixedBucketFileStoreWrite.java | 104 ++++++++++++++++++++
.../AppendOnlyUnawareBucketFileStoreWrite.java | 89 +++++++++++++++++
.../paimon/operation/KeyValueFileStoreWrite.java | 10 +-
.../paimon/operation/MemoryFileStoreWrite.java | 2 -
.../sink/cdc/CdcRecordStoreMultiWriteOperator.java | 3 +-
.../sink/MultiTablesStoreCompactOperator.java | 6 +-
.../paimon/flink/sink/NoopStoreSinkWriteState.java | 54 +++++++++++
.../paimon/flink/sink/RowUnawareBucketSink.java | 21 ++++-
.../paimon/flink/sink/StoreCompactOperator.java | 45 ++++-----
.../apache/paimon/flink/sink/StoreSinkWrite.java | 8 ++
.../paimon/flink/sink/StoreSinkWriteState.java | 97 ++-----------------
...riteState.java => StoreSinkWriteStateImpl.java} | 72 ++++----------
.../paimon/flink/sink/TableWriteOperator.java | 44 ++++-----
.../flink/UnawareBucketAppendOnlyTableITCase.java | 99 +++++++++++++++++++
.../apache/paimon/flink/sink/FlinkSinkTest.java | 30 +++---
.../flink/sink/StoreCompactOperatorTest.java | 50 ++++------
19 files changed, 583 insertions(+), 371 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 289f0bde7..b0e3ea5f4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -25,6 +25,8 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.operation.AppendOnlyFileStoreScan;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
+import org.apache.paimon.operation.AppendOnlyFixedBucketFileStoreWrite;
+import org.apache.paimon.operation.AppendOnlyUnawareBucketFileStoreWrite;
import org.apache.paimon.operation.RawFileSplitRead;
import org.apache.paimon.operation.ScanBucketFilter;
import org.apache.paimon.predicate.Predicate;
@@ -94,21 +96,36 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
@Override
public AppendOnlyFileStoreWrite newWrite(
String commitUser, ManifestCacheFilter manifestFilter) {
- return new AppendOnlyFileStoreWrite(
- fileIO,
- newRead(),
- schema.id(),
- commitUser,
- rowType,
- pathFactory(),
- snapshotManager(),
- newScan(true).withManifestCacheFilter(manifestFilter),
- options,
- bucketMode(),
+ DeletionVectorsMaintainer.Factory dvMaintainerFactory =
options.deletionVectorsEnabled()
?
DeletionVectorsMaintainer.factory(newIndexFileHandler())
- : null,
- tableName);
+ : null;
+ if (bucketMode() == BucketMode.BUCKET_UNAWARE) {
+ return new AppendOnlyUnawareBucketFileStoreWrite(
+ fileIO,
+ newRead(),
+ schema.id(),
+ rowType,
+ pathFactory(),
+ snapshotManager(),
+ newScan(true).withManifestCacheFilter(manifestFilter),
+ options,
+ dvMaintainerFactory,
+ tableName);
+ } else {
+ return new AppendOnlyFixedBucketFileStoreWrite(
+ fileIO,
+ newRead(),
+ schema.id(),
+ commitUser,
+ rowType,
+ pathFactory(),
+ snapshotManager(),
+ newScan(true).withManifestCacheFilter(manifestFilter),
+ options,
+ dvMaintainerFactory,
+ tableName);
+ }
}
private AppendOnlyFileStoreScan newScan(boolean forWrite) {
@@ -129,7 +146,7 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
splitAnd(predicate),
rowType.getFieldNames(),
bucketKeyType.getFieldNames());
- if (bucketFilters.size() > 0) {
+ if (!bucketFilters.isEmpty()) {
setBucketKeyFilter(and(bucketFilters));
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 195a0b48d..2e502e96f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -52,6 +52,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.function.Function;
import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber;
@@ -64,7 +65,6 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractFileStoreWrite.class);
- private final String commitUser;
protected final SnapshotManager snapshotManager;
private final FileStoreScan scan;
private final int writerNumberMax;
@@ -85,14 +85,12 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
private boolean isInsertOnly;
protected AbstractFileStoreWrite(
- String commitUser,
SnapshotManager snapshotManager,
FileStoreScan scan,
@Nullable IndexMaintainer.Factory<T> indexFactory,
@Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
String tableName,
int writerNumberMax) {
- this.commitUser = commitUser;
this.snapshotManager = snapshotManager;
this.scan = scan;
this.indexFactory = indexFactory;
@@ -169,7 +167,7 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
@Override
public List<CommitMessage> prepareCommit(boolean waitCompaction, long
commitIdentifier)
throws Exception {
- long latestCommittedIdentifier;
+ Function<WriterContainer<T>, Boolean> writerCleanChecker;
if (writers.values().stream()
.map(Map::values)
.flatMap(Collection::stream)
@@ -177,20 +175,10 @@ public abstract class AbstractFileStoreWrite<T>
implements FileStoreWrite<T> {
.max()
.orElse(Long.MIN_VALUE)
== Long.MIN_VALUE) {
- // Optimization for the first commit.
- //
- // If this is the first commit, no writer has previous modified
commit, so the value of
- // `latestCommittedIdentifier` does not matter.
- //
- // Without this optimization, we may need to scan through all
snapshots only to find
- // that there is no previous snapshot by this user, which is very
inefficient.
- latestCommittedIdentifier = Long.MIN_VALUE;
+ // If this is the first commit, no writer should be cleaned.
+ writerCleanChecker = writerContainer -> false;
} else {
- latestCommittedIdentifier =
- snapshotManager
- .latestSnapshotOfUser(commitUser)
- .map(Snapshot::commitIdentifier)
- .orElse(Long.MIN_VALUE);
+ writerCleanChecker = createWriterCleanChecker();
}
List<CommitMessage> result = new ArrayList<>();
@@ -226,14 +214,7 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
result.add(committable);
if (committable.isEmpty()) {
- // Condition 1: There is no more record waiting to be
committed. Note that the
- // condition is < (instead of <=), because each commit
identifier may have
- // multiple snapshots. We must make sure all snapshots of
this identifier are
- // committed.
- // Condition 2: No compaction is in progress. That is, no
more changelog will be
- // produced.
- if (writerContainer.lastModifiedCommitIdentifier <
latestCommittedIdentifier
- && !writerContainer.writer.isCompacting()) {
+ if (writerCleanChecker.apply(writerContainer)) {
// Clear writer if no update, and if its latest
modification has committed.
//
// We need a mechanism to clear writers, otherwise
there will be more and
@@ -242,12 +223,10 @@ public abstract class AbstractFileStoreWrite<T>
implements FileStoreWrite<T> {
LOG.debug(
"Closing writer for partition {}, bucket
{}. "
+ "Writer's last modified
identifier is {}, "
- + "while latest committed
identifier is {}, "
- + "current commit identifier is
{}.",
+ + "while current commit identifier
is {}.",
partition,
bucket,
writerContainer.lastModifiedCommitIdentifier,
- latestCommittedIdentifier,
commitIdentifier);
}
writerContainer.writer.close();
@@ -266,6 +245,41 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
return result;
}
+ // This abstract function returns a whole function (instead of just a
boolean value),
+ // because we do not want to introduce `commitUser` into this base class.
+ //
+ // For writers with no conflicts, `commitUser` might be some random value.
+ protected abstract Function<WriterContainer<T>, Boolean>
createWriterCleanChecker();
+
+ protected static <T>
+ Function<WriterContainer<T>, Boolean>
createConflictAwareWriterCleanChecker(
+ String commitUser, SnapshotManager snapshotManager) {
+ long latestCommittedIdentifier =
+ snapshotManager
+ .latestSnapshotOfUser(commitUser)
+ .map(Snapshot::commitIdentifier)
+ .orElse(Long.MIN_VALUE);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Latest committed identifier is {}",
latestCommittedIdentifier);
+ }
+
+ // Condition 1: There is no more record waiting to be committed. Note
that the
+ // condition is < (instead of <=), because each commit identifier may
have
+ // multiple snapshots. We must make sure all snapshots of this
identifier are
+ // committed.
+ //
+ // Condition 2: No compaction is in progress. That is, no more
changelog will be
+ // produced.
+ return writerContainer ->
+ writerContainer.lastModifiedCommitIdentifier <
latestCommittedIdentifier
+ && !writerContainer.writer.isCompacting();
+ }
+
+ protected static <T>
+ Function<WriterContainer<T>, Boolean>
createNoConflictAwareWriterCleanChecker() {
+ return writerContainer -> true;
+ }
+
@Override
public void close() throws Exception {
for (Map<Integer, WriterContainer<T>> bucketWriters :
writers.values()) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index 203a9ff35..40fe5dbfa 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -21,10 +21,7 @@ package org.apache.paimon.operation;
import org.apache.paimon.AppendOnlyFileStore;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.AppendOnlyWriter;
-import org.apache.paimon.append.BucketedAppendCompactManager;
import org.apache.paimon.compact.CompactManager;
-import org.apache.paimon.compact.NoopCompactManager;
-import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVector;
@@ -36,10 +33,8 @@ import org.apache.paimon.io.BundleRecords;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.manifest.FileSource;
-import org.apache.paimon.options.MemorySize;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.statistics.SimpleColStatsCollector;
-import org.apache.paimon.table.BucketMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.ExceptionUtils;
@@ -64,7 +59,7 @@ import java.util.concurrent.ExecutorService;
import java.util.function.Function;
/** {@link FileStoreWrite} for {@link AppendOnlyFileStore}. */
-public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite<InternalRow>
+public abstract class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
implements BundleFileStoreWriter {
private static final Logger LOG =
LoggerFactory.getLogger(AppendOnlyFileStoreWrite.class);
@@ -75,59 +70,30 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
private final RowType rowType;
private final FileFormat fileFormat;
private final FileStorePathFactory pathFactory;
- private final long targetFileSize;
- private final int compactionMinFileNum;
- private final int compactionMaxFileNum;
- private final boolean commitForceCompact;
- private final String fileCompression;
- private final CompressOptions spillCompression;
- private final boolean useWriteBuffer;
- private final boolean spillable;
- private final MemorySize maxDiskSize;
+
private final SimpleColStatsCollector.Factory[] statsCollectors;
private final FileIndexOptions fileIndexOptions;
- private final BucketMode bucketMode;
private boolean forceBufferSpill = false;
- private final boolean skipCompaction;
public AppendOnlyFileStoreWrite(
FileIO fileIO,
RawFileSplitRead read,
long schemaId,
- String commitUser,
RowType rowType,
FileStorePathFactory pathFactory,
SnapshotManager snapshotManager,
FileStoreScan scan,
CoreOptions options,
- BucketMode bucketMode,
@Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
String tableName) {
- super(commitUser, snapshotManager, scan, options, null,
dvMaintainerFactory, tableName);
+ super(snapshotManager, scan, options, null, dvMaintainerFactory,
tableName);
this.fileIO = fileIO;
this.read = read;
this.schemaId = schemaId;
this.rowType = rowType;
this.fileFormat = options.fileFormat();
this.pathFactory = pathFactory;
- this.bucketMode = bucketMode;
- this.targetFileSize = options.targetFileSize(false);
- this.compactionMinFileNum = options.compactionMinFileNum();
- this.compactionMaxFileNum = options.compactionMaxFileNum().orElse(5);
- this.commitForceCompact = options.commitForceCompact();
- // AppendOnlyFileStoreWrite is sensitive with bucket mode. It will act
difference in
- // unaware-bucket mode (no compaction and force empty-writer).
- if (bucketMode == BucketMode.BUCKET_UNAWARE) {
- super.withIgnorePreviousFiles(true);
- this.skipCompaction = true;
- } else {
- this.skipCompaction = options.writeOnly();
- }
- this.fileCompression = options.fileCompression();
- this.spillCompression = options.spillCompressOptions();
- this.useWriteBuffer = options.useWriteBufferForAppend();
- this.spillable = options.writeBufferSpillable(fileIO.isObjectStore(),
isStreamingMode);
- this.maxDiskSize = options.writeBufferSpillDiskSize();
+
this.statsCollectors =
StatsCollectorFactories.createStatsFactories(options,
rowType.getFieldNames());
this.fileIndexOptions = options.indexColumnsOptions();
@@ -143,50 +109,38 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
@Nullable CommitIncrement restoreIncrement,
ExecutorService compactExecutor,
@Nullable DeletionVectorsMaintainer dvMaintainer) {
- CompactManager compactManager = new NoopCompactManager();
- if (!skipCompaction) {
- Function<String, DeletionVector> dvFactory =
- dvMaintainer != null
- ? f ->
dvMaintainer.deletionVectorOf(f).orElse(null)
- : null;
- compactManager =
- new BucketedAppendCompactManager(
- compactExecutor,
- restoredFiles,
- dvMaintainer,
- compactionMinFileNum,
- compactionMaxFileNum,
- targetFileSize,
- files -> compactRewrite(partition, bucket,
dvFactory, files),
- compactionMetrics == null
- ? null
- :
compactionMetrics.createReporter(partition, bucket));
- }
-
return new AppendOnlyWriter(
fileIO,
ioManager,
schemaId,
fileFormat,
- targetFileSize,
+ options.targetFileSize(false),
rowType,
restoredMaxSeqNumber,
- compactManager,
+ getCompactManager(partition, bucket, restoredFiles,
compactExecutor, dvMaintainer),
// it is only for new files, no dv
files -> createFilesIterator(partition, bucket, files, null),
- commitForceCompact,
+ options.commitForceCompact(),
pathFactory.createDataFilePathFactory(partition, bucket),
restoreIncrement,
- useWriteBuffer || forceBufferSpill,
- spillable || forceBufferSpill,
- fileCompression,
- spillCompression,
+ options.useWriteBufferForAppend() || forceBufferSpill,
+ options.writeBufferSpillable(fileIO.isObjectStore(),
isStreamingMode)
+ || forceBufferSpill,
+ options.fileCompression(),
+ options.spillCompressOptions(),
statsCollectors,
- maxDiskSize,
+ options.writeBufferSpillDiskSize(),
fileIndexOptions,
options.asyncFileWrite());
}
+ protected abstract CompactManager getCompactManager(
+ BinaryRow partition,
+ int bucket,
+ List<DataFileMeta> restoredFiles,
+ ExecutorService compactExecutor,
+ @Nullable DeletionVectorsMaintainer dvMaintainer);
+
public List<DataFileMeta> compactRewrite(
BinaryRow partition,
int bucket,
@@ -199,10 +153,7 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
Exception collectedExceptions = null;
RowDataRollingFileWriter rewriter =
createRollingFileWriter(
- partition,
- bucket,
- new LongCounter(toCompact.get(0).minSequenceNumber()),
- FileSource.COMPACT);
+ partition, bucket, new
LongCounter(toCompact.get(0).minSequenceNumber()));
List<IOExceptionSupplier<DeletionVector>> dvFactories = null;
if (dvFactory != null) {
dvFactories = new ArrayList<>(toCompact.size());
@@ -228,19 +179,19 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
}
private RowDataRollingFileWriter createRollingFileWriter(
- BinaryRow partition, int bucket, LongCounter seqNumCounter,
FileSource fileSource) {
+ BinaryRow partition, int bucket, LongCounter seqNumCounter) {
return new RowDataRollingFileWriter(
fileIO,
schemaId,
fileFormat,
- targetFileSize,
+ options.targetFileSize(false),
rowType,
pathFactory.createDataFilePathFactory(partition, bucket),
seqNumCounter,
- fileCompression,
+ options.fileCompression(),
statsCollectors,
fileIndexOptions,
- fileSource,
+ FileSource.COMPACT,
options.asyncFileWrite());
}
@@ -253,12 +204,6 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
return new RecordReaderIterator<>(read.createReader(partition, bucket,
files, dvFactories));
}
- @Override
- public void withIgnorePreviousFiles(boolean ignorePrevious) {
- // in unaware bucket mode, we need all writers to be empty
- super.withIgnorePreviousFiles(ignorePrevious || bucketMode ==
BucketMode.BUCKET_UNAWARE);
- }
-
@Override
protected void forceBufferSpill() throws Exception {
if (ioManager == null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java
new file mode 100644
index 000000000..e169c0165
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.append.BucketedAppendCompactManager;
+import org.apache.paimon.compact.CompactManager;
+import org.apache.paimon.compact.NoopCompactManager;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SnapshotManager;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+
+/** {@link AppendOnlyFileStoreWrite} for {@link
org.apache.paimon.table.BucketMode#HASH_FIXED}. */
+public class AppendOnlyFixedBucketFileStoreWrite extends
AppendOnlyFileStoreWrite {
+
+ private final String commitUser;
+
+ public AppendOnlyFixedBucketFileStoreWrite(
+ FileIO fileIO,
+ RawFileSplitRead read,
+ long schemaId,
+ String commitUser,
+ RowType rowType,
+ FileStorePathFactory pathFactory,
+ SnapshotManager snapshotManager,
+ FileStoreScan scan,
+ CoreOptions options,
+ @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
+ String tableName) {
+ super(
+ fileIO,
+ read,
+ schemaId,
+ rowType,
+ pathFactory,
+ snapshotManager,
+ scan,
+ options,
+ dvMaintainerFactory,
+ tableName);
+ this.commitUser = commitUser;
+ }
+
+ @Override
+ protected CompactManager getCompactManager(
+ BinaryRow partition,
+ int bucket,
+ List<DataFileMeta> restoredFiles,
+ ExecutorService compactExecutor,
+ @Nullable DeletionVectorsMaintainer dvMaintainer) {
+ if (options.writeOnly()) {
+ return new NoopCompactManager();
+ } else {
+ Function<String, DeletionVector> dvFactory =
+ dvMaintainer != null
+ ? f ->
dvMaintainer.deletionVectorOf(f).orElse(null)
+ : null;
+ return new BucketedAppendCompactManager(
+ compactExecutor,
+ restoredFiles,
+ dvMaintainer,
+ options.compactionMinFileNum(),
+ options.compactionMaxFileNum().orElse(5),
+ options.targetFileSize(false),
+ files -> compactRewrite(partition, bucket, dvFactory,
files),
+ compactionMetrics == null
+ ? null
+ : compactionMetrics.createReporter(partition,
bucket));
+ }
+ }
+
+ @Override
+ protected Function<WriterContainer<InternalRow>, Boolean>
createWriterCleanChecker() {
+ return createConflictAwareWriterCleanChecker(commitUser,
snapshotManager);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyUnawareBucketFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyUnawareBucketFileStoreWrite.java
new file mode 100644
index 000000000..f33b207bb
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyUnawareBucketFileStoreWrite.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.compact.CompactManager;
+import org.apache.paimon.compact.NoopCompactManager;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SnapshotManager;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+
+/**
+ * {@link AppendOnlyFileStoreWrite} for {@link
org.apache.paimon.table.BucketMode#BUCKET_UNAWARE}.
+ */
+public class AppendOnlyUnawareBucketFileStoreWrite extends
AppendOnlyFileStoreWrite {
+
+ public AppendOnlyUnawareBucketFileStoreWrite(
+ FileIO fileIO,
+ RawFileSplitRead read,
+ long schemaId,
+ RowType rowType,
+ FileStorePathFactory pathFactory,
+ SnapshotManager snapshotManager,
+ FileStoreScan scan,
+ CoreOptions options,
+ @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
+ String tableName) {
+ super(
+ fileIO,
+ read,
+ schemaId,
+ rowType,
+ pathFactory,
+ snapshotManager,
+ scan,
+ options,
+ dvMaintainerFactory,
+ tableName);
+ super.withIgnorePreviousFiles(true);
+ }
+
+ @Override
+ protected CompactManager getCompactManager(
+ BinaryRow partition,
+ int bucket,
+ List<DataFileMeta> restoredFiles,
+ ExecutorService compactExecutor,
+ @Nullable DeletionVectorsMaintainer dvMaintainer) {
+ return new NoopCompactManager();
+ }
+
+ @Override
+ public void withIgnorePreviousFiles(boolean ignorePrevious) {
+ // in unaware bucket mode, we need all writers to be empty
+ super.withIgnorePreviousFiles(true);
+ }
+
+ @Override
+ protected Function<WriterContainer<InternalRow>, Boolean>
createWriterCleanChecker() {
+ return createNoConflictAwareWriterCleanChecker();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index b62cdabae..66819255b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -83,6 +83,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
import java.util.function.Supplier;
import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION;
@@ -106,6 +107,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
private final RowType keyType;
private final RowType valueType;
private final RowType partitionType;
+ private final String commitUser;
@Nullable private final RecordLevelExpire recordLevelExpire;
@Nullable private Cache<String, LookupFile> lookupFileCache;
@@ -131,7 +133,6 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
KeyValueFieldsExtractor extractor,
String tableName) {
super(
- commitUser,
snapshotManager,
scan,
options,
@@ -142,6 +143,8 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
this.partitionType = partitionType;
this.keyType = keyType;
this.valueType = valueType;
+ this.commitUser = commitUser;
+
this.udsComparatorSupplier = udsComparatorSupplier;
this.readerFactoryBuilder =
KeyValueFileReaderFactory.builder(
@@ -389,6 +392,11 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
lookupFileCache);
}
+ @Override
+ protected Function<WriterContainer<KeyValue>, Boolean>
createWriterCleanChecker() {
+ return createConflictAwareWriterCleanChecker(commitUser,
snapshotManager);
+ }
+
@Override
public void close() throws Exception {
super.close();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
index 58f7174bf..386a72cf5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
@@ -59,7 +59,6 @@ public abstract class MemoryFileStoreWrite<T> extends
AbstractFileStoreWrite<T>
private WriterBufferMetric writerBufferMetric;
public MemoryFileStoreWrite(
- String commitUser,
SnapshotManager snapshotManager,
FileStoreScan scan,
CoreOptions options,
@@ -67,7 +66,6 @@ public abstract class MemoryFileStoreWrite<T> extends
AbstractFileStoreWrite<T>
@Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
String tableName) {
super(
- commitUser,
snapshotManager,
scan,
indexFactory,
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
index a604b3045..7d72fe3e8 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
@@ -28,6 +28,7 @@ import org.apache.paimon.flink.sink.StateUtils;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.sink.StoreSinkWriteState;
+import org.apache.paimon.flink.sink.StoreSinkWriteStateImpl;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.options.Options;
@@ -98,7 +99,7 @@ public class CdcRecordStoreMultiWriteOperator
context, "commit_user_state", String.class,
initialCommitUser);
// TODO: should use CdcRecordMultiChannelComputer to filter
- state = new StoreSinkWriteState(context, (tableName, partition,
bucket) -> true);
+ state = new StoreSinkWriteStateImpl(context, (tableName, partition,
bucket) -> true);
tables = new HashMap<>();
writes = new HashMap<>();
compactExecutor =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 513b694fb..7cb5d30c2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -95,15 +95,15 @@ public class MultiTablesStoreCompactOperator
catalog = catalogLoader.load();
- // Each job can only have one user name and this name must be
consistent across restarts.
- // We cannot use job id as commit user name here because user may
change job id by creating
+ // Each job can only have one username and this name must be
consistent across restarts.
+ // We cannot use job id as commit username here because user may
change job id by creating
// a savepoint, stop the job and then resume from savepoint.
commitUser =
StateUtils.getSingleValueFromState(
context, "commit_user_state", String.class,
initialCommitUser);
state =
- new StoreSinkWriteState(
+ new StoreSinkWriteStateImpl(
context,
(tableName, partition, bucket) ->
ChannelComputer.select(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopStoreSinkWriteState.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopStoreSinkWriteState.java
new file mode 100644
index 000000000..f975bce6e
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopStoreSinkWriteState.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * A {@link StoreSinkWriteState} which stores nothing. Currently only used for
append only unaware
+ * bucket table writers.
+ */
+public class NoopStoreSinkWriteState implements StoreSinkWriteState {
+
+ private final StateValueFilter stateValueFilter;
+
+ public NoopStoreSinkWriteState(StateValueFilter stateValueFilter) {
+ this.stateValueFilter = stateValueFilter;
+ }
+
+ @Override
+ public StateValueFilter stateValueFilter() {
+ return stateValueFilter;
+ }
+
+ @Override
+ public @Nullable List<StateValue> get(String tableName, String key) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void put(String tableName, String key, List<StateValue>
stateValues) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void snapshotState() throws Exception {}
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java
index b670b905d..1cd10390c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import java.util.Map;
@@ -39,6 +40,24 @@ public class RowUnawareBucketSink extends
UnawareBucketSink<InternalRow> {
@Override
protected OneInputStreamOperator<InternalRow, Committable>
createWriteOperator(
StoreSinkWrite.Provider writeProvider, String commitUser) {
- return new RowDataStoreWriteOperator(table, logSinkFunction,
writeProvider, commitUser);
+ return new RowDataStoreWriteOperator(table, logSinkFunction,
writeProvider, commitUser) {
+
+ @Override
+ protected StoreSinkWriteState createState(
+ StateInitializationContext context,
+ StoreSinkWriteState.StateValueFilter stateFilter)
+ throws Exception {
+ // No conflicts will occur in append only unaware bucket
writer, so no state is
+ // needed.
+ return new NoopStoreSinkWriteState(stateFilter);
+ }
+
+ @Override
+ protected String getCommitUser(StateInitializationContext context)
throws Exception {
+ // No conflicts will occur in append only unaware bucket
writer, so commitUser does
+ // not matter.
+ return commitUser;
+ }
+ };
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 184288490..bc7bb350d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMetaSerializer;
@@ -29,7 +28,6 @@ import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -77,39 +75,30 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
public void initializeState(StateInitializationContext context) throws
Exception {
super.initializeState(context);
- // Each job can only have one user name and this name must be
consistent across restarts.
- // We cannot use job id as commit user name here because user may
change job id by creating
+ // Each job can only have one username and this name must be
consistent across restarts.
+ // We cannot use job id as commit username here because user may
change job id by creating
// a savepoint, stop the job and then resume from savepoint.
String commitUser =
StateUtils.getSingleValueFromState(
context, "commit_user_state", String.class,
initialCommitUser);
- initStateAndWriter(
- context,
- (tableName, partition, bucket) ->
- ChannelComputer.select(
- partition,
- bucket,
-
getRuntimeContext().getNumberOfParallelSubtasks())
- == getRuntimeContext().getIndexOfThisSubtask(),
- getContainingTask().getEnvironment().getIOManager(),
- commitUser);
- }
-
- @VisibleForTesting
- void initStateAndWriter(
- StateInitializationContext context,
- StoreSinkWriteState.StateValueFilter stateFilter,
- IOManager ioManager,
- String commitUser)
- throws Exception {
- // We put state and write init in this method for convenient testing.
Without construct a
- // runtime context, we can test to construct a writer here
- state = new StoreSinkWriteState(context, stateFilter);
-
+ state =
+ new StoreSinkWriteStateImpl(
+ context,
+ (tableName, partition, bucket) ->
+ ChannelComputer.select(
+ partition,
+ bucket,
+
getRuntimeContext().getNumberOfParallelSubtasks())
+ ==
getRuntimeContext().getIndexOfThisSubtask());
write =
storeSinkWriteProvider.provide(
- table, commitUser, state, ioManager, memoryPool,
getMetricGroup());
+ table,
+ commitUser,
+ state,
+ getContainingTask().getEnvironment().getIOManager(),
+ memoryPool,
+ getMetricGroup());
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
index a432a5545..3a5a8df5c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
@@ -84,6 +84,10 @@ public interface StoreSinkWrite {
@FunctionalInterface
interface Provider extends Serializable {
+ /**
+ * TODO: The argument list has become too complicated. Build {@link
TableWriteImpl} directly
+ * in caller and simplify the argument list.
+ */
StoreSinkWrite provide(
FileStoreTable table,
String commitUser,
@@ -97,6 +101,10 @@ public interface StoreSinkWrite {
@FunctionalInterface
interface WithWriteBufferProvider extends Serializable {
+ /**
+ * TODO: The argument list has become too complicated. Build {@link
TableWriteImpl} directly
+ * in caller and simplify the argument list.
+ */
StoreSinkWrite provide(
FileStoreTable table,
String commitUser,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
index 072d6a1b9..8626a01a6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
@@ -19,107 +19,31 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.utils.SerializationUtils;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.runtime.state.StateInitializationContext;
import javax.annotation.Nullable;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
/**
- * States for {@link StoreSinkWrite}s.
- *
- * <p>States are positioned first by table name and then by key name. This
class should be initiated
- * in a sink operator and then given to {@link StoreSinkWrite}.
+ * States for {@link StoreSinkWrite}s. It's a wrapper to conveniently modify
states for each table
+ * stored in Flink states.
*/
-public class StoreSinkWriteState {
-
- private final StateValueFilter stateValueFilter;
-
- private final ListState<Tuple5<String, String, byte[], Integer, byte[]>>
listState;
- private final Map<String, Map<String, List<StateValue>>> map;
-
- @SuppressWarnings("unchecked")
- public StoreSinkWriteState(
- StateInitializationContext context, StateValueFilter
stateValueFilter)
- throws Exception {
- this.stateValueFilter = stateValueFilter;
- TupleSerializer<Tuple5<String, String, byte[], Integer, byte[]>>
listStateSerializer =
- new TupleSerializer<>(
- (Class<Tuple5<String, String, byte[], Integer,
byte[]>>)
- (Class<?>) Tuple5.class,
- new TypeSerializer[] {
- StringSerializer.INSTANCE,
- StringSerializer.INSTANCE,
- BytePrimitiveArraySerializer.INSTANCE,
- IntSerializer.INSTANCE,
- BytePrimitiveArraySerializer.INSTANCE
- });
- listState =
- context.getOperatorStateStore()
- .getUnionListState(
- new ListStateDescriptor<>(
- "paimon_store_sink_write_state",
listStateSerializer));
-
- map = new HashMap<>();
- for (Tuple5<String, String, byte[], Integer, byte[]> tuple :
listState.get()) {
- BinaryRow partition =
SerializationUtils.deserializeBinaryRow(tuple.f2);
- if (stateValueFilter.filter(tuple.f0, partition, tuple.f3)) {
- map.computeIfAbsent(tuple.f0, k -> new HashMap<>())
- .computeIfAbsent(tuple.f1, k -> new ArrayList<>())
- .add(new StateValue(partition, tuple.f3, tuple.f4));
- }
- }
- }
+public interface StoreSinkWriteState {
- public StateValueFilter stateValueFilter() {
- return stateValueFilter;
- }
+ StoreSinkWriteState.StateValueFilter stateValueFilter();
- public @Nullable List<StateValue> get(String tableName, String key) {
- Map<String, List<StateValue>> innerMap = map.get(tableName);
- return innerMap == null ? null : innerMap.get(key);
- }
+ @Nullable
+ List<StoreSinkWriteState.StateValue> get(String tableName, String key);
- public void put(String tableName, String key, List<StateValue>
stateValues) {
- map.computeIfAbsent(tableName, k -> new HashMap<>()).put(key,
stateValues);
- }
+ void put(String tableName, String key,
List<StoreSinkWriteState.StateValue> stateValues);
- public void snapshotState() throws Exception {
- List<Tuple5<String, String, byte[], Integer, byte[]>> list = new
ArrayList<>();
- for (Map.Entry<String, Map<String, List<StateValue>>> tables :
map.entrySet()) {
- for (Map.Entry<String, List<StateValue>> entry :
tables.getValue().entrySet()) {
- for (StateValue stateValue : entry.getValue()) {
- list.add(
- Tuple5.of(
- tables.getKey(),
- entry.getKey(),
-
SerializationUtils.serializeBinaryRow(stateValue.partition()),
- stateValue.bucket(),
- stateValue.value()));
- }
- }
- }
- listState.update(list);
- }
+ void snapshotState() throws Exception;
/**
* A state value for {@link StoreSinkWrite}. All state values should be
given a partition and a
* bucket so that they can be redistributed once the sink parallelism is
changed.
*/
- public static class StateValue {
+ class StateValue {
private final BinaryRow partition;
private final int bucket;
@@ -148,8 +72,7 @@ public class StoreSinkWriteState {
* Given the table name, partition and bucket of a {@link StateValue} in a
union list state,
* decide whether to keep this {@link StateValue} in this subtask.
*/
- public interface StateValueFilter {
-
+ interface StateValueFilter {
boolean filter(String tableName, BinaryRow partition, int bucket);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteStateImpl.java
similarity index 67%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteStateImpl.java
index 072d6a1b9..a01cbcb68 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteStateImpl.java
@@ -39,21 +39,22 @@ import java.util.List;
import java.util.Map;
/**
- * States for {@link StoreSinkWrite}s.
+ * Default implementation for {@link StoreSinkWriteState}.
*
* <p>States are positioned first by table name and then by key name. This
class should be initiated
* in a sink operator and then given to {@link StoreSinkWrite}.
*/
-public class StoreSinkWriteState {
+public class StoreSinkWriteStateImpl implements StoreSinkWriteState {
- private final StateValueFilter stateValueFilter;
+ private final StoreSinkWriteState.StateValueFilter stateValueFilter;
private final ListState<Tuple5<String, String, byte[], Integer, byte[]>>
listState;
- private final Map<String, Map<String, List<StateValue>>> map;
+ private final Map<String, Map<String,
List<StoreSinkWriteState.StateValue>>> map;
@SuppressWarnings("unchecked")
- public StoreSinkWriteState(
- StateInitializationContext context, StateValueFilter
stateValueFilter)
+ public StoreSinkWriteStateImpl(
+ StateInitializationContext context,
+ StoreSinkWriteState.StateValueFilter stateValueFilter)
throws Exception {
this.stateValueFilter = stateValueFilter;
TupleSerializer<Tuple5<String, String, byte[], Integer, byte[]>>
listStateSerializer =
@@ -79,29 +80,34 @@ public class StoreSinkWriteState {
if (stateValueFilter.filter(tuple.f0, partition, tuple.f3)) {
map.computeIfAbsent(tuple.f0, k -> new HashMap<>())
.computeIfAbsent(tuple.f1, k -> new ArrayList<>())
- .add(new StateValue(partition, tuple.f3, tuple.f4));
+ .add(new StoreSinkWriteState.StateValue(partition,
tuple.f3, tuple.f4));
}
}
}
- public StateValueFilter stateValueFilter() {
+ @Override
+ public StoreSinkWriteState.StateValueFilter stateValueFilter() {
return stateValueFilter;
}
- public @Nullable List<StateValue> get(String tableName, String key) {
- Map<String, List<StateValue>> innerMap = map.get(tableName);
+ @Override
+ public @Nullable List<StoreSinkWriteState.StateValue> get(String
tableName, String key) {
+ Map<String, List<StoreSinkWriteState.StateValue>> innerMap =
map.get(tableName);
return innerMap == null ? null : innerMap.get(key);
}
- public void put(String tableName, String key, List<StateValue>
stateValues) {
+ public void put(
+ String tableName, String key, List<StoreSinkWriteState.StateValue>
stateValues) {
map.computeIfAbsent(tableName, k -> new HashMap<>()).put(key,
stateValues);
}
public void snapshotState() throws Exception {
List<Tuple5<String, String, byte[], Integer, byte[]>> list = new
ArrayList<>();
- for (Map.Entry<String, Map<String, List<StateValue>>> tables :
map.entrySet()) {
- for (Map.Entry<String, List<StateValue>> entry :
tables.getValue().entrySet()) {
- for (StateValue stateValue : entry.getValue()) {
+ for (Map.Entry<String, Map<String,
List<StoreSinkWriteState.StateValue>>> tables :
+ map.entrySet()) {
+ for (Map.Entry<String, List<StoreSinkWriteState.StateValue>> entry
:
+ tables.getValue().entrySet()) {
+ for (StoreSinkWriteState.StateValue stateValue :
entry.getValue()) {
list.add(
Tuple5.of(
tables.getKey(),
@@ -114,42 +120,4 @@ public class StoreSinkWriteState {
}
listState.update(list);
}
-
- /**
- * A state value for {@link StoreSinkWrite}. All state values should be
given a partition and a
- * bucket so that they can be redistributed once the sink parallelism is
changed.
- */
- public static class StateValue {
-
- private final BinaryRow partition;
- private final int bucket;
- private final byte[] value;
-
- public StateValue(BinaryRow partition, int bucket, byte[] value) {
- this.partition = partition;
- this.bucket = bucket;
- this.value = value;
- }
-
- public BinaryRow partition() {
- return partition;
- }
-
- public int bucket() {
- return bucket;
- }
-
- public byte[] value() {
- return value;
- }
- }
-
- /**
- * Given the table name, partition and bucket of a {@link StateValue} in a
union list state,
- * decide whether to keep this {@link StateValue} in this subtask.
- */
- public interface StateValueFilter {
-
- boolean filter(String tableName, BinaryRow partition, int bucket);
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index f38e0ad6b..f0f5e3bc9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -24,7 +24,6 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.ChannelComputer;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -56,13 +55,6 @@ public abstract class TableWriteOperator<IN> extends
PrepareCommitOperator<IN, C
public void initializeState(StateInitializationContext context) throws
Exception {
super.initializeState(context);
- // Each job can only have one user name and this name must be
consistent across restarts.
- // We cannot use job id as commit user name here because user may
change job id by creating
- // a savepoint, stop the job and then resume from savepoint.
- String commitUser =
- StateUtils.getSingleValueFromState(
- context, "commit_user_state", String.class,
initialCommitUser);
-
boolean containLogSystem = containLogSystem();
int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
StateValueFilter stateFilter =
@@ -74,27 +66,29 @@ public abstract class TableWriteOperator<IN> extends
PrepareCommitOperator<IN, C
return task == getRuntimeContext().getIndexOfThisSubtask();
};
- initStateAndWriter(
- context,
- stateFilter,
- getContainingTask().getEnvironment().getIOManager(),
- commitUser);
+ state = createState(context, stateFilter);
+ write =
+ storeSinkWriteProvider.provide(
+ table,
+ getCommitUser(context),
+ state,
+ getContainingTask().getEnvironment().getIOManager(),
+ memoryPool,
+ getMetricGroup());
}
- @VisibleForTesting
- void initStateAndWriter(
- StateInitializationContext context,
- StateValueFilter stateFilter,
- IOManager ioManager,
- String commitUser)
+ protected StoreSinkWriteState createState(
+ StateInitializationContext context,
StoreSinkWriteState.StateValueFilter stateFilter)
throws Exception {
- // We put state and write init in this method for convenient testing.
Without construct a
- // runtime context, we can test to construct a writer here
- state = new StoreSinkWriteState(context, stateFilter);
+ return new StoreSinkWriteStateImpl(context, stateFilter);
+ }
- write =
- storeSinkWriteProvider.provide(
- table, commitUser, state, ioManager, memoryPool,
getMetricGroup());
+ protected String getCommitUser(StateInitializationContext context) throws
Exception {
+ // Each job can only have one username and this name must be
consistent across restarts.
+ // We cannot use job id as commit username here because user may
change job id by creating
+ // a savepoint, stop the job and then resume from savepoint.
+ return StateUtils.getSingleValueFromState(
+ context, "commit_user_state", String.class, initialCommitUser);
}
protected abstract boolean containLogSystem();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
index b14f1c041..cb323542d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -19,14 +19,25 @@
package org.apache.paimon.flink;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.utils.FailingFileIO;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import java.io.File;
import java.time.Duration;
@@ -346,6 +357,94 @@ public class UnawareBucketAppendOnlyTableITCase extends
CatalogITCaseBase {
.containsExactlyInAnyOrder(Row.of(2, "c", "BBB"), Row.of(3,
"c", "BBB"));
}
+ @Timeout(60)
+ @Test
+ public void testStatelessWriter() throws Exception {
+ FileStoreTable table =
+ FileStoreTableFactory.create(
+ LocalFileIO.create(), new Path(path,
"default.db/append_table"));
+
+ StreamExecutionEnvironment env =
+ streamExecutionEnvironmentBuilder()
+ .streamingMode()
+ .parallelism(2)
+ .checkpointIntervalMs(500)
+ .build();
+ DataStream<Integer> source =
+ env.addSource(new
TestStatelessWriterSource(table)).setParallelism(2).forward();
+
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+ tEnv.registerCatalog("mycat", sEnv.getCatalog("PAIMON").get());
+ tEnv.executeSql("USE CATALOG mycat");
+ tEnv.createTemporaryView("S", tEnv.fromDataStream(source).as("id"));
+
+ tEnv.executeSql("INSERT INTO append_table SELECT id, 'test' FROM
S").await();
+ assertThat(batchSql("SELECT * FROM append_table"))
+ .containsExactlyInAnyOrder(Row.of(1, "test"), Row.of(2,
"test"));
+ }
+
+ private static class TestStatelessWriterSource extends
RichParallelSourceFunction<Integer> {
+
+ private final FileStoreTable table;
+
+ private volatile boolean isRunning = true;
+
+ private TestStatelessWriterSource(FileStoreTable table) {
+ this.table = table;
+ }
+
+ @Override
+ public void run(SourceContext<Integer> sourceContext) throws Exception
{
+ int taskId = getRuntimeContext().getIndexOfThisSubtask();
+ // wait some time in parallelism #2,
+ // so that it does not commit in the same checkpoint with
parallelism #1
+ int waitCount = (taskId == 0 ? 0 : 10);
+
+ while (isRunning) {
+ synchronized (sourceContext.getCheckpointLock()) {
+ if (taskId == 0) {
+ if (waitCount == 0) {
+ sourceContext.collect(1);
+ } else if (countNumRecords() >= 1) {
+ // wait for the record to commit before exiting
+ break;
+ }
+ } else {
+ int numRecords = countNumRecords();
+ if (numRecords >= 1) {
+ if (waitCount == 0) {
+ sourceContext.collect(2);
+ } else if (countNumRecords() >= 2) {
+ // make sure the next checkpoint is successful
+ break;
+ }
+ }
+ }
+ waitCount--;
+ }
+ Thread.sleep(1000);
+ }
+ }
+
+ private int countNumRecords() throws Exception {
+ int ret = 0;
+ RecordReader<InternalRow> reader =
+
table.newRead().createReader(table.newSnapshotReader().read());
+ try (RecordReaderIterator<InternalRow> it = new
RecordReaderIterator<>(reader)) {
+ while (it.hasNext()) {
+ it.next();
+ ret++;
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+ }
+
@Override
protected List<String> ddl() {
return Arrays.asList(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
index 84246d00a..c33556834 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
@@ -36,17 +36,15 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
-import
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -90,20 +88,14 @@ public class FlinkSinkTest {
((OneInputTransformation)
written.getTransformation())
.getOperatorFactory())
.getOperator());
- StateInitializationContextImpl context =
- new StateInitializationContextImpl(
- null,
- new MockOperatorStateStore() {
- @Override
- public <S> ListState<S> getUnionListState(
- ListStateDescriptor<S> stateDescriptor)
throws Exception {
- return getListState(stateDescriptor);
- }
- },
- null,
- null,
- null);
- operator.initStateAndWriter(context, (a, b, c) -> true, new
IOManagerAsync(), "123");
+
+ TypeSerializer<Committable> serializer =
+ new CommittableTypeInfo().createSerializer(new
ExecutionConfig());
+ OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
+ new OneInputStreamOperatorTestHarness<>(operator);
+ harness.setup(serializer);
+ harness.initializeEmptyState();
+
return ((KeyValueFileStoreWrite) ((StoreSinkWriteImpl)
operator.write).write.getWrite())
.bufferSpillable();
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
index eb7756718..3f2daedff 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
@@ -27,12 +27,10 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.SinkRecord;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.state.StateInitializationContextImpl;
-import
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
@@ -50,36 +48,28 @@ public class StoreCompactOperatorTest extends TableTestBase
{
CompactRememberStoreWrite compactRememberStoreWrite =
new CompactRememberStoreWrite(streamingMode);
- StoreCompactOperator storeCompactOperator =
+ StoreCompactOperator operator =
new StoreCompactOperator(
- (FileStoreTable) getTableDefault(),
+ getTableDefault(),
(table, commitUser, state, ioManager, memoryPool,
metricGroup) ->
compactRememberStoreWrite,
"10086");
- storeCompactOperator.open();
- StateInitializationContextImpl context =
- new StateInitializationContextImpl(
- null,
- new MockOperatorStateStore() {
- @Override
- public <S> ListState<S> getUnionListState(
- ListStateDescriptor<S> stateDescriptor)
throws Exception {
- return getListState(stateDescriptor);
- }
- },
- null,
- null,
- null);
- storeCompactOperator.initStateAndWriter(
- context, (a, b, c) -> true, new IOManagerAsync(), "123");
-
- storeCompactOperator.processElement(new StreamRecord<>(data(0)));
- storeCompactOperator.processElement(new StreamRecord<>(data(0)));
- storeCompactOperator.processElement(new StreamRecord<>(data(1)));
- storeCompactOperator.processElement(new StreamRecord<>(data(1)));
- storeCompactOperator.processElement(new StreamRecord<>(data(2)));
- storeCompactOperator.prepareCommit(true, 1);
+ TypeSerializer<Committable> serializer =
+ new CommittableTypeInfo().createSerializer(new
ExecutionConfig());
+ OneInputStreamOperatorTestHarness<RowData, Committable> harness =
+ new OneInputStreamOperatorTestHarness<>(operator);
+ harness.setup(serializer);
+ harness.initializeEmptyState();
+ harness.open();
+
+ harness.processElement(new StreamRecord<>(data(0)));
+ harness.processElement(new StreamRecord<>(data(0)));
+ harness.processElement(new StreamRecord<>(data(1)));
+ harness.processElement(new StreamRecord<>(data(1)));
+ harness.processElement(new StreamRecord<>(data(2)));
+
+ operator.prepareCommit(true, 1);
Assertions.assertThat(compactRememberStoreWrite.compactTime).isEqualTo(3);
}