This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 90fb473e9 [core] Fix partition table get index RowType error in
IndexBootstrap (#3885)
90fb473e9 is described below
commit 90fb473e907c5f97b914c69e748026518a3cf3fa
Author: zhangdonghao <[email protected]>
AuthorDate: Tue Aug 6 09:36:42 2024 +0800
[core] Fix partition table get index RowType error in IndexBootstrap (#3885)
---
.../paimon/crosspartition/IndexBootstrap.java | 1 +
.../paimon/crosspartition/IndexBootstrapTest.java | 44 +++++++++++++++-------
.../flink/GlobalDynamicBucketTableITCase.java | 32 ++++++++++++++++
3 files changed, 63 insertions(+), 14 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
index ec8244a2a..cc64d9549 100644
---
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
+++
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
@@ -142,6 +142,7 @@ public class IndexBootstrap implements Serializable {
new ArrayList<>(
schema.projectedLogicalRowType(
Stream.concat(primaryKeys.stream(),
partitionKeys.stream())
+ .distinct()
.collect(Collectors.toList()))
.getFields());
bootstrapFields.add(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
index bbb1abfd3..d9d8e1f69 100644
---
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
@@ -27,10 +27,12 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.junit.jupiter.api.Test;
@@ -43,6 +45,7 @@ import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
+import static org.apache.paimon.crosspartition.IndexBootstrap.BUCKET_FIELD;
import static org.apache.paimon.crosspartition.IndexBootstrap.filterSplit;
import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS;
@@ -53,20 +56,7 @@ public class IndexBootstrapTest extends TableTestBase {
@Test
public void testBoostrap() throws Exception {
- Identifier identifier = identifier("T");
- Options options = new Options();
- options.set(CoreOptions.BUCKET, -1);
- Schema schema =
- Schema.newBuilder()
- .column("pt", DataTypes.INT())
- .column("col", DataTypes.INT())
- .column("pk", DataTypes.INT())
- .primaryKey("pk")
- .partitionKeys("pt")
- .options(options.toMap())
- .build();
- catalog.createTable(identifier, schema, true);
- Table table = catalog.getTable(identifier);
+ Table table = createTable();
write(
table,
@@ -106,6 +96,32 @@ public class IndexBootstrapTest extends TableTestBase {
Thread.sleep(1000);
}
+ private Table createTable() throws Exception {
+ Identifier identifier = identifier("T");
+ Options options = new Options();
+ options.set(CoreOptions.BUCKET, -1);
+ Schema schema =
+ Schema.newBuilder()
+ .column("pt", DataTypes.INT())
+ .column("col", DataTypes.INT())
+ .column("pk", DataTypes.INT())
+ .primaryKey("pk")
+ .partitionKeys("pt")
+ .options(options.toMap())
+ .build();
+ catalog.createTable(identifier, schema, true);
+ return catalog.getTable(identifier);
+ }
+
+ @Test
+ public void testBootstrapType() throws Exception {
+ FileStoreTable table = (FileStoreTable) createTable();
+ RowType indexRowType = IndexBootstrap.bootstrapType(table.schema());
+ assertThat(indexRowType.getFieldNames()).contains(BUCKET_FIELD);
+ assertThat(indexRowType.getFieldIndex(BUCKET_FIELD))
+ .isEqualTo(2); // the last field is bucket, which is not in
table schema
+ }
+
@Test
public void testFilterSplit() {
assertThat(filterSplit(newSplit(newFile(100), newFile(200)), 50,
230)).isTrue();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
index c908efc4e..aaf28b3de 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
@@ -18,12 +18,21 @@
package org.apache.paimon.flink;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.crosspartition.IndexBootstrap;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.FileStoreTable;
+
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
+import static org.apache.paimon.crosspartition.IndexBootstrap.BUCKET_FIELD;
import static org.assertj.core.api.Assertions.assertThat;
/** ITCase for batch file store. */
@@ -161,4 +170,27 @@ public class GlobalDynamicBucketTableITCase extends
CatalogITCaseBase {
sql("insert into large_t select * from src");
assertThat(sql("select k, count(*) from large_t group by k having
count(*) > 1")).isEmpty();
}
+
+ @Test
+ public void testBootstrapType() throws Exception {
+ Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(new Path(path)));
+ FileStoreTable t = (FileStoreTable)
catalog.getTable(Identifier.create("default", "T"));
+ FileStoreTable partialUpdateT =
+ (FileStoreTable) catalog.getTable(Identifier.create("default",
"partial_update_t"));
+ FileStoreTable firstRowT =
+ (FileStoreTable) catalog.getTable(Identifier.create("default",
"first_row_t"));
+
assertThat(IndexBootstrap.bootstrapType(t.schema()).getFieldNames()).contains(BUCKET_FIELD);
+
assertThat(IndexBootstrap.bootstrapType(partialUpdateT.schema()).getFieldNames())
+ .contains(BUCKET_FIELD);
+
assertThat(IndexBootstrap.bootstrapType(firstRowT.schema()).getFieldNames())
+ .contains(BUCKET_FIELD);
+
assertThat(IndexBootstrap.bootstrapType(t.schema()).getFieldIndex(BUCKET_FIELD))
+ .isEqualTo(2);
+ assertThat(
+ IndexBootstrap.bootstrapType(partialUpdateT.schema())
+ .getFieldIndex(BUCKET_FIELD))
+ .isEqualTo(2);
+
assertThat(IndexBootstrap.bootstrapType(firstRowT.schema()).getFieldIndex(BUCKET_FIELD))
+ .isEqualTo(2);
+ }
}