This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a5c9111613 [core] Fix the timezone conversion for timestamp_ltz 
data_type in OrcFileFormat (#5082)
a5c9111613 is described below

commit a5c911161360d7cee6b3986bf6e860a0ec8a48fc
Author: xiangyu0xf <[email protected]>
AuthorDate: Mon Feb 17 12:15:35 2025 +0800

    [core] Fix the timezone conversion for timestamp_ltz data_type in 
OrcFileFormat (#5082)
---
 .../shortcodes/generated/orc_configuration.html    |   6 +
 .../java/org/apache/paimon/format/OrcOptions.java  |   7 +
 .../apache/paimon/format/orc/OrcFileFormat.java    |  12 +-
 .../apache/paimon/format/orc/OrcReaderFactory.java |  10 +-
 .../format/orc/filter/OrcSimpleStatsExtractor.java |  23 ++-
 .../format/orc/reader/AbstractOrcColumnVector.java |  10 +-
 .../orc/reader/OrcTimestampColumnVector.java       |  19 ++-
 .../format/orc/writer/FieldWriterFactory.java      |  26 ++-
 .../format/orc/writer/RowDataVectorizer.java       |  10 +-
 .../paimon/format/orc/OrcFormatReadWriteTest.java  | 182 ++++++++++++++++++++-
 .../paimon/format/orc/OrcReaderFactoryTest.java    |   1 +
 11 files changed, 286 insertions(+), 20 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/orc_configuration.html 
b/docs/layouts/shortcodes/generated/orc_configuration.html
index 390b26359e..fab33e5963 100644
--- a/docs/layouts/shortcodes/generated/orc_configuration.html
+++ b/docs/layouts/shortcodes/generated/orc_configuration.html
@@ -38,5 +38,11 @@ under the License.
             <td>Double</td>
             <td>If the number of distinct keys in a dictionary is greater than 
this fraction of the total number of non-null rows, turn off dictionary 
encoding in orc. Use 0 to always disable dictionary encoding. Use 1 to always 
use dictionary encoding.</td>
         </tr>
+        <tr>
+            <td><h5>orc.timestamp-ltz.legacy.type</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>This option is used to be compatible with the paimon-orc‘s old 
behavior for the `timestamp_ltz` data type.</td>
+        </tr>
     </tbody>
 </table>
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/OrcOptions.java 
b/paimon-format/src/main/java/org/apache/paimon/format/OrcOptions.java
index dad2628679..e102543d6b 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/OrcOptions.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/OrcOptions.java
@@ -41,4 +41,11 @@ public class OrcOptions {
                                     + "fraction of the total number of 
non-null rows, turn off "
                                     + "dictionary encoding in orc. Use 0 to 
always disable dictionary encoding. "
                                     + "Use 1 to always use dictionary 
encoding.");
+
+    public static final ConfigOption<Boolean> ORC_TIMESTAMP_LTZ_LEGACY_TYPE =
+            key("orc.timestamp-ltz.legacy.type")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "This option is used to be compatible with the 
paimon-orc‘s old behavior for the `timestamp_ltz` data type.");
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
index 9acea56ab3..be257aef4b 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
@@ -56,6 +56,7 @@ import java.util.Properties;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
+import static 
org.apache.paimon.format.OrcOptions.ORC_TIMESTAMP_LTZ_LEGACY_TYPE;
 import static org.apache.paimon.types.DataTypeChecks.getFieldTypes;
 
 /** Orc {@link FileFormat}. */
@@ -70,6 +71,7 @@ public class OrcFileFormat extends FileFormat {
     private final int readBatchSize;
     private final int writeBatchSize;
     private final boolean deletionVectorsEnabled;
+    private final boolean legacyTimestampLtzType;
 
     public OrcFileFormat(FormatContext formatContext) {
         super(IDENTIFIER);
@@ -81,6 +83,7 @@ public class OrcFileFormat extends FileFormat {
         this.readBatchSize = formatContext.readBatchSize();
         this.writeBatchSize = formatContext.writeBatchSize();
         this.deletionVectorsEnabled = 
formatContext.options().get(DELETION_VECTORS_ENABLED);
+        this.legacyTimestampLtzType = 
formatContext.options().get(ORC_TIMESTAMP_LTZ_LEGACY_TYPE);
     }
 
     @VisibleForTesting
@@ -96,7 +99,8 @@ public class OrcFileFormat extends FileFormat {
     @Override
     public Optional<SimpleStatsExtractor> createStatsExtractor(
             RowType type, SimpleColStatsCollector.Factory[] statsCollectors) {
-        return Optional.of(new OrcSimpleStatsExtractor(type, statsCollectors));
+        return Optional.of(
+                new OrcSimpleStatsExtractor(type, statsCollectors, 
legacyTimestampLtzType));
     }
 
     @Override
@@ -116,7 +120,8 @@ public class OrcFileFormat extends FileFormat {
                 (RowType) refineDataType(projectedRowType),
                 orcPredicates,
                 readBatchSize,
-                deletionVectorsEnabled);
+                deletionVectorsEnabled,
+                legacyTimestampLtzType);
     }
 
     @Override
@@ -141,7 +146,8 @@ public class OrcFileFormat extends FileFormat {
         DataType[] orcTypes = getFieldTypes(refinedType).toArray(new 
DataType[0]);
 
         TypeDescription typeDescription = 
OrcTypeUtil.convertToOrcSchema((RowType) refinedType);
-        Vectorizer<InternalRow> vectorizer = new 
RowDataVectorizer(typeDescription, orcTypes);
+        Vectorizer<InternalRow> vectorizer =
+                new RowDataVectorizer(typeDescription, orcTypes, 
legacyTimestampLtzType);
 
         return new OrcWriterFactory(vectorizer, orcProperties, writerConf, 
writeBatchSize);
     }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index fad4c7e36a..a94eaf2356 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -69,6 +69,7 @@ public class OrcReaderFactory implements FormatReaderFactory {
     protected final List<OrcFilters.Predicate> conjunctPredicates;
     protected final int batchSize;
     protected final boolean deletionVectorsEnabled;
+    protected final boolean legacyTimestampLtzType;
 
     /**
      * @param hadoopConfig the hadoop config for orc reader.
@@ -80,13 +81,15 @@ public class OrcReaderFactory implements 
FormatReaderFactory {
             final RowType readType,
             final List<OrcFilters.Predicate> conjunctPredicates,
             final int batchSize,
-            final boolean deletionVectorsEnabled) {
+            final boolean deletionVectorsEnabled,
+            final boolean legacyTimestampLtzType) {
         this.hadoopConfig = checkNotNull(hadoopConfig);
         this.schema = convertToOrcSchema(readType);
         this.tableType = readType;
         this.conjunctPredicates = checkNotNull(conjunctPredicates);
         this.batchSize = batchSize;
         this.deletionVectorsEnabled = deletionVectorsEnabled;
+        this.legacyTimestampLtzType = legacyTimestampLtzType;
     }
 
     // ------------------------------------------------------------------------
@@ -131,7 +134,10 @@ public class OrcReaderFactory implements 
FormatReaderFactory {
             DataType type = tableFieldTypes.get(i);
             vectors[i] =
                     createPaimonVector(
-                            orcBatch.cols[tableFieldNames.indexOf(name)], 
orcBatch, type);
+                            orcBatch.cols[tableFieldNames.indexOf(name)],
+                            orcBatch,
+                            type,
+                            legacyTimestampLtzType);
         }
         return new OrcReaderBatch(filePath, orcBatch, new 
VectorizedColumnBatch(vectors), recycler);
     }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcSimpleStatsExtractor.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcSimpleStatsExtractor.java
index c6772c4849..c0b9b6f59b 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcSimpleStatsExtractor.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcSimpleStatsExtractor.java
@@ -56,11 +56,15 @@ public class OrcSimpleStatsExtractor implements 
SimpleStatsExtractor {
 
     private final RowType rowType;
     private final SimpleColStatsCollector.Factory[] statsCollectors;
+    private final boolean legacyTimestampLtzType;
 
     public OrcSimpleStatsExtractor(
-            RowType rowType, SimpleColStatsCollector.Factory[] 
statsCollectors) {
+            RowType rowType,
+            SimpleColStatsCollector.Factory[] statsCollectors,
+            boolean legacyTimestampLtzType) {
         this.rowType = rowType;
         this.statsCollectors = statsCollectors;
+        this.legacyTimestampLtzType = legacyTimestampLtzType;
         Preconditions.checkArgument(
                 rowType.getFieldCount() == statsCollectors.length,
                 "The stats collector is not aligned to write schema.");
@@ -228,7 +232,6 @@ public class OrcSimpleStatsExtractor implements 
SimpleStatsExtractor {
                                 nullCount);
                 break;
             case TIMESTAMP_WITHOUT_TIME_ZONE:
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                 assertStatsClass(field, stats, 
TimestampColumnStatistics.class);
                 TimestampColumnStatistics timestampStats = 
(TimestampColumnStatistics) stats;
                 fieldStats =
@@ -237,6 +240,22 @@ public class OrcSimpleStatsExtractor implements 
SimpleStatsExtractor {
                                 
Timestamp.fromSQLTimestamp(timestampStats.getMaximum()),
                                 nullCount);
                 break;
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                assertStatsClass(field, stats, 
TimestampColumnStatistics.class);
+                TimestampColumnStatistics timestampLtzStats = 
(TimestampColumnStatistics) stats;
+                fieldStats =
+                        legacyTimestampLtzType
+                                ? new SimpleColStats(
+                                        
Timestamp.fromSQLTimestamp(timestampLtzStats.getMinimum()),
+                                        
Timestamp.fromSQLTimestamp(timestampLtzStats.getMaximum()),
+                                        nullCount)
+                                : new SimpleColStats(
+                                        Timestamp.fromInstant(
+                                                
timestampLtzStats.getMinimum().toInstant()),
+                                        Timestamp.fromInstant(
+                                                
timestampLtzStats.getMaximum().toInstant()),
+                                        nullCount);
+                break;
             default:
                 fieldStats = new SimpleColStats(null, null, nullCount);
         }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java
index 0557a72230..93ae8a2aea 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java
@@ -62,6 +62,14 @@ public abstract class AbstractOrcColumnVector
 
     public static org.apache.paimon.data.columnar.ColumnVector 
createPaimonVector(
             ColumnVector vector, VectorizedRowBatch orcBatch, DataType 
dataType) {
+        return createPaimonVector(vector, orcBatch, dataType, false);
+    }
+
+    public static org.apache.paimon.data.columnar.ColumnVector 
createPaimonVector(
+            ColumnVector vector,
+            VectorizedRowBatch orcBatch,
+            DataType dataType,
+            boolean legacyTimestampLtzType) {
         if (vector instanceof LongColumnVector) {
             if (dataType.getTypeRoot() == 
DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
                 return new OrcLegacyTimestampColumnVector((LongColumnVector) 
vector, orcBatch);
@@ -75,7 +83,7 @@ public abstract class AbstractOrcColumnVector
         } else if (vector instanceof DecimalColumnVector) {
             return new OrcDecimalColumnVector((DecimalColumnVector) vector, 
orcBatch);
         } else if (vector instanceof TimestampColumnVector) {
-            return new OrcTimestampColumnVector(vector, orcBatch);
+            return new OrcTimestampColumnVector(vector, orcBatch, dataType, 
legacyTimestampLtzType);
         } else if (vector instanceof ListColumnVector) {
             return new OrcArrayColumnVector(
                     (ListColumnVector) vector, orcBatch, (ArrayType) dataType);
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java
index a6e71d6016..4a36619594 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java
@@ -19,6 +19,8 @@
 package org.apache.paimon.format.orc.reader;
 
 import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.TimestampType;
 import org.apache.paimon.utils.DateTimeUtils;
 
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
@@ -31,17 +33,28 @@ import 
org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
  */
 public class OrcTimestampColumnVector extends AbstractOrcColumnVector
         implements org.apache.paimon.data.columnar.TimestampColumnVector {
-
+    private final Boolean legacyTimestampLtzType;
+    private final DataType dataType;
     private final TimestampColumnVector vector;
 
-    public OrcTimestampColumnVector(ColumnVector vector, VectorizedRowBatch 
orcBatch) {
+    public OrcTimestampColumnVector(
+            ColumnVector vector,
+            VectorizedRowBatch orcBatch,
+            DataType dataType,
+            boolean legacyTimestampLtzType) {
         super(vector, orcBatch);
         this.vector = (TimestampColumnVector) vector;
+        this.dataType = dataType;
+        this.legacyTimestampLtzType = legacyTimestampLtzType;
     }
 
     @Override
     public Timestamp getTimestamp(int i, int precision) {
         i = rowMapper(i);
-        return DateTimeUtils.toInternal(vector.time[i], vector.nanos[i] % 
1_000_000);
+        if (dataType instanceof TimestampType || legacyTimestampLtzType) {
+            return DateTimeUtils.toInternal(vector.time[i], vector.nanos[i] % 
1_000_000);
+        } else {
+            return Timestamp.fromEpochMillis(vector.time[i], vector.nanos[i] % 
1_000_000);
+        }
     }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
index 26b008ab1c..61f54c7864 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.LocalZoneTimestamp;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.BinaryType;
@@ -63,8 +64,6 @@ import java.util.stream.Collectors;
 /** Factory to create {@link FieldWriter}. */
 public class FieldWriterFactory implements DataTypeVisitor<FieldWriter> {
 
-    public static final FieldWriterFactory WRITER_FACTORY = new 
FieldWriterFactory();
-
     private static final FieldWriter STRING_WRITER =
             (rowId, column, getters, columnId) -> {
                 BytesColumnVector vector = (BytesColumnVector) column;
@@ -108,6 +107,12 @@ public class FieldWriterFactory implements 
DataTypeVisitor<FieldWriter> {
             (rowId, column, getters, columnId) ->
                     ((DoubleColumnVector) column).vector[rowId] = 
getters.getDouble(columnId);
 
+    private final boolean legacyTimestampLtzType;
+
+    public FieldWriterFactory(boolean legacyTimestampLtzType) {
+        this.legacyTimestampLtzType = legacyTimestampLtzType;
+    }
+
     @Override
     public FieldWriter visit(CharType charType) {
         return STRING_WRITER;
@@ -186,9 +191,20 @@ public class FieldWriterFactory implements 
DataTypeVisitor<FieldWriter> {
     @Override
     public FieldWriter visit(LocalZonedTimestampType localZonedTimestampType) {
         return (rowId, column, getters, columnId) -> {
-            Timestamp timestamp =
-                    getters.getTimestamp(columnId, 
localZonedTimestampType.getPrecision())
-                            .toSQLTimestamp();
+            org.apache.paimon.data.Timestamp localTimestamp =
+                    getters.getTimestamp(columnId, 
localZonedTimestampType.getPrecision());
+            Timestamp timestamp;
+
+            if (legacyTimestampLtzType) {
+                timestamp = localTimestamp.toSQLTimestamp();
+            } else {
+                LocalZoneTimestamp localZoneTimestamp =
+                        LocalZoneTimestamp.fromEpochMillis(
+                                localTimestamp.getMillisecond(),
+                                localTimestamp.getNanoOfMillisecond());
+                timestamp = 
java.sql.Timestamp.from(localZoneTimestamp.toInstant());
+            }
+
             TimestampColumnVector vector = (TimestampColumnVector) column;
             vector.set(rowId, timestamp);
         };
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
index 46c936a026..47c448c17f 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
@@ -29,18 +29,22 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import static 
org.apache.paimon.format.orc.writer.FieldWriterFactory.WRITER_FACTORY;
-
 /** A {@link Vectorizer} of {@link InternalRow} type element. */
 public class RowDataVectorizer extends Vectorizer<InternalRow> {
 
     private final List<FieldWriter> fieldWriters;
 
     public RowDataVectorizer(TypeDescription schema, DataType[] fieldTypes) {
+        this(schema, fieldTypes, false);
+    }
+
+    public RowDataVectorizer(
+            TypeDescription schema, DataType[] fieldTypes, boolean 
legacyTimestampLtzType) {
         super(schema);
+        FieldWriterFactory fieldWriterFactory = new 
FieldWriterFactory(legacyTimestampLtzType);
         this.fieldWriters =
                 Arrays.stream(fieldTypes)
-                        .map(t -> t.accept(WRITER_FACTORY))
+                        .map(t -> t.accept(fieldWriterFactory))
                         .collect(Collectors.toList());
     }
 
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFormatReadWriteTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFormatReadWriteTest.java
index 782d0fc116..c4c7500dbe 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFormatReadWriteTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFormatReadWriteTest.java
@@ -18,20 +18,200 @@
 
 package org.apache.paimon.format.orc;
 
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.FileFormatFactory;
 import org.apache.paimon.format.FormatReadWriteTest;
+import org.apache.paimon.format.FormatReaderContext;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.OrcOptions;
+import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TimeZone;
+
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** An orc {@link FormatReadWriteTest}. */
 public class OrcFormatReadWriteTest extends FormatReadWriteTest {
 
+    private final FileFormat legacyFormat =
+            new OrcFileFormat(
+                    new FileFormatFactory.FormatContext(
+                            new Options(
+                                    new HashMap<String, String>() {
+                                        {
+                                            put(
+                                                    
OrcOptions.ORC_TIMESTAMP_LTZ_LEGACY_TYPE.key(),
+                                                    "true");
+                                        }
+                                    }),
+                            1024,
+                            1024));
+
+    private final FileFormat newFormat =
+            new OrcFileFormat(
+                    new FileFormatFactory.FormatContext(
+                            new Options(
+                                    new HashMap<String, String>() {
+                                        {
+                                            put(
+                                                    
OrcOptions.ORC_TIMESTAMP_LTZ_LEGACY_TYPE.key(),
+                                                    "false");
+                                        }
+                                    }),
+                            1024,
+                            1024));
+
     protected OrcFormatReadWriteTest() {
         super("orc");
     }
 
+    @Test
+    public void testTimestampLTZWithLegacyWriteAndRead() throws IOException {
+        RowType rowType = 
DataTypes.ROW(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
+        InternalRowSerializer serializer = new InternalRowSerializer(rowType);
+        PositionOutputStream out = fileIO.newOutputStream(file, false);
+        FormatWriter writer = 
legacyFormat.createWriterFactory(rowType).create(out, "zstd");
+        Timestamp localTimestamp =
+                Timestamp.fromLocalDateTime(
+                        LocalDateTime.parse(
+                                "2024-12-12 10:10:10",
+                                DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss")));
+        GenericRow record = GenericRow.of(localTimestamp);
+        writer.addElement(record);
+        writer.close();
+        out.close();
+
+        RecordReader<InternalRow> reader =
+                legacyFormat
+                        .createReaderFactory(rowType)
+                        .createReader(
+                                new FormatReaderContext(fileIO, file, 
fileIO.getFileSize(file)));
+        List<InternalRow> result = new ArrayList<>();
+        reader.forEachRemaining(row -> result.add(serializer.copy(row)));
+
+        assertThat(result).containsExactly(GenericRow.of(localTimestamp));
+    }
+
+    @Test
+    public void testTimestampLTZWithNewWriteAndRead() throws IOException {
+        RowType rowType = 
DataTypes.ROW(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
+        InternalRowSerializer serializer = new InternalRowSerializer(rowType);
+        PositionOutputStream out = fileIO.newOutputStream(file, false);
+        FormatWriter writer = 
newFormat.createWriterFactory(rowType).create(out, "zstd");
+        Timestamp localTimestamp =
+                Timestamp.fromLocalDateTime(
+                        LocalDateTime.parse(
+                                "2024-12-12 10:10:10",
+                                DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss")));
+        GenericRow record = GenericRow.of(localTimestamp);
+        writer.addElement(record);
+        writer.close();
+        out.close();
+
+        RecordReader<InternalRow> reader =
+                newFormat
+                        .createReaderFactory(rowType)
+                        .createReader(
+                                new FormatReaderContext(fileIO, file, 
fileIO.getFileSize(file)));
+        List<InternalRow> result = new ArrayList<>();
+        reader.forEachRemaining(row -> result.add(serializer.copy(row)));
+
+        assertThat(result).containsExactly(GenericRow.of(localTimestamp));
+    }
+
+    @Test
+    public void testTimestampLTZWithNewWriteAndLegacyRead() throws IOException 
{
+        RowType rowType = 
DataTypes.ROW(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
+        InternalRowSerializer serializer = new InternalRowSerializer(rowType);
+        PositionOutputStream out = fileIO.newOutputStream(file, false);
+        FormatWriter writer = 
newFormat.createWriterFactory(rowType).create(out, "zstd");
+        Timestamp localTimestamp =
+                Timestamp.fromLocalDateTime(
+                        LocalDateTime.parse(
+                                "2024-12-12 10:10:10",
+                                DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss")));
+        GenericRow record = GenericRow.of(localTimestamp);
+        writer.addElement(record);
+        writer.close();
+        out.close();
+
+        RecordReader<InternalRow> reader =
+                legacyFormat
+                        .createReaderFactory(rowType)
+                        .createReader(
+                                new FormatReaderContext(fileIO, file, 
fileIO.getFileSize(file)));
+        List<InternalRow> result = new ArrayList<>();
+        reader.forEachRemaining(row -> result.add(serializer.copy(row)));
+        Timestamp shiftedTimestamp =
+                Timestamp.fromEpochMillis(
+                        localTimestamp.getMillisecond()
+                                + 
TimeZone.getDefault().getOffset(localTimestamp.getMillisecond()),
+                        localTimestamp.getNanoOfMillisecond());
+
+        assertThat(result).containsExactly(GenericRow.of(shiftedTimestamp));
+    }
+
+    @Test
+    public void testTimestampLTZWithLegacyWriteAndNewRead() throws IOException 
{
+        RowType rowType = 
DataTypes.ROW(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
+        InternalRowSerializer serializer = new InternalRowSerializer(rowType);
+        PositionOutputStream out = fileIO.newOutputStream(file, false);
+        FormatWriter writer = 
legacyFormat.createWriterFactory(rowType).create(out, "zstd");
+        Timestamp localTimestamp =
+                Timestamp.fromLocalDateTime(
+                        LocalDateTime.parse(
+                                "2024-12-12 10:10:10",
+                                DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss")));
+        GenericRow record = GenericRow.of(localTimestamp);
+        writer.addElement(record);
+        writer.close();
+        out.close();
+
+        RecordReader<InternalRow> reader =
+                newFormat
+                        .createReaderFactory(rowType)
+                        .createReader(
+                                new FormatReaderContext(fileIO, file, 
fileIO.getFileSize(file)));
+        List<InternalRow> result = new ArrayList<>();
+        reader.forEachRemaining(row -> result.add(serializer.copy(row)));
+        Timestamp shiftedTimestamp =
+                Timestamp.fromEpochMillis(
+                        localTimestamp.getMillisecond()
+                                - 
TimeZone.getDefault().getOffset(localTimestamp.getMillisecond()),
+                        localTimestamp.getNanoOfMillisecond());
+
+        assertThat(result).containsExactly(GenericRow.of(shiftedTimestamp));
+    }
+
     @Override
     protected FileFormat fileFormat() {
-        return new OrcFileFormat(new FileFormatFactory.FormatContext(new 
Options(), 1024, 1024));
+        return new OrcFileFormat(
+                new FileFormatFactory.FormatContext(
+                        new Options(
+                                new HashMap<String, String>() {
+                                    {
+                                        put(
+                                                
OrcOptions.ORC_TIMESTAMP_LTZ_LEGACY_TYPE.key(),
+                                                "false");
+                                    }
+                                }),
+                        1024,
+                        1024));
     }
 }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
index 63b391b44c..87f7c8839a 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
@@ -278,6 +278,7 @@ class OrcReaderFactoryTest {
                 Projection.of(selectedFields).project(formatType),
                 conjunctPredicates,
                 BATCH_SIZE,
+                false,
                 false);
     }
 

Reply via email to