This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b3e13513ac [Improve][LocalFile] parquet use system timezone (#5605)
b3e13513ac is described below
commit b3e13513ac292589add64c4490e93c22c2d1d1cb
Author: Jarvis <[email protected]>
AuthorDate: Thu Oct 26 14:04:12 2023 +0800
[Improve][LocalFile] parquet use system timezone (#5605)
---------
Co-authored-by: Jia Fan <[email protected]>
---
.../file/sink/writer/ParquetWriteStrategy.java | 7 +++--
.../file/source/reader/ParquetReadStrategy.java | 2 +-
.../file/writer/ParquetReadStrategyTest.java | 33 ++++++++++++++++++++++
3 files changed, 39 insertions(+), 3 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
index 79db210616..88ad368df8 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
@@ -55,7 +55,7 @@ import lombok.NonNull;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.LocalDateTime;
-import java.time.ZoneOffset;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
@@ -189,7 +189,10 @@ public class ParquetWriteStrategy extends
AbstractWriteStrategy {
case DATE:
return data;
case TIMESTAMP:
- return ((LocalDateTime)
data).toInstant(ZoneOffset.of("+8")).toEpochMilli();
+ return ((LocalDateTime) data)
+ .atZone(ZoneId.systemDefault())
+ .toInstant()
+ .toEpochMilli();
case BYTES:
return ByteBuffer.wrap((byte[]) data);
case ROW:
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index 424e181f8d..c6ff89bb5e 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -203,7 +203,7 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
return new Timestamp(timestamp).toLocalDateTime();
}
Instant instant = Instant.ofEpochMilli((long) field);
- return LocalDateTime.ofInstant(instant, ZoneId.of("+8"));
+ return LocalDateTime.ofInstant(instant,
ZoneId.systemDefault());
case ROW:
SeaTunnelRowType rowType = (SeaTunnelRowType) fieldType;
Object[] objects = new Object[rowType.getTotalFields()];
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
index 82e0bac741..0a3bee6282 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
@@ -32,8 +32,10 @@ import org.junit.jupiter.api.Test;
import java.io.File;
import java.net.URL;
import java.nio.file.Paths;
+import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
+import java.util.TimeZone;
import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
@@ -70,6 +72,37 @@ public class ParquetReadStrategyTest {
parquetReadStrategy.read(path, testCollector);
}
+ @Test
+ public void testParquetReadUseSystemDefaultTimeZone() throws Exception {
+ URL resource =
ParquetReadStrategyTest.class.getResource("/timestamp_as_int64.parquet");
+ Assertions.assertNotNull(resource);
+ String path = Paths.get(resource.toURI()).toString();
+ ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
+ LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ parquetReadStrategy.init(localConf);
+ SeaTunnelRowType seaTunnelRowTypeInfo =
+ parquetReadStrategy.getSeaTunnelRowTypeInfo(localConf, path);
+ Assertions.assertNotNull(seaTunnelRowTypeInfo);
+ System.out.println(seaTunnelRowTypeInfo);
+ int index = seaTunnelRowTypeInfo.indexOf("c_timestamp");
+ TimeZone tz1 = TimeZone.getTimeZone("Asia/Shanghai");
+ TimeZone.setDefault(tz1);
+ TestCollector testCollector = new TestCollector();
+ parquetReadStrategy.read(path, testCollector);
+ LocalDateTime time1 = (LocalDateTime)
testCollector.getRows().get(0).getField(index);
+
+ TimeZone tz2 = TimeZone.getTimeZone("UTC");
+ TimeZone.setDefault(tz2);
+ TestCollector testCollector2 = new TestCollector();
+ parquetReadStrategy.read(path, testCollector2);
+ LocalDateTime time2 = (LocalDateTime)
testCollector2.getRows().get(0).getField(index);
+
+ Assertions.assertTrue(time1.isAfter(time2));
+ Assertions.assertEquals(
+
time1.atZone(tz1.toZoneId()).withZoneSameInstant(tz2.toZoneId()).toLocalDateTime(),
+ time2);
+ }
+
@Test
public void testParquetReadProjection1() throws Exception {
URL resource =
ParquetReadStrategyTest.class.getResource("/timestamp_as_int96.parquet");