This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b8802e3a5 [flink][spark] Delete in flink should produce changelog no 
matter what (#2594)
b8802e3a5 is described below

commit b8802e3a5e31ed28abed935097387ec4422d6869
Author: YeJunHao <[email protected]>
AuthorDate: Sat May 4 20:54:10 2024 +0800

    [flink][spark] Delete in flink should produce changelog no matter what 
(#2594)
---
 .../shortcodes/generated/core_configuration.html   |  6 +++++
 .../main/java/org/apache/paimon/CoreOptions.java   | 13 ++++++++++
 .../SupportsRowLevelOperationFlinkTableSink.java   |  5 ++--
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 27 ++++++++++++++++++++
 .../commands/DeleteFromPaimonTableCommand.scala    | 10 +++++++-
 .../paimon/spark/sql/DeleteFromTableTest.scala     | 29 ++++++++++++++++++++++
 6 files changed, 87 insertions(+), 3 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 468da3f5a..bdc6371b3 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -176,6 +176,12 @@ under the License.
             <td>Duration</td>
             <td>The TTL in rocksdb index for cross partition upsert (primary 
keys not contain all partition fields), this can avoid maintaining too many 
indexes and lead to worse and worse performance, but please note that this may 
also cause data duplication.</td>
         </tr>
+        <tr>
+            <td><h5>delete.force-produce-changelog</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Force produce changelog in delete sql no matter what if 
changelog producer is not NONE.</td>
+        </tr>
         <tr>
             <td><h5>deletion-vectors.enabled</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index d883a0107..9c0d3bf3b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1104,6 +1104,14 @@ public class CoreOptions implements Serializable {
                             "Whether to enable deletion vectors mode. In this 
mode, index files containing deletion"
                                     + " vectors are generated when data is 
written, which marks the data for deletion."
                                     + " During read operations, by applying 
these index files, merging can be avoided.");
+
+    public static final ConfigOption<Boolean> DELETION_FORCE_PRODUCE_CHANGELOG 
=
+            key("delete.force-produce-changelog")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Force produce changelog in delete sql no matter 
what if changelog producer is not NONE.");
+
     public static final ConfigOption<RangeStrategy> SORT_RANG_STRATEGY =
             key("sort-compaction.range-strategy")
                     .enumType(RangeStrategy.class)
@@ -1749,6 +1757,11 @@ public class CoreOptions implements Serializable {
         return options.get(FILE_INDEX_READ_ENABLED);
     }
 
+    public boolean deleteForceProduceChangelog() {
+        return options.get(DELETION_FORCE_PRODUCE_CHANGELOG)
+                && changelogProducer() != CoreOptions.ChangelogProducer.NONE;
+    }
+
     /** Specifies the merge engine for table with primary key. */
     public enum MergeEngine implements DescribedEnum {
         DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
index 95f33bac8..8c490fb9d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
@@ -58,7 +58,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
-import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
 import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
 import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
@@ -193,7 +192,9 @@ public abstract class 
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
     }
 
     private boolean canPushDownDeleteFilter() {
-        return -1 != Options.fromMap(table.options()).get(BUCKET)
+        CoreOptions coreOptions = CoreOptions.fromMap(table.options());
+        return -1 != coreOptions.bucket()
+                && !coreOptions.deleteForceProduceChangelog()
                 && (deletePredicate == null || deleteIsDropPartition() || 
deleteInSingleNode());
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index ac67e87dd..83b8c80fd 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -28,6 +28,8 @@ import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -437,6 +439,31 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
         iterator.close();
     }
 
+    @ParameterizedTest
+    @ValueSource(strings = {"lookup", "input"})
+    public void testDeletePartitionWithChangelog(String producer) throws 
Exception {
+        sql(
+                "CREATE TABLE delete_table (pt INT, pk INT, v STRING, PRIMARY 
KEY(pt, pk) NOT ENFORCED) PARTITIONED BY (pt)   "
+                        + "WITH ('changelog-producer' = '"
+                        + producer
+                        + "', 'delete.force-produce-changelog'='true', 
'bucket'='1')");
+        BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT * 
FROM delete_table");
+
+        sql("INSERT INTO delete_table VALUES (1, 1, 'A'), (2, 2, 'B')");
+        assertThat(iterator.collect(2))
+                .containsExactlyInAnyOrder(
+                        Row.ofKind(RowKind.INSERT, 1, 1, "A"),
+                        Row.ofKind(RowKind.INSERT, 2, 2, "B"));
+        sql("DELETE FROM delete_table WHERE pt = 1");
+        assertThat(iterator.collect(1))
+                .containsExactlyInAnyOrder(Row.ofKind(RowKind.DELETE, 1, 1, 
"A"));
+        sql("INSERT INTO delete_table VALUES (1, 1, 'B')");
+
+        assertThat(iterator.collect(1))
+                .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 1, 1, 
"B"));
+        iterator.close();
+    }
+
     @Test
     public void testScanFromOldSchema() throws InterruptedException {
         sql("CREATE TABLE select_old (f0 INT PRIMARY KEY NOT ENFORCED, f1 
STRING)");
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 6ce042e41..cbd365f88 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -19,7 +19,10 @@
 package org.apache.paimon.spark.commands
 
 import org.apache.paimon.CoreOptions
+import org.apache.paimon.options.Options
 import org.apache.paimon.partition.PartitionPredicate
+import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor
+import org.apache.paimon.spark.{InsertInto, SparkTable}
 import org.apache.paimon.spark.PaimonSplitScan
 import org.apache.paimon.spark.catalyst.Compatibility
 import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
@@ -74,7 +77,12 @@ case class DeleteFromPaimonTableCommand(
           ignoreFailure = true)
       }
 
-      if (otherCondition.isEmpty && partitionPredicate.nonEmpty) {
+      if (
+        otherCondition.isEmpty && partitionPredicate.nonEmpty && !table
+          .store()
+          .options()
+          .deleteForceProduceChangelog()
+      ) {
         val matchedPartitions =
           
table.newSnapshotReader().withPartitionFilter(partitionPredicate.get).partitions().asScala
         val rowDataPartitionComputer = new RowDataPartitionComputer(
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
index 7c76dd236..0117c5f95 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
@@ -318,6 +318,35 @@ abstract class DeleteFromTableTestBase extends 
PaimonSparkTestBase {
     val rows4 = spark.sql("SELECT * FROM T ORDER BY id").collectAsList()
     assertThat(rows4.toString).isEqualTo("[]")
   }
+
+  test(s"test delete producer changelog") {
+    spark.sql(
+      s"""
+         |CREATE TABLE T (id INT, name STRING, dt STRING, hh STRING)
+         |TBLPROPERTIES ('primary-key' = 'id, dt, hh', 'merge-engine' = 
'deduplicate', 'changelog-producer'='input', 
'delete.force-produce-changelog'='true')
+         |PARTITIONED BY (dt, hh)
+         |""".stripMargin)
+
+    spark.sql(
+      "INSERT INTO T VALUES " +
+        "(1, 'a', '2023-10-01', '12')," +
+        "(2, 'b', '2023-10-01', '12')," +
+        "(3, 'c', '2023-10-02', '12')," +
+        "(4, 'd', '2023-10-02', '13')," +
+        "(5, 'e', '2023-10-02', '14')," +
+        "(6, 'f', '2023-10-02', '15')")
+
+    // delete isn't drop partition
+    spark.sql("DELETE FROM T WHERE name = 'a' and hh = '12'")
+    assertThat(spark.sql("SELECT * FROM `T$audit_log` WHERE 
rowkind='-D'").collectAsList().size())
+      .isEqualTo(1)
+
+    // delete is drop partition
+    spark.sql("DELETE FROM T WHERE hh = '12'")
+    assertThat(spark.sql("SELECT * FROM `T$audit_log` WHERE 
rowkind='-D'").collectAsList().size())
+      .isEqualTo(3)
+
+  }
 }
 
 class DeleteFromTableTest extends DeleteFromTableTestBase {}

Reply via email to