This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d3edf1e58 [spark] Fix spark query with filter timestamp (#3730)
d3edf1e58 is described below
commit d3edf1e581d28cfec800a65c70326234b594037e
Author: xuzifu666 <[email protected]>
AuthorDate: Fri Jul 12 10:09:58 2024 +0800
[spark] Fix spark query with filter timestamp (#3730)
---
.../org/apache/paimon/predicate/PredicateBuilder.java | 15 ++++++++++++++-
.../org/apache/paimon/spark/SparkFilterConverterTest.java | 4 ++--
2 files changed, 16 insertions(+), 3 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
index cf85cc3b4..71f02e9b6 100644
---
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
+++
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
@@ -36,6 +36,7 @@ import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
@@ -282,8 +283,20 @@ public class PredicateBuilder {
int scale = decimalType.getScale();
return Decimal.fromBigDecimal((BigDecimal) o, precision,
scale);
case TIMESTAMP_WITHOUT_TIME_ZONE:
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
Timestamp timestamp;
+ if (o instanceof java.sql.Timestamp) {
+ timestamp =
Timestamp.fromSQLTimestamp((java.sql.Timestamp) o);
+ } else if (o instanceof Instant) {
+ Instant o1 = (Instant) o;
+ LocalDateTime dateTime =
o1.atZone(ZoneId.systemDefault()).toLocalDateTime();
+ timestamp = Timestamp.fromLocalDateTime(dateTime);
+ } else if (o instanceof LocalDateTime) {
+ timestamp = Timestamp.fromLocalDateTime((LocalDateTime) o);
+ } else {
+ throw new UnsupportedOperationException("Unsupported
object: " + o);
+ }
+ return timestamp;
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
if (o instanceof java.sql.Timestamp) {
timestamp =
Timestamp.fromSQLTimestamp((java.sql.Timestamp) o);
} else if (o instanceof Instant) {
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java
index 79725a181..3242c696c 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java
@@ -48,7 +48,7 @@ import java.sql.Date;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
-import java.time.ZoneOffset;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -178,7 +178,7 @@ public class SparkFilterConverterTest {
java.sql.Timestamp timestamp = java.sql.Timestamp.valueOf("2018-10-18
00:00:57.907");
LocalDateTime localDateTime =
LocalDateTime.parse("2018-10-18T00:00:57.907");
- Instant instant = localDateTime.toInstant(ZoneOffset.UTC);
+ Instant instant =
localDateTime.atZone(ZoneId.systemDefault()).toInstant();
Predicate instantExpression = converter.convert(GreaterThan.apply("x",
instant));
Predicate timestampExpression =
converter.convert(GreaterThan.apply("x", timestamp));