This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6742db80a [spark] Support timestamp_ntz for Spark 3.4+ (#3569)
6742db80a is described below

commit 6742db80a040e669069834a2c4263db85b6861cd
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Jul 16 18:47:39 2024 +0800

    [spark] Support timestamp_ntz for Spark 3.4+ (#3569)
---
 docs/content/spark/quick-start.md                  |  15 +-
 .../apache/paimon/spark/util/shim/TypeUtils.scala  |  25 +++
 .../apache/paimon/spark/util/shim/TypeUtils.scala  |  25 +++
 .../apache/paimon/spark/util/shim/TypeUtils.scala  |  25 +++
 .../org/apache/paimon/spark/SparkInternalRow.java  |   7 +-
 .../java/org/apache/paimon/spark/SparkRow.java     |  19 +-
 .../org/apache/paimon/spark/SparkTypeUtils.java    |  16 +-
 .../apache/paimon/spark/util/shim/TypeUtils.scala  |  24 +++
 .../apache/paimon/spark/SparkInternalRowTest.java  |  57 ++++--
 .../org/apache/paimon/spark/SparkTypeTest.java     |   4 +-
 .../apache/paimon/spark/PaimonSparkTestBase.scala  |  13 ++
 .../org/apache/paimon/spark/sql/DDLTestBase.scala  | 227 +++++++++++++++++++++
 12 files changed, 426 insertions(+), 31 deletions(-)

diff --git a/docs/content/spark/quick-start.md 
b/docs/content/spark/quick-start.md
index 253ed0b2f..6687d2959 100644
--- a/docs/content/spark/quick-start.md
+++ b/docs/content/spark/quick-start.md
@@ -310,7 +310,12 @@ All Spark's data types are available in package 
`org.apache.spark.sql.types`.
     </tr>
     <tr>
       <td><code>TimestampType</code></td>
-      <td><code>TimestampType</code>, <code>LocalZonedTimestamp</code></td>
+      <td><code>LocalZonedTimestamp</code></td>
+      <td>true</td>
+    </tr>
+    <tr>
+      <td><code>TimestampNTZType(Spark3.4+)</code></td>
+      <td><code>TimestampType</code></td>
       <td>true</td>
     </tr>
     <tr>
@@ -325,3 +330,11 @@ All Spark's data types are available in package 
`org.apache.spark.sql.types`.
     </tr>
     </tbody>
 </table>
+
+{{< hint warning >}}
+Due to the previous design, in Spark3.3 and below, Paimon will map both 
Paimon's TimestampType and LocalZonedTimestamp to Spark's TimestampType, and 
only correctly handle with TimestampType.
+
+Therefore, when using Spark3.3 and below, reads Paimon table with 
LocalZonedTimestamp type written by other engines, such as Flink, the query 
result of LocalZonedTimestamp type will have time zone offset, which needs to 
be adjusted manually.
+
+When using Spark3.4 and above, all timestamp types can be parsed correctly.
+{{< /hint >}}
\ No newline at end of file
diff --git 
a/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
 
b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
new file mode 100644
index 000000000..dcd2d6889
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.util.shim
+
+object TypeUtils {
+
+  // Since Spark 3.3 and below do not support timestamp ntz, treat Paimon 
TimestampType as Spark TimestampType
+  def treatPaimonTimestampTypeAsSparkTimestampType(): Boolean = true
+}
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
new file mode 100644
index 000000000..dcd2d6889
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.util.shim
+
+object TypeUtils {
+
+  // Since Spark 3.3 and below do not support timestamp ntz, treat Paimon 
TimestampType as Spark TimestampType
+  def treatPaimonTimestampTypeAsSparkTimestampType(): Boolean = true
+}
diff --git 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
new file mode 100644
index 000000000..dcd2d6889
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.util.shim
+
+object TypeUtils {
+
+  // Since Spark 3.3 and below do not support timestamp ntz, treat Paimon 
TimestampType as Spark TimestampType
+  def treatPaimonTimestampTypeAsSparkTimestampType(): Boolean = true
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRow.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRow.java
index 787a1329a..9dd6c7b68 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRow.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRow.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.spark.util.shim.TypeUtils;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataType;
@@ -283,7 +284,11 @@ public class SparkInternalRow extends 
org.apache.spark.sql.catalyst.InternalRow
     }
 
     public static long fromPaimon(Timestamp timestamp) {
-        return DateTimeUtils.fromJavaTimestamp(timestamp.toSQLTimestamp());
+        if (TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType()) {
+            return DateTimeUtils.fromJavaTimestamp(timestamp.toSQLTimestamp());
+        } else {
+            return timestamp.toMicros();
+        }
     }
 
     public static ArrayData fromPaimon(InternalArray array, ArrayType 
arrayType) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
index c650719b0..b343be247 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.spark.util.shim.TypeUtils;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DateType;
@@ -170,11 +171,21 @@ public class SparkRow implements InternalRow, 
Serializable {
 
     private static Timestamp toPaimonTimestamp(Object object) {
         if (object instanceof java.sql.Timestamp) {
-            return Timestamp.fromSQLTimestamp((java.sql.Timestamp) object);
+            java.sql.Timestamp ts = (java.sql.Timestamp) object;
+            if (TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType()) {
+                return Timestamp.fromSQLTimestamp(ts);
+            } else {
+                return Timestamp.fromInstant(ts.toInstant());
+            }
         } else if (object instanceof java.time.Instant) {
-            LocalDateTime localDateTime =
-                    LocalDateTime.ofInstant((Instant) object, 
ZoneId.systemDefault());
-            return Timestamp.fromLocalDateTime(localDateTime);
+            Instant instant = (Instant) object;
+            if (TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType()) {
+                LocalDateTime localDateTime =
+                        LocalDateTime.ofInstant((Instant) object, 
ZoneId.systemDefault());
+                return Timestamp.fromLocalDateTime(localDateTime);
+            } else {
+                return Timestamp.fromInstant(instant);
+            }
         } else {
             return Timestamp.fromLocalDateTime((LocalDateTime) object);
         }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
index 803cf5e54..08fb2de32 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark;
 
+import org.apache.paimon.spark.util.shim.TypeUtils;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.BinaryType;
@@ -150,7 +151,11 @@ public class SparkTypeUtils {
 
         @Override
         public DataType visit(TimestampType timestampType) {
-            return DataTypes.TimestampType;
+            if (TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType()) {
+                return DataTypes.TimestampType;
+            } else {
+                return DataTypes.TimestampNTZType;
+            }
         }
 
         @Override
@@ -308,13 +313,20 @@ public class SparkTypeUtils {
             } else if (atomic instanceof org.apache.spark.sql.types.DateType) {
                 return new DateType();
             } else if (atomic instanceof 
org.apache.spark.sql.types.TimestampType) {
-                return new TimestampType();
+                if (TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType()) {
+                    return new TimestampType();
+                } else {
+                    return new LocalZonedTimestampType();
+                }
             } else if (atomic instanceof 
org.apache.spark.sql.types.DecimalType) {
                 return new DecimalType(
                         ((org.apache.spark.sql.types.DecimalType) 
atomic).precision(),
                         ((org.apache.spark.sql.types.DecimalType) 
atomic).scale());
             } else if (atomic instanceof 
org.apache.spark.sql.types.BinaryType) {
                 return new VarBinaryType(VarBinaryType.MAX_LENGTH);
+            } else if (atomic instanceof 
org.apache.spark.sql.types.TimestampNTZType) {
+                // Move TimestampNTZType to the end for compatibility with 
spark3.3 and below
+                return new TimestampType();
             }
 
             throw new UnsupportedOperationException(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
new file mode 100644
index 000000000..8e70432bd
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.util.shim
+
+object TypeUtils {
+
+  def treatPaimonTimestampTypeAsSparkTimestampType(): Boolean = false
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java
index 14080cd8a..9af886d83 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java
@@ -37,10 +37,12 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.AbstractMap;
 import java.util.Map;
+import java.util.TimeZone;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import scala.Function1;
+import scala.collection.JavaConverters;
 
 import static org.apache.paimon.data.BinaryString.fromString;
 import static org.apache.paimon.spark.SparkTypeTest.ALL_TYPES;
@@ -51,6 +53,8 @@ public class SparkInternalRowTest {
 
     @Test
     public void test() {
+        TimeZone tz = TimeZone.getDefault();
+        TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"));
         InternalRow rowData =
                 GenericRow.of(
                         1,
@@ -77,6 +81,7 @@ public class SparkInternalRowTest {
                         23567222L,
                         "varbinary_v".getBytes(StandardCharsets.UTF_8),
                         
Timestamp.fromLocalDateTime(LocalDateTime.parse("2007-12-03T10:15:30")),
+                        
Timestamp.fromLocalDateTime(LocalDateTime.parse("2007-12-03T10:15:30")),
                         
DateTimeUtils.toInternal(LocalDate.parse("2022-05-02")),
                         Decimal.fromBigDecimal(BigDecimal.valueOf(0.21), 2, 2),
                         
Decimal.fromBigDecimal(BigDecimal.valueOf(65782123123.01), 38, 2),
@@ -93,32 +98,40 @@ public class SparkInternalRowTest {
                         sparkConverter.apply(new 
SparkInternalRow(ALL_TYPES).replace(rowData));
 
         String expected =
-                "{"
-                        + "\"id\":1,"
-                        + "\"name\":\"jingsong\","
-                        + "\"char\":\"apache\","
-                        + "\"varchar\":\"paimon\","
-                        + "\"salary\":22.2,"
-                        + 
"\"locations\":{\"key1\":{\"posX\":1.2,\"posY\":2.3},\"key2\":{\"posX\":2.4,\"posY\":3.5}},"
-                        + "\"strArray\":[\"v1\",\"v5\"],"
-                        + "\"intArray\":[10,30],"
-                        + "\"boolean\":true,"
-                        + "\"tinyint\":22,"
-                        + "\"smallint\":356,"
-                        + "\"bigint\":23567222,"
-                        + "\"bytes\":\"dmFyYmluYXJ5X3Y=\","
-                        + "\"timestamp\":\"2007-12-03 10:15:30\","
-                        + "\"date\":\"2022-05-02\","
-                        + "\"decimal\":0.21,"
-                        + "\"decimal2\":65782123123.01,"
-                        + "\"decimal3\":62123123.5"
-                        + "}";
-        assertThat(sparkRow.json()).isEqualTo(expected);
+                "1,"
+                        + "jingsong,"
+                        + "apache,"
+                        + "paimon,"
+                        + "22.2,"
+                        + "Map(key2 -> [2.4,3.5], key1 -> [1.2,2.3]),"
+                        + "WrappedArray(v1, v5),"
+                        + "WrappedArray(10, 30),"
+                        + "true,"
+                        + "22,"
+                        + "356,"
+                        + "23567222,"
+                        + "[B@,"
+                        + "2007-12-03 18:15:30.0,"
+                        + "2007-12-03T10:15:30,"
+                        + "2022-05-02,"
+                        + "0.21,"
+                        + "65782123123.01,"
+                        + "62123123.5";
+        assertThat(sparkRowToString(sparkRow)).isEqualTo(expected);
 
         SparkRow sparkRowData = new SparkRow(ALL_TYPES, sparkRow);
         sparkRow =
                 (org.apache.spark.sql.Row)
                         sparkConverter.apply(new 
SparkInternalRow(ALL_TYPES).replace(sparkRowData));
-        assertThat(sparkRow.json()).isEqualTo(expected);
+        assertThat(sparkRowToString(sparkRow)).isEqualTo(expected);
+        TimeZone.setDefault(tz);
+    }
+
+    private String sparkRowToString(org.apache.spark.sql.Row row) {
+        return JavaConverters.seqAsJavaList(row.toSeq()).stream()
+                .map(Object::toString)
+                // Since the toString result of Spark's binary col is 
unstable, replace it
+                .map(x -> x.startsWith("[B@") ? "[B@" : x)
+                .collect(Collectors.joining(","));
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
index 84f509d76..fdc7558fd 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
@@ -65,7 +65,8 @@ public class SparkTypeTest {
                     .field("smallint", DataTypes.SMALLINT())
                     .field("bigint", DataTypes.BIGINT())
                     .field("bytes", DataTypes.BYTES())
-                    .field("timestamp", DataTypes.TIMESTAMP())
+                    .field("timestamp", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+                    .field("timestamp_ntz", DataTypes.TIMESTAMP())
                     .field("date", DataTypes.DATE())
                     .field("decimal", DataTypes.DECIMAL(2, 2))
                     .field("decimal2", DataTypes.DECIMAL(38, 2))
@@ -95,6 +96,7 @@ public class SparkTypeTest {
                         + "StructField(bigint,LongType,true),"
                         + "StructField(bytes,BinaryType,true),"
                         + "StructField(timestamp,TimestampType,true),"
+                        + "StructField(timestamp_ntz,TimestampNTZType,true),"
                         + "StructField(date,DateType,true),"
                         + "StructField(decimal,DecimalType(2,2),true),"
                         + "StructField(decimal2,DecimalType(38,2),true),"
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index d431a754d..b8115132d 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -36,6 +36,7 @@ import org.scalactic.source.Position
 import org.scalatest.Tag
 
 import java.io.File
+import java.util.TimeZone
 
 import scala.util.Random
 
@@ -101,6 +102,18 @@ class PaimonSparkTestBase
     withTempDir(file1 => withTempDir(file2 => f(file1, file2)))
   }
 
+  protected def withTimeZone(timeZone: String)(f: => Unit): Unit = {
+    withSQLConf("spark.sql.session.timeZone" -> timeZone) {
+      val originTimeZone = TimeZone.getDefault
+      try {
+        TimeZone.setDefault(TimeZone.getTimeZone(timeZone))
+        f
+      } finally {
+        TimeZone.setDefault(originTimeZone)
+      }
+    }
+  }
+
   override def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
       pos: Position): Unit = {
     println(testName)
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
index 80118a1b6..da4017104 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
@@ -18,11 +18,17 @@
 
 package org.apache.paimon.spark.sql
 
+import org.apache.paimon.catalog.Identifier
+import org.apache.paimon.schema.Schema
 import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.types.DataTypes
 
 import org.apache.spark.sql.Row
 import org.junit.jupiter.api.Assertions
 
+import java.sql.Timestamp
+import java.time.LocalDateTime
+
 abstract class DDLTestBase extends PaimonSparkTestBase {
 
   import testImplicits._
@@ -181,4 +187,225 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
         }
       }
   }
+
+  test("Paimon DDL: create table with timestamp/timestamp_ntz") {
+    Seq("orc", "parquet", "avro").foreach {
+      format =>
+        Seq(true, false).foreach {
+          datetimeJava8APIEnabled =>
+            withSQLConf("spark.sql.datetime.java8API.enabled" -> 
datetimeJava8APIEnabled.toString) {
+              withTimeZone("Asia/Shanghai") {
+                withTable("paimon_tbl") {
+                  // Spark support create table with timestamp_ntz since 3.4
+                  if (gteqSpark3_4) {
+                    sql(s"""
+                           |CREATE TABLE paimon_tbl (id int, binary BINARY, ts 
timestamp, ts_ntz timestamp_ntz)
+                           |USING paimon
+                           |TBLPROPERTIES ('file.format'='$format')
+                           |""".stripMargin)
+
+                    sql(s"INSERT INTO paimon_tbl VALUES (1, binary('b'), 
timestamp'2024-01-01 00:00:00', timestamp_ntz'2024-01-01 00:00:00')")
+                    checkAnswer(
+                      sql(s"SELECT ts, ts_ntz FROM paimon_tbl"),
+                      Row(
+                        if (datetimeJava8APIEnabled)
+                          Timestamp.valueOf("2024-01-01 00:00:00").toInstant
+                        else Timestamp.valueOf("2024-01-01 00:00:00"),
+                        LocalDateTime.parse("2024-01-01T00:00:00")
+                      )
+                    )
+
+                    // change time zone to UTC
+                    withTimeZone("UTC") {
+                      // todo: fix with orc
+                      if (format != "orc")
+                        checkAnswer(
+                          sql(s"SELECT ts, ts_ntz FROM paimon_tbl"),
+                          Row(
+                            if (datetimeJava8APIEnabled)
+                              Timestamp.valueOf("2023-12-31 
16:00:00").toInstant
+                            else Timestamp.valueOf("2023-12-31 16:00:00"),
+                            LocalDateTime.parse("2024-01-01T00:00:00")
+                          )
+                        )
+                    }
+                  } else {
+                    sql(s"""
+                           |CREATE TABLE paimon_tbl (id int, binary BINARY, ts 
timestamp)
+                           |USING paimon
+                           |TBLPROPERTIES ('file.format'='$format')
+                           |""".stripMargin)
+
+                    sql(s"INSERT INTO paimon_tbl VALUES (1, binary('b'), 
timestamp'2024-01-01 00:00:00')")
+                    checkAnswer(
+                      sql(s"SELECT ts FROM paimon_tbl"),
+                      Row(
+                        if (datetimeJava8APIEnabled)
+                          Timestamp.valueOf("2024-01-01 00:00:00").toInstant
+                        else Timestamp.valueOf("2024-01-01 00:00:00"))
+                    )
+
+                    // For Spark 3.3 and below, time zone conversion is not 
supported,
+                    // see 
TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType
+                    withTimeZone("UTC") {
+                      // todo: fix with orc
+                      if (format != "orc") {
+                        checkAnswer(
+                          sql(s"SELECT ts FROM paimon_tbl"),
+                          Row(
+                            if (datetimeJava8APIEnabled)
+                              Timestamp.valueOf("2024-01-01 
00:00:00").toInstant
+                            else Timestamp.valueOf("2024-01-01 00:00:00"))
+                        )
+                      }
+                    }
+                  }
+                }
+              }
+            }
+        }
+    }
+  }
+
+  test("Paimon DDL: create table with timestamp/timestamp_ntz using table 
API") {
+    val identifier = Identifier.create("test", "paimon_tbl")
+    try {
+      withTimeZone("Asia/Shanghai") {
+        val schema = Schema.newBuilder
+          .column("ts", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+          .column("ts_ntz", DataTypes.TIMESTAMP())
+          .build
+        catalog.createTable(identifier, schema, false)
+        sql(
+          s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-01 00:00:00', 
timestamp_ntz'2024-01-01 00:00:00')")
+
+        // read by spark
+        checkAnswer(
+          sql(s"SELECT ts, ts_ntz FROM paimon_tbl"),
+          Row(
+            Timestamp.valueOf("2024-01-01 00:00:00"),
+            if (gteqSpark3_4) LocalDateTime.parse("2024-01-01T00:00:00")
+            else Timestamp.valueOf("2024-01-01 00:00:00")
+          )
+        )
+
+        // read by table api
+        // Due to previous design, read timestamp ltz type with spark 3.3 and 
below will cause problems,
+        // skip testing it
+        if (gteqSpark3_4) {
+          val table = catalog.getTable(identifier)
+          val builder = table.newReadBuilder.withProjection(Array[Int](0, 1))
+          val splits = builder.newScan().plan().splits()
+          builder.newRead
+            .createReader(splits)
+            .forEachRemaining(
+              r => {
+                Assertions.assertEquals(
+                  Timestamp.valueOf("2023-12-31 16:00:00"),
+                  r.getTimestamp(0, 6).toSQLTimestamp)
+                Assertions.assertEquals(
+                  Timestamp.valueOf("2024-01-01 00:00:00").toLocalDateTime,
+                  r.getTimestamp(1, 6).toLocalDateTime)
+              })
+        }
+
+        // change time zone to UTC
+        withTimeZone("UTC") {
+          // read by spark
+          checkAnswer(
+            sql(s"SELECT ts, ts_ntz FROM paimon_tbl"),
+            Row(
+              // For Spark 3.3 and below, time zone conversion is not 
supported,
+              // see TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType
+              if (gteqSpark3_4) Timestamp.valueOf("2023-12-31 16:00:00")
+              else Timestamp.valueOf("2024-01-01 00:00:00"),
+              if (gteqSpark3_4) LocalDateTime.parse("2024-01-01T00:00:00")
+              else Timestamp.valueOf("2024-01-01 00:00:00")
+            )
+          )
+
+          // read by table api
+          // Due to previous design, read timestamp ltz type with spark 3.3 
and below will cause problems,
+          // skip testing it
+          if (gteqSpark3_4) {
+            val table = catalog.getTable(identifier)
+            val builder = table.newReadBuilder.withProjection(Array[Int](0, 1))
+            val splits = builder.newScan().plan().splits()
+            builder.newRead
+              .createReader(splits)
+              .forEachRemaining(
+                r => {
+                  Assertions.assertEquals(
+                    Timestamp.valueOf("2023-12-31 16:00:00"),
+                    r.getTimestamp(0, 6).toSQLTimestamp)
+                  Assertions.assertEquals(
+                    Timestamp.valueOf("2024-01-01 00:00:00").toLocalDateTime,
+                    r.getTimestamp(1, 6).toLocalDateTime)
+                })
+          }
+        }
+      }
+    } finally {
+      catalog.dropTable(identifier, true)
+    }
+  }
+
+  test("Paimon DDL: select table with timestamp and timestamp_ntz with 
filter") {
+    Seq(true, false).foreach {
+      datetimeJava8APIEnabled =>
+        withSQLConf("spark.sql.datetime.java8API.enabled" -> 
datetimeJava8APIEnabled.toString) {
+          withTable("paimon_tbl") {
+            // Spark support create table with timestamp_ntz since 3.4
+            if (gteqSpark3_4) {
+              sql(s"""
+                     |CREATE TABLE paimon_tbl (ts timestamp, ts_ntz 
timestamp_ntz)
+                     |USING paimon
+                     |""".stripMargin)
+              sql(
+                s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-01 
00:00:00', timestamp_ntz'2024-01-01 00:00:00')")
+              sql(
+                s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-02 
00:00:00', timestamp_ntz'2024-01-02 00:00:00')")
+              sql(
+                s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-03 
00:00:00', timestamp_ntz'2024-01-03 00:00:00')")
+
+              checkAnswer(
+                sql(s"SELECT * FROM paimon_tbl where ts_ntz = 
timestamp_ntz'2024-01-01 00:00:00'"),
+                Row(
+                  if (datetimeJava8APIEnabled)
+                    Timestamp.valueOf("2024-01-01 00:00:00").toInstant
+                  else Timestamp.valueOf("2024-01-01 00:00:00"),
+                  LocalDateTime.parse("2024-01-01T00:00:00")
+                )
+              )
+
+              checkAnswer(
+                sql(s"SELECT * FROM paimon_tbl where ts > timestamp'2024-01-02 
00:00:00'"),
+                Row(
+                  if (datetimeJava8APIEnabled)
+                    Timestamp.valueOf("2024-01-03 00:00:00").toInstant
+                  else Timestamp.valueOf("2024-01-03 00:00:00"),
+                  LocalDateTime.parse("2024-01-03T00:00:00")
+                )
+              )
+            } else {
+              sql(s"""
+                     |CREATE TABLE paimon_tbl (ts timestamp)
+                     |USING paimon
+                     |""".stripMargin)
+              sql(s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-01 
00:00:00')")
+              sql(s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-02 
00:00:00')")
+              sql(s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-03 
00:00:00')")
+
+              checkAnswer(
+                sql(s"SELECT * FROM paimon_tbl where ts = timestamp'2024-01-01 
00:00:00'"),
+                Row(
+                  if (datetimeJava8APIEnabled)
+                    Timestamp.valueOf("2024-01-01 00:00:00").toInstant
+                  else Timestamp.valueOf("2024-01-01 00:00:00"))
+              )
+            }
+          }
+        }
+    }
+  }
 }

Reply via email to