This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d4b8a420 [core] Introduce TableCommit.abort (#898)
2d4b8a420 is described below

commit 2d4b8a42035c944530140c1dd05580bdd67a7c62
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Apr 13 19:39:14 2023 +0800

    [core] Introduce TableCommit.abort (#898)
---
 .../apache/paimon/operation/FileStoreCommit.java   |  4 ++++
 .../paimon/operation/FileStoreCommitImpl.java      | 25 ++++++++++++++++++++++
 .../org/apache/paimon/table/sink/TableCommit.java  |  8 ++++++-
 .../apache/paimon/table/sink/TableCommitImpl.java  |  5 +++++
 .../paimon/table/FileStoreTableTestBase.java       | 16 ++++++++++++++
 .../apache/paimon/flink/PreAggregationITCase.java  | 19 +++++++++++-----
 6 files changed, 71 insertions(+), 6 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index ac1d17aba..d7c68dd5b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -20,6 +20,7 @@ package org.apache.paimon.operation;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.table.sink.CommitMessage;
 
 import java.util.List;
 import java.util.Map;
@@ -72,4 +73,7 @@ public interface FileStoreCommit {
      * @param partitions A list of partition {@link Map}s. NOTE: cannot be 
empty!
      */
     void dropPartitions(List<Map<String, String>> partitions, long 
commitIdentifier);
+
+    /** Abort an unsuccessful commit. The data files will be deleted. */
+    void abort(List<CommitMessage> commitMessages);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index da0d75d56..2084701b6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -25,6 +25,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestEntry;
@@ -39,6 +40,7 @@ import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 import org.apache.paimon.utils.SnapshotManager;
@@ -373,6 +375,29 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                 Collections.emptyMap());
     }
 
+    @Override
+    public void abort(List<CommitMessage> commitMessages) {
+        Map<Pair<BinaryRow, Integer>, DataFilePathFactory> factoryMap = new 
HashMap<>();
+        for (CommitMessage message : commitMessages) {
+            DataFilePathFactory pathFactory =
+                    factoryMap.computeIfAbsent(
+                            Pair.of(message.partition(), message.bucket()),
+                            k ->
+                                    this.pathFactory.createDataFilePathFactory(
+                                            k.getKey(), k.getValue()));
+            CommitMessageImpl commitMessage = (CommitMessageImpl) message;
+            List<DataFileMeta> toDelete = new ArrayList<>();
+            toDelete.addAll(commitMessage.newFilesIncrement().newFiles());
+            
toDelete.addAll(commitMessage.newFilesIncrement().changelogFiles());
+            toDelete.addAll(commitMessage.compactIncrement().compactAfter());
+            toDelete.addAll(commitMessage.compactIncrement().changelogFiles());
+
+            for (DataFileMeta file : toDelete) {
+                fileIO.deleteQuietly(pathFactory.toPath(file.fileName()));
+            }
+        }
+    }
+
     private void collectChanges(
             List<CommitMessage> commitMessages,
             List<ManifestEntry> appendTableFiles,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommit.java
index 2694d66b0..8845b7fa1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommit.java
@@ -21,6 +21,8 @@ package org.apache.paimon.table.sink;
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.table.Table;
 
+import java.util.List;
+
 /**
  * Create and commit snapshots of a {@link Table}. Snapshots are produced from 
{@link
  * CommitMessage}s, which themselves are generated by {@link TableWrite}.
@@ -31,4 +33,8 @@ import org.apache.paimon.table.Table;
  * @since 0.4.0
  */
 @Public
-public interface TableCommit extends AutoCloseable {}
+public interface TableCommit extends AutoCloseable {
+
+    /** Abort an unsuccessful commit. The data files will be deleted. */
+    void abort(List<CommitMessage> commitMessages);
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 4b411e7a4..9fe8ea3fb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -159,4 +159,9 @@ public class TableCommitImpl implements InnerTableCommit {
         batchCommitted = true;
         commit(BatchWriteBuilder.COMMIT_IDENTIFIER, commitMessages);
     }
+
+    @Override
+    public void abort(List<CommitMessage> commitMessages) {
+        commit.abort(commitMessages);
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index bf99f93f7..edf58e0d5 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -29,7 +29,9 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.JoinedRow;
 import org.apache.paimon.fs.FileIOFinder;
+import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
@@ -321,6 +323,20 @@ public abstract class FileStoreTableTestBase {
         assertThat(((DataSplit) splits.get(0)).bucket()).isEqualTo(1);
     }
 
+    @Test
+    public void testAbort() throws Exception {
+        FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET, 
1));
+        StreamTableWrite write = table.newWrite(commitUser);
+        write.write(rowData(1, 2, 3L));
+        List<CommitMessage> messages = write.prepareCommit(true, 0);
+        table.newCommit(commitUser).abort(messages);
+
+        FileStatus[] files =
+                LocalFileIO.create().listStatus(new Path(tablePath + 
"/pt=1/bucket-0"));
+        assertThat(files).isEmpty();
+        write.close();
+    }
+
     @Test
     public void testReadFilter() throws Exception {
         FileStoreTable table = createFileStoreTable();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
index 9268f9530..bb14b9a27 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink;
 import org.apache.paimon.utils.BlockingIterator;
 
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
 import org.junit.jupiter.api.Test;
 
@@ -841,14 +842,22 @@ public class PreAggregationITCase {
             sql("CREATE TABLE INPUT (" + "k INT," + "b INT," + "PRIMARY KEY 
(k) NOT ENFORCED);");
             CloseableIterator<Row> insert =
                     streamSqlIter("INSERT INTO T SELECT k, SUM(b) FROM INPUT 
GROUP BY k;");
+            BlockingIterator<Row, Row> select = streamSqlBlockIter("SELECT * 
FROM T");
+
             sql("INSERT INTO INPUT VALUES (1, 1), (2, 2)");
+            assertThat(select.collect(2))
+                    .containsExactlyInAnyOrder(
+                            Row.of(1, BigDecimal.valueOf(100, 2)),
+                            Row.of(2, BigDecimal.valueOf(200, 2)));
+
             sql("INSERT INTO INPUT VALUES (1, 3), (2, 4)");
-            BlockingIterator<Row, Row> select = streamSqlBlockIter("SELECT * 
FROM T");
-            List<Row> result = select.collect(2);
-            assertThat(result)
+            assertThat(select.collect(4))
                     .containsExactlyInAnyOrder(
-                            Row.of(1, BigDecimal.valueOf(300, 2)),
-                            Row.of(2, BigDecimal.valueOf(400, 2)));
+                            Row.ofKind(RowKind.UPDATE_BEFORE, 1, 
BigDecimal.valueOf(100, 2)),
+                            Row.ofKind(RowKind.UPDATE_AFTER, 1, 
BigDecimal.valueOf(300, 2)),
+                            Row.ofKind(RowKind.UPDATE_BEFORE, 2, 
BigDecimal.valueOf(200, 2)),
+                            Row.ofKind(RowKind.UPDATE_AFTER, 2, 
BigDecimal.valueOf(400, 2)));
+
             select.close();
             insert.close();
         }

Reply via email to