This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6742db80a [spark] Support timestamp_ntz for Spark 3.4+ (#3569)
6742db80a is described below
commit 6742db80a040e669069834a2c4263db85b6861cd
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Jul 16 18:47:39 2024 +0800
[spark] Support timestamp_ntz for Spark 3.4+ (#3569)
---
docs/content/spark/quick-start.md | 15 +-
.../apache/paimon/spark/util/shim/TypeUtils.scala | 25 +++
.../apache/paimon/spark/util/shim/TypeUtils.scala | 25 +++
.../apache/paimon/spark/util/shim/TypeUtils.scala | 25 +++
.../org/apache/paimon/spark/SparkInternalRow.java | 7 +-
.../java/org/apache/paimon/spark/SparkRow.java | 19 +-
.../org/apache/paimon/spark/SparkTypeUtils.java | 16 +-
.../apache/paimon/spark/util/shim/TypeUtils.scala | 24 +++
.../apache/paimon/spark/SparkInternalRowTest.java | 57 ++++--
.../org/apache/paimon/spark/SparkTypeTest.java | 4 +-
.../apache/paimon/spark/PaimonSparkTestBase.scala | 13 ++
.../org/apache/paimon/spark/sql/DDLTestBase.scala | 227 +++++++++++++++++++++
12 files changed, 426 insertions(+), 31 deletions(-)
diff --git a/docs/content/spark/quick-start.md
b/docs/content/spark/quick-start.md
index 253ed0b2f..6687d2959 100644
--- a/docs/content/spark/quick-start.md
+++ b/docs/content/spark/quick-start.md
@@ -310,7 +310,12 @@ All Spark's data types are available in package
`org.apache.spark.sql.types`.
</tr>
<tr>
<td><code>TimestampType</code></td>
- <td><code>TimestampType</code>, <code>LocalZonedTimestamp</code></td>
+ <td><code>LocalZonedTimestamp</code></td>
+ <td>true</td>
+ </tr>
+ <tr>
+ <td><code>TimestampNTZType(Spark3.4+)</code></td>
+ <td><code>TimestampType</code></td>
<td>true</td>
</tr>
<tr>
@@ -325,3 +330,11 @@ All Spark's data types are available in package
`org.apache.spark.sql.types`.
</tr>
</tbody>
</table>
+
+{{< hint warning >}}
+Due to the previous design, in Spark3.3 and below, Paimon will map both
Paimon's TimestampType and LocalZonedTimestamp to Spark's TimestampType, and
only correctly handle with TimestampType.
+
+Therefore, when using Spark3.3 and below, reads Paimon table with
LocalZonedTimestamp type written by other engines, such as Flink, the query
result of LocalZonedTimestamp type will have time zone offset, which needs to
be adjusted manually.
+
+When using Spark3.4 and above, all timestamp types can be parsed correctly.
+{{< /hint >}}
\ No newline at end of file
diff --git
a/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
new file mode 100644
index 000000000..dcd2d6889
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.util.shim
+
+object TypeUtils {
+
+ // Since Spark 3.3 and below do not support timestamp ntz, treat Paimon
TimestampType as Spark TimestampType
+ def treatPaimonTimestampTypeAsSparkTimestampType(): Boolean = true
+}
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
new file mode 100644
index 000000000..dcd2d6889
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.util.shim
+
+object TypeUtils {
+
+ // Since Spark 3.3 and below do not support timestamp ntz, treat Paimon
TimestampType as Spark TimestampType
+ def treatPaimonTimestampTypeAsSparkTimestampType(): Boolean = true
+}
diff --git
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
new file mode 100644
index 000000000..dcd2d6889
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.util.shim
+
+object TypeUtils {
+
+ // Since Spark 3.3 and below do not support timestamp ntz, treat Paimon
TimestampType as Spark TimestampType
+ def treatPaimonTimestampTypeAsSparkTimestampType(): Boolean = true
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRow.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRow.java
index 787a1329a..9dd6c7b68 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRow.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRow.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.spark.util.shim.TypeUtils;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataType;
@@ -283,7 +284,11 @@ public class SparkInternalRow extends
org.apache.spark.sql.catalyst.InternalRow
}
public static long fromPaimon(Timestamp timestamp) {
- return DateTimeUtils.fromJavaTimestamp(timestamp.toSQLTimestamp());
+ if (TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType()) {
+ return DateTimeUtils.fromJavaTimestamp(timestamp.toSQLTimestamp());
+ } else {
+ return timestamp.toMicros();
+ }
}
public static ArrayData fromPaimon(InternalArray array, ArrayType
arrayType) {
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
index c650719b0..b343be247 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.spark.util.shim.TypeUtils;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DateType;
@@ -170,11 +171,21 @@ public class SparkRow implements InternalRow,
Serializable {
private static Timestamp toPaimonTimestamp(Object object) {
if (object instanceof java.sql.Timestamp) {
- return Timestamp.fromSQLTimestamp((java.sql.Timestamp) object);
+ java.sql.Timestamp ts = (java.sql.Timestamp) object;
+ if (TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType()) {
+ return Timestamp.fromSQLTimestamp(ts);
+ } else {
+ return Timestamp.fromInstant(ts.toInstant());
+ }
} else if (object instanceof java.time.Instant) {
- LocalDateTime localDateTime =
- LocalDateTime.ofInstant((Instant) object,
ZoneId.systemDefault());
- return Timestamp.fromLocalDateTime(localDateTime);
+ Instant instant = (Instant) object;
+ if (TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType()) {
+ LocalDateTime localDateTime =
+ LocalDateTime.ofInstant((Instant) object,
ZoneId.systemDefault());
+ return Timestamp.fromLocalDateTime(localDateTime);
+ } else {
+ return Timestamp.fromInstant(instant);
+ }
} else {
return Timestamp.fromLocalDateTime((LocalDateTime) object);
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
index 803cf5e54..08fb2de32 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
@@ -18,6 +18,7 @@
package org.apache.paimon.spark;
+import org.apache.paimon.spark.util.shim.TypeUtils;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.BinaryType;
@@ -150,7 +151,11 @@ public class SparkTypeUtils {
@Override
public DataType visit(TimestampType timestampType) {
- return DataTypes.TimestampType;
+ if (TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType()) {
+ return DataTypes.TimestampType;
+ } else {
+ return DataTypes.TimestampNTZType;
+ }
}
@Override
@@ -308,13 +313,20 @@ public class SparkTypeUtils {
} else if (atomic instanceof org.apache.spark.sql.types.DateType) {
return new DateType();
} else if (atomic instanceof
org.apache.spark.sql.types.TimestampType) {
- return new TimestampType();
+ if (TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType()) {
+ return new TimestampType();
+ } else {
+ return new LocalZonedTimestampType();
+ }
} else if (atomic instanceof
org.apache.spark.sql.types.DecimalType) {
return new DecimalType(
((org.apache.spark.sql.types.DecimalType)
atomic).precision(),
((org.apache.spark.sql.types.DecimalType)
atomic).scale());
} else if (atomic instanceof
org.apache.spark.sql.types.BinaryType) {
return new VarBinaryType(VarBinaryType.MAX_LENGTH);
+ } else if (atomic instanceof
org.apache.spark.sql.types.TimestampNTZType) {
+ // Move TimestampNTZType to the end for compatibility with
spark3.3 and below
+ return new TimestampType();
}
throw new UnsupportedOperationException(
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
new file mode 100644
index 000000000..8e70432bd
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.util.shim
+
+object TypeUtils {
+
+ def treatPaimonTimestampTypeAsSparkTimestampType(): Boolean = false
+}
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java
index 14080cd8a..9af886d83 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java
@@ -37,10 +37,12 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.AbstractMap;
import java.util.Map;
+import java.util.TimeZone;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import scala.Function1;
+import scala.collection.JavaConverters;
import static org.apache.paimon.data.BinaryString.fromString;
import static org.apache.paimon.spark.SparkTypeTest.ALL_TYPES;
@@ -51,6 +53,8 @@ public class SparkInternalRowTest {
@Test
public void test() {
+ TimeZone tz = TimeZone.getDefault();
+ TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"));
InternalRow rowData =
GenericRow.of(
1,
@@ -77,6 +81,7 @@ public class SparkInternalRowTest {
23567222L,
"varbinary_v".getBytes(StandardCharsets.UTF_8),
Timestamp.fromLocalDateTime(LocalDateTime.parse("2007-12-03T10:15:30")),
+
Timestamp.fromLocalDateTime(LocalDateTime.parse("2007-12-03T10:15:30")),
DateTimeUtils.toInternal(LocalDate.parse("2022-05-02")),
Decimal.fromBigDecimal(BigDecimal.valueOf(0.21), 2, 2),
Decimal.fromBigDecimal(BigDecimal.valueOf(65782123123.01), 38, 2),
@@ -93,32 +98,40 @@ public class SparkInternalRowTest {
sparkConverter.apply(new
SparkInternalRow(ALL_TYPES).replace(rowData));
String expected =
- "{"
- + "\"id\":1,"
- + "\"name\":\"jingsong\","
- + "\"char\":\"apache\","
- + "\"varchar\":\"paimon\","
- + "\"salary\":22.2,"
- +
"\"locations\":{\"key1\":{\"posX\":1.2,\"posY\":2.3},\"key2\":{\"posX\":2.4,\"posY\":3.5}},"
- + "\"strArray\":[\"v1\",\"v5\"],"
- + "\"intArray\":[10,30],"
- + "\"boolean\":true,"
- + "\"tinyint\":22,"
- + "\"smallint\":356,"
- + "\"bigint\":23567222,"
- + "\"bytes\":\"dmFyYmluYXJ5X3Y=\","
- + "\"timestamp\":\"2007-12-03 10:15:30\","
- + "\"date\":\"2022-05-02\","
- + "\"decimal\":0.21,"
- + "\"decimal2\":65782123123.01,"
- + "\"decimal3\":62123123.5"
- + "}";
- assertThat(sparkRow.json()).isEqualTo(expected);
+ "1,"
+ + "jingsong,"
+ + "apache,"
+ + "paimon,"
+ + "22.2,"
+ + "Map(key2 -> [2.4,3.5], key1 -> [1.2,2.3]),"
+ + "WrappedArray(v1, v5),"
+ + "WrappedArray(10, 30),"
+ + "true,"
+ + "22,"
+ + "356,"
+ + "23567222,"
+ + "[B@,"
+ + "2007-12-03 18:15:30.0,"
+ + "2007-12-03T10:15:30,"
+ + "2022-05-02,"
+ + "0.21,"
+ + "65782123123.01,"
+ + "62123123.5";
+ assertThat(sparkRowToString(sparkRow)).isEqualTo(expected);
SparkRow sparkRowData = new SparkRow(ALL_TYPES, sparkRow);
sparkRow =
(org.apache.spark.sql.Row)
sparkConverter.apply(new
SparkInternalRow(ALL_TYPES).replace(sparkRowData));
- assertThat(sparkRow.json()).isEqualTo(expected);
+ assertThat(sparkRowToString(sparkRow)).isEqualTo(expected);
+ TimeZone.setDefault(tz);
+ }
+
+ private String sparkRowToString(org.apache.spark.sql.Row row) {
+ return JavaConverters.seqAsJavaList(row.toSeq()).stream()
+ .map(Object::toString)
+ // Since the toString result of Spark's binary col is
unstable, replace it
+ .map(x -> x.startsWith("[B@") ? "[B@" : x)
+ .collect(Collectors.joining(","));
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
index 84f509d76..fdc7558fd 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
@@ -65,7 +65,8 @@ public class SparkTypeTest {
.field("smallint", DataTypes.SMALLINT())
.field("bigint", DataTypes.BIGINT())
.field("bytes", DataTypes.BYTES())
- .field("timestamp", DataTypes.TIMESTAMP())
+ .field("timestamp",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+ .field("timestamp_ntz", DataTypes.TIMESTAMP())
.field("date", DataTypes.DATE())
.field("decimal", DataTypes.DECIMAL(2, 2))
.field("decimal2", DataTypes.DECIMAL(38, 2))
@@ -95,6 +96,7 @@ public class SparkTypeTest {
+ "StructField(bigint,LongType,true),"
+ "StructField(bytes,BinaryType,true),"
+ "StructField(timestamp,TimestampType,true),"
+ + "StructField(timestamp_ntz,TimestampNTZType,true),"
+ "StructField(date,DateType,true),"
+ "StructField(decimal,DecimalType(2,2),true),"
+ "StructField(decimal2,DecimalType(38,2),true),"
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index d431a754d..b8115132d 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -36,6 +36,7 @@ import org.scalactic.source.Position
import org.scalatest.Tag
import java.io.File
+import java.util.TimeZone
import scala.util.Random
@@ -101,6 +102,18 @@ class PaimonSparkTestBase
withTempDir(file1 => withTempDir(file2 => f(file1, file2)))
}
+ protected def withTimeZone(timeZone: String)(f: => Unit): Unit = {
+ withSQLConf("spark.sql.session.timeZone" -> timeZone) {
+ val originTimeZone = TimeZone.getDefault
+ try {
+ TimeZone.setDefault(TimeZone.getTimeZone(timeZone))
+ f
+ } finally {
+ TimeZone.setDefault(originTimeZone)
+ }
+ }
+ }
+
override def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
pos: Position): Unit = {
println(testName)
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
index 80118a1b6..da4017104 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
@@ -18,11 +18,17 @@
package org.apache.paimon.spark.sql
+import org.apache.paimon.catalog.Identifier
+import org.apache.paimon.schema.Schema
import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.types.DataTypes
import org.apache.spark.sql.Row
import org.junit.jupiter.api.Assertions
+import java.sql.Timestamp
+import java.time.LocalDateTime
+
abstract class DDLTestBase extends PaimonSparkTestBase {
import testImplicits._
@@ -181,4 +187,225 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
}
}
}
+
+ test("Paimon DDL: create table with timestamp/timestamp_ntz") {
+ Seq("orc", "parquet", "avro").foreach {
+ format =>
+ Seq(true, false).foreach {
+ datetimeJava8APIEnabled =>
+ withSQLConf("spark.sql.datetime.java8API.enabled" ->
datetimeJava8APIEnabled.toString) {
+ withTimeZone("Asia/Shanghai") {
+ withTable("paimon_tbl") {
+ // Spark support create table with timestamp_ntz since 3.4
+ if (gteqSpark3_4) {
+ sql(s"""
+ |CREATE TABLE paimon_tbl (id int, binary BINARY, ts
timestamp, ts_ntz timestamp_ntz)
+ |USING paimon
+ |TBLPROPERTIES ('file.format'='$format')
+ |""".stripMargin)
+
+ sql(s"INSERT INTO paimon_tbl VALUES (1, binary('b'),
timestamp'2024-01-01 00:00:00', timestamp_ntz'2024-01-01 00:00:00')")
+ checkAnswer(
+ sql(s"SELECT ts, ts_ntz FROM paimon_tbl"),
+ Row(
+ if (datetimeJava8APIEnabled)
+ Timestamp.valueOf("2024-01-01 00:00:00").toInstant
+ else Timestamp.valueOf("2024-01-01 00:00:00"),
+ LocalDateTime.parse("2024-01-01T00:00:00")
+ )
+ )
+
+ // change time zone to UTC
+ withTimeZone("UTC") {
+ // todo: fix with orc
+ if (format != "orc")
+ checkAnswer(
+ sql(s"SELECT ts, ts_ntz FROM paimon_tbl"),
+ Row(
+ if (datetimeJava8APIEnabled)
+ Timestamp.valueOf("2023-12-31
16:00:00").toInstant
+ else Timestamp.valueOf("2023-12-31 16:00:00"),
+ LocalDateTime.parse("2024-01-01T00:00:00")
+ )
+ )
+ }
+ } else {
+ sql(s"""
+ |CREATE TABLE paimon_tbl (id int, binary BINARY, ts
timestamp)
+ |USING paimon
+ |TBLPROPERTIES ('file.format'='$format')
+ |""".stripMargin)
+
+ sql(s"INSERT INTO paimon_tbl VALUES (1, binary('b'),
timestamp'2024-01-01 00:00:00')")
+ checkAnswer(
+ sql(s"SELECT ts FROM paimon_tbl"),
+ Row(
+ if (datetimeJava8APIEnabled)
+ Timestamp.valueOf("2024-01-01 00:00:00").toInstant
+ else Timestamp.valueOf("2024-01-01 00:00:00"))
+ )
+
+ // For Spark 3.3 and below, time zone conversion is not
supported,
+ // see
TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType
+ withTimeZone("UTC") {
+ // todo: fix with orc
+ if (format != "orc") {
+ checkAnswer(
+ sql(s"SELECT ts FROM paimon_tbl"),
+ Row(
+ if (datetimeJava8APIEnabled)
+ Timestamp.valueOf("2024-01-01
00:00:00").toInstant
+ else Timestamp.valueOf("2024-01-01 00:00:00"))
+ )
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ test("Paimon DDL: create table with timestamp/timestamp_ntz using table
API") {
+ val identifier = Identifier.create("test", "paimon_tbl")
+ try {
+ withTimeZone("Asia/Shanghai") {
+ val schema = Schema.newBuilder
+ .column("ts", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+ .column("ts_ntz", DataTypes.TIMESTAMP())
+ .build
+ catalog.createTable(identifier, schema, false)
+ sql(
+ s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-01 00:00:00',
timestamp_ntz'2024-01-01 00:00:00')")
+
+ // read by spark
+ checkAnswer(
+ sql(s"SELECT ts, ts_ntz FROM paimon_tbl"),
+ Row(
+ Timestamp.valueOf("2024-01-01 00:00:00"),
+ if (gteqSpark3_4) LocalDateTime.parse("2024-01-01T00:00:00")
+ else Timestamp.valueOf("2024-01-01 00:00:00")
+ )
+ )
+
+ // read by table api
+ // Due to previous design, read timestamp ltz type with spark 3.3 and
below will cause problems,
+ // skip testing it
+ if (gteqSpark3_4) {
+ val table = catalog.getTable(identifier)
+ val builder = table.newReadBuilder.withProjection(Array[Int](0, 1))
+ val splits = builder.newScan().plan().splits()
+ builder.newRead
+ .createReader(splits)
+ .forEachRemaining(
+ r => {
+ Assertions.assertEquals(
+ Timestamp.valueOf("2023-12-31 16:00:00"),
+ r.getTimestamp(0, 6).toSQLTimestamp)
+ Assertions.assertEquals(
+ Timestamp.valueOf("2024-01-01 00:00:00").toLocalDateTime,
+ r.getTimestamp(1, 6).toLocalDateTime)
+ })
+ }
+
+ // change time zone to UTC
+ withTimeZone("UTC") {
+ // read by spark
+ checkAnswer(
+ sql(s"SELECT ts, ts_ntz FROM paimon_tbl"),
+ Row(
+ // For Spark 3.3 and below, time zone conversion is not
supported,
+ // see TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType
+ if (gteqSpark3_4) Timestamp.valueOf("2023-12-31 16:00:00")
+ else Timestamp.valueOf("2024-01-01 00:00:00"),
+ if (gteqSpark3_4) LocalDateTime.parse("2024-01-01T00:00:00")
+ else Timestamp.valueOf("2024-01-01 00:00:00")
+ )
+ )
+
+ // read by table api
+ // Due to previous design, read timestamp ltz type with spark 3.3
and below will cause problems,
+ // skip testing it
+ if (gteqSpark3_4) {
+ val table = catalog.getTable(identifier)
+ val builder = table.newReadBuilder.withProjection(Array[Int](0, 1))
+ val splits = builder.newScan().plan().splits()
+ builder.newRead
+ .createReader(splits)
+ .forEachRemaining(
+ r => {
+ Assertions.assertEquals(
+ Timestamp.valueOf("2023-12-31 16:00:00"),
+ r.getTimestamp(0, 6).toSQLTimestamp)
+ Assertions.assertEquals(
+ Timestamp.valueOf("2024-01-01 00:00:00").toLocalDateTime,
+ r.getTimestamp(1, 6).toLocalDateTime)
+ })
+ }
+ }
+ }
+ } finally {
+ catalog.dropTable(identifier, true)
+ }
+ }
+
+ test("Paimon DDL: select table with timestamp and timestamp_ntz with
filter") {
+ Seq(true, false).foreach {
+ datetimeJava8APIEnabled =>
+ withSQLConf("spark.sql.datetime.java8API.enabled" ->
datetimeJava8APIEnabled.toString) {
+ withTable("paimon_tbl") {
+ // Spark support create table with timestamp_ntz since 3.4
+ if (gteqSpark3_4) {
+ sql(s"""
+ |CREATE TABLE paimon_tbl (ts timestamp, ts_ntz
timestamp_ntz)
+ |USING paimon
+ |""".stripMargin)
+ sql(
+ s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-01
00:00:00', timestamp_ntz'2024-01-01 00:00:00')")
+ sql(
+ s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-02
00:00:00', timestamp_ntz'2024-01-02 00:00:00')")
+ sql(
+ s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-03
00:00:00', timestamp_ntz'2024-01-03 00:00:00')")
+
+ checkAnswer(
+ sql(s"SELECT * FROM paimon_tbl where ts_ntz =
timestamp_ntz'2024-01-01 00:00:00'"),
+ Row(
+ if (datetimeJava8APIEnabled)
+ Timestamp.valueOf("2024-01-01 00:00:00").toInstant
+ else Timestamp.valueOf("2024-01-01 00:00:00"),
+ LocalDateTime.parse("2024-01-01T00:00:00")
+ )
+ )
+
+ checkAnswer(
+ sql(s"SELECT * FROM paimon_tbl where ts > timestamp'2024-01-02
00:00:00'"),
+ Row(
+ if (datetimeJava8APIEnabled)
+ Timestamp.valueOf("2024-01-03 00:00:00").toInstant
+ else Timestamp.valueOf("2024-01-03 00:00:00"),
+ LocalDateTime.parse("2024-01-03T00:00:00")
+ )
+ )
+ } else {
+ sql(s"""
+ |CREATE TABLE paimon_tbl (ts timestamp)
+ |USING paimon
+ |""".stripMargin)
+ sql(s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-01
00:00:00')")
+ sql(s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-02
00:00:00')")
+ sql(s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-03
00:00:00')")
+
+ checkAnswer(
+ sql(s"SELECT * FROM paimon_tbl where ts = timestamp'2024-01-01
00:00:00'"),
+ Row(
+ if (datetimeJava8APIEnabled)
+ Timestamp.valueOf("2024-01-01 00:00:00").toInstant
+ else Timestamp.valueOf("2024-01-01 00:00:00"))
+ )
+ }
+ }
+ }
+ }
+ }
}