This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 815dc698a [flink] Fixed the issue where deletes could not be performed 
when the table is in dynamic bucket mode. (#3214)
815dc698a is described below

commit 815dc698aad955b7b178fb944d5036cda147e925
Author: sai <[email protected]>
AuthorDate: Tue Apr 16 10:50:20 2024 +0800

    [flink] Fixed the issue where deletes could not be performed when the table 
is in dynamic bucket mode. (#3214)
---
 .../shortcodes/generated/core_configuration.html   |  2 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |  8 +++-
 .../src/main/java/org/apache/paimon/fs/FileIO.java |  8 ++--
 .../org/apache/paimon/schema/SchemaValidation.java | 54 ++++++++++++----------
 .../org/apache/paimon/schema/TableSchemaTest.java  | 19 ++++++++
 .../apache/paimon/flink/sink/FlinkTableSink.java   |  7 ++-
 .../SupportsRowLevelOperationFlinkTableSink.java   |  4 +-
 .../paimon/flink/DynamicBucketTableITCase.java     |  6 ++-
 8 files changed, 75 insertions(+), 33 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 4c42bbfbb..901a3b66b 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -36,7 +36,7 @@ under the License.
             <td><h5>bucket</h5></td>
             <td style="word-wrap: break-word;">-1</td>
             <td>Integer</td>
-            <td>Bucket number for file store.</td>
+            <td>Bucket number for file store.<br />It should either be equal 
to -1 (dynamic bucket mode), or it must be greater than 0 (fixed bucket 
mode).</td>
         </tr>
         <tr>
             <td><h5>bucket-key</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 37bd0c217..327e6f231 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -79,7 +79,13 @@ public class CoreOptions implements Serializable {
             key("bucket")
                     .intType()
                     .defaultValue(-1)
-                    .withDescription("Bucket number for file store.");
+                    .withDescription(
+                            Description.builder()
+                                    .text("Bucket number for file store.")
+                                    .linebreak()
+                                    .text(
+                                            "It should either be equal to -1 
(dynamic bucket mode), or it must be greater than 0 (fixed bucket mode).")
+                                    .build());
 
     @Immutable
     public static final ConfigOption<String> BUCKET_KEY =
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
index ae8ed60eb..55aa9b825 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
@@ -337,9 +337,9 @@ public interface FileIO extends Serializable {
         List<IOException> ioExceptionList = new ArrayList<>();
 
         // load preferIO
-        FileIOLoader perferIOLoader = config.preferIO();
+        FileIOLoader preferIOLoader = config.preferIO();
         try {
-            loader = checkAccess(perferIOLoader, path, config);
+            loader = checkAccess(preferIOLoader, path, config);
         } catch (IOException ioException) {
             ioExceptionList.add(ioException);
         }
@@ -403,10 +403,10 @@ public interface FileIO extends Serializable {
         if (loader == null) {
             String fallbackMsg = "";
             String preferMsg = "";
-            if (perferIOLoader != null) {
+            if (preferIOLoader != null) {
                 preferMsg =
                         " "
-                                + perferIOLoader.getClass().getSimpleName()
+                                + preferIOLoader.getClass().getSimpleName()
                                 + " also cannot access this path.";
             }
             if (fallbackIO != null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 722e27de9..fab5f31fd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -87,6 +87,8 @@ public class SchemaValidation {
 
         CoreOptions options = new CoreOptions(schema.options());
 
+        validateBucket(schema, options);
+
         validateDefaultValues(schema);
 
         validateStartupMode(options);
@@ -156,18 +158,6 @@ public class SchemaValidation {
                                             f, KEY_FIELD_PREFIX));
                         });
 
-        if (options.bucket() == -1 && options.toMap().get(BUCKET_KEY.key()) != 
null) {
-            throw new RuntimeException(
-                    "Cannot define 'bucket-key' in unaware or dynamic bucket 
mode.");
-        }
-
-        if (options.bucket() == -1
-                && schema.primaryKeys().isEmpty()
-                && options.toMap().get(FULL_COMPACTION_DELTA_COMMITS.key()) != 
null) {
-            throw new RuntimeException(
-                    "AppendOnlyTable of unware or dynamic bucket does not 
support 'full-compaction.delta-commits'");
-        }
-
         if (schema.primaryKeys().isEmpty() && 
options.streamingReadOverwrite()) {
             throw new RuntimeException(
                     "Doesn't support streaming read the changes from overwrite 
when the primary keys are not defined.");
@@ -180,17 +170,7 @@ public class SchemaValidation {
             }
         }
 
-        if (schema.crossPartitionUpdate()) {
-            if (options.bucket() != -1) {
-                throw new IllegalArgumentException(
-                        String.format(
-                                "You should use dynamic bucket (bucket = -1) 
mode in cross partition update case "
-                                        + "(Primary key constraint %s not 
include all partition fields %s).",
-                                schema.primaryKeys(), schema.partitionKeys()));
-            }
-        }
-
-        if (options.mergeEngine() == CoreOptions.MergeEngine.FIRST_ROW) {
+        if (options.mergeEngine() == MergeEngine.FIRST_ROW) {
             if (options.changelogProducer() != ChangelogProducer.LOOKUP) {
                 throw new IllegalArgumentException(
                         "Only support 'lookup' changelog-producer on 
FIRST_MERGE merge engine");
@@ -522,7 +502,7 @@ public class SchemaValidation {
                                 field);
                     });
 
-            if (options.mergeEngine() == CoreOptions.MergeEngine.FIRST_ROW) {
+            if (options.mergeEngine() == MergeEngine.FIRST_ROW) {
                 throw new IllegalArgumentException(
                         "Do not support use sequence field on FIRST_MERGE 
merge engine.");
             }
@@ -536,4 +516,30 @@ public class SchemaValidation {
             }
         }
     }
+
+    private static void validateBucket(TableSchema schema, CoreOptions 
options) {
+        int bucket = options.bucket();
+        if (bucket == -1) {
+            if (options.toMap().get(BUCKET_KEY.key()) != null) {
+                throw new RuntimeException(
+                        "Cannot define 'bucket-key' in unaware or dynamic 
bucket mode.");
+            }
+
+            if (schema.primaryKeys().isEmpty()
+                    && 
options.toMap().get(FULL_COMPACTION_DELTA_COMMITS.key()) != null) {
+                throw new RuntimeException(
+                        "AppendOnlyTable of unware or dynamic bucket does not 
support 'full-compaction.delta-commits'");
+            }
+        } else if (bucket < 1) {
+            throw new RuntimeException("The number of buckets needs to be 
greater than 0.");
+        } else {
+            if (schema.crossPartitionUpdate()) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "You should use dynamic bucket (bucket = -1) 
mode in cross partition update case "
+                                        + "(Primary key constraint %s not 
include all partition fields %s).",
+                                schema.primaryKeys(), schema.partitionKeys()));
+            }
+        }
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
index 0927c9e91..cf09d1ce4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
@@ -179,6 +179,25 @@ public class TableSchemaTest {
                 .hasMessageContaining("Field fake_col can not be found in 
table schema.");
     }
 
+    @Test
+    public void testBucket() {
+        List<DataField> fields =
+                Arrays.asList(
+                        new DataField(0, "f0", DataTypes.INT()),
+                        new DataField(1, "f1", DataTypes.INT()),
+                        new DataField(2, "f2", DataTypes.INT()));
+        List<String> partitionKeys = Collections.singletonList("f0");
+        List<String> primaryKeys = Collections.singletonList("f1");
+        Map<String, String> options = new HashMap<>();
+
+        TableSchema schema =
+                new TableSchema(1, fields, 10, partitionKeys, primaryKeys, 
options, "");
+
+        options.put(BUCKET.key(), "-2");
+        assertThatThrownBy(() -> validateTableSchema(schema))
+                .hasMessageContaining("The number of buckets needs to be 
greater than 0.");
+    }
+
     static RowType newRowType(boolean isNullable, int fieldId) {
         return new RowType(
                 isNullable,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
index b1211b0e7..734441843 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.flink.log.LogStoreTableFactory;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
 
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.sink.abilities.SupportsTruncate;
@@ -41,6 +42,10 @@ public class FlinkTableSink extends 
SupportsRowLevelOperationFlinkTableSink
 
     @Override
     public void executeTruncation() {
-        table.newBatchWriteBuilder().newCommit().truncateTable();
+        try (BatchTableCommit batchTableCommit = 
table.newBatchWriteBuilder().newCommit()) {
+            batchTableCommit.truncateTable();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
index c81804a23..95f33bac8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
@@ -58,6 +58,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
 import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
 import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
@@ -192,7 +193,8 @@ public abstract class 
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
     }
 
     private boolean canPushDownDeleteFilter() {
-        return deletePredicate == null || deleteIsDropPartition() || 
deleteInSingleNode();
+        return -1 != Options.fromMap(table.options()).get(BUCKET)
+                && (deletePredicate == null || deleteIsDropPartition() || 
deleteInSingleNode());
     }
 
     private boolean deleteIsDropPartition() {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
index c4f928ef4..bb27851f1 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
@@ -57,7 +57,7 @@ public class DynamicBucketTableITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testWriteRead() {
+    public void testWriteReadDelete() {
         sql("INSERT INTO T VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), 
(1, 5, 5)");
         assertThat(sql("SELECT * FROM T"))
                 .containsExactlyInAnyOrder(
@@ -74,6 +74,10 @@ public class DynamicBucketTableITCase extends 
CatalogITCaseBase {
                         Row.of(1, 3, 33),
                         Row.of(1, 4, 4),
                         Row.of(1, 5, 5));
+        sql("DELETE FROM T WHERE pt = 1 AND pk = 3");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 1, 11), Row.of(1, 2, 2), Row.of(1, 4, 4), 
Row.of(1, 5, 5));
 
         assertThat(sql("SELECT DISTINCT bucket FROM T$files"))
                 .containsExactlyInAnyOrder(Row.of(0), Row.of(1));

Reply via email to