liuyehcf opened a new issue, #5066:
URL: https://github.com/apache/paimon/issues/5066

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/paimon/issues) 
and found nothing similar.
   
   
   ### Paimon version
   
   For timestamp with local time type, the local time should be normalized to 
UTC time then written to parquet/orc file.
   
   Orc format process well as expected, but parquet not.
   
   
   
   ### Compute Engine
   
   Just Paimon SDK itself.
   
   ### Minimal reproduce step
   
   the code(0.8.2):
   
   ```java
   package org.byconity.paimon.demo;
   
   import org.apache.commons.io.FileUtils;
   import org.apache.paimon.catalog.Catalog;
   import org.apache.paimon.catalog.CatalogContext;
   import org.apache.paimon.catalog.CatalogFactory;
   import org.apache.paimon.catalog.Identifier;
   import org.apache.paimon.data.GenericRow;
   import org.apache.paimon.data.InternalRow;
   import org.apache.paimon.data.Timestamp;
   import org.apache.paimon.fs.Path;
   import org.apache.paimon.options.CatalogOptions;
   import org.apache.paimon.options.Options;
   import org.apache.paimon.reader.RecordReader;
   import org.apache.paimon.schema.Schema;
   import org.apache.paimon.table.Table;
   import org.apache.paimon.table.sink.BatchTableCommit;
   import org.apache.paimon.table.sink.BatchTableWrite;
   import org.apache.paimon.table.sink.BatchWriteBuilder;
   import org.apache.paimon.table.sink.CommitMessage;
   import org.apache.paimon.table.source.ReadBuilder;
   import org.apache.paimon.table.source.Split;
   import org.apache.paimon.table.source.TableRead;
   import org.apache.paimon.types.DataTypes;
   
   import java.io.File;
   import java.time.LocalDateTime;
   import java.time.format.DateTimeFormatter;
   import java.util.List;
   
   public class ParquetWriteTest {
   
       private static void testTimestampWithFormat(String localPath, String 
fileFormat)
               throws Exception {
           Options options = new Options();
           options.set(CatalogOptions.METASTORE, "filesystem");
           options.set(CatalogOptions.WAREHOUSE, new 
Path(localPath).toUri().toString());
           CatalogContext context = CatalogContext.create(options);
           Catalog catalog = CatalogFactory.createCatalog(context);
           String dbName = "testDb" + "_" + fileFormat;
           String tblName = "testTbl";
           catalog.createDatabase(dbName, false);
           Schema.Builder schemaBuilder = Schema.newBuilder();
           schemaBuilder.column("col_local_zoned_timestamp",
                   DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
           schemaBuilder.option("file.format", fileFormat);
           Schema schema = schemaBuilder.build();
           Identifier tableId = Identifier.create(dbName, tblName);
           catalog.createTable(tableId, schema, false);
           Table table = catalog.getTable(tableId);
           BatchWriteBuilder writeBuilder = 
table.newBatchWriteBuilder().withOverwrite();
           try (BatchTableWrite write = writeBuilder.newWrite()) {
               LocalDateTime localDateTime = LocalDateTime.parse("2024-12-12 
10:10:10.000000",
                       DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss.SSSSSS"));
               GenericRow record = 
GenericRow.of(Timestamp.fromLocalDateTime(localDateTime));
               write.write(record);
               List<CommitMessage> messages = write.prepareCommit();
               try (BatchTableCommit commit = writeBuilder.newCommit()) {
                   commit.commit(messages);
               }
           }
   
           ReadBuilder readBuilder = table.newReadBuilder();
           List<Split> splits = readBuilder.newScan().plan().splits();
           TableRead read = readBuilder.newRead();
   
           try (RecordReader<InternalRow> reader = read.createReader(splits)) {
               RecordReader.RecordIterator<InternalRow> batch;
               while ((batch = reader.readBatch()) != null) {
                   InternalRow row;
                   while ((row = batch.next()) != null) {
                       Timestamp timestamp = row.getTimestamp(0, 6);
                       System.out.println(timestamp);
                   }
                   batch.releaseBatch();
               }
           }
       }
   
       public static void main(String[] args) throws Exception {
           String localPath = "/tmp/paimon_warehouse";
           FileUtils.deleteDirectory(new File(localPath));
   
           testTimestampWithFormat(localPath, "orc");
           testTimestampWithFormat(localPath, "parquet");
       }
   }
   ```
   
   and, you can check the orc data file and parquet data file, it stores 
different UTC timestamp.
   
   Here are two py scrips for display orc and parquet file
   
   ```py
   import pyarrow.orc as orc
   import sys
   
   def read_orc_file(file_path):
       orc_file = orc.ORCFile(file_path)
       table = orc_file.read()
   
       print("Schema:")
       print(table.schema)
       print("\nData:")
       print(table.to_pandas())
   
   if __name__ == "__main__":
       if len(sys.argv) != 2:
           print("Usage: python read_orc.py <path_to_orc_file>")
           sys.exit(1)
   
       orc_file_path = sys.argv[1]
       read_orc_file(orc_file_path)
   ```
   
   
   ```py
   import pyarrow.parquet as pq
   import sys
   
   def read_parquet_file(file_path):
       parquet_file = pq.ParquetFile(file_path)
       table = parquet_file.read()
   
       print("Schema:")
       print(table.schema)
       print("\nData:")
       print(table.to_pandas())
   
   if __name__ == "__main__":
       if len(sys.argv) != 2:
           print("Usage: python read_parquet.py <path_to_parquet_file>")
           sys.exit(1)
   
       parquet_file_path = sys.argv[1]
       read_parquet_file(parquet_file_path)
   ```
   
   
   Results:
   
   ```
   python3 ~/Code/py/read_orc.py 
/tmp/paimon_warehouse/testDb_orc.db/testTbl/bucket-0/data-945a146d-9ee2-4f41-987f-0231c63b97e8-0.orc
   Schema:
   col_local_zoned_timestamp: timestamp[ns, tz=UTC]
   
   Data:
     col_local_zoned_timestamp
   0 2024-12-12 02:10:10+00:00
   ```
   
   ```
   python3 ~/Code/py/read_parquet.py 
/tmp/paimon_warehouse/testDb_parquet.db/testTbl/bucket-0/data-82e3abb6-98e9-4eb5-9a6f-126a96fc056c-0.parquet
   Schema:
   col_local_zoned_timestamp: timestamp[us, tz=UTC]
   
   Data:
     col_local_zoned_timestamp
   0 2024-12-12 10:10:10+00:00
   ```
   
   As you can see, parquet didn't normalize the local time to UTC timestamp, 
but simply treated it as UTC timestamp and write into file directly.
   
   ### What doesn't meet your expectations?
   
   the data stored in parquet is not met expectations.
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to