This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2df0c1e4b [core] Introduce BatchTableCommit.truncateTable (#3037)
2df0c1e4b is described below

commit 2df0c1e4b33ec3e7c1d2ccc8a01fa816912423c0
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 18 19:05:48 2024 +0800

    [core] Introduce BatchTableCommit.truncateTable (#3037)
---
 .../paimon/benchmark/TableWriterBenchmark.java     | 56 +++++++++++-----------
 .../apache/paimon/operation/FileStoreCommit.java   |  2 +-
 .../paimon/operation/FileStoreCommitImpl.java      |  2 +-
 .../apache/paimon/table/sink/BatchTableCommit.java |  6 +++
 .../apache/paimon/table/sink/TableCommitImpl.java  | 13 ++++-
 .../apache/paimon/flink/sink/FlinkTableSink.java   | 10 +---
 .../SupportsRowLevelOperationFlinkTableSink.java   |  2 +-
 .../commands/DeleteFromPaimonTableCommand.scala    |  2 +-
 .../commands/PaimonTruncateTableCommand.scala      |  2 +-
 9 files changed, 52 insertions(+), 43 deletions(-)

diff --git 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
index 5ced4248b..04d1d7342 100644
--- 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
+++ 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
@@ -20,14 +20,13 @@ package org.apache.paimon.benchmark;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.sink.StreamTableCommit;
-import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
 
 import org.junit.jupiter.api.Test;
 
-import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /** Benchmark for table writer. */
@@ -41,9 +40,9 @@ public class TableWriterBenchmark extends TableBenchmark {
         /*
          * Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16
          * Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
-         * avro:            Best/Avg Time(ms)    Row Rate(M/s)      Per 
Row(ns)   Relative
+         * avro:            Best/Avg Time(ms)    Row Rate(K/s)      Per 
Row(ns)   Relative
          * 
---------------------------------------------------------------------------------
-         * avro_write        5847 / 7296              0.1          19489.5     
  1.0X
+         * avro_write        40309 / 41161             74.4          13436.3   
    1.0X
          */
     }
 
@@ -56,9 +55,9 @@ public class TableWriterBenchmark extends TableBenchmark {
         /*
          * Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16
          * Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
-         * avro:            Best/Avg Time(ms)    Row Rate(M/s)      Per 
Row(ns)   Relative
+         * avro:            Best/Avg Time(ms)    Row Rate(K/s)      Per 
Row(ns)   Relative
          * 
---------------------------------------------------------------------------------
-         * avro_write        4701 / 5780              0.1          15669.6     
  1.0X
+         * avro_write        31817 / 32359             94.3          10605.6   
    1.0X
          */
     }
 
@@ -71,9 +70,9 @@ public class TableWriterBenchmark extends TableBenchmark {
         /*
          * Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16
          * Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
-         * orc:            Best/Avg Time(ms)    Row Rate(M/s)      Per Row(ns) 
  Relative
+         * orc:            Best/Avg Time(ms)    Row Rate(K/s)      Per Row(ns) 
  Relative
          * 
---------------------------------------------------------------------------------
-         * orc_write        8448 / 9584              0.0          28160.1      
 1.0X
+         * orc_write        32751 / 33032             91.6          10917.0    
   1.0X
          */
     }
 
@@ -85,9 +84,9 @@ public class TableWriterBenchmark extends TableBenchmark {
         /*
          * Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16
          * Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
-         * parquet:            Best/Avg Time(ms)    Row Rate(M/s)      Per 
Row(ns)   Relative
+         * parquet:            Best/Avg Time(ms)    Row Rate(K/s)      Per 
Row(ns)   Relative
          * 
---------------------------------------------------------------------------------
-         * parquet_write       10872 / 12566              0.0          36239.7 
      1.0X
+         * parquet_write       46279 / 46715             64.8          15426.3 
      1.0X
          */
     }
 
@@ -99,44 +98,45 @@ public class TableWriterBenchmark extends TableBenchmark {
         /*
          * Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16
          * Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
-         * orc:               Best/Avg Time(ms)    Row Rate(M/s)      Per 
Row(ns)   Relative
+         * orc:               Best/Avg Time(ms)    Row Rate(K/s)      Per 
Row(ns)   Relative
          * 
---------------------------------------------------------------------------------
-         * orc_write           8690 / 9771              0.0          28968.0   
    1.0X
+         * orc_write           36489 / 36697             82.2          12163.1 
      1.0X
          */
     }
 
     public void innerTest(String name, Options options) throws Exception {
         options.set(CoreOptions.BUCKET, 1);
-        StreamWriteBuilder writeBuilder = createTable(options, 
"T").newStreamWriteBuilder();
-        StreamTableWrite write = writeBuilder.newWrite();
-        StreamTableCommit commit = writeBuilder.newCommit();
-        long valuesPerIteration = 300_000;
+        Table table = createTable(options, "T");
+        long valuesPerIteration = 3_000_000;
         Benchmark benchmark =
                 new Benchmark(name, valuesPerIteration)
                         .setNumWarmupIters(1)
                         .setOutputPerIteration(true);
         AtomicInteger writeCount = new AtomicInteger(0);
-        AtomicInteger commitIdentifier = new AtomicInteger(0);
         benchmark.addCase(
                 "write",
-                5,
+                3,
                 () -> {
+                    BatchWriteBuilder writeBuilder = 
table.newBatchWriteBuilder();
+                    BatchTableWrite write = writeBuilder.newWrite();
+                    BatchTableCommit commit = writeBuilder.newCommit();
                     for (int i = 0; i < valuesPerIteration; i++) {
                         try {
                             write.write(newRandomRow());
                             writeCount.incrementAndGet();
-                            if (writeCount.get() % 10_000 == 0) {
-                                List<CommitMessage> commitMessages =
-                                        write.prepareCommit(false, 
commitIdentifier.get());
-                                commit.commit(commitIdentifier.get(), 
commitMessages);
-                                commitIdentifier.incrementAndGet();
-                            }
                         } catch (Exception e) {
                             throw new RuntimeException(e);
                         }
                     }
+                    try {
+                        commit.commit(write.prepareCommit());
+                        writeBuilder.newCommit().truncateTable();
+                        write.close();
+                        commit.close();
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
                 });
         benchmark.run();
-        write.close();
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index fad4c2b70..7151f2512 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -78,7 +78,7 @@ public interface FileStoreCommit {
      */
     void dropPartitions(List<Map<String, String>> partitions, long 
commitIdentifier);
 
-    void purgeTable(long commitIdentifier);
+    void truncateTable(long commitIdentifier);
 
     /** Abort an unsuccessful commit. The data files will be deleted. */
     void abort(List<CommitMessage> commitMessages);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 46cfceb14..d3ce76b0a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -493,7 +493,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     }
 
     @Override
-    public void purgeTable(long commitIdentifier) {
+    public void truncateTable(long commitIdentifier) {
         tryOverwrite(
                 null,
                 Collections.emptyList(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java
index 894aec3e5..f0c9b59e3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java
@@ -50,4 +50,10 @@ public interface BatchTableCommit extends TableCommit {
      * @param commitMessages commit messages from table write
      */
     void commit(List<CommitMessage> commitMessages);
+
+    /**
+     * Truncate table, like normal {@link #commit}, files are not immediately 
deleted, they are only
+     * logically deleted and will be deleted after the snapshot expires.
+     */
+    void truncateTable();
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index ab01943bb..c76b750a1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -65,6 +65,7 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.ExpireExecutionMode;
+import static org.apache.paimon.table.sink.BatchWriteBuilder.COMMIT_IDENTIFIER;
 import static org.apache.paimon.utils.Preconditions.checkState;
 
 /** An abstraction layer above {@link FileStoreCommit} to provide snapshot 
commit and expiration. */
@@ -159,9 +160,19 @@ public class TableCommitImpl implements InnerTableCommit {
 
     @Override
     public void commit(List<CommitMessage> commitMessages) {
+        checkCommitted();
+        commit(COMMIT_IDENTIFIER, commitMessages);
+    }
+
+    @Override
+    public void truncateTable() {
+        checkCommitted();
+        commit.truncateTable(COMMIT_IDENTIFIER);
+    }
+
+    private void checkCommitted() {
         checkState(!batchCommitted, "BatchTableCommit only support one-time 
committing.");
         batchCommitted = true;
-        commit(BatchWriteBuilder.COMMIT_IDENTIFIER, commitMessages);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
index 50bc45b75..b1211b0e7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
@@ -19,10 +19,7 @@
 package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.flink.log.LogStoreTableFactory;
-import org.apache.paimon.operation.FileStoreCommit;
-import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
 
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.sink.abilities.SupportsTruncate;
@@ -30,8 +27,6 @@ import org.apache.flink.table.factories.DynamicTableFactory;
 
 import javax.annotation.Nullable;
 
-import java.util.UUID;
-
 /** Table sink to create sink. */
 public class FlinkTableSink extends SupportsRowLevelOperationFlinkTableSink
         implements SupportsTruncate {
@@ -46,9 +41,6 @@ public class FlinkTableSink extends 
SupportsRowLevelOperationFlinkTableSink
 
     @Override
     public void executeTruncation() {
-        FileStoreCommit commit =
-                ((FileStoreTable) 
table).store().newCommit(UUID.randomUUID().toString());
-        long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
-        commit.purgeTable(identifier);
+        table.newBatchWriteBuilder().newCommit().truncateTable();
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
index c45fd168f..0d2bd3962 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
@@ -165,7 +165,7 @@ public abstract class 
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
                 ((FileStoreTable) 
table).store().newCommit(UUID.randomUUID().toString());
         long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
         if (deletePredicate == null) {
-            commit.purgeTable(identifier);
+            commit.truncateTable(identifier);
             return Optional.empty();
         } else if (deleteIsDropPartition()) {
             
commit.dropPartitions(Collections.singletonList(deletePartitions()), 
identifier);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 3b8e801a2..ff3aa253c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -64,7 +64,7 @@ trait DeleteFromPaimonTableCommandBase extends 
PaimonLeafRunnableCommand with Pa
     if (forceDeleteByRows) {
       deleteRowsByCondition(sparkSession)
     } else if (deletePredicate.isEmpty) {
-      commit.purgeTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
+      commit.truncateTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
     } else {
       val visitor = new OnlyPartitionKeyEqualVisitor(table.partitionKeys)
       if (deletePredicate.get.visit(visitor)) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
index 9ca27b631..e9125e3e6 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
@@ -40,7 +40,7 @@ case class PaimonTruncateTableCommand(v2Table: SparkTable, 
partitionSpec: TableP
     val commit = table.store.newCommit(UUID.randomUUID.toString)
 
     if (partitionSpec.isEmpty) {
-      commit.purgeTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
+      commit.truncateTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
     } else {
       commit.dropPartitions(
         Collections.singletonList(partitionSpec.asJava),

Reply via email to