This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2df0c1e4b [core] Introduce BatchTableCommit.truncateTable (#3037)
2df0c1e4b is described below
commit 2df0c1e4b33ec3e7c1d2ccc8a01fa816912423c0
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 18 19:05:48 2024 +0800
[core] Introduce BatchTableCommit.truncateTable (#3037)
---
.../paimon/benchmark/TableWriterBenchmark.java | 56 +++++++++++-----------
.../apache/paimon/operation/FileStoreCommit.java | 2 +-
.../paimon/operation/FileStoreCommitImpl.java | 2 +-
.../apache/paimon/table/sink/BatchTableCommit.java | 6 +++
.../apache/paimon/table/sink/TableCommitImpl.java | 13 ++++-
.../apache/paimon/flink/sink/FlinkTableSink.java | 10 +---
.../SupportsRowLevelOperationFlinkTableSink.java | 2 +-
.../commands/DeleteFromPaimonTableCommand.scala | 2 +-
.../commands/PaimonTruncateTableCommand.scala | 2 +-
9 files changed, 52 insertions(+), 43 deletions(-)
diff --git
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
index 5ced4248b..04d1d7342 100644
---
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
+++
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
@@ -20,14 +20,13 @@ package org.apache.paimon.benchmark;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.options.Options;
-import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.sink.StreamTableCommit;
-import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.junit.jupiter.api.Test;
-import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/** Benchmark for table writer. */
@@ -41,9 +40,9 @@ public class TableWriterBenchmark extends TableBenchmark {
/*
* Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16
* Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
- * avro: Best/Avg Time(ms) Row Rate(M/s) Per
Row(ns) Relative
+ * avro: Best/Avg Time(ms) Row Rate(K/s) Per
Row(ns) Relative
*
---------------------------------------------------------------------------------
- * avro_write 5847 / 7296 0.1 19489.5
1.0X
+ * avro_write 40309 / 41161 74.4 13436.3
1.0X
*/
}
@@ -56,9 +55,9 @@ public class TableWriterBenchmark extends TableBenchmark {
/*
* Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16
* Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
- * avro: Best/Avg Time(ms) Row Rate(M/s) Per
Row(ns) Relative
+ * avro: Best/Avg Time(ms) Row Rate(K/s) Per
Row(ns) Relative
*
---------------------------------------------------------------------------------
- * avro_write 4701 / 5780 0.1 15669.6
1.0X
+ * avro_write 31817 / 32359 94.3 10605.6
1.0X
*/
}
@@ -71,9 +70,9 @@ public class TableWriterBenchmark extends TableBenchmark {
/*
* Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16
* Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
- * orc: Best/Avg Time(ms) Row Rate(M/s) Per Row(ns)
Relative
+ * orc: Best/Avg Time(ms) Row Rate(K/s) Per Row(ns)
Relative
*
---------------------------------------------------------------------------------
- * orc_write 8448 / 9584 0.0 28160.1
1.0X
+ * orc_write 32751 / 33032 91.6 10917.0
1.0X
*/
}
@@ -85,9 +84,9 @@ public class TableWriterBenchmark extends TableBenchmark {
/*
* Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16
* Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
- * parquet: Best/Avg Time(ms) Row Rate(M/s) Per
Row(ns) Relative
+ * parquet: Best/Avg Time(ms) Row Rate(K/s) Per
Row(ns) Relative
*
---------------------------------------------------------------------------------
- * parquet_write 10872 / 12566 0.0 36239.7
1.0X
+ * parquet_write 46279 / 46715 64.8 15426.3
1.0X
*/
}
@@ -99,44 +98,45 @@ public class TableWriterBenchmark extends TableBenchmark {
/*
* Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16
* Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
- * orc: Best/Avg Time(ms) Row Rate(M/s) Per
Row(ns) Relative
+ * orc: Best/Avg Time(ms) Row Rate(K/s) Per
Row(ns) Relative
*
---------------------------------------------------------------------------------
- * orc_write 8690 / 9771 0.0 28968.0
1.0X
+ * orc_write 36489 / 36697 82.2 12163.1
1.0X
*/
}
public void innerTest(String name, Options options) throws Exception {
options.set(CoreOptions.BUCKET, 1);
- StreamWriteBuilder writeBuilder = createTable(options,
"T").newStreamWriteBuilder();
- StreamTableWrite write = writeBuilder.newWrite();
- StreamTableCommit commit = writeBuilder.newCommit();
- long valuesPerIteration = 300_000;
+ Table table = createTable(options, "T");
+ long valuesPerIteration = 3_000_000;
Benchmark benchmark =
new Benchmark(name, valuesPerIteration)
.setNumWarmupIters(1)
.setOutputPerIteration(true);
AtomicInteger writeCount = new AtomicInteger(0);
- AtomicInteger commitIdentifier = new AtomicInteger(0);
benchmark.addCase(
"write",
- 5,
+ 3,
() -> {
+ BatchWriteBuilder writeBuilder =
table.newBatchWriteBuilder();
+ BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit();
for (int i = 0; i < valuesPerIteration; i++) {
try {
write.write(newRandomRow());
writeCount.incrementAndGet();
- if (writeCount.get() % 10_000 == 0) {
- List<CommitMessage> commitMessages =
- write.prepareCommit(false,
commitIdentifier.get());
- commit.commit(commitIdentifier.get(),
commitMessages);
- commitIdentifier.incrementAndGet();
- }
} catch (Exception e) {
throw new RuntimeException(e);
}
}
+ try {
+ commit.commit(write.prepareCommit());
+ writeBuilder.newCommit().truncateTable();
+ write.close();
+ commit.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
});
benchmark.run();
- write.close();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index fad4c2b70..7151f2512 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -78,7 +78,7 @@ public interface FileStoreCommit {
*/
void dropPartitions(List<Map<String, String>> partitions, long
commitIdentifier);
- void purgeTable(long commitIdentifier);
+ void truncateTable(long commitIdentifier);
/** Abort an unsuccessful commit. The data files will be deleted. */
void abort(List<CommitMessage> commitMessages);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 46cfceb14..d3ce76b0a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -493,7 +493,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
}
@Override
- public void purgeTable(long commitIdentifier) {
+ public void truncateTable(long commitIdentifier) {
tryOverwrite(
null,
Collections.emptyList(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java
index 894aec3e5..f0c9b59e3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java
@@ -50,4 +50,10 @@ public interface BatchTableCommit extends TableCommit {
* @param commitMessages commit messages from table write
*/
void commit(List<CommitMessage> commitMessages);
+
+ /**
+ * Truncate table, like normal {@link #commit}, files are not immediately
deleted, they are only
+ * logically deleted and will be deleted after the snapshot expires.
+ */
+ void truncateTable();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index ab01943bb..c76b750a1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -65,6 +65,7 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.ExpireExecutionMode;
+import static org.apache.paimon.table.sink.BatchWriteBuilder.COMMIT_IDENTIFIER;
import static org.apache.paimon.utils.Preconditions.checkState;
/** An abstraction layer above {@link FileStoreCommit} to provide snapshot
commit and expiration. */
@@ -159,9 +160,19 @@ public class TableCommitImpl implements InnerTableCommit {
@Override
public void commit(List<CommitMessage> commitMessages) {
+ checkCommitted();
+ commit(COMMIT_IDENTIFIER, commitMessages);
+ }
+
+ @Override
+ public void truncateTable() {
+ checkCommitted();
+ commit.truncateTable(COMMIT_IDENTIFIER);
+ }
+
+ private void checkCommitted() {
checkState(!batchCommitted, "BatchTableCommit only support one-time
committing.");
batchCommitted = true;
- commit(BatchWriteBuilder.COMMIT_IDENTIFIER, commitMessages);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
index 50bc45b75..b1211b0e7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
@@ -19,10 +19,7 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.flink.log.LogStoreTableFactory;
-import org.apache.paimon.operation.FileStoreCommit;
-import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.sink.abilities.SupportsTruncate;
@@ -30,8 +27,6 @@ import org.apache.flink.table.factories.DynamicTableFactory;
import javax.annotation.Nullable;
-import java.util.UUID;
-
/** Table sink to create sink. */
public class FlinkTableSink extends SupportsRowLevelOperationFlinkTableSink
implements SupportsTruncate {
@@ -46,9 +41,6 @@ public class FlinkTableSink extends
SupportsRowLevelOperationFlinkTableSink
@Override
public void executeTruncation() {
- FileStoreCommit commit =
- ((FileStoreTable)
table).store().newCommit(UUID.randomUUID().toString());
- long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
- commit.purgeTable(identifier);
+ table.newBatchWriteBuilder().newCommit().truncateTable();
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
index c45fd168f..0d2bd3962 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
@@ -165,7 +165,7 @@ public abstract class
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
((FileStoreTable)
table).store().newCommit(UUID.randomUUID().toString());
long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
if (deletePredicate == null) {
- commit.purgeTable(identifier);
+ commit.truncateTable(identifier);
return Optional.empty();
} else if (deleteIsDropPartition()) {
commit.dropPartitions(Collections.singletonList(deletePartitions()),
identifier);
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 3b8e801a2..ff3aa253c 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -64,7 +64,7 @@ trait DeleteFromPaimonTableCommandBase extends
PaimonLeafRunnableCommand with Pa
if (forceDeleteByRows) {
deleteRowsByCondition(sparkSession)
} else if (deletePredicate.isEmpty) {
- commit.purgeTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
+ commit.truncateTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
} else {
val visitor = new OnlyPartitionKeyEqualVisitor(table.partitionKeys)
if (deletePredicate.get.visit(visitor)) {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
index 9ca27b631..e9125e3e6 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
@@ -40,7 +40,7 @@ case class PaimonTruncateTableCommand(v2Table: SparkTable,
partitionSpec: TableP
val commit = table.store.newCommit(UUID.randomUUID.toString)
if (partitionSpec.isEmpty) {
- commit.purgeTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
+ commit.truncateTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
} else {
commit.dropPartitions(
Collections.singletonList(partitionSpec.asJava),