This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 815dc698a [flink] Fixed the issue where deletes could not be performed
when the table is in dynamic bucket mode. (#3214)
815dc698a is described below
commit 815dc698aad955b7b178fb944d5036cda147e925
Author: sai <[email protected]>
AuthorDate: Tue Apr 16 10:50:20 2024 +0800
[flink] Fixed the issue where deletes could not be performed when the table
is in dynamic bucket mode. (#3214)
---
.../shortcodes/generated/core_configuration.html | 2 +-
.../main/java/org/apache/paimon/CoreOptions.java | 8 +++-
.../src/main/java/org/apache/paimon/fs/FileIO.java | 8 ++--
.../org/apache/paimon/schema/SchemaValidation.java | 54 ++++++++++++----------
.../org/apache/paimon/schema/TableSchemaTest.java | 19 ++++++++
.../apache/paimon/flink/sink/FlinkTableSink.java | 7 ++-
.../SupportsRowLevelOperationFlinkTableSink.java | 4 +-
.../paimon/flink/DynamicBucketTableITCase.java | 6 ++-
8 files changed, 75 insertions(+), 33 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 4c42bbfbb..901a3b66b 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -36,7 +36,7 @@ under the License.
<td><h5>bucket</h5></td>
<td style="word-wrap: break-word;">-1</td>
<td>Integer</td>
- <td>Bucket number for file store.</td>
+ <td>Bucket number for file store.<br />It should either be equal
to -1 (dynamic bucket mode), or it must be greater than 0 (fixed bucket
mode).</td>
</tr>
<tr>
<td><h5>bucket-key</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 37bd0c217..327e6f231 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -79,7 +79,13 @@ public class CoreOptions implements Serializable {
key("bucket")
.intType()
.defaultValue(-1)
- .withDescription("Bucket number for file store.");
+ .withDescription(
+ Description.builder()
+ .text("Bucket number for file store.")
+ .linebreak()
+ .text(
+ "It should either be equal to -1
(dynamic bucket mode), or it must be greater than 0 (fixed bucket mode).")
+ .build());
@Immutable
public static final ConfigOption<String> BUCKET_KEY =
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
index ae8ed60eb..55aa9b825 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
@@ -337,9 +337,9 @@ public interface FileIO extends Serializable {
List<IOException> ioExceptionList = new ArrayList<>();
// load preferIO
- FileIOLoader perferIOLoader = config.preferIO();
+ FileIOLoader preferIOLoader = config.preferIO();
try {
- loader = checkAccess(perferIOLoader, path, config);
+ loader = checkAccess(preferIOLoader, path, config);
} catch (IOException ioException) {
ioExceptionList.add(ioException);
}
@@ -403,10 +403,10 @@ public interface FileIO extends Serializable {
if (loader == null) {
String fallbackMsg = "";
String preferMsg = "";
- if (perferIOLoader != null) {
+ if (preferIOLoader != null) {
preferMsg =
" "
- + perferIOLoader.getClass().getSimpleName()
+ + preferIOLoader.getClass().getSimpleName()
+ " also cannot access this path.";
}
if (fallbackIO != null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 722e27de9..fab5f31fd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -87,6 +87,8 @@ public class SchemaValidation {
CoreOptions options = new CoreOptions(schema.options());
+ validateBucket(schema, options);
+
validateDefaultValues(schema);
validateStartupMode(options);
@@ -156,18 +158,6 @@ public class SchemaValidation {
f, KEY_FIELD_PREFIX));
});
- if (options.bucket() == -1 && options.toMap().get(BUCKET_KEY.key()) !=
null) {
- throw new RuntimeException(
- "Cannot define 'bucket-key' in unaware or dynamic bucket
mode.");
- }
-
- if (options.bucket() == -1
- && schema.primaryKeys().isEmpty()
- && options.toMap().get(FULL_COMPACTION_DELTA_COMMITS.key()) !=
null) {
- throw new RuntimeException(
- "AppendOnlyTable of unware or dynamic bucket does not
support 'full-compaction.delta-commits'");
- }
-
if (schema.primaryKeys().isEmpty() &&
options.streamingReadOverwrite()) {
throw new RuntimeException(
"Doesn't support streaming read the changes from overwrite
when the primary keys are not defined.");
@@ -180,17 +170,7 @@ public class SchemaValidation {
}
}
- if (schema.crossPartitionUpdate()) {
- if (options.bucket() != -1) {
- throw new IllegalArgumentException(
- String.format(
- "You should use dynamic bucket (bucket = -1)
mode in cross partition update case "
- + "(Primary key constraint %s not
include all partition fields %s).",
- schema.primaryKeys(), schema.partitionKeys()));
- }
- }
-
- if (options.mergeEngine() == CoreOptions.MergeEngine.FIRST_ROW) {
+ if (options.mergeEngine() == MergeEngine.FIRST_ROW) {
if (options.changelogProducer() != ChangelogProducer.LOOKUP) {
throw new IllegalArgumentException(
"Only support 'lookup' changelog-producer on
FIRST_MERGE merge engine");
@@ -522,7 +502,7 @@ public class SchemaValidation {
field);
});
- if (options.mergeEngine() == CoreOptions.MergeEngine.FIRST_ROW) {
+ if (options.mergeEngine() == MergeEngine.FIRST_ROW) {
throw new IllegalArgumentException(
"Do not support use sequence field on FIRST_MERGE
merge engine.");
}
@@ -536,4 +516,30 @@ public class SchemaValidation {
}
}
}
+
+ private static void validateBucket(TableSchema schema, CoreOptions
options) {
+ int bucket = options.bucket();
+ if (bucket == -1) {
+ if (options.toMap().get(BUCKET_KEY.key()) != null) {
+ throw new RuntimeException(
+ "Cannot define 'bucket-key' in unaware or dynamic
bucket mode.");
+ }
+
+ if (schema.primaryKeys().isEmpty()
+ &&
options.toMap().get(FULL_COMPACTION_DELTA_COMMITS.key()) != null) {
+ throw new RuntimeException(
+ "AppendOnlyTable of unware or dynamic bucket does not
support 'full-compaction.delta-commits'");
+ }
+ } else if (bucket < 1) {
+ throw new RuntimeException("The number of buckets needs to be
greater than 0.");
+ } else {
+ if (schema.crossPartitionUpdate()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "You should use dynamic bucket (bucket = -1)
mode in cross partition update case "
+ + "(Primary key constraint %s not
include all partition fields %s).",
+ schema.primaryKeys(), schema.partitionKeys()));
+ }
+ }
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
index 0927c9e91..cf09d1ce4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
@@ -179,6 +179,25 @@ public class TableSchemaTest {
.hasMessageContaining("Field fake_col can not be found in
table schema.");
}
+ @Test
+ public void testBucket() {
+ List<DataField> fields =
+ Arrays.asList(
+ new DataField(0, "f0", DataTypes.INT()),
+ new DataField(1, "f1", DataTypes.INT()),
+ new DataField(2, "f2", DataTypes.INT()));
+ List<String> partitionKeys = Collections.singletonList("f0");
+ List<String> primaryKeys = Collections.singletonList("f1");
+ Map<String, String> options = new HashMap<>();
+
+ TableSchema schema =
+ new TableSchema(1, fields, 10, partitionKeys, primaryKeys,
options, "");
+
+ options.put(BUCKET.key(), "-2");
+ assertThatThrownBy(() -> validateTableSchema(schema))
+ .hasMessageContaining("The number of buckets needs to be
greater than 0.");
+ }
+
static RowType newRowType(boolean isNullable, int fieldId) {
return new RowType(
isNullable,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
index b1211b0e7..734441843 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.sink.abilities.SupportsTruncate;
@@ -41,6 +42,10 @@ public class FlinkTableSink extends
SupportsRowLevelOperationFlinkTableSink
@Override
public void executeTruncation() {
- table.newBatchWriteBuilder().newCommit().truncateTable();
+ try (BatchTableCommit batchTableCommit =
table.newBatchWriteBuilder().newCommit()) {
+ batchTableCommit.truncateTable();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
index c81804a23..95f33bac8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
@@ -58,6 +58,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
+import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
@@ -192,7 +193,8 @@ public abstract class
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
}
private boolean canPushDownDeleteFilter() {
- return deletePredicate == null || deleteIsDropPartition() ||
deleteInSingleNode();
+ return -1 != Options.fromMap(table.options()).get(BUCKET)
+ && (deletePredicate == null || deleteIsDropPartition() ||
deleteInSingleNode());
}
private boolean deleteIsDropPartition() {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
index c4f928ef4..bb27851f1 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
@@ -57,7 +57,7 @@ public class DynamicBucketTableITCase extends
CatalogITCaseBase {
}
@Test
- public void testWriteRead() {
+ public void testWriteReadDelete() {
sql("INSERT INTO T VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4),
(1, 5, 5)");
assertThat(sql("SELECT * FROM T"))
.containsExactlyInAnyOrder(
@@ -74,6 +74,10 @@ public class DynamicBucketTableITCase extends
CatalogITCaseBase {
Row.of(1, 3, 33),
Row.of(1, 4, 4),
Row.of(1, 5, 5));
+ sql("DELETE FROM T WHERE pt = 1 AND pk = 3");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 1, 11), Row.of(1, 2, 2), Row.of(1, 4, 4),
Row.of(1, 5, 5));
assertThat(sql("SELECT DISTINCT bucket FROM T$files"))
.containsExactlyInAnyOrder(Row.of(0), Row.of(1));