This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 90fb473e9 [core] Fix partition table get index RowType error in 
IndexBootstrap (#3885)
90fb473e9 is described below

commit 90fb473e907c5f97b914c69e748026518a3cf3fa
Author: zhangdonghao <[email protected]>
AuthorDate: Tue Aug 6 09:36:42 2024 +0800

    [core] Fix partition table get index RowType error in IndexBootstrap (#3885)
---
 .../paimon/crosspartition/IndexBootstrap.java      |  1 +
 .../paimon/crosspartition/IndexBootstrapTest.java  | 44 +++++++++++++++-------
 .../flink/GlobalDynamicBucketTableITCase.java      | 32 ++++++++++++++++
 3 files changed, 63 insertions(+), 14 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
index ec8244a2a..cc64d9549 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
@@ -142,6 +142,7 @@ public class IndexBootstrap implements Serializable {
                 new ArrayList<>(
                         schema.projectedLogicalRowType(
                                         Stream.concat(primaryKeys.stream(), 
partitionKeys.stream())
+                                                .distinct()
                                                 .collect(Collectors.toList()))
                                 .getFields());
         bootstrapFields.add(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
index bbb1abfd3..d9d8e1f69 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
@@ -27,10 +27,12 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableTestBase;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Pair;
 
 import org.junit.jupiter.api.Test;
@@ -43,6 +45,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.function.Consumer;
 
+import static org.apache.paimon.crosspartition.IndexBootstrap.BUCKET_FIELD;
 import static org.apache.paimon.crosspartition.IndexBootstrap.filterSplit;
 import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
 import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS;
@@ -53,20 +56,7 @@ public class IndexBootstrapTest extends TableTestBase {
 
     @Test
     public void testBoostrap() throws Exception {
-        Identifier identifier = identifier("T");
-        Options options = new Options();
-        options.set(CoreOptions.BUCKET, -1);
-        Schema schema =
-                Schema.newBuilder()
-                        .column("pt", DataTypes.INT())
-                        .column("col", DataTypes.INT())
-                        .column("pk", DataTypes.INT())
-                        .primaryKey("pk")
-                        .partitionKeys("pt")
-                        .options(options.toMap())
-                        .build();
-        catalog.createTable(identifier, schema, true);
-        Table table = catalog.getTable(identifier);
+        Table table = createTable();
 
         write(
                 table,
@@ -106,6 +96,32 @@ public class IndexBootstrapTest extends TableTestBase {
         Thread.sleep(1000);
     }
 
+    private Table createTable() throws Exception {
+        Identifier identifier = identifier("T");
+        Options options = new Options();
+        options.set(CoreOptions.BUCKET, -1);
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pt", DataTypes.INT())
+                        .column("col", DataTypes.INT())
+                        .column("pk", DataTypes.INT())
+                        .primaryKey("pk")
+                        .partitionKeys("pt")
+                        .options(options.toMap())
+                        .build();
+        catalog.createTable(identifier, schema, true);
+        return catalog.getTable(identifier);
+    }
+
+    @Test
+    public void testBootstrapType() throws Exception {
+        FileStoreTable table = (FileStoreTable) createTable();
+        RowType indexRowType = IndexBootstrap.bootstrapType(table.schema());
+        assertThat(indexRowType.getFieldNames()).contains(BUCKET_FIELD);
+        assertThat(indexRowType.getFieldIndex(BUCKET_FIELD))
+                .isEqualTo(2); // the last field is bucket, which is not in 
table schema
+    }
+
     @Test
     public void testFilterSplit() {
         assertThat(filterSplit(newSplit(newFile(100), newFile(200)), 50, 
230)).isTrue();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
index c908efc4e..aaf28b3de 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
@@ -18,12 +18,21 @@
 
 package org.apache.paimon.flink;
 
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.crosspartition.IndexBootstrap;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.FileStoreTable;
+
 import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.paimon.crosspartition.IndexBootstrap.BUCKET_FIELD;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** ITCase for batch file store. */
@@ -161,4 +170,27 @@ public class GlobalDynamicBucketTableITCase extends 
CatalogITCaseBase {
         sql("insert into large_t select * from src");
         assertThat(sql("select k, count(*) from large_t group by k having 
count(*) > 1")).isEmpty();
     }
+
+    @Test
+    public void testBootstrapType() throws Exception {
+        Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(path)));
+        FileStoreTable t = (FileStoreTable) 
catalog.getTable(Identifier.create("default", "T"));
+        FileStoreTable partialUpdateT =
+                (FileStoreTable) catalog.getTable(Identifier.create("default", 
"partial_update_t"));
+        FileStoreTable firstRowT =
+                (FileStoreTable) catalog.getTable(Identifier.create("default", 
"first_row_t"));
+        
assertThat(IndexBootstrap.bootstrapType(t.schema()).getFieldNames()).contains(BUCKET_FIELD);
+        
assertThat(IndexBootstrap.bootstrapType(partialUpdateT.schema()).getFieldNames())
+                .contains(BUCKET_FIELD);
+        
assertThat(IndexBootstrap.bootstrapType(firstRowT.schema()).getFieldNames())
+                .contains(BUCKET_FIELD);
+        
assertThat(IndexBootstrap.bootstrapType(t.schema()).getFieldIndex(BUCKET_FIELD))
+                .isEqualTo(2);
+        assertThat(
+                        IndexBootstrap.bootstrapType(partialUpdateT.schema())
+                                .getFieldIndex(BUCKET_FIELD))
+                .isEqualTo(2);
+        
assertThat(IndexBootstrap.bootstrapType(firstRowT.schema()).getFieldIndex(BUCKET_FIELD))
+                .isEqualTo(2);
+    }
 }

Reply via email to