This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new f3d08a837f [#7046] fix(spark-connector): Spark connector fails to
parse timestamp data type (#7076)
f3d08a837f is described below
commit f3d08a837f53de2340de882240a80fe771c313b5
Author: Xiaojian Sun <[email protected]>
AuthorDate: Tue Apr 29 08:22:01 2025 +0800
[#7046] fix(spark-connector): Spark connector fails to parse timestamp data
type (#7076)
### What changes were proposed in this pull request?
Spark connector fails to parse timestamp data type
### Why are the changes needed?
Fix: #7046
### Does this PR introduce _any_ user-facing change?
N/A
### How was this patch tested?
SparkHiveCatalogIT.testCreateTableWithTimestamp()
---
.../spark/connector/hive/GravitinoHiveCatalog.java | 5 ++
.../connector/hive/SparkHiveTypeConverter.java | 68 ++++++++++++++++++++
.../integration/test/hive/SparkHiveCatalogIT.java | 17 +++++
.../hive/GravitinoHiveCatalogSpark34.java | 3 +-
.../connector/hive/SparkHiveTypeConverter34.java | 73 ++++++++++++++++++++++
.../connector/TestSparkHiveTypeConverter34.java | 48 ++++++++++++++
6 files changed, 212 insertions(+), 2 deletions(-)
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/hive/GravitinoHiveCatalog.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/hive/GravitinoHiveCatalog.java
index a95c50d0cb..b1df7ad4b4 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/hive/GravitinoHiveCatalog.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/hive/GravitinoHiveCatalog.java
@@ -72,4 +72,9 @@ public class GravitinoHiveCatalog extends BaseCatalog {
protected SparkTransformConverter getSparkTransformConverter() {
return new SparkTransformConverter(false);
}
+
+ @Override
+ protected SparkTypeConverter getSparkTypeConverter() {
+ return new SparkHiveTypeConverter();
+ }
}
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/hive/SparkHiveTypeConverter.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/hive/SparkHiveTypeConverter.java
new file mode 100644
index 0000000000..c2ae20f371
--- /dev/null
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/hive/SparkHiveTypeConverter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.spark.connector.hive;
+
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.spark.connector.SparkTypeConverter;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.TimestampType;
+
+public class SparkHiveTypeConverter extends SparkTypeConverter {
+
+ /**
+ * Converts Spark data types to Hive-compatible Gravitino types
+ *
+ * <p>Special handling for timestamp types:
+ *
+ * <ul>
+ * <li>{@link TimestampType} ➔ {@link
Types.TimestampType#withoutTimeZone()} (Hive limitation)
+ * <li>Other types follow base class conversion rules
+ * </ul>
+ *
+ * @param sparkType Spark data type to convert (non-null)
+ * @return Hive-compatible Gravitino type with timezone information removed
where applicable
+ * @throws UnsupportedOperationException For unsupported timestamp
conversions
+ * @see <a href="https://github.com/apache/gravitino/issues/7046">Issue
#7046</a>
+ */
+ @Override
+ public Type toGravitinoType(DataType sparkType) {
+ if (sparkType instanceof TimestampType) {
+ return Types.TimestampType.withoutTimeZone();
+ } else {
+ return super.toGravitinoType(sparkType);
+ }
+ }
+
+ /**
+ * Converts Gravitino types to Spark data types.
+ *
+ * <p>Note: All timestamp types convert to Spark's TimestampType.
+ */
+ @Override
+ public DataType toSparkType(Type gravitinoType) {
+ if (gravitinoType instanceof Types.TimestampType) {
+ return DataTypes.TimestampType;
+ } else {
+ return super.toSparkType(gravitinoType);
+ }
+ }
+}
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
index e8fd08e5be..64c286da39 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
@@ -453,4 +453,21 @@ public abstract class SparkHiveCatalogIT extends
SparkCommonIT {
Assertions.assertEquals("gravitino_it_test", (String) row[1]);
Assertions.assertEquals("2", row[2]);
}
+
+ @Test
+ void testCreateTableWithTimestamp() {
+ String databaseName = "test_db_with_timestamp";
+ String tableName = "test_table_with_timestamp";
+ createDatabaseIfNotExists(databaseName, getProvider());
+ sql(String.format("USE %s", databaseName));
+ sql(String.format("CREATE TABLE %s (id int, ts timestamp)", tableName));
+ sql(String.format("describe extended %s", tableName));
+
+ SparkTableInfo tableInfo = getTableInfo(tableName);
+ List<SparkColumnInfo> expectedSparkInfo =
+ Arrays.asList(
+ SparkColumnInfo.of("id", DataTypes.IntegerType),
+ SparkColumnInfo.of("ts", DataTypes.TimestampType));
+ checkTableColumns(tableName, expectedSparkInfo, tableInfo);
+ }
}
diff --git
a/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/hive/GravitinoHiveCatalogSpark34.java
b/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/hive/GravitinoHiveCatalogSpark34.java
index cdc6c7feed..54e9b87f5f 100644
---
a/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/hive/GravitinoHiveCatalogSpark34.java
+++
b/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/hive/GravitinoHiveCatalogSpark34.java
@@ -21,12 +21,11 @@ package org.apache.gravitino.spark.connector.hive;
import org.apache.gravitino.spark.connector.SparkTableChangeConverter;
import org.apache.gravitino.spark.connector.SparkTableChangeConverter34;
import org.apache.gravitino.spark.connector.SparkTypeConverter;
-import org.apache.gravitino.spark.connector.SparkTypeConverter34;
public class GravitinoHiveCatalogSpark34 extends GravitinoHiveCatalog {
@Override
protected SparkTypeConverter getSparkTypeConverter() {
- return new SparkTypeConverter34();
+ return new SparkHiveTypeConverter34();
}
@Override
diff --git
a/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/hive/SparkHiveTypeConverter34.java
b/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/hive/SparkHiveTypeConverter34.java
new file mode 100644
index 0000000000..6141d6ab37
--- /dev/null
+++
b/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/hive/SparkHiveTypeConverter34.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.spark.connector.hive;
+
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.spark.connector.SparkTypeConverter34;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.TimestampNTZType;
+import org.apache.spark.sql.types.TimestampType;
+
+public class SparkHiveTypeConverter34 extends SparkTypeConverter34 {
+ /**
+ * Converts Spark timestamp types to Hive-compatible Gravitino types
+ *
+ * <p>This implementation enforces consistent timestamp handling across
Spark versions:
+ *
+ * <ol>
+ * <li>Converts Spark's {@link TimestampType} (with timezone) to {@link
+ * Types.TimestampType#withoutTimeZone()}
+ * <li>Explicitly rejects {@link TimestampNTZType} as it's incompatible
with Hive's type system
+ * </ol>
+ *
+ * @param sparkType Spark data type to convert, must not be null
+ * @return Gravitino type with timezone information stripped when necessary
+ * @throws UnsupportedOperationException When encountering {@link
TimestampNTZType}
+ * @see <a href="https://github.com/apache/gravitino/issues/7046">Issue
#7046</a>
+ * @see SparkTypeConverter34#toGravitinoType(DataType) Base implementation
+ */
+ @Override
+ public Type toGravitinoType(DataType sparkType) {
+ if (sparkType instanceof TimestampType) {
+ return Types.TimestampType.withoutTimeZone();
+ } else if (sparkType instanceof TimestampNTZType) {
+ throw new UnsupportedOperationException(
+ "Hive does not support 'timestamp_ntz' (timestamp without time
zone), please use 'timestamp' instead.");
+ } else {
+ return super.toGravitinoType(sparkType);
+ }
+ }
+
+ /**
+ * Converts Gravitino types to Spark data types.
+ *
+ * <p>Note: All timestamp types convert to Spark's TimestampType.
+ */
+ @Override
+ public DataType toSparkType(Type gravitinoType) {
+ if (gravitinoType instanceof Types.TimestampType) {
+ return DataTypes.TimestampType;
+ } else {
+ return super.toSparkType(gravitinoType);
+ }
+ }
+}
diff --git
a/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/TestSparkHiveTypeConverter34.java
b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/TestSparkHiveTypeConverter34.java
new file mode 100644
index 0000000000..e04a6f2371
--- /dev/null
+++
b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/TestSparkHiveTypeConverter34.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.spark.connector;
+
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.spark.connector.hive.SparkHiveTypeConverter34;
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+
+@TestInstance(Lifecycle.PER_CLASS)
+public class TestSparkHiveTypeConverter34 {
+ private SparkTypeConverter sparkHiveTypeConverter = new
SparkHiveTypeConverter34();
+
+ @Test
+ void testUnsupportedTimestampType() {
+ UnsupportedOperationException exception =
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () ->
sparkHiveTypeConverter.toGravitinoType(DataTypes.TimestampNTZType));
+ Assertions.assertTrue(exception.getMessage().contains("Hive does not
support 'timestamp_ntz'"));
+
+ Assertions.assertEquals(
+ Types.TimestampType.withoutTimeZone(),
+ sparkHiveTypeConverter.toGravitinoType(DataTypes.TimestampType));
+ Assertions.assertEquals(
+ DataTypes.TimestampType,
+
sparkHiveTypeConverter.toSparkType(Types.TimestampType.withoutTimeZone()));
+ }
+}