This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 177be640a3 [core][spark] Support add partial column by spark MERGE INTO clause for data-evolution table (#6091) 177be640a3 is described below commit 177be640a3fedf64083557f0401bca4cf1012547 Author: YeJunHao <41894543+leaves12...@users.noreply.github.com> AuthorDate: Thu Aug 21 14:20:24 2025 +0800 [core][spark] Support add partial column by spark MERGE INTO clause for data-evolution table (#6091) --- docs/content/append-table/row-tracking.md | 40 +++ .../paimon/operation/DataEvolutionSplitRead.java | 5 +- .../table/source/DataEvolutionSplitGenerator.java | 15 +- .../splitread/DataEvolutionSplitReadProvider.java | 18 +- .../paimon/table/system/RowLineageTable.java | 4 +- .../DataEvolutionSplitReadProviderTest.java | 4 + .../parquet/writer/ParquetRowDataBuilder.java | 6 +- .../spark/DataEvolutionSparkTableWrite.scala | 161 ++++++++++ .../org/apache/paimon/spark/SparkTableWrite.scala | 12 +- .../apache/paimon/spark/SparkTableWriteTrait.scala | 41 +++ .../catalyst/analysis/PaimonDeleteTable.scala | 4 + .../catalyst/analysis/PaimonMergeIntoBase.scala | 32 +- .../catalyst/analysis/PaimonUpdateTable.scala | 4 + .../spark/commands/DataEvolutionPaimonWriter.scala | 105 +++++++ .../MergeIntoPaimonDataEvolutionTable.scala | 335 +++++++++++++++++++++ .../spark/commands/MergeIntoPaimonTable.scala | 1 + .../paimon/spark/schema/PaimonMetadataColumn.scala | 7 +- .../apache/paimon/spark/util/ScanPlanHelper.scala | 17 +- .../paimon/spark/sql/RowLineageTestBase.scala | 115 +++++++ 19 files changed, 876 insertions(+), 50 deletions(-) diff --git a/docs/content/append-table/row-tracking.md b/docs/content/append-table/row-tracking.md index 5277653001..d8d4022155 100644 --- a/docs/content/append-table/row-tracking.md +++ b/docs/content/append-table/row-tracking.md @@ -129,3 +129,43 @@ You will get: - If we append records to row-tracking table in the first time, we don't actually write them to the data file, they are lazy assigned by committer. - If one row moved from one file to another file for **any reason**, the `_ROW_ID` column should be copied to the target file. The `_SEQUENCE_NUMBER` field should be set to `NULL` if the record is changed, otherwise, copy it too. - Whenever we read from a row-tracking table, we firstly read `_ROW_ID` and `_SEQUENCE_NUMBER` from the data file, then we read the value columns from the data file. If they found `NULL`, we read from `DataFileMeta` to fall back to the lazy assigned values. Anyway, it has no way to be `NULL`. + +# Data Evolution Mode + +## What is data evolution mode +Data Evolution Mode is a new feature for Paimon's append tables that revolutionizes how you handle schema evolution, particularly when adding new columns. +This mode allows you to update partial columns without rewriting entire data files. +Instead, it writes new column data to separate files and intelligently merges them with the original data during read operations. + + +## Key Features and Benefits +The data evolution mode offers significant advantages for your data lake architecture: + +* Efficient Partial Column Updates: With this mode, you can use Spark's MERGE INTO statement to update a subset of columns. This avoids the high I/O cost of rewriting the whole file, as only the updated columns are written. + +* Reduced File Rewrites: In scenarios with frequent schema changes, such as adding new columns, the traditional method requires constant file rewriting. Data evolution mode eliminates this overhead by appending new column data to dedicated files. This approach is much more efficient and reduces the burden on your storage system. + +* Optimized Read Performance: The new mode is designed for seamless data retrieval. During query execution, Paimon's engine efficiently combines the original data with the new column data, ensuring that read performance remains uncompromised. The merge process is highly optimized, so your queries run just as fast as they would on a single, consolidated file. + +## Enabling Data Evolution Mode +To enable data evolution, you must enable row-tracking and set the `data-evolution.enabled` property to `true` when creating an append table. This ensures that the table is ready for efficient schema evolution operations. +Use Spark Sql as an example: +```sql +CREATE TABLE target (a INT, b INT, c STRING) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true') +``` + +## Partially update columns +Now we could only support spark 'MERGE INTO' statement to update partial columns. +```sql +MERGE INTO t +USING s +ON t.id = s.id +WHEN MATCHED THEN UPDATE SET t.b = s.b +WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11) +``` +This statement updates only the `b` column in the target table `t` based on the matching records from the source table `s`. The `id` column and `c` column remain unchanged, and new records are inserted with the specified values. + +Note that: +* Data Evolution Table does not support 'Delete' statement yet +* Merge Into for Data Evolution Table does not support 'WHEN NOT MATCHED BY SOURCE' clause +* Only Spark version greater than 3.5.0 is supported for Data Evolution Table \ No newline at end of file diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java index 46bfb1a68b..f4e11fb685 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java @@ -188,7 +188,10 @@ public class DataEvolutionSplitRead implements SplitRead<InternalRow> { String formatIdentifier = DataFilePathFactory.formatIdentifier(file.fileName()); long schemaId = file.schemaId(); TableSchema dataSchema = schemaManager.schema(schemaId).project(file.writeCols()); - int[] fieldIds = dataSchema.fields().stream().mapToInt(DataField::id).toArray(); + int[] fieldIds = + rowTypeWithRowLineage(dataSchema.logicalRowType()).getFields().stream() + .mapToInt(DataField::id) + .toArray(); List<DataField> readFields = new ArrayList<>(); for (int j = 0; j < readFieldIndex.length; j++) { for (int fieldId : fieldIds) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java index 22ea9b7f3d..39fa2d9ce7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java @@ -45,7 +45,7 @@ public class DataEvolutionSplitGenerator implements SplitGenerator { @Override public boolean alwaysRawConvertible() { - return true; + return false; } @Override @@ -57,8 +57,17 @@ public class DataEvolutionSplitGenerator implements SplitGenerator { file.stream().mapToLong(DataFileMeta::fileSize).sum(), openFileCost); return BinPacking.packForOrdered(files, weightFunc, targetSplitSize).stream() - .flatMap(Collection::stream) - .map(SplitGroup::rawConvertibleGroup) + .map( + f -> { + boolean rawConvertible = f.stream().allMatch(file -> file.size() == 1); + List<DataFileMeta> groupFiles = + f.stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()); + return rawConvertible + ? SplitGroup.rawConvertibleGroup(groupFiles) + : SplitGroup.nonRawConvertibleGroup(groupFiles); + }) .collect(Collectors.toList()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java index bcf2e9fa1b..5f227b6a66 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java @@ -19,11 +19,14 @@ package org.apache.paimon.table.source.splitread; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.operation.DataEvolutionSplitRead; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.utils.LazyField; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.function.Supplier; /** A {@link SplitReadProvider} to create {@link DataEvolutionSplitRead}. */ @@ -49,21 +52,20 @@ public class DataEvolutionSplitReadProvider implements SplitReadProvider { return false; } - Long firstRowId = null; + Set<Long> firstRowIds = new HashSet<>(); for (DataFileMeta file : files) { Long current = file.firstRowId(); - if (current == null) { + if (current == null + || !file.fileSource().isPresent() + || file.fileSource().get() != FileSource.APPEND) { return false; } - if (firstRowId == null) { - firstRowId = current; - } else if (!firstRowId.equals(current)) { - return false; - } + firstRowIds.add(current); } - return true; + // If all files have a distinct first row id, we don't need to merge fields + return firstRowIds.size() != files.size(); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java index 6be13e8e7e..75e4319bc1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java @@ -30,7 +30,6 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.DataTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.ReadonlyTable; -import org.apache.paimon.table.SpecialFields; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.DataTableScan; import org.apache.paimon.table.source.InnerTableRead; @@ -49,6 +48,7 @@ import java.util.Map; import java.util.Optional; import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER; +import static org.apache.paimon.table.SpecialFields.rowTypeWithRowLineage; /** A {@link Table} for reading row id of table. */ public class RowLineageTable implements DataTable, ReadonlyTable { @@ -101,7 +101,7 @@ public class RowLineageTable implements DataTable, ReadonlyTable { @Override public RowType rowType() { - return SpecialFields.rowTypeWithRowLineage(wrapped.rowType()); + return rowTypeWithRowLineage(wrapped.rowType()); } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java index b53a33481a..7e44afe6fb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.table.source.splitread; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.operation.DataEvolutionSplitRead; import org.apache.paimon.table.source.DataSplit; @@ -27,6 +28,7 @@ import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; +import java.util.Optional; import java.util.function.Supplier; import static org.assertj.core.api.Assertions.assertThat; @@ -121,6 +123,8 @@ public class DataEvolutionSplitReadProviderTest { DataFileMeta file1 = mock(DataFileMeta.class); DataFileMeta file2 = mock(DataFileMeta.class); + when(file1.fileSource()).thenReturn(Optional.of(FileSource.APPEND)); + when(file2.fileSource()).thenReturn(Optional.of(FileSource.APPEND)); when(file1.firstRowId()).thenReturn(100L); when(file2.firstRowId()).thenReturn(100L); when(split.dataFiles()).thenReturn(Arrays.asList(file1, file2)); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java index 8f2e442045..b6a5ea361c 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java @@ -80,11 +80,7 @@ public class ParquetRowDataBuilder @Override public void write(InternalRow record) { - try { - this.writer.write(record); - } catch (Exception e) { - throw new RuntimeException(e); - } + this.writer.write(record); } } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/DataEvolutionSparkTableWrite.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/DataEvolutionSparkTableWrite.scala new file mode 100644 index 0000000000..3ee3c4a5b7 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/DataEvolutionSparkTableWrite.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark + +import org.apache.paimon.data.{BinaryRow, InternalRow} +import org.apache.paimon.disk.IOManager +import org.apache.paimon.io.{CompactIncrement, DataIncrement} +import org.apache.paimon.operation.{AbstractFileStoreWrite, AppendFileStoreWrite} +import org.apache.paimon.spark.util.SparkRowUtils +import org.apache.paimon.table.FileStoreTable +import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessageImpl, CommitMessageSerializer, TableWriteImpl} +import org.apache.paimon.types.RowType +import org.apache.paimon.utils.RecordWriter + +import org.apache.spark.sql.Row + +import java.util.Collections + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + +case class DataEvolutionSparkTableWrite( + writeBuilder: BatchWriteBuilder, + writeType: RowType, + firstRowIdToPartitionMap: mutable.HashMap[Long, Tuple2[BinaryRow, Long]]) + extends SparkTableWriteTrait { + + private var currentWriter: PerFileWriter = _ + private val ioManager: IOManager = SparkUtils.createIOManager + private val rowIdIndex = writeType.getFieldCount + private val firstRowIdIndex = rowIdIndex + 1 + private val commitMessageImpls = ListBuffer[CommitMessageImpl]() + + private val toPaimonRow = { + SparkRowUtils.toPaimonRow(writeType, -1) + } + + def write(row: Row): Unit = { + val firstRowId = row.getLong(firstRowIdIndex) + val rowId = row.getLong(rowIdIndex) + + if (currentWriter == null || !currentWriter.matchFirstRowId(firstRowId)) { + newCurrentWriter(firstRowId) + } + + currentWriter.write(toPaimonRow(row), rowId) + } + + def newCurrentWriter(firstRowId: Long): Unit = { + finishCurrentWriter() + val (partition, numRecords) = firstRowIdToPartitionMap.getOrElse(firstRowId, null) + if (partition == null) { + throw new IllegalArgumentException( + s"First row ID $firstRowId not found in partition map. " + + s"Available first row IDs: ${firstRowIdToPartitionMap.keys.mkString(", ")}") + } + + val writer = writeBuilder + .newWrite() + .withWriteType(writeType) + .asInstanceOf[TableWriteImpl[InternalRow]] + .getWrite + .asInstanceOf[AbstractFileStoreWrite[InternalRow]] + .createWriter(partition, 0) + currentWriter = PerFileWriter(partition, firstRowId, writer, numRecords) + } + + def finishCurrentWriter(): Unit = { + if (currentWriter != null) { + commitMessageImpls.append(currentWriter.finish()) + } + currentWriter = null + } + + def write(row: Row, bucket: Int): Unit = { + throw new UnsupportedOperationException( + "DataEvolutionSparkTableWrite does not support writing with bucket.") + } + + def finish(): Iterator[Array[Byte]] = { + finishCurrentWriter() + var bytesWritten = 0L + var recordsWritten = 0L + val commitMessages = new ListBuffer[Array[Byte]]() + val serializer = new CommitMessageSerializer() + commitMessageImpls.foreach { + message: CommitMessageImpl => + message.newFilesIncrement().newFiles().asScala.foreach { + dataFileMeta => + bytesWritten += dataFileMeta.fileSize() + recordsWritten += dataFileMeta.rowCount() + } + commitMessages += serializer.serialize(message) + } + reportOutputMetrics(bytesWritten, recordsWritten) + commitMessages.iterator + } + + override def close(): Unit = { + ioManager.close() + } + + private case class PerFileWriter( + partition: BinaryRow, + firstRowId: Long, + recordWriter: RecordWriter[InternalRow], + numRecords: Long) { + + var numWritten = 0 + + def matchFirstRowId(firstRowId: Long): Boolean = { + this.firstRowId == firstRowId + } + + def write(row: InternalRow, rowId: Long): Unit = { + assert(rowId == firstRowId + numWritten, "Row ID does not match expected.") + numWritten += 1 + recordWriter.write(row) + } + + def finish(): CommitMessageImpl = { + try { + assert( + numRecords == numWritten, + s"Number of written records $numWritten does not match expected number $numRecords for first row ID $firstRowId.") + val result = recordWriter.prepareCommit(false) + val dataFiles = result.newFilesIncrement().newFiles() + assert(dataFiles.size() == 1, "This is a bug, PerFileWriter could only produce one file") + val dataFileMeta = dataFiles.get(0).assignFirstRowId(firstRowId) + new CommitMessageImpl( + partition, + 0, + null, + new DataIncrement( + java.util.Arrays.asList(dataFileMeta), + Collections.emptyList(), + Collections.emptyList()), + CompactIncrement.emptyIncrement()) + } finally { + recordWriter.close() + } + } + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala index 1cd524490d..1ac9aab47b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala @@ -34,7 +34,7 @@ case class SparkTableWrite( writeType: RowType, rowKindColIdx: Int = -1, writeRowLineage: Boolean = false) - extends AutoCloseable { + extends SparkTableWriteTrait { private val ioManager: IOManager = SparkUtils.createIOManager @@ -81,14 +81,4 @@ case class SparkTableWrite( write.close() ioManager.close() } - - private def reportOutputMetrics(bytesWritten: Long, recordsWritten: Long): Unit = { - val taskContext = TaskContext.get - if (taskContext != null) { - PaimonUtils.updateOutputMetrics( - taskContext.taskMetrics.outputMetrics, - bytesWritten, - recordsWritten) - } - } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWriteTrait.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWriteTrait.scala new file mode 100644 index 0000000000..f603d4c3ef --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWriteTrait.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark + +import org.apache.spark.TaskContext +import org.apache.spark.sql.{PaimonUtils, Row} + +trait SparkTableWriteTrait extends AutoCloseable { + + def write(row: Row): Unit + + def write(row: Row, bucket: Int) + + def finish(): Iterator[Array[Byte]] + + def reportOutputMetrics(bytesWritten: Long, recordsWritten: Long): Unit = { + val taskContext = TaskContext.get + if (taskContext != null) { + PaimonUtils.updateOutputMetrics( + taskContext.taskMetrics.outputMetrics, + bytesWritten, + recordsWritten) + } + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala index 578b4e4dce..4a895a517d 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala @@ -38,6 +38,10 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper { table.getTable match { case paimonTable: FileStoreTable => val relation = PaimonRelation.getPaimonRelation(d.table) + if (paimonTable.coreOptions().dataEvolutionEnabled()) { + throw new RuntimeException( + "Delete operation is not supported when data evolution is enabled yet.") + } DeleteFromPaimonTableCommand(relation, paimonTable, condition) case _ => diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala index bc6c98df89..654ae24daa 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala @@ -20,7 +20,7 @@ package org.apache.paimon.spark.catalyst.analysis import org.apache.paimon.spark.SparkTable import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper -import org.apache.paimon.spark.commands.MergeIntoPaimonTable +import org.apache.paimon.spark.commands.{MergeIntoPaimonDataEvolutionTable, MergeIntoPaimonTable} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, SubqueryExpression} @@ -67,15 +67,27 @@ trait PaimonMergeIntoBase merge.notMatchedActions.map(checkAndAlignActionAssignment(_, targetOutput)) val alignedNotMatchedBySourceActions = resolveNotMatchedBySourceActions(merge, targetOutput) - MergeIntoPaimonTable( - v2Table, - merge.targetTable, - merge.sourceTable, - merge.mergeCondition, - alignedMatchedActions, - alignedNotMatchedActions, - alignedNotMatchedBySourceActions - ) + if (v2Table.coreOptions.dataEvolutionEnabled()) { + MergeIntoPaimonDataEvolutionTable( + v2Table, + merge.targetTable, + merge.sourceTable, + merge.mergeCondition, + alignedMatchedActions, + alignedNotMatchedActions, + alignedNotMatchedBySourceActions + ) + } else { + MergeIntoPaimonTable( + v2Table, + merge.targetTable, + merge.sourceTable, + merge.mergeCondition, + alignedMatchedActions, + alignedNotMatchedActions, + alignedNotMatchedBySourceActions + ) + } } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala index ad3912ddb7..63d19379cc 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala @@ -47,6 +47,10 @@ object PaimonUpdateTable } val relation = PaimonRelation.getPaimonRelation(u.table) + if (paimonTable.coreOptions().dataEvolutionEnabled()) { + throw new RuntimeException( + "Update operation is not supported when data evolution is enabled yet.") + } UpdatePaimonTableCommand( relation, paimonTable, diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala new file mode 100644 index 0000000000..d1c47ce4df --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.commands + +import org.apache.paimon.CoreOptions +import org.apache.paimon.data.BinaryRow +import org.apache.paimon.spark.DataEvolutionSparkTableWrite +import org.apache.paimon.spark.commands.DataEvolutionPaimonWriter.{deserializeCommitMessage, dynamicOp} +import org.apache.paimon.spark.write.WriteHelper +import org.apache.paimon.table.FileStoreTable +import org.apache.paimon.table.sink._ + +import org.apache.spark.sql._ + +import java.io.IOException +import java.util.Collections + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable) extends WriteHelper { + + private lazy val firstRowIdToPartitionMap: mutable.HashMap[Long, Tuple2[BinaryRow, Long]] = + initPartitionMap() + override val table: FileStoreTable = paimonTable.copy(dynamicOp) + + @transient private lazy val serializer = new CommitMessageSerializer + + private def initPartitionMap(): mutable.HashMap[Long, Tuple2[BinaryRow, Long]] = { + val firstRowIdToPartitionMap = new mutable.HashMap[Long, Tuple2[BinaryRow, Long]] + table + .store() + .newScan() + .readFileIterator() + .forEachRemaining( + k => + firstRowIdToPartitionMap + .put(k.file().firstRowId(), Tuple2.apply(k.partition(), k.file().rowCount()))) + firstRowIdToPartitionMap + } + + def writePartialFields(data: DataFrame, columnNames: Seq[String]): Seq[CommitMessage] = { + val sparkSession = data.sparkSession + import sparkSession.implicits._ + assert(data.columns.length == columnNames.size + 2) + val writeType = table.rowType().project(columnNames.asJava) + + val written = + data.mapPartitions { + iter => + { + val write = DataEvolutionSparkTableWrite( + table.newBatchWriteBuilder(), + writeType, + firstRowIdToPartitionMap) + try { + iter.foreach(row => write.write(row)) + write.finish() + } finally { + write.close() + } + } + } + + written + .collect() + .map(deserializeCommitMessage(serializer, _)) + .toSeq + } +} + +object DataEvolutionPaimonWriter { + final private val dynamicOp = + Collections.singletonMap(CoreOptions.TARGET_FILE_SIZE.key(), "99999 G") + def apply(table: FileStoreTable): DataEvolutionPaimonWriter = { + new DataEvolutionPaimonWriter(table) + } + + private def deserializeCommitMessage( + serializer: CommitMessageSerializer, + bytes: Array[Byte]): CommitMessage = { + try { + serializer.deserialize(serializer.getVersion, bytes) + } catch { + case e: IOException => + throw new RuntimeException("Failed to deserialize CommitMessage's object", e) + } + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala new file mode 100644 index 0000000000..fe47ce45c3 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.commands + +import org.apache.paimon.spark.SparkTable +import org.apache.paimon.spark.catalyst.analysis.PaimonRelation +import org.apache.paimon.spark.catalyst.analysis.PaimonUpdateTable.toColumn +import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand +import org.apache.paimon.spark.schema.PaimonMetadataColumn +import org.apache.paimon.spark.util.ScanPlanHelper.createNewPlan +import org.apache.paimon.table.FileStoreTable +import org.apache.paimon.table.sink.CommitMessage +import org.apache.paimon.table.source.DataSplit + +import org.apache.spark.sql.{Dataset, Row, SparkSession} +import org.apache.spark.sql.PaimonUtils._ +import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolver +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} +import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftOuter} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Keep +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.types.StructType + +import scala.collection.JavaConverters._ +import scala.collection.Searching.{search, Found, InsertionPoint} +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + +/** Command for Merge Into for Data Evolution paimom table. */ +case class MergeIntoPaimonDataEvolutionTable( + v2Table: SparkTable, + targetTable: LogicalPlan, + sourceTable: LogicalPlan, + matchedCondition: Expression, + matchedActions: Seq[MergeAction], + notMatchedActions: Seq[MergeAction], + notMatchedBySourceActions: Seq[MergeAction]) + extends PaimonLeafRunnableCommand + with WithFileStoreTable { + + private lazy val partialColumnWriter = DataEvolutionPaimonWriter(table) + private lazy val writer = PaimonSparkWriter(table) + + assert( + notMatchedBySourceActions.isEmpty, + "notMatchedBySourceActions is not supported in MergeIntoPaimonDataEvolutionTable.") + assert( + matchedActions.forall(x => x.isInstanceOf[UpdateAction]), + "Only SET clause is supported in MergeIntoPaimonDataEvolutionTable for SQL: WHEN MATCHED.") + assert( + notMatchedActions.forall(x => x.isInstanceOf[InsertAction]), + "Only INSERT clause is supported in MergeIntoPaimonDataEvolutionTable for SQL: WHEN NOT MATCHED." + ) + + import MergeIntoPaimonDataEvolutionTable._ + + override val table: FileStoreTable = v2Table.getTable.asInstanceOf[FileStoreTable] + private val firstRowIds: Seq[Long] = table + .newSnapshotReader() + .withManifestEntryFilter(entry => entry.file().firstRowId() != null) + .read() + .splits() + .asScala + .map(_.asInstanceOf[DataSplit]) + .flatMap(split => split.dataFiles().asScala.map(s => s.firstRowId().asInstanceOf[Long])) + .distinct + .sorted + .toSeq + + lazy val targetRelation: DataSourceV2Relation = PaimonRelation.getPaimonRelation(targetTable) + lazy val sourceRelation: DataSourceV2Relation = PaimonRelation.getPaimonRelation(sourceTable) + + lazy val tableSchema: StructType = v2Table.schema + + override def run(sparkSession: SparkSession): Seq[Row] = { + // Avoid that more than one source rows match the same target row. + val commitMessages = invokeMergeInto(sparkSession) + writer.commit(commitMessages.toSeq) + Seq.empty[Row] + } + + private def invokeMergeInto(sparkSession: SparkSession): Seq[CommitMessage] = { + // step 1: find the related data split, make it target file plan + val dataSplits: Seq[DataSplit] = targetRelatedSplits(sparkSession) + val touchedFileTargetRelation = + createNewPlan(dataSplits.toSeq, targetRelation) + + // step 2: invoke update action + val updateCommit = + if (matchedActions.nonEmpty) + updateActionInvoke(sparkSession, touchedFileTargetRelation) + else Nil + + // step 3: invoke insert action + val insertCommit = + if (notMatchedActions.nonEmpty) + insertActionInvoke(sparkSession, touchedFileTargetRelation) + else Nil + + updateCommit ++ insertCommit + } + + private def targetRelatedSplits(sparkSession: SparkSession): Seq[DataSplit] = { + val targetDss = createDataset( + sparkSession, + targetRelation + ) + val sourceDss = createDataset(sparkSession, sourceRelation) + + val firstRowIdsTouched = mutable.Set.empty[Long] + + firstRowIdsTouched ++= findRelatedFirstRowIds( + targetDss.alias("_left").join(sourceDss, toColumn(matchedCondition), "inner"), + sparkSession, + "_left." + ROW_ID_NAME) + + table + .newSnapshotReader() + .withManifestEntryFilter( + entry => + entry.file().firstRowId() != null && firstRowIdsTouched.contains( + entry.file().firstRowId())) + .read() + .splits() + .asScala + .map(_.asInstanceOf[DataSplit]) + .toSeq + } + + private def updateActionInvoke( + sparkSession: SparkSession, + touchedFileTargetRelation: DataSourceV2Relation): Seq[CommitMessage] = { + val mergeFields = extractFields(matchedCondition) + val allFields = mutable.SortedSet.empty[AttributeReference]( + (o1, o2) => { + o1.toString().compareTo(o2.toString()) + }) ++ mergeFields + val updateColumns = mutable.Set[AttributeReference]() + for (action <- matchedActions) { + action match { + case updateAction: UpdateAction => + for (assignment <- updateAction.assignments) { + if (!assignment.key.equals(assignment.value)) { + val key = assignment.key.asInstanceOf[AttributeReference] + updateColumns ++= Seq(key) + } + } + } + } + + val updateColumnsSorted = updateColumns.toSeq.sortBy( + s => targetTable.output.map(x => x.toString()).indexOf(s.toString())) + + val assignments = redundantColumns.map(column => Assignment(column, column)) + val output = updateColumnsSorted ++ redundantColumns + val realUpdateActions = matchedActions + .map(s => s.asInstanceOf[UpdateAction]) + .map( + update => + UpdateAction.apply( + update.condition, + update.assignments.filter( + a => + updateColumnsSorted.contains( + a.key.asInstanceOf[AttributeReference])) ++ assignments)) + + for (action <- realUpdateActions) { + allFields ++= action.references.flatMap(r => extractFields(r)).seq + } + + val allReadFieldsOnTarget = allFields.filter( + field => + targetTable.output.exists( + attr => attr.toString().equals(field.toString()))) ++ redundantColumns + val allReadFieldsOnSource = allFields.filter( + field => sourceTable.output.exists(attr => attr.toString().equals(field.toString()))) + + val targetReadPlan = + touchedFileTargetRelation.copy(targetRelation.table, allReadFieldsOnTarget.toSeq) + val targetTableProjExprs = targetReadPlan.output :+ Alias(TrueLiteral, ROW_FROM_TARGET)() + val targetTableProj = Project(targetTableProjExprs, targetReadPlan) + + val sourceReadPlan = sourceRelation.copy(sourceRelation.table, allReadFieldsOnSource.toSeq) + val sourceTableProjExprs = sourceReadPlan.output :+ Alias(TrueLiteral, ROW_FROM_SOURCE)() + val sourceTableProj = Project(sourceTableProjExprs, sourceReadPlan) + + val joinPlan = + Join(targetTableProj, sourceTableProj, LeftOuter, Some(matchedCondition), JoinHint.NONE) + + val rowFromSourceAttr = attribute(ROW_FROM_SOURCE, joinPlan) + val rowFromTargetAttr = attribute(ROW_FROM_TARGET, joinPlan) + + val mergeRows = MergeRows( + isSourceRowPresent = rowFromSourceAttr, + isTargetRowPresent = rowFromTargetAttr, + matchedInstructions = realUpdateActions + .map( + action => { + Keep(action.condition.getOrElse(TrueLiteral), action.assignments.map(a => a.value)) + }) ++ Seq(Keep(TrueLiteral, output)), + notMatchedInstructions = Nil, + notMatchedBySourceInstructions = Seq(Keep(TrueLiteral, output)).toSeq, + checkCardinality = false, + output = output, + child = joinPlan + ) + + val firstRowIdsFinal = firstRowIds + val firstRowIdUdf = udf((rowId: Long) => floorBinarySearch(firstRowIdsFinal, rowId)) + val firstRowIdColumn = firstRowIdUdf(col(ROW_ID_NAME)) + val toWrite = + createDataset(sparkSession, mergeRows).withColumn(FIRST_ROW_ID_NAME, firstRowIdColumn) + assert(toWrite.schema.fields.length == updateColumnsSorted.size + 2) + val sortedDs = toWrite + .repartitionByRange(firstRowIdColumn) + .sortWithinPartitions(FIRST_ROW_ID_NAME, ROW_ID_NAME) + partialColumnWriter.writePartialFields(sortedDs, updateColumnsSorted.map(_.name)) + } + + private def insertActionInvoke( + sparkSession: SparkSession, + touchedFileTargetRelation: DataSourceV2Relation): Seq[CommitMessage] = { + val mergeFields = extractFields(matchedCondition) + val allReadFieldsOnTarget = + mergeFields.filter(field => targetTable.output.exists(attr => attr.equals(field))) + + val targetReadPlan = + touchedFileTargetRelation.copy(targetRelation.table, allReadFieldsOnTarget.toSeq) + + val joinPlan = + Join(sourceRelation, targetReadPlan, LeftAnti, Some(matchedCondition), JoinHint.NONE) + + // merge rows as there are multiple not matched actions + val mergeRows = MergeRows( + isSourceRowPresent = TrueLiteral, + isTargetRowPresent = FalseLiteral, + matchedInstructions = Nil, + notMatchedInstructions = notMatchedActions.map { + case insertAction: InsertAction => + Keep( + insertAction.condition.getOrElse(TrueLiteral), + insertAction.assignments.map(a => a.value)) + }.toSeq, + notMatchedBySourceInstructions = Nil, + checkCardinality = false, + output = targetTable.output, + child = joinPlan + ) + + val toWrite = createDataset(sparkSession, mergeRows) + writer.write(toWrite) + } + + private def findRelatedFirstRowIds( + dataset: Dataset[Row], + sparkSession: SparkSession, + identifier: String): Array[Long] = { + import sparkSession.implicits._ + val firstRowIdsFinal = firstRowIds + val firstRowIdUdf = udf((rowId: Long) => floorBinarySearch(firstRowIdsFinal, rowId)) + dataset + .select(firstRowIdUdf(col(identifier))) + .distinct() + .as[Long] + .collect() + } + + private def extractFields(expression: Expression): Seq[AttributeReference] = { + val fields = new ListBuffer[AttributeReference]() + + def traverse(expr: Expression): Unit = { + expr match { + case attr: AttributeReference => + fields += attr + case other => + other.children.foreach(traverse) + } + } + + traverse(expression) + fields.distinct.toSeq + } + + private def attribute(name: String, plan: LogicalPlan) = + plan.output.find(attr => resolver(name, attr.name)).get + +} + +object MergeIntoPaimonDataEvolutionTable { + final private val ROW_FROM_SOURCE = "__row_from_source" + final private val ROW_FROM_TARGET = "__row_from_target" + final private val ROW_ID_NAME = "_ROW_ID" + final private val FIRST_ROW_ID_NAME = "_FIRST_ROW_ID"; + final private val redundantColumns = + Seq(PaimonMetadataColumn.ROW_ID.toAttribute) + + def floorBinarySearch(sortedSeq: Seq[Long], value: Long): Long = { + val indexed = sortedSeq.toIndexedSeq + + if (indexed.isEmpty) { + throw new IllegalArgumentException("The input sorted sequence is empty.") + } + + indexed.search(value) match { + case Found(foundIndex) => indexed(foundIndex) + case InsertionPoint(insertionIndex) => { + if (insertionIndex == 0) { + throw new IllegalArgumentException( + s"Value $value is less than the first element in the sorted sequence.") + } else { + indexed(insertionIndex - 1) + } + } + } + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala index 3a8a301734..3697b4c695 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala @@ -424,4 +424,5 @@ object MergeIntoPaimonTable { .map(notDeletedInternalRow => fromRow(outputProj(notDeletedInternalRow))) } } + } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala index 4bf6b6c743..0acb25e58b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala @@ -22,6 +22,7 @@ import org.apache.paimon.spark.SparkTypeUtils import org.apache.paimon.table.SpecialFields import org.apache.paimon.types.DataField +import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.connector.catalog.MetadataColumn import org.apache.spark.sql.types.{DataType, IntegerType, LongType, StringType, StructField, StructType} @@ -48,8 +49,8 @@ object PaimonMetadataColumn { val FILE_PATH_COLUMN = "__paimon_file_path" val PARTITION_COLUMN = "__paimon_partition" val BUCKET_COLUMN = "__paimon_bucket" - val ROW_ID_COLUMN = SpecialFields.ROW_ID.name() - val SEQUENCE_NUMBER_COLUMN = SpecialFields.SEQUENCE_NUMBER.name() + val ROW_ID_COLUMN: String = SpecialFields.ROW_ID.name() + val SEQUENCE_NUMBER_COLUMN: String = SpecialFields.SEQUENCE_NUMBER.name() val DV_META_COLUMNS: Seq[String] = Seq(FILE_PATH_COLUMN, ROW_INDEX_COLUMN) val ROW_LINEAGE_META_COLUMNS: Seq[String] = Seq(ROW_ID_COLUMN, SEQUENCE_NUMBER_COLUMN) @@ -79,8 +80,6 @@ object PaimonMetadataColumn { def dvMetaCols: Seq[PaimonMetadataColumn] = Seq(FILE_PATH, ROW_INDEX) - def rowLineageMetaCols: Seq[PaimonMetadataColumn] = Seq(ROW_ID, SEQUENCE_NUMBER) - def get(metadataColumn: String, partitionType: StructType): PaimonMetadataColumn = { metadataColumn match { case ROW_INDEX_COLUMN => ROW_INDEX diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala index 5f8e260254..4fa553e5b7 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala @@ -38,17 +38,22 @@ trait ScanPlanHelper extends SQLConfHelper { dataSplits: Seq[DataSplit], relation: DataSourceV2Relation, condition: Option[Expression] = None): LogicalPlan = { - val newRelation = relation.table match { + val newRelation = createNewPlan(dataSplits, relation) + condition match { + case Some(c) if c != TrueLiteral => Filter(c, newRelation) + case _ => newRelation + } + } + + def createNewPlan( + dataSplits: Seq[DataSplit], + relation: DataSourceV2Relation): DataSourceV2Relation = { + relation.table match { case sparkTable @ SparkTable(table: InnerTable) => val knownSplitsTable = KnownSplitsTable.create(table, dataSplits.toArray) relation.copy(table = sparkTable.copy(table = knownSplitsTable)) case _ => throw new RuntimeException() } - - condition match { - case Some(c) if c != TrueLiteral => Filter(c, newRelation) - case _ => newRelation - } } def selectWithDvMeta(data: DataFrame): DataFrame = { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala index 1753cfb654..970c517113 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala @@ -198,6 +198,121 @@ abstract class RowLineageTestBase extends PaimonSparkTestBase { } } + test("Data Evolution: insert into table with data-evolution") { + if (gteqSpark3_5) { + withTable("s", "t") { + sql("CREATE TABLE s (id INT, b INT)") + sql("INSERT INTO s VALUES (1, 11), (2, 22)") + + sql( + "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11) + |""".stripMargin) + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 11, 11, 2, 2), Row(2, 2, 2, 0, 1), Row(3, 3, 3, 1, 1)) + ) + } + } + } + + test("Data Evolution: merge into table with data-evolution") { + if (gteqSpark3_5) { + withTable("s", "t") { + sql("CREATE TABLE s (id INT, b INT)") + sql("INSERT INTO s VALUES (1, 11), (2, 22)") + + sql( + "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET t.b = s.b + |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11) + |""".stripMargin) + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 11, 11, 2, 2), Row(2, 22, 2, 0, 2), Row(3, 3, 3, 1, 2)) + ) + } + } + } + + test("Data Evolution: merge into table with data-evolution complex") { + if (gteqSpark3_5) { + withTable("source", "target") { + sql("CREATE TABLE source (a INT, b INT, c STRING)") + sql( + "INSERT INTO source VALUES (1, 100, 'c11'), (3, 300, 'c33'), (5, 500, 'c55'), (7, 700, 'c77'), (9, 900, 'c99')") + + sql( + "CREATE TABLE target (a INT, b INT, c STRING) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") + sql( + "INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2'), (3, 30, 'c3'), (4, 40, 'c4'), (5, 50, 'c5')") + + sql(s""" + |MERGE INTO target + |USING source + |ON target.a = source.a + |WHEN MATCHED AND target.a = 5 THEN UPDATE SET b = source.b + target.b + |WHEN MATCHED AND source.c > 'c2' THEN UPDATE SET b = source.b, c = source.c + |WHEN NOT MATCHED AND c > 'c9' THEN INSERT (a, b, c) VALUES (a, b * 1.1, c) + |WHEN NOT MATCHED THEN INSERT * + |""".stripMargin) + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM target ORDER BY a"), + Seq( + Row(1, 10, "c1", 0, 2), + Row(2, 20, "c2", 1, 2), + Row(3, 300, "c33", 2, 2), + Row(4, 40, "c4", 3, 2), + Row(5, 550, "c5", 4, 2), + Row(7, 700, "c77", 5, 2), + Row(9, 990, "c99", 6, 2)) + ) + } + } + } + + test("Data Evolution: update table throws exception") { + if (gteqSpark3_5) { + withTable("t") { + sql( + "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") + assert( + intercept[RuntimeException] { + sql("UPDATE t SET b = 22") + }.getMessage + .contains("Update operation is not supported when data evolution is enabled yet.")) + } + } + } + + test("Data Evolution: delete table throws exception") { + if (gteqSpark3_5) { + withTable("t") { + sql( + "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") + assert( + intercept[RuntimeException] { + sql("DELETE FROM t WHERE id = 2") + }.getMessage + .contains("Delete operation is not supported when data evolution is enabled yet.")) + } + } + } + test("Row Lineage: merge into table not matched by source") { if (gteqSpark3_4) { withTable("source", "target") {