This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 6fb528f2 [FLINK-30710] Fix invalid field id for nested type in spark
catalog
6fb528f2 is described below
commit 6fb528f234e6d33bc3797ec23e8a7b7206a7a063
Author: shammon <[email protected]>
AuthorDate: Tue Jan 17 15:51:40 2023 +0800
[FLINK-30710] Fix invalid field id for nested type in spark catalog
This closes #486
---
.../flink/table/store/spark/SparkTypeUtils.java | 7 ++++++-
.../flink/table/store/spark/SparkReadITCase.java | 19 +++++++++++++++++++
.../apache/flink/table/store/spark/SparkTypeTest.java | 7 ++++++-
3 files changed, 31 insertions(+), 2 deletions(-)
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
index 7264c167..6dc51c2b 100644
---
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
@@ -50,6 +50,7 @@ import org.apache.spark.sql.types.UserDefinedType;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
/** Utils for spark {@link DataType}. */
public class SparkTypeUtils {
@@ -191,6 +192,8 @@ public class SparkTypeUtils {
private static class SparkToFlinkTypeVisitor {
+ private final AtomicInteger currentIndex = new AtomicInteger(0);
+
static org.apache.flink.table.store.types.DataType visit(DataType
type) {
return visit(type, new SparkToFlinkTypeVisitor());
}
@@ -238,7 +241,9 @@ public class SparkTypeUtils {
org.apache.flink.table.store.types.DataType fieldType =
fieldResults.get(i).copy(field.nullable());
String comment = field.getComment().getOrElse(() -> null);
- newFields.add(new DataField(i, field.name(), fieldType,
comment));
+ newFields.add(
+ new DataField(
+ currentIndex.getAndIncrement(), field.name(),
fieldType, comment));
}
return new RowType(newFields);
diff --git
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 87e583f5..bab448f8 100644
---
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -487,4 +487,23 @@ public class SparkReadITCase extends SparkReadTestBase {
.isEqualTo(
"[[3,WrappedArray(true, false),2],
[4,WrappedArray(true, false, true),3]]");
}
+
+ @Test
+ public void testCreateNestedField() {
+ spark.sql(
+ "CREATE TABLE tablestore.default.nested_table ( a INT, b
STRUCT<b1: STRUCT<b11: INT, b12 INT>, b2 BIGINT>)");
+ assertThat(
+ spark.sql("SHOW CREATE TABLE
tablestore.default.nested_table")
+ .collectAsList()
+ .toString())
+ .isEqualTo(
+ String.format(
+ "[[CREATE TABLE nested_table (\n"
+ + " `a` INT,\n"
+ + " `b` STRUCT<`b1`: STRUCT<`b11`:
INT, `b12`: INT>, `b2`: BIGINT>)\n"
+ + "TBLPROPERTIES(\n"
+ + " 'path' = '%s')\n"
+ + "]]",
+ new Path(warehousePath,
"default.db/nested_table")));
+ }
}
diff --git
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
index 4ea93aaa..50ab8940 100644
---
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
+++
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.store.types.RowType;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Test;
+import java.util.concurrent.atomic.AtomicInteger;
+
import static
org.apache.flink.table.store.spark.SparkTypeUtils.fromFlinkRowType;
import static org.apache.flink.table.store.spark.SparkTypeUtils.toFlinkType;
import static org.assertj.core.api.Assertions.assertThat;
@@ -32,7 +34,10 @@ import static org.assertj.core.api.Assertions.assertThat;
public class SparkTypeTest {
public static final RowType ALL_TYPES =
- RowType.builder()
+ RowType.builder(
+ true,
+ new AtomicInteger(
+ 1)) // posX and posY have field id 0 and
1, here we start from 2
.field("id", DataTypes.INT().notNull())
.field("name", DataTypes.STRING()) /* optional by default
*/
.field("salary", DataTypes.DOUBLE().notNull())