wuchong commented on code in PR #2219: URL: https://github.com/apache/fluss/pull/2219#discussion_r2644994042
########## fluss-spark/fluss-spark-common/pom.xml: ########## @@ -0,0 +1,77 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.fluss</groupId> + <artifactId>fluss-spark</artifactId> + <version>0.9-SNAPSHOT</version> + </parent> + + <artifactId>fluss-spark-common</artifactId> Review Comment: Should we add a Scala version suffix to the artifact ID (also for `fluss-spark-3.4` and `fluss-spark-3.5` modules)? This would ensure that the published JARs automatically include the Scala version in their artifact names during Maven deployment, following standard Scala cross-build conventions. ########## pom.xml: ########## @@ -458,6 +464,20 @@ </build> </profile> + <profile> + <id>spark3</id> + <modules> + <module>fluss-spark/fluss-spark-3.5</module> + <module>fluss-spark/fluss-spark-3.4</module> + </modules> + <activation> + <activeByDefault>true</activeByDefault> + <property> + <name>spark3</name> + </property> + </activation> + </profile> Review Comment: I think we can enable these modues by default? So that the license checker pipeline can verify these modules as well. ########## fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/SparkToFlussTypeVisitor.scala: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.types + +import org.apache.fluss.types.{ArrayType => FlussArrayType, DataField => FlussDataField, DataType => FlussDataType, MapType => FlussMapType, _} +import org.apache.spark.sql.types.{ArrayType => SparkArrayType, DataType => SparkDataType, MapType => SparkMapType, StructType, UserDefinedType} + +import scala.collection.JavaConverters._ + +object SparkToFlussTypeVisitor { + + def visit(dataType: SparkDataType): FlussDataType = { + dataType match { + case st: StructType => + visitStructType(st) + case mt: SparkMapType => + visitMapType(mt) + case at: SparkArrayType => + visitArrayType(at) + case _: UserDefinedType[_] => + throw new UnsupportedOperationException("User-defined type is not supported"); + case t => + visitPrimitiveType(t) + } + } + + private def visitStructType(st: StructType): RowType = { + val flussDataFields = st.fields.map { + field => + val flussDataType = visit(field.dataType) + new FlussDataField(field.name, flussDataType, field.getComment().orNull) + } + new RowType(flussDataFields.toList.asJava) + } + + private def visitMapType(mt: SparkMapType): FlussMapType = { + new FlussMapType(visit(mt.keyType), visit(mt.valueType).copy(mt.valueContainsNull)) + } + + private def visitArrayType(at: SparkArrayType): FlussArrayType = { + new FlussArrayType(at.containsNull, visit(at.elementType)) + } + + private def visitPrimitiveType(t: SparkDataType): FlussDataType = { + t match { + case _: org.apache.spark.sql.types.BooleanType => + new BooleanType() + case _: org.apache.spark.sql.types.ByteType => + new TinyIntType() + case _: org.apache.spark.sql.types.ShortType => + new SmallIntType() + case _: org.apache.spark.sql.types.IntegerType => + new IntType() + case _: org.apache.spark.sql.types.LongType => + new BigIntType() + case _: org.apache.spark.sql.types.FloatType => + new FloatType() + case _: org.apache.spark.sql.types.DoubleType => + new DoubleType() + case dt: org.apache.spark.sql.types.DecimalType => + new DecimalType(dt.precision, dt.scale) + case _: org.apache.spark.sql.types.BinaryType => + new BinaryType(BinaryType.MAX_LENGTH) Review Comment: use `new BytesType` ########## fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark + +import org.apache.fluss.config.FlussConfigUtils +import org.apache.fluss.metadata.{Schema, TableDescriptor} +import org.apache.fluss.spark.SparkConnectorOptions._ +import org.apache.fluss.spark.types.{FlussDataTypeToSparkDataType, SparkToFlussTypeVisitor} +import org.apache.fluss.types.RowType +import org.apache.spark.sql.FlussIdentityTransform +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.types.StructType + +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +object SparkConversions { + + def toFlussDataType(schema: StructType): RowType = + SparkToFlussTypeVisitor.visit(schema).asInstanceOf[RowType] + + def toSparkDataType(rowType: RowType): StructType = + FlussDataTypeToSparkDataType.visit(rowType).asInstanceOf[StructType] + + def toFlussTable( + sparkSchema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): TableDescriptor = { + val caseInsensitiveProps = CaseInsensitiveMap(properties.asScala.toMap) + + val tableDescriptorBuilder = TableDescriptor.builder() + val schemaBuilder = Schema.newBuilder().fromRowType(toFlussDataType(sparkSchema)) + + val partitionKey = toPartitionKeys(partitions) + tableDescriptorBuilder.partitionedBy(partitionKey: _*) + + val primaryKeys = if (caseInsensitiveProps.contains(PRIMARY_KEY.key)) { + val pks = caseInsensitiveProps.get(PRIMARY_KEY.key).get.split(",") + schemaBuilder.primaryKey(pks: _*) + pks + } else { + Array.empty[String] + } + + if (caseInsensitiveProps.contains(BUCKET_NUMBER.key)) { + val bucketNum = caseInsensitiveProps.get(BUCKET_NUMBER.key).get.toInt + val bucketKeys = if (caseInsensitiveProps.contains(BUCKET_KEY.key)) { + caseInsensitiveProps.get(BUCKET_KEY.key).get.split(",") + } else { + primaryKeys.filterNot(partitionKey.contains) + } + tableDescriptorBuilder.distributedBy(bucketNum, bucketKeys: _*) + } + + if (caseInsensitiveProps.contains(COMMENT.key)) { + tableDescriptorBuilder.comment(caseInsensitiveProps.get(COMMENT.key).get) + } + + val (tableProps, customProps) = + caseInsensitiveProps.filterNot(SPARK_TABLE_OPTIONS.contains).partition { + case (key, _) => FlussConfigUtils.TABLE_OPTIONS.containsKey(key) Review Comment: `FlussConfigUtils.TABLE_OPTIONS` is a static set, however, the fluss table created by newer client version may carry additional table options. Therefore, it would be more robust to check whether the config key start with `table.` prefix. ########## fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/FlussDataTypeToSparkDataType.scala: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.types + +import org.apache.fluss.types._ +import org.apache.spark.sql.types.{DataType => SparkDataType, DataTypes => SparkDataTypes} + +import scala.collection.JavaConverters._ + +object FlussDataTypeToSparkDataType extends DataTypeVisitor[SparkDataType] { Review Comment: `FlussDataTypeToSparkDataType` -> `FlussToSparkTypeVisitor` to align with `SparkToFlussTypeVisitor`. ########## fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala: ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark + +import org.apache.fluss.metadata.{DatabaseDescriptor, Schema, TableDescriptor, TablePath} +import org.apache.fluss.types.{DataTypes, RowType} +import org.apache.spark.sql.Row +import org.assertj.core.api.Assertions.{assertThat, assertThatList} + +import scala.collection.JavaConverters._ + +class FlussCatalogTest extends FlussSparkTestBase { + + test("Catalog: namespaces") { + // Always a default database 'fluss'. + checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil) + + sql("CREATE DATABASE testdb COMMENT 'created by spark'") + checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Row("testdb") :: Nil) + + checkAnswer( + sql("DESC DATABASE testdb").filter("info_name != 'Owner'"), + Row("Catalog Name", "fluss_catalog") :: Row("Namespace Name", "testdb") :: Row( + "Comment", + "created by spark") :: Nil + ) + + sql("DROP DATABASE testdb") + checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil) + } + + test("Catalog: basic table") { + sql(s"CREATE TABLE $DEFAULT_DATABASE.test_tbl (id int, name string) COMMENT 'my test table'") + checkAnswer(sql("SHOW TABLES"), Row(DEFAULT_DATABASE, "test_tbl", false) :: Nil) + checkAnswer(sql("DESC test_tbl"), Row("id", "int", null) :: Row("name", "string", null) :: Nil) + + val testTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "test_tbl")).get() + assertThat(testTable.getTablePath.getTableName).isEqualTo("test_tbl") + assertThat(testTable.getComment.orElse(null)).isEqualTo("my test table") + assertThat(testTable.getRowType).isEqualTo( + RowType.builder().field("id", DataTypes.INT()).field("name", DataTypes.STRING()).build()) + + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.test_pt_tbl (id int, name string, pt string) + |PARTITIONED BY (pt) + |TBLPROPERTIES("key" = "value") + |""".stripMargin) + + val testPartitionedTable = + admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "test_pt_tbl")).get() + assertThat(testPartitionedTable.getRowType).isEqualTo( + RowType + .builder() + .field("id", DataTypes.INT()) + .field("name", DataTypes.STRING()) + .field("pt", DataTypes.STRING()) + .build()) + assertThat(testPartitionedTable.getPartitionKeys.get(0)).isEqualTo("pt") + assertThat(testPartitionedTable.getCustomProperties.containsKey("key")).isEqualTo(true) Review Comment: What is the `key` to verify? ########## fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/FlussTable.scala: ########## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark + +import org.apache.fluss.metadata.TableInfo +import org.apache.fluss.spark.catalog.{FlussTableInfo, SupportsFlussPartitionManagement} + +case class FlussTable(table: TableInfo) Review Comment: ditto. `SparkTable`? ########## fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/FlussTableInfo.scala: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.catalog + +import org.apache.fluss.metadata.TableInfo +import org.apache.fluss.spark.SparkConversions +import org.apache.spark.sql.connector.catalog.{Table, TableCapability} +import org.apache.spark.sql.types.StructType + +import java.util + +import scala.collection.JavaConverters._ + +abstract class FlussTableInfo(tableInfo: TableInfo) extends Table { Review Comment: ditto, `AbstractSparkTable`? ########## fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/FlussCatalog.scala: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark + +import org.apache.fluss.exception.{DatabaseNotExistException, TableAlreadyExistException, TableNotExistException} +import org.apache.fluss.metadata.TablePath +import org.apache.fluss.spark.catalog.{SupportsFlussNamespaces, WithFlussAdmin} +import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog, TableChange} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import java.util +import java.util.concurrent.ExecutionException + +import scala.collection.JavaConverters._ + +class FlussCatalog extends TableCatalog with SupportsFlussNamespaces with WithFlussAdmin { Review Comment: How about naming it `SparkCatalog`? Since these catalog implementations reside in the Fluss repository alongside those for other engines (such as Flink and Trino), including the engine name in the class name would make it easier to identify and distinguish between them. ########## fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/FlussDataTypeToSparkDataType.scala: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.types + +import org.apache.fluss.types._ +import org.apache.spark.sql.types.{DataType => SparkDataType, DataTypes => SparkDataTypes} + +import scala.collection.JavaConverters._ + +object FlussDataTypeToSparkDataType extends DataTypeVisitor[SparkDataType] { + + override def visit(charType: CharType): SparkDataType = { + org.apache.spark.sql.types.CharType(charType.getLength) + } + + override def visit(stringType: StringType): SparkDataType = { + SparkDataTypes.StringType + } + + override def visit(booleanType: BooleanType): SparkDataType = { + SparkDataTypes.BooleanType + } + + override def visit(binaryType: BinaryType): SparkDataType = { + SparkDataTypes.BinaryType + } + + override def visit(bytesType: BytesType): SparkDataType = { + SparkDataTypes.BinaryType + } + + override def visit(decimalType: DecimalType): SparkDataType = { + org.apache.spark.sql.types.DecimalType(decimalType.getPrecision, decimalType.getScale) + } + + override def visit(tinyIntType: TinyIntType): SparkDataType = { + SparkDataTypes.ByteType + } + + override def visit(smallIntType: SmallIntType): SparkDataType = { + SparkDataTypes.ShortType + } + + override def visit(intType: IntType): SparkDataType = { + SparkDataTypes.IntegerType + } + + override def visit(bigIntType: BigIntType): SparkDataType = { + SparkDataTypes.LongType + } + + override def visit(floatType: FloatType): SparkDataType = { + SparkDataTypes.FloatType + } + + override def visit(doubleType: DoubleType): SparkDataType = { + SparkDataTypes.DoubleType + } + + override def visit(dateType: DateType): SparkDataType = { + SparkDataTypes.DateType + } + + override def visit(timeType: TimeType): SparkDataType = { + SparkDataTypes.IntegerType + } + + override def visit(timestampType: TimestampType): SparkDataType = { + SparkDataTypes.TimestampNTZType + } + + override def visit(localZonedTimestampType: LocalZonedTimestampType): SparkDataType = { + SparkDataTypes.TimestampType + } + + override def visit(arrayType: ArrayType): SparkDataType = { + SparkDataTypes.createArrayType(arrayType.getElementType.accept(this), arrayType.isNullable) + } + + override def visit(mapType: MapType): SparkDataType = { + SparkDataTypes.createMapType( + mapType.getKeyType.accept(this), + mapType.getValueType.accept(this), + mapType.isNullable Review Comment: `mapType.getValueType.isNullable` ########## fluss-spark/pom.xml: ########## @@ -0,0 +1,340 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.fluss</groupId> + <artifactId>fluss</artifactId> + <version>0.9-SNAPSHOT</version> + </parent> + + <artifactId>fluss-spark</artifactId> + <name>Fluss : Engine Spark :</name> + <packaging>pom</packaging> + + <properties> + <scala212.version>2.12.18</scala212.version> + <scala213.version>2.13.16</scala213.version> + </properties> + + <modules> + <module>fluss-spark-common</module> + <module>fluss-spark-ut</module> + </modules> + + <dependencies> + <dependency> + <groupId>org.apache.fluss</groupId> + <artifactId>fluss-client</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.fluss</groupId> + <artifactId>fluss-common</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + <version>${scala.version}</version> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-compiler</artifactId> + <version>${scala.version}</version> + </dependency> + + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <version>3.1.0</version> + <scope>test</scope> + </dependency> + </dependencies> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j2-impl</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <!-- SPARK-40511 upgrades SLF4J2, which is not compatible w/ SLF4J1 --> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j2-impl</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-catalyst_${scala.binary.version}</artifactId> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-hive_${scala.binary.version}</artifactId> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j2-impl</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-catalyst_${scala.binary.version}</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j2-impl</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + </dependencyManagement> + + <build> + <pluginManagement> + <plugins> + <plugin> + <groupId>org.scalastyle</groupId> + <artifactId>scalastyle-maven-plugin</artifactId> + <version>1.0.0</version> + <executions> + <execution> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + <configuration> + <verbose>false</verbose> + <failOnViolation>true</failOnViolation> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + <failOnWarning>false</failOnWarning> + <sourceDirectory>${basedir}/src/main/scala</sourceDirectory> + <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory> + <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile> + <inputEncoding>UTF-8</inputEncoding> + <outputEncoding>UTF-8</outputEncoding> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.8.0</version> + <configuration> + <source>${target.java.version}</source> + <target>${target.java.version}</target> + <useIncrementalCompilation>false</useIncrementalCompilation> + <compilerArgs> + <arg>-Xpkginfo:always</arg> + <arg>-Xlint:deprecation</arg> + </compilerArgs> + </configuration> + </plugin> + + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.2.2</version> + <executions> + <!-- Run scala compiler in the process-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) compile phase --> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>add-source</goal> + <goal>compile</goal> + </goals> + </execution> + + <!-- Run scala compiler in the process-test-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) test-compile phase --> + <execution> + <id>scala-test-compile</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <scalaVersion>${scala.version}</scalaVersion> + <checkMultipleScalaVersions>false</checkMultipleScalaVersions> + <args> + <arg>-nobootcp</arg> + <arg>-target:jvm-${target.java.version}</arg> + </args> + </configuration> + </plugin> + + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + <version>2.1.0</version> + <configuration> + <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> + <junitxml>.</junitxml> + <argLine>-ea -Xmx4g -Xss4m -XX:MaxMetaspaceSize=2g -XX:ReservedCodeCacheSize=128m ${extraJavaTestArgs} -Dio.netty.tryReflectionSetAccessible=true</argLine> + <filereports>PaimonTestSuite.txt</filereports> Review Comment: `FlussTestSuite.txt`? ########## fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.catalog + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement +import org.apache.spark.sql.types.StructType + +import java.util + +trait SupportsFlussPartitionManagement extends FlussTableInfo with SupportsPartitionManagement { + + override def partitionSchema(): StructType = _partitionSchema + + override def createPartition(ident: InternalRow, properties: util.Map[String, String]): Unit = { + throw new UnsupportedOperationException("Creating partition is not supported") Review Comment: Will them be supported in follow-up PRs? It seems the original PR already supports this https://github.com/apache/fluss/pull/1636/files#diff-151d06dcc7dd4e7d0f9aae9be2f042e2588bb6d86820b57ec6fc2f8fef9f62cc ########## fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/SparkToFlussTypeVisitor.scala: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.types + +import org.apache.fluss.types.{ArrayType => FlussArrayType, DataField => FlussDataField, DataType => FlussDataType, MapType => FlussMapType, _} +import org.apache.spark.sql.types.{ArrayType => SparkArrayType, DataType => SparkDataType, MapType => SparkMapType, StructType, UserDefinedType} + +import scala.collection.JavaConverters._ + +object SparkToFlussTypeVisitor { + + def visit(dataType: SparkDataType): FlussDataType = { + dataType match { + case st: StructType => + visitStructType(st) + case mt: SparkMapType => + visitMapType(mt) + case at: SparkArrayType => + visitArrayType(at) + case _: UserDefinedType[_] => + throw new UnsupportedOperationException("User-defined type is not supported"); + case t => + visitPrimitiveType(t) + } + } + + private def visitStructType(st: StructType): RowType = { + val flussDataFields = st.fields.map { + field => + val flussDataType = visit(field.dataType) + new FlussDataField(field.name, flussDataType, field.getComment().orNull) + } + new RowType(flussDataFields.toList.asJava) + } + + private def visitMapType(mt: SparkMapType): FlussMapType = { + new FlussMapType(visit(mt.keyType), visit(mt.valueType).copy(mt.valueContainsNull)) + } + + private def visitArrayType(at: SparkArrayType): FlussArrayType = { + new FlussArrayType(at.containsNull, visit(at.elementType)) + } + + private def visitPrimitiveType(t: SparkDataType): FlussDataType = { + t match { + case _: org.apache.spark.sql.types.BooleanType => + new BooleanType() + case _: org.apache.spark.sql.types.ByteType => + new TinyIntType() + case _: org.apache.spark.sql.types.ShortType => + new SmallIntType() + case _: org.apache.spark.sql.types.IntegerType => + new IntType() + case _: org.apache.spark.sql.types.LongType => + new BigIntType() + case _: org.apache.spark.sql.types.FloatType => + new FloatType() + case _: org.apache.spark.sql.types.DoubleType => + new DoubleType() + case dt: org.apache.spark.sql.types.DecimalType => + new DecimalType(dt.precision, dt.scale) + case _: org.apache.spark.sql.types.BinaryType => + new BinaryType(BinaryType.MAX_LENGTH) + case vct: org.apache.spark.sql.types.VarcharType => + new CharType(vct.length) + case ct: org.apache.spark.sql.types.CharType => + new CharType(ct.length) + case _: org.apache.spark.sql.types.StringType => + new StringType() + case _: org.apache.spark.sql.types.DateType => + new DateType() + case _: org.apache.spark.sql.types.TimestampType => + new LocalZonedTimestampType() + case _: org.apache.spark.sql.types.TimestampNTZType => + new TimestampType() Review Comment: ditto ########## fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/SparkToFlussTypeVisitor.scala: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.types + +import org.apache.fluss.types.{ArrayType => FlussArrayType, DataField => FlussDataField, DataType => FlussDataType, MapType => FlussMapType, _} +import org.apache.spark.sql.types.{ArrayType => SparkArrayType, DataType => SparkDataType, MapType => SparkMapType, StructType, UserDefinedType} + +import scala.collection.JavaConverters._ + +object SparkToFlussTypeVisitor { + + def visit(dataType: SparkDataType): FlussDataType = { + dataType match { + case st: StructType => + visitStructType(st) + case mt: SparkMapType => + visitMapType(mt) + case at: SparkArrayType => + visitArrayType(at) + case _: UserDefinedType[_] => + throw new UnsupportedOperationException("User-defined type is not supported"); + case t => + visitPrimitiveType(t) + } + } + + private def visitStructType(st: StructType): RowType = { + val flussDataFields = st.fields.map { + field => + val flussDataType = visit(field.dataType) + new FlussDataField(field.name, flussDataType, field.getComment().orNull) + } + new RowType(flussDataFields.toList.asJava) + } + + private def visitMapType(mt: SparkMapType): FlussMapType = { + new FlussMapType(visit(mt.keyType), visit(mt.valueType).copy(mt.valueContainsNull)) + } + + private def visitArrayType(at: SparkArrayType): FlussArrayType = { + new FlussArrayType(at.containsNull, visit(at.elementType)) Review Comment: IIUC, `SparkArrayType.containsNull` means the array element can be null. However, the first boolean parameter of `new FlussArrayType` means the array instance can be null. If we want to set the element to nullable, we should ```java new FlussArrayType(visit(at.elementType).copy(at.containsNull)) ``` ########## fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/FlussDataTypeToSparkDataType.scala: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.types + +import org.apache.fluss.types._ +import org.apache.spark.sql.types.{DataType => SparkDataType, DataTypes => SparkDataTypes} + +import scala.collection.JavaConverters._ + +object FlussDataTypeToSparkDataType extends DataTypeVisitor[SparkDataType] { + + override def visit(charType: CharType): SparkDataType = { + org.apache.spark.sql.types.CharType(charType.getLength) + } + + override def visit(stringType: StringType): SparkDataType = { + SparkDataTypes.StringType + } + + override def visit(booleanType: BooleanType): SparkDataType = { + SparkDataTypes.BooleanType + } + + override def visit(binaryType: BinaryType): SparkDataType = { + SparkDataTypes.BinaryType + } + + override def visit(bytesType: BytesType): SparkDataType = { + SparkDataTypes.BinaryType + } + + override def visit(decimalType: DecimalType): SparkDataType = { + org.apache.spark.sql.types.DecimalType(decimalType.getPrecision, decimalType.getScale) + } + + override def visit(tinyIntType: TinyIntType): SparkDataType = { + SparkDataTypes.ByteType + } + + override def visit(smallIntType: SmallIntType): SparkDataType = { + SparkDataTypes.ShortType + } + + override def visit(intType: IntType): SparkDataType = { + SparkDataTypes.IntegerType + } + + override def visit(bigIntType: BigIntType): SparkDataType = { + SparkDataTypes.LongType + } + + override def visit(floatType: FloatType): SparkDataType = { + SparkDataTypes.FloatType + } + + override def visit(doubleType: DoubleType): SparkDataType = { + SparkDataTypes.DoubleType + } + + override def visit(dateType: DateType): SparkDataType = { + SparkDataTypes.DateType + } + + override def visit(timeType: TimeType): SparkDataType = { + SparkDataTypes.IntegerType + } + + override def visit(timestampType: TimestampType): SparkDataType = { + SparkDataTypes.TimestampNTZType + } + + override def visit(localZonedTimestampType: LocalZonedTimestampType): SparkDataType = { + SparkDataTypes.TimestampType + } + + override def visit(arrayType: ArrayType): SparkDataType = { + SparkDataTypes.createArrayType(arrayType.getElementType.accept(this), arrayType.isNullable) Review Comment: ditto, should be ```scala SparkDataTypes.createArrayType(elementType.accept(this), elementType.isNullable) ``` ########## .scalafmt.conf: ########## @@ -0,0 +1,70 @@ +runner.dialect = scala212 + +# Version is required to make sure IntelliJ picks the right version +version = 3.10.2 +preset = default + +# Max column +maxColumn = 100 + +# This parameter simply says the .stripMargin method was not redefined by the user to assign +# special meaning to indentation preceding the | character. Hence, that indentation can be modified. +assumeStandardLibraryStripMargin = true +align.stripMargin = true + +# Align settings +align.preset = none +align.closeParenSite = false +align.openParenCallSite = false +danglingParentheses.defnSite = false +danglingParentheses.callSite = false +danglingParentheses.ctrlSite = true +danglingParentheses.tupleSite = false +align.openParenCallSite = false +align.openParenDefnSite = false +align.openParenTupleSite = false + +# Newlines +newlines.alwaysBeforeElseAfterCurlyIf = false +newlines.beforeCurlyLambdaParams = multiline # Newline before lambda params +newlines.afterCurlyLambdaParams = squash # No newline after lambda params +newlines.inInterpolation = "avoid" +newlines.avoidInResultType = true +optIn.annotationNewlines = true + +# Scaladoc +docstrings.style = Asterisk # Javadoc style +docstrings.removeEmpty = true +docstrings.oneline = fold +docstrings.forceBlankLineBefore = true + +# Indentation +indent.extendSite = 2 # This makes sure extend is not indented as the ctor parameters + +# Rewrites +rewrite.rules = [AvoidInfix, Imports, RedundantBraces, SortModifiers] + +# Imports +rewrite.imports.sort = scalastyle +rewrite.imports.groups = [ + ["org.apache.paimon\\..*"], + ["org.apache.paimon.shade\\..*"], Review Comment: change to fluss package ########## fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/SparkToFlussTypeVisitor.scala: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.types + +import org.apache.fluss.types.{ArrayType => FlussArrayType, DataField => FlussDataField, DataType => FlussDataType, MapType => FlussMapType, _} +import org.apache.spark.sql.types.{ArrayType => SparkArrayType, DataType => SparkDataType, MapType => SparkMapType, StructType, UserDefinedType} + +import scala.collection.JavaConverters._ + +object SparkToFlussTypeVisitor { + + def visit(dataType: SparkDataType): FlussDataType = { + dataType match { + case st: StructType => + visitStructType(st) + case mt: SparkMapType => + visitMapType(mt) + case at: SparkArrayType => + visitArrayType(at) + case _: UserDefinedType[_] => + throw new UnsupportedOperationException("User-defined type is not supported"); + case t => + visitPrimitiveType(t) + } + } + + private def visitStructType(st: StructType): RowType = { + val flussDataFields = st.fields.map { + field => + val flussDataType = visit(field.dataType) + new FlussDataField(field.name, flussDataType, field.getComment().orNull) + } + new RowType(flussDataFields.toList.asJava) + } + + private def visitMapType(mt: SparkMapType): FlussMapType = { + new FlussMapType(visit(mt.keyType), visit(mt.valueType).copy(mt.valueContainsNull)) + } + + private def visitArrayType(at: SparkArrayType): FlussArrayType = { + new FlussArrayType(at.containsNull, visit(at.elementType)) + } + + private def visitPrimitiveType(t: SparkDataType): FlussDataType = { + t match { + case _: org.apache.spark.sql.types.BooleanType => + new BooleanType() + case _: org.apache.spark.sql.types.ByteType => + new TinyIntType() + case _: org.apache.spark.sql.types.ShortType => + new SmallIntType() + case _: org.apache.spark.sql.types.IntegerType => + new IntType() + case _: org.apache.spark.sql.types.LongType => + new BigIntType() + case _: org.apache.spark.sql.types.FloatType => + new FloatType() + case _: org.apache.spark.sql.types.DoubleType => + new DoubleType() + case dt: org.apache.spark.sql.types.DecimalType => + new DecimalType(dt.precision, dt.scale) + case _: org.apache.spark.sql.types.BinaryType => + new BinaryType(BinaryType.MAX_LENGTH) + case vct: org.apache.spark.sql.types.VarcharType => + new CharType(vct.length) Review Comment: use `new StringType()` ########## fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/SparkToFlussTypeVisitor.scala: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.types + +import org.apache.fluss.types.{ArrayType => FlussArrayType, DataField => FlussDataField, DataType => FlussDataType, MapType => FlussMapType, _} +import org.apache.spark.sql.types.{ArrayType => SparkArrayType, DataType => SparkDataType, MapType => SparkMapType, StructType, UserDefinedType} + +import scala.collection.JavaConverters._ + +object SparkToFlussTypeVisitor { + + def visit(dataType: SparkDataType): FlussDataType = { + dataType match { + case st: StructType => + visitStructType(st) + case mt: SparkMapType => + visitMapType(mt) + case at: SparkArrayType => + visitArrayType(at) + case _: UserDefinedType[_] => + throw new UnsupportedOperationException("User-defined type is not supported"); + case t => + visitPrimitiveType(t) + } + } + + private def visitStructType(st: StructType): RowType = { + val flussDataFields = st.fields.map { + field => + val flussDataType = visit(field.dataType) + new FlussDataField(field.name, flussDataType, field.getComment().orNull) + } + new RowType(flussDataFields.toList.asJava) + } + + private def visitMapType(mt: SparkMapType): FlussMapType = { + new FlussMapType(visit(mt.keyType), visit(mt.valueType).copy(mt.valueContainsNull)) + } + + private def visitArrayType(at: SparkArrayType): FlussArrayType = { + new FlussArrayType(at.containsNull, visit(at.elementType)) + } + + private def visitPrimitiveType(t: SparkDataType): FlussDataType = { + t match { + case _: org.apache.spark.sql.types.BooleanType => + new BooleanType() + case _: org.apache.spark.sql.types.ByteType => + new TinyIntType() + case _: org.apache.spark.sql.types.ShortType => + new SmallIntType() + case _: org.apache.spark.sql.types.IntegerType => + new IntType() + case _: org.apache.spark.sql.types.LongType => + new BigIntType() + case _: org.apache.spark.sql.types.FloatType => + new FloatType() + case _: org.apache.spark.sql.types.DoubleType => + new DoubleType() + case dt: org.apache.spark.sql.types.DecimalType => + new DecimalType(dt.precision, dt.scale) + case _: org.apache.spark.sql.types.BinaryType => + new BinaryType(BinaryType.MAX_LENGTH) + case vct: org.apache.spark.sql.types.VarcharType => + new CharType(vct.length) + case ct: org.apache.spark.sql.types.CharType => + new CharType(ct.length) + case _: org.apache.spark.sql.types.StringType => + new StringType() + case _: org.apache.spark.sql.types.DateType => + new DateType() + case _: org.apache.spark.sql.types.TimestampType => + new LocalZonedTimestampType() Review Comment: Does spark only support precision=6 for timestamp? If yes, how about declaring the precision and add comment about this? ```java // spark only support 6 digits of precision return new LocalZonedTimestampType(6); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
