This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b3e13513ac [Improve][LocalFile] parquet use system timezone (#5605)
b3e13513ac is described below

commit b3e13513ac292589add64c4490e93c22c2d1d1cb
Author: Jarvis <[email protected]>
AuthorDate: Thu Oct 26 14:04:12 2023 +0800

    [Improve][LocalFile] parquet use system timezone (#5605)
    
    
    
    ---------
    
    Co-authored-by: Jia Fan <[email protected]>
---
 .../file/sink/writer/ParquetWriteStrategy.java     |  7 +++--
 .../file/source/reader/ParquetReadStrategy.java    |  2 +-
 .../file/writer/ParquetReadStrategyTest.java       | 33 ++++++++++++++++++++++
 3 files changed, 39 insertions(+), 3 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
index 79db210616..88ad368df8 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
@@ -55,7 +55,7 @@ import lombok.NonNull;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.LocalDateTime;
-import java.time.ZoneOffset;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -189,7 +189,10 @@ public class ParquetWriteStrategy extends 
AbstractWriteStrategy {
             case DATE:
                 return data;
             case TIMESTAMP:
-                return ((LocalDateTime) 
data).toInstant(ZoneOffset.of("+8")).toEpochMilli();
+                return ((LocalDateTime) data)
+                        .atZone(ZoneId.systemDefault())
+                        .toInstant()
+                        .toEpochMilli();
             case BYTES:
                 return ByteBuffer.wrap((byte[]) data);
             case ROW:
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index 424e181f8d..c6ff89bb5e 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -203,7 +203,7 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
                     return new Timestamp(timestamp).toLocalDateTime();
                 }
                 Instant instant = Instant.ofEpochMilli((long) field);
-                return LocalDateTime.ofInstant(instant, ZoneId.of("+8"));
+                return LocalDateTime.ofInstant(instant, 
ZoneId.systemDefault());
             case ROW:
                 SeaTunnelRowType rowType = (SeaTunnelRowType) fieldType;
                 Object[] objects = new Object[rowType.getTotalFields()];
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
index 82e0bac741..0a3bee6282 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
@@ -32,8 +32,10 @@ import org.junit.jupiter.api.Test;
 import java.io.File;
 import java.net.URL;
 import java.nio.file.Paths;
+import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.TimeZone;
 
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
 
@@ -70,6 +72,37 @@ public class ParquetReadStrategyTest {
         parquetReadStrategy.read(path, testCollector);
     }
 
+    @Test
+    public void testParquetReadUseSystemDefaultTimeZone() throws Exception {
+        URL resource = 
ParquetReadStrategyTest.class.getResource("/timestamp_as_int64.parquet");
+        Assertions.assertNotNull(resource);
+        String path = Paths.get(resource.toURI()).toString();
+        ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
+        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        parquetReadStrategy.init(localConf);
+        SeaTunnelRowType seaTunnelRowTypeInfo =
+                parquetReadStrategy.getSeaTunnelRowTypeInfo(localConf, path);
+        Assertions.assertNotNull(seaTunnelRowTypeInfo);
+        System.out.println(seaTunnelRowTypeInfo);
+        int index = seaTunnelRowTypeInfo.indexOf("c_timestamp");
+        TimeZone tz1 = TimeZone.getTimeZone("Asia/Shanghai");
+        TimeZone.setDefault(tz1);
+        TestCollector testCollector = new TestCollector();
+        parquetReadStrategy.read(path, testCollector);
+        LocalDateTime time1 = (LocalDateTime) 
testCollector.getRows().get(0).getField(index);
+
+        TimeZone tz2 = TimeZone.getTimeZone("UTC");
+        TimeZone.setDefault(tz2);
+        TestCollector testCollector2 = new TestCollector();
+        parquetReadStrategy.read(path, testCollector2);
+        LocalDateTime time2 = (LocalDateTime) 
testCollector2.getRows().get(0).getField(index);
+
+        Assertions.assertTrue(time1.isAfter(time2));
+        Assertions.assertEquals(
+                
time1.atZone(tz1.toZoneId()).withZoneSameInstant(tz2.toZoneId()).toLocalDateTime(),
+                time2);
+    }
+
     @Test
     public void testParquetReadProjection1() throws Exception {
         URL resource = 
ParquetReadStrategyTest.class.getResource("/timestamp_as_int96.parquet");

Reply via email to