This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9892c2ad19 [core] format table: refactor overwrite to both Flink and
Spark (#6497)
9892c2ad19 is described below
commit 9892c2ad19838e1c68114531cc1370c6e29e01d3
Author: jerry <[email protected]>
AuthorDate: Sat Nov 1 12:53:49 2025 +0800
[core] format table: refactor overwrite to both Flink and Spark (#6497)
---
.../table/format/FormatBatchWriteBuilder.java | 49 +++++-
.../paimon/table/format/FormatTableCommit.java | 195 +++++++++++++++++++++
.../paimon/table/format/FormatTableWrite.java | 28 ---
.../org/apache/paimon/catalog/CatalogTestBase.java | 124 ++++++++++++-
.../flink/sink/FlinkFormatTableDataStreamSink.java | 45 +++--
.../paimon/flink/sink/FlinkFormatTableSink.java | 15 +-
.../paimon/flink/sink/FlinkTableSinkBase.java | 4 +-
.../paimon/flink/source/FormatTableITCase.java | 87 +++++++++
.../paimon/spark/format/PaimonFormatTable.scala | 127 ++++++--------
9 files changed, 551 insertions(+), 123 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java
index abb10f0535..78fef13ab6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java
@@ -28,6 +28,7 @@ import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -38,6 +39,8 @@ public class FormatBatchWriteBuilder implements
BatchWriteBuilder {
private final FormatTable table;
protected final CoreOptions options;
+ private Map<String, String> staticPartition;
+ private boolean overwrite = false;
public FormatBatchWriteBuilder(FormatTable table) {
this.table = table;
@@ -71,11 +74,53 @@ public class FormatBatchWriteBuilder implements
BatchWriteBuilder {
@Override
public BatchTableCommit newCommit() {
- throw new UnsupportedOperationException("FormatTable does not support
commit");
+ boolean formatTablePartitionOnlyValueInPath =
+ (new
CoreOptions(table.options())).formatTablePartitionOnlyValueInPath();
+ return new FormatTableCommit(
+ table.location(),
+ table.partitionKeys(),
+ table.fileIO(),
+ formatTablePartitionOnlyValueInPath,
+ overwrite,
+ staticPartition);
}
@Override
public BatchWriteBuilder withOverwrite(@Nullable Map<String, String>
staticPartition) {
- throw new UnsupportedOperationException("FormatTable does not support
commit");
+ this.overwrite = true;
+ validateStaticPartition(staticPartition, table.partitionKeys());
+ this.staticPartition = staticPartition;
+ return this;
+ }
+
+ protected static void validateStaticPartition(
+ Map<String, String> staticPartition, List<String> partitionKeys) {
+ if (staticPartition != null && !staticPartition.isEmpty()) {
+ if (partitionKeys == null || partitionKeys.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Format table is not partitioned, static partition
values are not allowed.");
+ }
+
+ boolean missingLeadingKey = false;
+ for (String partitionKey : partitionKeys) {
+ boolean contains = staticPartition.containsKey(partitionKey);
+ if (missingLeadingKey && contains) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Static partition column '%s' cannot be
specified without its leading partition.",
+ partitionKey));
+ }
+ if (!contains) {
+ missingLeadingKey = true;
+ }
+ }
+
+ for (String key : staticPartition.keySet()) {
+ if (!partitionKeys.contains(key)) {
+ throw new IllegalArgumentException(
+ String.format("Unknown static partition column
'%s'.", key));
+ }
+ }
+ }
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
new file mode 100644
index 0000000000..54c8cac541
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.TwoPhaseOutputStream;
+import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.stats.Statistics;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.TableCommit;
+
+import javax.annotation.Nullable;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringJoiner;
+
+import static
org.apache.paimon.table.format.FormatBatchWriteBuilder.validateStaticPartition;
+
+/** Commit for Format Table. */
+public class FormatTableCommit implements BatchTableCommit {
+
+ private String location;
+ private final boolean formatTablePartitionOnlyValueInPath;
+ private FileIO fileIO;
+ private List<String> partitionKeys;
+ protected Map<String, String> staticPartitions;
+ protected boolean overwrite = false;
+
+ public FormatTableCommit(
+ String location,
+ List<String> partitionKeys,
+ FileIO fileIO,
+ boolean formatTablePartitionOnlyValueInPath,
+ boolean overwrite,
+ @Nullable Map<String, String> staticPartitions) {
+ this.location = location;
+ this.fileIO = fileIO;
+ this.formatTablePartitionOnlyValueInPath =
formatTablePartitionOnlyValueInPath;
+ validateStaticPartition(staticPartitions, partitionKeys);
+ this.staticPartitions = staticPartitions;
+ this.overwrite = overwrite;
+ this.partitionKeys = partitionKeys;
+ }
+
+ @Override
+ public void commit(List<CommitMessage> commitMessages) {
+ try {
+ List<TwoPhaseOutputStream.Committer> committers = new
ArrayList<>();
+ for (CommitMessage commitMessage : commitMessages) {
+ if (commitMessage instanceof TwoPhaseCommitMessage) {
+ committers.add(((TwoPhaseCommitMessage)
commitMessage).getCommitter());
+ } else {
+ throw new RuntimeException(
+ "Unsupported commit message type: "
+ + commitMessage.getClass().getName());
+ }
+ }
+ if (overwrite && staticPartitions != null &&
!staticPartitions.isEmpty()) {
+ Path partitionPath =
+ buildPartitionPath(
+ location,
+ staticPartitions,
+ formatTablePartitionOnlyValueInPath,
+ partitionKeys);
+ deletePreviousDataFile(partitionPath);
+ } else if (overwrite) {
+ Set<Path> partitionPaths = new HashSet<>();
+ for (TwoPhaseOutputStream.Committer c : committers) {
+ partitionPaths.add(c.targetFilePath().getParent());
+ }
+ for (Path p : partitionPaths) {
+ deletePreviousDataFile(p);
+ }
+ }
+ for (TwoPhaseOutputStream.Committer committer : committers) {
+ committer.commit(this.fileIO);
+ }
+ } catch (Exception e) {
+ this.abort(commitMessages);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static Path buildPartitionPath(
+ String location,
+ Map<String, String> partitionSpec,
+ boolean formatTablePartitionOnlyValueInPath,
+ List<String> partitionKeys) {
+ if (partitionSpec.isEmpty() || partitionKeys.isEmpty()) {
+ throw new IllegalArgumentException("partitionSpec or partitionKeys
is empty.");
+ }
+ StringJoiner joiner = new StringJoiner("/");
+ for (int i = 0; i < partitionSpec.size(); i++) {
+ String key = partitionKeys.get(i);
+ if (partitionSpec.containsKey(key)) {
+ if (formatTablePartitionOnlyValueInPath) {
+ joiner.add(partitionSpec.get(key));
+ } else {
+ joiner.add(key + "=" + partitionSpec.get(key));
+ }
+ } else {
+ throw new RuntimeException("partitionSpec does not contain
key: " + key);
+ }
+ }
+ return new Path(location, joiner.toString());
+ }
+
+ @Override
+ public void abort(List<CommitMessage> commitMessages) {
+ try {
+ for (CommitMessage commitMessage : commitMessages) {
+ if (commitMessage instanceof TwoPhaseCommitMessage) {
+ TwoPhaseCommitMessage twoPhaseCommitMessage =
+ (TwoPhaseCommitMessage) commitMessage;
+ twoPhaseCommitMessage.getCommitter().discard(this.fileIO);
+ } else {
+ throw new RuntimeException(
+ "Unsupported commit message type: "
+ + commitMessage.getClass().getName());
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {}
+
+ private void deletePreviousDataFile(Path partitionPath) throws IOException
{
+ if (fileIO.exists(partitionPath)) {
+ FileStatus[] files = fileIO.listFiles(partitionPath, true);
+ for (FileStatus file : files) {
+ if (FormatTableScan.isDataFileName(file.getPath().getName())) {
+ try {
+ fileIO.delete(file.getPath(), false);
+ } catch (FileNotFoundException ignore) {
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void truncateTable() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void truncatePartitions(List<Map<String, String>> partitionSpecs) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void updateStatistics(Statistics statistics) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void compactManifests() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TableCommit withMetricRegistry(MetricRegistry registry) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
index 08a940ba15..27b47b129c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
@@ -24,7 +24,6 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.TwoPhaseOutputStream;
import org.apache.paimon.io.BundleRecords;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.metrics.MetricRegistry;
@@ -43,7 +42,6 @@ import java.util.stream.Collectors;
/** {@link TableWrite} implementation for format table. */
public class FormatTableWrite implements BatchTableWrite {
- private FileIO fileIO;
private RowType rowType;
private final FormatTableFileWriter write;
private final FormatTableRowPartitionKeyExtractor partitionKeyExtractor;
@@ -57,7 +55,6 @@ public class FormatTableWrite implements BatchTableWrite {
CoreOptions options,
RowType partitionType,
List<String> partitionKeys) {
- this.fileIO = fileIO;
this.rowType = rowType;
this.write = new FormatTableFileWriter(fileIO, rowType, options,
partitionType);
this.partitionKeyExtractor =
@@ -102,31 +99,6 @@ public class FormatTableWrite implements BatchTableWrite {
return write.prepareCommit();
}
- public void commit(List<CommitMessage> commitMessages) throws Exception {
- applyCommitterAction(commitMessages,
TwoPhaseOutputStream.Committer::commit);
- }
-
- public void discard(List<CommitMessage> commitMessages) throws Exception {
- applyCommitterAction(commitMessages,
TwoPhaseOutputStream.Committer::discard);
- }
-
- private interface CommitterAction {
- void apply(TwoPhaseOutputStream.Committer committer, FileIO fileIO)
throws Exception;
- }
-
- private void applyCommitterAction(List<CommitMessage> commitMessages,
CommitterAction action)
- throws Exception {
- for (CommitMessage commitMessage : commitMessages) {
- if (commitMessage instanceof TwoPhaseCommitMessage) {
- TwoPhaseCommitMessage twoPhaseCommitMessage =
(TwoPhaseCommitMessage) commitMessage;
- action.apply(twoPhaseCommitMessage.getCommitter(),
this.fileIO);
- } else {
- throw new RuntimeException(
- "Unsupported commit message type: " +
commitMessage.getClass().getName());
- }
- }
- }
-
@Override
public void close() throws Exception {
write.close();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index ea3e2ebd86..4ddd6ab05f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -41,7 +41,6 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.format.FormatTableWrite;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
@@ -710,9 +709,126 @@ public abstract class CatalogTestBase {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testFormatTableOverwrite(boolean partitionPathOnlyValue)
throws Exception {
+ if (!supportsFormatTable()) {
+ return;
+ }
+ String dbName = "format_overwrite_db";
+ catalog.createDatabase(dbName, true);
+
+ Identifier id = Identifier.create(dbName, "format_overwrite_table");
+ Schema nonPartitionedSchema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.INT())
+ .options(getFormatTableOptions())
+ .option("file.format", "csv")
+ .option("file.compression",
HadoopCompressionType.GZIP.value())
+ .option(
+ "format-table.partition-path-only-value",
+ "" + partitionPathOnlyValue)
+ .build();
+ catalog.createTable(id, nonPartitionedSchema, true);
+ FormatTable nonPartitionedTable = (FormatTable) catalog.getTable(id);
+ BatchWriteBuilder nonPartitionedTableWriteBuilder =
+ nonPartitionedTable.newBatchWriteBuilder();
+ try (BatchTableWrite write =
nonPartitionedTableWriteBuilder.newWrite();
+ BatchTableCommit commit =
nonPartitionedTableWriteBuilder.newCommit()) {
+ write.write(GenericRow.of(1, 10));
+ write.write(GenericRow.of(2, 20));
+ commit.commit(write.prepareCommit());
+ }
+
+ try (BatchTableWrite write =
nonPartitionedTableWriteBuilder.newWrite();
+ BatchTableCommit commit =
+
nonPartitionedTableWriteBuilder.withOverwrite().newCommit()) {
+ write.write(GenericRow.of(3, 30));
+ commit.commit(write.prepareCommit());
+ }
+
+ List<InternalRow> fullOverwriteRows = read(nonPartitionedTable, null,
null, null, null);
+
assertThat(fullOverwriteRows).containsExactlyInAnyOrder(GenericRow.of(3, 30));
+ catalog.dropTable(id, true);
+
+ Identifier pid = Identifier.create(dbName,
"format_overwrite_partitioned");
+ Schema partitionedSchema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.INT())
+ .column("year", DataTypes.INT())
+ .column("month", DataTypes.INT())
+ .partitionKeys("year", "month")
+ .options(getFormatTableOptions())
+ .option("file.format", "csv")
+ .option("file.compression",
HadoopCompressionType.GZIP.value())
+ .option(
+ "format-table.partition-path-only-value",
+ "" + partitionPathOnlyValue)
+ .build();
+ catalog.createTable(pid, partitionedSchema, true);
+ FormatTable partitionedTable = (FormatTable) catalog.getTable(pid);
+ BatchWriteBuilder partitionedTableWriteBuilder =
partitionedTable.newBatchWriteBuilder();
+ try (BatchTableWrite write = partitionedTableWriteBuilder.newWrite();
+ BatchTableCommit commit =
partitionedTableWriteBuilder.newCommit()) {
+ write.write(GenericRow.of(1, 100, 2024, 10));
+ write.write(GenericRow.of(2, 200, 2025, 10));
+ write.write(GenericRow.of(3, 300, 2025, 11));
+ commit.commit(write.prepareCommit());
+ }
+
+ Map<String, String> staticPartition = new HashMap<>();
+ staticPartition.put("year", "2024");
+ staticPartition.put("month", "10");
+ try (BatchTableWrite write = partitionedTableWriteBuilder.newWrite();
+ BatchTableCommit commit =
+
partitionedTableWriteBuilder.withOverwrite(staticPartition).newCommit()) {
+ write.write(GenericRow.of(10, 1000, 2024, 10));
+ commit.commit(write.prepareCommit());
+ }
+
+ List<InternalRow> partitionOverwriteRows = read(partitionedTable,
null, null, null, null);
+ assertThat(partitionOverwriteRows)
+ .containsExactlyInAnyOrder(
+ GenericRow.of(10, 1000, 2024, 10),
+ GenericRow.of(2, 200, 2025, 10),
+ GenericRow.of(3, 300, 2025, 11));
+
+ staticPartition = new HashMap<>();
+ staticPartition.put("year", "2025");
+ try (BatchTableWrite write = partitionedTableWriteBuilder.newWrite();
+ BatchTableCommit commit =
+
partitionedTableWriteBuilder.withOverwrite(staticPartition).newCommit()) {
+ write.write(GenericRow.of(10, 1000, 2025, 10));
+ commit.commit(write.prepareCommit());
+ }
+
+ partitionOverwriteRows = read(partitionedTable, null, null, null,
null);
+ assertThat(partitionOverwriteRows)
+ .containsExactlyInAnyOrder(
+ GenericRow.of(10, 1000, 2024, 10), GenericRow.of(10,
1000, 2025, 10));
+
+ try (BatchTableWrite write = partitionedTableWriteBuilder.newWrite()) {
+ write.write(GenericRow.of(10, 1000, 2025, 10));
+ assertThrows(
+ RuntimeException.class,
+ () -> {
+ Map<String, String> staticOverwritePartition = new
HashMap<>();
+ staticOverwritePartition.put("month", "10");
+ partitionedTableWriteBuilder
+ .withOverwrite(staticOverwritePartition)
+ .newCommit();
+ });
+ }
+ catalog.dropTable(pid, true);
+ }
+
private void writeAndCheckCommitFormatTable(
- Table table, InternalRow[] datas, InternalRow
dataWithDiffPartition) throws Exception {
- try (FormatTableWrite write = (FormatTableWrite)
table.newBatchWriteBuilder().newWrite()) {
+ FormatTable table, InternalRow[] datas, InternalRow
dataWithDiffPartition)
+ throws Exception {
+ try (BatchTableWrite write = table.newBatchWriteBuilder().newWrite();
+ BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
for (InternalRow row : datas) {
write.write(row);
}
@@ -722,7 +838,7 @@ public abstract class CatalogTestBase {
List<CommitMessage> committers = write.prepareCommit();
List<InternalRow> readData = read(table, null, null, null, null);
assertThat(readData).isEmpty();
- write.commit(committers);
+ commit.commit(committers);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableDataStreamSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableDataStreamSink.java
index d340af1d65..3d133a9cea 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableDataStreamSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableDataStreamSink.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.format.FormatTableWrite;
+import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
@@ -34,26 +35,37 @@ import
org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.data.RowData;
import java.util.List;
+import java.util.Map;
/** DataStream sink for format tables. */
public class FlinkFormatTableDataStreamSink {
private final FormatTable table;
+ private final boolean overwrite;
+ private final Map<String, String> staticPartitions;
- public FlinkFormatTableDataStreamSink(FormatTable table) {
+ public FlinkFormatTableDataStreamSink(
+ FormatTable table, boolean overwrite, Map<String, String>
staticPartitions) {
this.table = table;
+ this.overwrite = overwrite;
+ this.staticPartitions = staticPartitions;
}
public DataStreamSink<?> sinkFrom(DataStream<RowData> dataStream) {
- return dataStream.sinkTo(new FormatTableSink(table));
+ return dataStream.sinkTo(new FormatTableSink(table, overwrite,
staticPartitions));
}
private static class FormatTableSink implements Sink<RowData> {
private final FormatTable table;
+ private final boolean overwrite;
+ private final Map<String, String> staticPartitions;
- public FormatTableSink(FormatTable table) {
+ public FormatTableSink(
+ FormatTable table, boolean overwrite, Map<String, String>
staticPartitions) {
this.table = table;
+ this.overwrite = overwrite;
+ this.staticPartitions = staticPartitions;
}
/**
@@ -61,7 +73,7 @@ public class FlinkFormatTableDataStreamSink {
* 2.0+.
*/
public SinkWriter<RowData> createWriter(InitContext context) {
- return new FormatTableSinkWriter(table);
+ return new FormatTableSinkWriter(table, overwrite,
staticPartitions);
}
/**
@@ -69,18 +81,25 @@ public class FlinkFormatTableDataStreamSink {
* 1.18-.
*/
public SinkWriter<RowData> createWriter(WriterInitContext context) {
- return new FormatTableSinkWriter(table);
+ return new FormatTableSinkWriter(table, overwrite,
staticPartitions);
}
/** Sink writer for format tables using Flink v2 API. */
private static class FormatTableSinkWriter implements
SinkWriter<RowData> {
- private transient FormatTableWrite tableWrite;
private transient BatchWriteBuilder writeBuilder;
+ private transient FormatTableWrite tableWrite;
+ private transient BatchTableCommit tableCommit;
- public FormatTableSinkWriter(FormatTable table) {
+ public FormatTableSinkWriter(
+ FormatTable table, boolean overwrite, Map<String, String>
staticPartitions) {
this.writeBuilder = table.newBatchWriteBuilder();
this.tableWrite = (FormatTableWrite) writeBuilder.newWrite();
+ if (overwrite) {
+ this.tableCommit =
writeBuilder.withOverwrite(staticPartitions).newCommit();
+ } else {
+ this.tableCommit = writeBuilder.newCommit();
+ }
}
@Override
@@ -99,16 +118,16 @@ public class FlinkFormatTableDataStreamSink {
@Override
public void close() throws Exception {
if (tableWrite != null) {
- List<CommitMessage> committers = null;
+ List<CommitMessage> commitMessages = null;
try {
// Prepare commit and commit the data
- committers = tableWrite.prepareCommit();
- if (!committers.isEmpty()) {
- tableWrite.commit(committers);
+ commitMessages = tableWrite.prepareCommit();
+ if (!commitMessages.isEmpty()) {
+ tableCommit.commit(commitMessages);
}
} catch (Exception e) {
- if (committers != null && !committers.isEmpty()) {
- tableWrite.discard(committers);
+ if (commitMessages != null &&
!commitMessages.isEmpty()) {
+ tableCommit.abort(commitMessages);
}
throw new RuntimeException(e);
} finally {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java
index 83c2f9072f..361323f016 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java
@@ -24,6 +24,7 @@ import org.apache.paimon.table.FormatTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.factories.DynamicTableFactory;
@@ -31,12 +32,14 @@ import java.util.HashMap;
import java.util.Map;
/** Table sink for format tables. */
-public class FlinkFormatTableSink implements DynamicTableSink,
SupportsPartitioning {
+public class FlinkFormatTableSink
+ implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning {
private final ObjectIdentifier tableIdentifier;
private final FormatTable table;
private final DynamicTableFactory.Context context;
private Map<String, String> staticPartitions = new HashMap<>();
+ protected boolean overwrite = false;
public FlinkFormatTableSink(
ObjectIdentifier tableIdentifier,
@@ -55,13 +58,16 @@ public class FlinkFormatTableSink implements
DynamicTableSink, SupportsPartition
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return new PaimonDataStreamSinkProvider(
- (dataStream) -> new
FlinkFormatTableDataStreamSink(table).sinkFrom(dataStream));
+ (dataStream) ->
+ new FlinkFormatTableDataStreamSink(table, overwrite,
staticPartitions)
+ .sinkFrom(dataStream));
}
@Override
public DynamicTableSink copy() {
FlinkFormatTableSink copied = new
FlinkFormatTableSink(tableIdentifier, table, context);
copied.staticPartitions = new HashMap<>(staticPartitions);
+ copied.overwrite = overwrite;
return copied;
}
@@ -81,4 +87,9 @@ public class FlinkFormatTableSink implements
DynamicTableSink, SupportsPartition
}
});
}
+
+ @Override
+ public void applyOverwrite(boolean overwrite) {
+ this.overwrite = overwrite;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
index d7847f1889..79e449407a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
@@ -123,7 +123,9 @@ public abstract class FlinkTableSinkBase
FormatTable formatTable = (FormatTable) table;
return new PaimonDataStreamSinkProvider(
(dataStream) ->
- new
FlinkFormatTableDataStreamSink(formatTable).sinkFrom(dataStream));
+ new FlinkFormatTableDataStreamSink(
+ formatTable, overwrite,
staticPartitions)
+ .sinkFrom(dataStream));
}
LogSinkProvider logSinkProvider = null;
if (logStoreTableFactory != null) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableITCase.java
index 8a086116bd..2e14093baf 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableITCase.java
@@ -58,4 +58,91 @@ public class FormatTableITCase extends RESTCatalogITCaseBase
{
Row.of(new BigDecimal(bigDecimalStr), 2));
sql("Drop TABLE %s", tableName);
}
+
+ @Test
+ public void testPartitionedTableInsertOverwrite() {
+
+ String ptTableName = "format_table_overwrite";
+ Identifier ptIdentifier = Identifier.create("default", ptTableName);
+ sql(
+ "CREATE TABLE %s (a DECIMAL(8, 3), b INT, c INT) PARTITIONED
BY (c) WITH ('file.format'='parquet', 'type'='format-table')",
+ ptTableName);
+ RESTToken expiredDataToken =
+ new RESTToken(
+ ImmutableMap.of(
+ "akId", "akId-expire", "akSecret",
UUID.randomUUID().toString()),
+ System.currentTimeMillis() + 1000_000);
+ restCatalogServer.setDataToken(ptIdentifier, expiredDataToken);
+
+ String ptBigDecimalStr1 = "10.001";
+ String ptBigDecimalStr2 = "12.345";
+ Decimal ptDecimal1 = Decimal.fromBigDecimal(new
BigDecimal(ptBigDecimalStr1), 8, 3);
+ Decimal ptDecimal2 = Decimal.fromBigDecimal(new
BigDecimal(ptBigDecimalStr2), 8, 3);
+
+ sql(
+ "INSERT INTO %s PARTITION (c = 1) VALUES (%s, 10), (%s, 20)",
+ ptTableName, ptDecimal1, ptDecimal1);
+ sql("INSERT INTO %s PARTITION (c = 2) VALUES (%s, 30)", ptTableName,
ptDecimal1);
+
+ assertThat(sql("SELECT a, b, c FROM %s", ptTableName))
+ .containsExactlyInAnyOrder(
+ Row.of(new BigDecimal(ptBigDecimalStr1), 10, 1),
+ Row.of(new BigDecimal(ptBigDecimalStr1), 20, 1),
+ Row.of(new BigDecimal(ptBigDecimalStr1), 30, 2));
+
+ sql(
+ "INSERT OVERWRITE %s PARTITION (c = 1) VALUES (%s, 100), (%s,
200)",
+ ptTableName, ptDecimal2, ptDecimal2);
+
+ assertThat(sql("SELECT a, b, c FROM %s", ptTableName))
+ .containsExactlyInAnyOrder(
+ Row.of(new BigDecimal(ptBigDecimalStr2), 100, 1),
+ Row.of(new BigDecimal(ptBigDecimalStr2), 200, 1),
+ Row.of(new BigDecimal(ptBigDecimalStr1), 30, 2));
+
+ sql(
+ "INSERT OVERWRITE %s VALUES (%s, 100, 1), (%s, 200, 2)",
+ ptTableName, ptDecimal1, ptDecimal2);
+
+ assertThat(sql("SELECT a, b, c FROM %s", ptTableName))
+ .containsExactlyInAnyOrder(
+ Row.of(new BigDecimal(ptBigDecimalStr1), 100, 1),
+ Row.of(new BigDecimal(ptBigDecimalStr2), 200, 2));
+
+ sql("Drop TABLE %s", ptTableName);
+ }
+
+ @Test
+ public void testUnPartitionedTableInsertOverwrite() {
+ String tableName = "format_table_overwrite_test";
+ String bigDecimalStr1 = "10.001";
+ String bigDecimalStr2 = "12.345";
+ Decimal decimal1 = Decimal.fromBigDecimal(new
BigDecimal(bigDecimalStr1), 8, 3);
+ Decimal decimal2 = Decimal.fromBigDecimal(new
BigDecimal(bigDecimalStr2), 8, 3);
+
+ Identifier identifier = Identifier.create("default", tableName);
+ sql(
+ "CREATE TABLE %s (a DECIMAL(8, 3), b INT, c INT) WITH
('file.format'='parquet', 'type'='format-table')",
+ tableName);
+ RESTToken expiredDataToken =
+ new RESTToken(
+ ImmutableMap.of(
+ "akId", "akId-expire", "akSecret",
UUID.randomUUID().toString()),
+ System.currentTimeMillis() + 1000_000);
+ restCatalogServer.setDataToken(identifier, expiredDataToken);
+
+ sql("INSERT INTO %s VALUES (%s, 1, 1), (%s, 2, 2)", tableName,
decimal1, decimal1);
+ assertThat(sql("SELECT a, b FROM %s", tableName))
+ .containsExactlyInAnyOrder(
+ Row.of(new BigDecimal(bigDecimalStr1), 1),
+ Row.of(new BigDecimal(bigDecimalStr1), 2));
+
+ sql("INSERT OVERWRITE %s VALUES (%s, 3, 3), (%s, 4, 4)", tableName,
decimal2, decimal2);
+ assertThat(sql("SELECT a, b FROM %s", tableName))
+ .containsExactlyInAnyOrder(
+ Row.of(new BigDecimal(bigDecimalStr2), 3),
+ Row.of(new BigDecimal(bigDecimalStr2), 4));
+
+ sql("Drop TABLE %s", tableName);
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
index 421dc6206c..449346e456 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
@@ -18,13 +18,13 @@
package org.apache.paimon.spark.format
+import org.apache.paimon.CoreOptions
import org.apache.paimon.format.csv.CsvOptions
-import org.apache.paimon.fs.TwoPhaseOutputStream
import org.apache.paimon.spark.{BaseTable, FormatTableScanBuilder,
SparkInternalRowWrapper}
import org.apache.paimon.spark.write.BaseV2WriteBuilder
import org.apache.paimon.table.FormatTable
-import org.apache.paimon.table.format.TwoPhaseCommitMessage
-import org.apache.paimon.table.sink.BatchTableWrite
+import org.apache.paimon.table.format.{FormatTableCommit,
TwoPhaseCommitMessage}
+import org.apache.paimon.table.sink.{BatchTableWrite, BatchWriteBuilder,
CommitMessage}
import org.apache.paimon.types.RowType
import org.apache.spark.internal.Logging
@@ -37,7 +37,6 @@ import
org.apache.spark.sql.connector.write.streaming.StreamingWrite
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import java.io.FileNotFoundException
import java.util
import scala.collection.JavaConverters._
@@ -57,7 +56,11 @@ case class PaimonFormatTable(table: FormatTable)
properties.put(TableCatalog.PROP_COMMENT, table.comment.get)
}
if (FormatTable.Format.CSV == table.format) {
- properties.put("sep", properties.get(CsvOptions.FIELD_DELIMITER.key()))
+ properties.put(
+ "sep",
+ properties.getOrDefault(
+ CsvOptions.FIELD_DELIMITER.key(),
+ CsvOptions.FIELD_DELIMITER.defaultValue()))
}
properties
}
@@ -101,37 +104,28 @@ private case class FormatTableBatchWrite(
!(overwriteDynamic && overwritePartitions.exists(_.nonEmpty)),
"Cannot overwrite dynamically and by filter both")
+ private val batchWriteBuilder = {
+ val builder = table.newBatchWriteBuilder()
+ if (overwriteDynamic) {
+ builder.withOverwrite()
+ } else {
+ overwritePartitions.foreach(partitions =>
builder.withOverwrite(partitions.asJava))
+ }
+ builder
+ }
+
override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory =
- FormatTableWriterFactory(table, writeSchema)
+ FormatTableWriterFactory(batchWriteBuilder, writeSchema)
override def useCommitCoordinator(): Boolean = false
override def commit(messages: Array[WriterCommitMessage]): Unit = {
logInfo(s"Committing to FormatTable ${table.name()}")
-
- val committers = messages
- .collect {
- case taskCommit: FormatTableTaskCommit => taskCommit.committers()
- case other =>
- throw new IllegalArgumentException(s"${other.getClass.getName} is
not supported")
- }
- .flatten
- .toSeq
-
+ val batchTableCommit = batchWriteBuilder.newCommit()
+ val commitMessages = getPaimonCommitMessages(messages)
try {
val start = System.currentTimeMillis()
- if (overwritePartitions.isDefined && overwritePartitions.get.nonEmpty) {
- val child = org.apache.paimon.partition.PartitionUtils
- .buildPartitionName(overwritePartitions.get.asJava)
- val partitionPath = new org.apache.paimon.fs.Path(table.location(),
child)
- deletePreviousDataFile(partitionPath)
- } else if (overwritePartitions.isDefined &&
overwritePartitions.get.isEmpty) {
- committers
- .map(c => c.targetFilePath().getParent)
- .distinct
- .foreach(deletePreviousDataFile)
- }
- committers.foreach(c => c.commit(table.fileIO()))
+ batchTableCommit.commit(commitMessages)
logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
} catch {
case e: Exception =>
@@ -140,53 +134,38 @@ private case class FormatTableBatchWrite(
}
}
- private def deletePreviousDataFile(partitionPath:
org.apache.paimon.fs.Path): Unit = {
- if (table.fileIO().exists(partitionPath)) {
- val files = table.fileIO().listFiles(partitionPath, true)
- files
- .filter(f => !f.getPath.getName.startsWith(".") &&
!f.getPath.getName.startsWith("_"))
- .foreach(
- f => {
- try {
- table.fileIO().deleteQuietly(f.getPath)
- } catch {
- case _: FileNotFoundException => logInfo(s"File ${f.getPath}
already deleted")
- case other => throw new RuntimeException(other)
- }
- })
- }
- }
-
override def abort(messages: Array[WriterCommitMessage]): Unit = {
logInfo(s"Aborting write to FormatTable ${table.name()}")
- val committers = messages.collect {
- case taskCommit: FormatTableTaskCommit => taskCommit.committers()
- }.flatten
-
- committers.foreach {
- committer =>
- try {
- committer.discard(table.fileIO())
- } catch {
- case e: Exception => logWarning(s"Failed to abort committer:
${e.getMessage}")
- }
- }
+ val batchTableCommit = batchWriteBuilder.newCommit()
+ val commitMessages = getPaimonCommitMessages(messages)
+ batchTableCommit.abort(commitMessages)
+ }
+
+ private def getPaimonCommitMessages(
+ messages: Array[WriterCommitMessage]): util.List[CommitMessage] = {
+ messages
+ .collect {
+ case taskCommit: FormatTableTaskCommit => taskCommit.commitMessages()
+ case other =>
+ throw new IllegalArgumentException(s"${other.getClass.getName} is
not supported")
+ }
+ .flatten
+ .toList
+ .asJava
}
}
-private case class FormatTableWriterFactory(table: FormatTable, writeSchema:
StructType)
+private case class FormatTableWriterFactory(
+ batchWriteBuilder: BatchWriteBuilder,
+ writeSchema: StructType)
extends DataWriterFactory {
override def createWriter(partitionId: Int, taskId: Long):
DataWriter[InternalRow] = {
- val formatTableWrite = table.newBatchWriteBuilder().newWrite()
- new FormatTableDataWriter(table, formatTableWrite, writeSchema)
+ new FormatTableDataWriter(batchWriteBuilder, writeSchema)
}
}
-private class FormatTableDataWriter(
- table: FormatTable,
- formatTableWrite: BatchTableWrite,
- writeSchema: StructType)
+private class FormatTableDataWriter(batchWriteBuilder: BatchWriteBuilder,
writeSchema: StructType)
extends DataWriter[InternalRow]
with Logging {
@@ -197,24 +176,26 @@ private class FormatTableDataWriter(
}
}
+ private val write: BatchTableWrite = batchWriteBuilder.newWrite()
+
override def write(record: InternalRow): Unit = {
val paimonRow = rowConverter.apply(record)
- formatTableWrite.write(paimonRow)
+ write.write(paimonRow)
}
override def commit(): WriterCommitMessage = {
try {
- val committers = formatTableWrite
+ val commitMessages = write
.prepareCommit()
.asScala
.map {
- case committer: TwoPhaseCommitMessage => committer.getCommitter
+ case commitMessage: TwoPhaseCommitMessage => commitMessage
case other =>
throw new IllegalArgumentException(
"Unsupported commit message type: " +
other.getClass.getSimpleName)
}
.toSeq
- FormatTableTaskCommit(committers)
+ FormatTableTaskCommit(commitMessages)
} finally {
close()
}
@@ -227,7 +208,7 @@ private class FormatTableDataWriter(
override def close(): Unit = {
try {
- formatTableWrite.close()
+ write.close()
} catch {
case e: Exception =>
logError("Error closing FormatTableDataWriter", e)
@@ -237,14 +218,14 @@ private class FormatTableDataWriter(
}
/** Commit message container for FormatTable writes, holding committers that
need to be executed. */
-class FormatTableTaskCommit private (private val _committers:
Seq[TwoPhaseOutputStream.Committer])
+class FormatTableTaskCommit private (private val _commitMessages:
Seq[CommitMessage])
extends WriterCommitMessage {
- def committers(): Seq[TwoPhaseOutputStream.Committer] = _committers
+ def commitMessages(): Seq[CommitMessage] = _commitMessages
}
object FormatTableTaskCommit {
- def apply(committers: Seq[TwoPhaseOutputStream.Committer]):
FormatTableTaskCommit = {
- new FormatTableTaskCommit(committers)
+ def apply(commitMessages: Seq[CommitMessage]): FormatTableTaskCommit = {
+ new FormatTableTaskCommit(commitMessages)
}
}