This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2d4b8a420 [core] Introduce TableCommit.abort (#898)
2d4b8a420 is described below
commit 2d4b8a42035c944530140c1dd05580bdd67a7c62
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Apr 13 19:39:14 2023 +0800
[core] Introduce TableCommit.abort (#898)
---
.../apache/paimon/operation/FileStoreCommit.java | 4 ++++
.../paimon/operation/FileStoreCommitImpl.java | 25 ++++++++++++++++++++++
.../org/apache/paimon/table/sink/TableCommit.java | 8 ++++++-
.../apache/paimon/table/sink/TableCommitImpl.java | 5 +++++
.../paimon/table/FileStoreTableTestBase.java | 16 ++++++++++++++
.../apache/paimon/flink/PreAggregationITCase.java | 19 +++++++++++-----
6 files changed, 71 insertions(+), 6 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index ac1d17aba..d7c68dd5b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -20,6 +20,7 @@ package org.apache.paimon.operation;
import org.apache.paimon.Snapshot;
import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.table.sink.CommitMessage;
import java.util.List;
import java.util.Map;
@@ -72,4 +73,7 @@ public interface FileStoreCommit {
* @param partitions A list of partition {@link Map}s. NOTE: cannot be
empty!
*/
void dropPartitions(List<Map<String, String>> partitions, long
commitIdentifier);
+
+ /** Abort an unsuccessful commit. The data files will be deleted. */
+ void abort(List<CommitMessage> commitMessages);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index da0d75d56..2084701b6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -25,6 +25,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
@@ -39,6 +40,7 @@ import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.SnapshotManager;
@@ -373,6 +375,29 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
Collections.emptyMap());
}
+ @Override
+ public void abort(List<CommitMessage> commitMessages) {
+ Map<Pair<BinaryRow, Integer>, DataFilePathFactory> factoryMap = new
HashMap<>();
+ for (CommitMessage message : commitMessages) {
+ DataFilePathFactory pathFactory =
+ factoryMap.computeIfAbsent(
+ Pair.of(message.partition(), message.bucket()),
+ k ->
+ this.pathFactory.createDataFilePathFactory(
+ k.getKey(), k.getValue()));
+ CommitMessageImpl commitMessage = (CommitMessageImpl) message;
+ List<DataFileMeta> toDelete = new ArrayList<>();
+ toDelete.addAll(commitMessage.newFilesIncrement().newFiles());
+
toDelete.addAll(commitMessage.newFilesIncrement().changelogFiles());
+ toDelete.addAll(commitMessage.compactIncrement().compactAfter());
+ toDelete.addAll(commitMessage.compactIncrement().changelogFiles());
+
+ for (DataFileMeta file : toDelete) {
+ fileIO.deleteQuietly(pathFactory.toPath(file.fileName()));
+ }
+ }
+ }
+
private void collectChanges(
List<CommitMessage> commitMessages,
List<ManifestEntry> appendTableFiles,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommit.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommit.java
index 2694d66b0..8845b7fa1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommit.java
@@ -21,6 +21,8 @@ package org.apache.paimon.table.sink;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.table.Table;
+import java.util.List;
+
/**
* Create and commit snapshots of a {@link Table}. Snapshots are produced from
{@link
* CommitMessage}s, which themselves are generated by {@link TableWrite}.
@@ -31,4 +33,8 @@ import org.apache.paimon.table.Table;
* @since 0.4.0
*/
@Public
-public interface TableCommit extends AutoCloseable {}
+public interface TableCommit extends AutoCloseable {
+
+ /** Abort an unsuccessful commit. The data files will be deleted. */
+ void abort(List<CommitMessage> commitMessages);
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 4b411e7a4..9fe8ea3fb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -159,4 +159,9 @@ public class TableCommitImpl implements InnerTableCommit {
batchCommitted = true;
commit(BatchWriteBuilder.COMMIT_IDENTIFIER, commitMessages);
}
+
+ @Override
+ public void abort(List<CommitMessage> commitMessages) {
+ commit.abort(commitMessages);
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index bf99f93f7..edf58e0d5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -29,7 +29,9 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.fs.FileIOFinder;
+import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
@@ -321,6 +323,20 @@ public abstract class FileStoreTableTestBase {
assertThat(((DataSplit) splits.get(0)).bucket()).isEqualTo(1);
}
+ @Test
+ public void testAbort() throws Exception {
+ FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET,
1));
+ StreamTableWrite write = table.newWrite(commitUser);
+ write.write(rowData(1, 2, 3L));
+ List<CommitMessage> messages = write.prepareCommit(true, 0);
+ table.newCommit(commitUser).abort(messages);
+
+ FileStatus[] files =
+ LocalFileIO.create().listStatus(new Path(tablePath +
"/pt=1/bucket-0"));
+ assertThat(files).isEmpty();
+ write.close();
+ }
+
@Test
public void testReadFilter() throws Exception {
FileStoreTable table = createFileStoreTable();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
index 9268f9530..bb14b9a27 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.Test;
@@ -841,14 +842,22 @@ public class PreAggregationITCase {
sql("CREATE TABLE INPUT (" + "k INT," + "b INT," + "PRIMARY KEY
(k) NOT ENFORCED);");
CloseableIterator<Row> insert =
streamSqlIter("INSERT INTO T SELECT k, SUM(b) FROM INPUT
GROUP BY k;");
+ BlockingIterator<Row, Row> select = streamSqlBlockIter("SELECT *
FROM T");
+
sql("INSERT INTO INPUT VALUES (1, 1), (2, 2)");
+ assertThat(select.collect(2))
+ .containsExactlyInAnyOrder(
+ Row.of(1, BigDecimal.valueOf(100, 2)),
+ Row.of(2, BigDecimal.valueOf(200, 2)));
+
sql("INSERT INTO INPUT VALUES (1, 3), (2, 4)");
- BlockingIterator<Row, Row> select = streamSqlBlockIter("SELECT *
FROM T");
- List<Row> result = select.collect(2);
- assertThat(result)
+ assertThat(select.collect(4))
.containsExactlyInAnyOrder(
- Row.of(1, BigDecimal.valueOf(300, 2)),
- Row.of(2, BigDecimal.valueOf(400, 2)));
+ Row.ofKind(RowKind.UPDATE_BEFORE, 1,
BigDecimal.valueOf(100, 2)),
+ Row.ofKind(RowKind.UPDATE_AFTER, 1,
BigDecimal.valueOf(300, 2)),
+ Row.ofKind(RowKind.UPDATE_BEFORE, 2,
BigDecimal.valueOf(200, 2)),
+ Row.ofKind(RowKind.UPDATE_AFTER, 2,
BigDecimal.valueOf(400, 2)));
+
select.close();
insert.close();
}