[ 
https://issues.apache.org/jira/browse/HUDI-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

voon updated HUDI-6052:
-----------------------
    Description: 
h1. APPEND-ONLY MODE

 

When *TIMESTAMP(6)* is used for *APPEND-ONLY* pipelines with inline-clustering 
enabled, the error below will be thrown:

 

 
{code:java}
Caused by: org.apache.hudi.exception.HoodieException: unable to read next 
record from parquet file 
    at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
    at 
java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811)
    at 
java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:295)
    at 
java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:207)
    at 
java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:162)
    at 
java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:301)
    at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
    at 
org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45)
    at 
org.apache.hudi.sink.clustering.ClusteringOperator.doClustering(ClusteringOperator.java:307)
    at 
org.apache.hudi.sink.clustering.ClusteringOperator.processElement(ClusteringOperator.java:240)
    at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:524)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:758)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:951)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:930)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:744)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
at 1 in block 0 in file 
file:/var/folders/p_/09zfm5sx3v14w97hhk4vqrn8s817xt/T/junit5996224223926304717/par2/3cc78c96-2823-46fb-ab8c-7106edd55fc7-0_1-4-0_20230410162304415.parquet
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
    at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
    ... 22 more
Caused by: java.lang.UnsupportedOperationException: 
org.apache.parquet.avro.AvroConverters$FieldLongConverter
    at 
org.apache.parquet.io.api.PrimitiveConverter.addBinary(PrimitiveConverter.java:70)
    at 
org.apache.parquet.column.impl.ColumnReaderBase$2$6.writeValue(ColumnReaderBase.java:390)
    at 
org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:440)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30)
    at 
org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:229)
    ... 25 more
Process finished with exit code 255 {code}
 

 

Sample code to trigger this:

 
{code:java}
CREATE TABLE `src_table` (
  `id` INT,
  `userId` INT,
  `name` STRING,
  `timestamp_col` TIMESTAMP(6)
)
WITH (
    'connector' = 'datagen',
    'rows-per-second' = '50'
);

-- will write TIMESTAMP(6) type as INT96
CREATE TABLE `sink_table` 
(
  `id` INT,
  `userId` INT,
  `name` STRING,
  `timestamp_col` TIMESTAMP(6)
)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://path/to/table/',
  'table.type' = 'COPY_ON_WRITE',
  'write.operation' = 'insert',  
  'hoodie.datasource.write.recordkey.field' = 'id',
  'hive_sync.enable' = 'false',
  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'clustering.async.enabled' = 'true', -- enable inline clustering
  'clustering.schedule.enabled'= 'true', -- enable clustering schedule
  'clustering.delta_commits'='4', -- schedule clustering every 4 commits
  'hoodie.clustering.plan.strategy.small.file.limit'='104857600' -- only 
rewrite file smaller than 100MB
);

insert into sink_table
select 
  *
from src_table;{code}
 

After looking through the code, we realised that the same TIMESTAMP(6) type 
will be written as INT96 to parquet when AppendWriteFunction is used.

 

Snippet extracted from *parquet-tools* to show the physical type in parquet:

 
{code:java}
############ Column(timestamp_col)[row group 0] ############
name: timestamp_col
path: timestamp_col
max_definition_level: 1
max_repetition_level: 0
physical_type: INT96
logical_type: None
converted_type (legacy): NONE
compression: GZIP (space_saved: 55%)
total_compressed_size: 1102
total_uncompressed_size: 2444 {code}
 
h1. UPSERT MODE

However, if StreamWriteFunction is used, TIMESTAMP(6) types will be written as 
INT64 to parquet.

 

One can reproduce this by using the code below (changing the *write.operation* 
value to {*}update{*})

 
{code:java}
CREATE TABLE `src_table` (
  `id` INT,
  `userId` INT,
  `name` STRING,
  `timestamp_col` TIMESTAMP(6)
)
WITH (
    'connector' = 'datagen',
    'rows-per-second' = '50'
);

-- will write TIMESTAMP(6) type as INT64
CREATE TABLE `sink_table` 
(
  `id` INT,
  `userId` INT,
  `name` STRING,
  `timestamp_col` TIMESTAMP(6)
)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://path/to/table/',
  'table.type' = 'COPY_ON_WRITE',
  'write.operation' = 'update',  
  'hoodie.datasource.write.recordkey.field' = 'id',
  'hive_sync.enable' = 'false',
  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'clustering.async.enabled' = 'true', -- enable inline clustering
  'clustering.schedule.enabled'= 'true', -- enable clustering schedule
  'clustering.delta_commits'='4', -- schedule clustering every 4 commits
  'hoodie.clustering.plan.strategy.small.file.limit'='104857600' -- only 
rewrite file smaller than 100MB
);

insert into sink_table
select 
  *
from src_table; {code}
 

 

Snippet extracted from *parquet-tools* to show the physical type in parquet:
{code:java}
############ Column(timestamp_col)[row group 0] ############
name: timestamp_col
path: timestamp_col
max_definition_level: 1
max_repetition_level: 0
physical_type: INT64
logical_type: Timestamp(isAdjustedToUTC=true, timeUnit=microseconds, 
is_from_converted_type=false, force_set_converted_type=false)
converted_type (legacy): TIMESTAMP_MICROS
compression: GZIP (space_saved: 26%)
total_compressed_size: 1228
total_uncompressed_size: 1654 {code}
 

  was:
When *TIMESTAMP(6)* is used for *APPEND-ONLY* pipelines with inline-clustering 
enabled, the error below will be thrown:

 

 
{code:java}
Caused by: org.apache.hudi.exception.HoodieException: unable to read next 
record from parquet file 
    at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
    at 
java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811)
    at 
java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:295)
    at 
java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:207)
    at 
java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:162)
    at 
java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:301)
    at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
    at 
org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45)
    at 
org.apache.hudi.sink.clustering.ClusteringOperator.doClustering(ClusteringOperator.java:307)
    at 
org.apache.hudi.sink.clustering.ClusteringOperator.processElement(ClusteringOperator.java:240)
    at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:524)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:758)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:951)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:930)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:744)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
at 1 in block 0 in file 
file:/var/folders/p_/09zfm5sx3v14w97hhk4vqrn8s817xt/T/junit5996224223926304717/par2/3cc78c96-2823-46fb-ab8c-7106edd55fc7-0_1-4-0_20230410162304415.parquet
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
    at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
    ... 22 more
Caused by: java.lang.UnsupportedOperationException: 
org.apache.parquet.avro.AvroConverters$FieldLongConverter
    at 
org.apache.parquet.io.api.PrimitiveConverter.addBinary(PrimitiveConverter.java:70)
    at 
org.apache.parquet.column.impl.ColumnReaderBase$2$6.writeValue(ColumnReaderBase.java:390)
    at 
org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:440)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30)
    at 
org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:229)
    ... 25 more
Process finished with exit code 255 {code}
 

 

Sample code to trigger this:

 
{code:java}
CREATE TABLE `src_table` (
  `id` INT,
  `userId` INT,
  `name` STRING,
  `timestamp_col` TIMESTAMP(6)
)
WITH (
    'connector' = 'datagen',
    'rows-per-second' = '50'
);

-- will write TIMESTAMP(6) type as INT96
CREATE TABLE `sink_table` 
(
  `id` INT,
  `userId` INT,
  `name` STRING,
  `timestamp_col` TIMESTAMP(6)
)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://path/to/table/',
  'table.type' = 'COPY_ON_WRITE',
  'write.operation' = 'insert',  
  'hoodie.datasource.write.recordkey.field' = 'id',
  'hive_sync.enable' = 'false',
  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'clustering.async.enabled' = 'true', -- enable inline clustering
  'clustering.schedule.enabled'= 'true', -- enable clustering schedule
  'clustering.delta_commits'='4', -- schedule clustering every 4 commits
  'hoodie.clustering.plan.strategy.small.file.limit'='104857600' -- only 
rewrite file smaller than 100MB
);

insert into sink_table
select 
  *
from src_table;{code}
 

After looking through the code, we realised that the same TIMESTAMP(6) type 
will be written as INT96 to parquet when AppendWriteFunction is used.

 

Snippet extracted from *parquet-tools* to show the physical type in parquet:

 
{code:java}
############ Column(timestamp_col)[row group 0] ############
name: timestamp_col
path: timestamp_col
max_definition_level: 1
max_repetition_level: 0
physical_type: INT96
logical_type: None
converted_type (legacy): NONE
compression: GZIP (space_saved: 55%)
total_compressed_size: 1102
total_uncompressed_size: 2444 {code}
 

 

However, if StreamWriteFunction is used, TIMESTAMP(6) types will be written as 
INT64 to parquet.

 

One can reproduce this by using the code below (changing the *write.operation* 
value to {*}update{*})

 
{code:java}
CREATE TABLE `src_table` (
  `id` INT,
  `userId` INT,
  `name` STRING,
  `timestamp_col` TIMESTAMP(6)
)
WITH (
    'connector' = 'datagen',
    'rows-per-second' = '50'
);

-- will write TIMESTAMP(6) type as INT64
CREATE TABLE `sink_table` 
(
  `id` INT,
  `userId` INT,
  `name` STRING,
  `timestamp_col` TIMESTAMP(6)
)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://path/to/table/',
  'table.type' = 'COPY_ON_WRITE',
  'write.operation' = 'update',  
  'hoodie.datasource.write.recordkey.field' = 'id',
  'hive_sync.enable' = 'false',
  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'clustering.async.enabled' = 'true', -- enable inline clustering
  'clustering.schedule.enabled'= 'true', -- enable clustering schedule
  'clustering.delta_commits'='4', -- schedule clustering every 4 commits
  'hoodie.clustering.plan.strategy.small.file.limit'='104857600' -- only 
rewrite file smaller than 100MB
);

insert into sink_table
select 
  *
from src_table; {code}
 

 

Snippet extracted from *parquet-tools* to show the physical type in parquet:
{code:java}
############ Column(timestamp_col)[row group 0] ############
name: timestamp_col
path: timestamp_col
max_definition_level: 1
max_repetition_level: 0
physical_type: INT64
logical_type: Timestamp(isAdjustedToUTC=true, timeUnit=microseconds, 
is_from_converted_type=false, force_set_converted_type=false)
converted_type (legacy): TIMESTAMP_MICROS
compression: GZIP (space_saved: 26%)
total_compressed_size: 1228
total_uncompressed_size: 1654 {code}
 


> Standardise TIMESTAMP(6) format when writing to Parquet files
> -------------------------------------------------------------
>
>                 Key: HUDI-6052
>                 URL: https://issues.apache.org/jira/browse/HUDI-6052
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: voon
>            Assignee: voon
>            Priority: Major
>              Labels: pull-request-available
>
> h1. APPEND-ONLY MODE
>  
> When *TIMESTAMP(6)* is used for *APPEND-ONLY* pipelines with 
> inline-clustering enabled, the error below will be thrown:
>  
>  
> {code:java}
> Caused by: org.apache.hudi.exception.HoodieException: unable to read next 
> record from parquet file 
>     at 
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
>     at 
> java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811)
>     at 
> java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:295)
>     at 
> java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:207)
>     at 
> java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:162)
>     at 
> java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:301)
>     at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
>     at 
> org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45)
>     at 
> org.apache.hudi.sink.clustering.ClusteringOperator.doClustering(ClusteringOperator.java:307)
>     at 
> org.apache.hudi.sink.clustering.ClusteringOperator.processElement(ClusteringOperator.java:240)
>     at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:524)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:758)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:951)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:930)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:744)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>     at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
> at 1 in block 0 in file 
> file:/var/folders/p_/09zfm5sx3v14w97hhk4vqrn8s817xt/T/junit5996224223926304717/par2/3cc78c96-2823-46fb-ab8c-7106edd55fc7-0_1-4-0_20230410162304415.parquet
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
>     at 
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
>     ... 22 more
> Caused by: java.lang.UnsupportedOperationException: 
> org.apache.parquet.avro.AvroConverters$FieldLongConverter
>     at 
> org.apache.parquet.io.api.PrimitiveConverter.addBinary(PrimitiveConverter.java:70)
>     at 
> org.apache.parquet.column.impl.ColumnReaderBase$2$6.writeValue(ColumnReaderBase.java:390)
>     at 
> org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:440)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30)
>     at 
> org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:229)
>     ... 25 more
> Process finished with exit code 255 {code}
>  
>  
> Sample code to trigger this:
>  
> {code:java}
> CREATE TABLE `src_table` (
>   `id` INT,
>   `userId` INT,
>   `name` STRING,
>   `timestamp_col` TIMESTAMP(6)
> )
> WITH (
>     'connector' = 'datagen',
>     'rows-per-second' = '50'
> );
> -- will write TIMESTAMP(6) type as INT96
> CREATE TABLE `sink_table` 
> (
>   `id` INT,
>   `userId` INT,
>   `name` STRING,
>   `timestamp_col` TIMESTAMP(6)
> )
> WITH (
>   'connector' = 'hudi',
>   'path' = 'hdfs://path/to/table/',
>   'table.type' = 'COPY_ON_WRITE',
>   'write.operation' = 'insert',  
>   'hoodie.datasource.write.recordkey.field' = 'id',
>   'hive_sync.enable' = 'false',
>   'hoodie.datasource.write.hive_style_partitioning' = 'true',
>   'clustering.async.enabled' = 'true', -- enable inline clustering
>   'clustering.schedule.enabled'= 'true', -- enable clustering schedule
>   'clustering.delta_commits'='4', -- schedule clustering every 4 commits
>   'hoodie.clustering.plan.strategy.small.file.limit'='104857600' -- only 
> rewrite file smaller than 100MB
> );
> insert into sink_table
> select 
>   *
> from src_table;{code}
>  
> After looking through the code, we realised that the same TIMESTAMP(6) type 
> will be written as INT96 to parquet when AppendWriteFunction is used.
>  
> Snippet extracted from *parquet-tools* to show the physical type in parquet:
>  
> {code:java}
> ############ Column(timestamp_col)[row group 0] ############
> name: timestamp_col
> path: timestamp_col
> max_definition_level: 1
> max_repetition_level: 0
> physical_type: INT96
> logical_type: None
> converted_type (legacy): NONE
> compression: GZIP (space_saved: 55%)
> total_compressed_size: 1102
> total_uncompressed_size: 2444 {code}
>  
> h1. UPSERT MODE
> However, if StreamWriteFunction is used, TIMESTAMP(6) types will be written 
> as INT64 to parquet.
>  
> One can reproduce this by using the code below (changing the 
> *write.operation* value to {*}update{*})
>  
> {code:java}
> CREATE TABLE `src_table` (
>   `id` INT,
>   `userId` INT,
>   `name` STRING,
>   `timestamp_col` TIMESTAMP(6)
> )
> WITH (
>     'connector' = 'datagen',
>     'rows-per-second' = '50'
> );
> -- will write TIMESTAMP(6) type as INT64
> CREATE TABLE `sink_table` 
> (
>   `id` INT,
>   `userId` INT,
>   `name` STRING,
>   `timestamp_col` TIMESTAMP(6)
> )
> WITH (
>   'connector' = 'hudi',
>   'path' = 'hdfs://path/to/table/',
>   'table.type' = 'COPY_ON_WRITE',
>   'write.operation' = 'update',  
>   'hoodie.datasource.write.recordkey.field' = 'id',
>   'hive_sync.enable' = 'false',
>   'hoodie.datasource.write.hive_style_partitioning' = 'true',
>   'clustering.async.enabled' = 'true', -- enable inline clustering
>   'clustering.schedule.enabled'= 'true', -- enable clustering schedule
>   'clustering.delta_commits'='4', -- schedule clustering every 4 commits
>   'hoodie.clustering.plan.strategy.small.file.limit'='104857600' -- only 
> rewrite file smaller than 100MB
> );
> insert into sink_table
> select 
>   *
> from src_table; {code}
>  
>  
> Snippet extracted from *parquet-tools* to show the physical type in parquet:
> {code:java}
> ############ Column(timestamp_col)[row group 0] ############
> name: timestamp_col
> path: timestamp_col
> max_definition_level: 1
> max_repetition_level: 0
> physical_type: INT64
> logical_type: Timestamp(isAdjustedToUTC=true, timeUnit=microseconds, 
> is_from_converted_type=false, force_set_converted_type=false)
> converted_type (legacy): TIMESTAMP_MICROS
> compression: GZIP (space_saved: 26%)
> total_compressed_size: 1228
> total_uncompressed_size: 1654 {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to