qingyuan18 opened a new issue #5163:
URL: https://github.com/apache/hudi/issues/5163


   **Describe the problem you faced**
   
   - hudi not support compaction with flink sql directly insert into MOR table 
   - when using flink sql hudi connector to insert bounded data into MOR table 
, hudi not  support compaction avro log files into parquet ,neither using hudi 
cli nor flink compaction utility
   - this will effect the Trino/PrestoDB ‘s query for MOR ro table, as they 
can't retrieve result while no parquet file generated
   - unless using spark to initial the parquet for MOR table ,or using  Flink 
streaming to continually ingestion into MOR table, the online/offline 
compaction works 
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. set up flink hudi integration env & libs( 
hudi-flink-bundle_2.12-0.10.1.jar, Flink 1.13.1, Hudi 0.10.1)
   2. start flink on yarn session mode
   flink-yarn-session -jm 1024 -tm 4096 -s 2 \
   -D state.backend=rocksdb \
   -D state.checkpoint-storage=filesystem \
   -D state.checkpoints.dir=${checkpoints} \
   -D execution.checkpointing.interval=60000 \
   -D state.checkpoints.num-retained=5 \
   -D execution.checkpointing.mode=EXACTLY_ONCE \
   -D 
execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION
 \
   -D state.backend.incremental=true \
   -D execution.checkpointing.max-concurrent-checkpoints=1 \
   -D rest.flamegraph.enabled=true \
   -d \
   -t /etc/hive/conf/hive-site.xml
   3. start flink sql client and create flink hudi MOR table 
   /usr/lib/flink/bin/sql-client.sh -s application_1648449519844_0018
   CREATE TABLE t4(
     uuid VARCHAR(20),
     name VARCHAR(10),
     age INT,
     ts TIMESTAMP(3),
     `partition` VARCHAR(20)
   )
   PARTITIONED BY (`partition`)
   WITH (
     'connector' = 'hudi',
     'path' = 's3://emrfssampledata/flink-hudi/t4',
     'table.type' = 'MERGE_ON_READ',  -- If MERGE_ON_READ, hive query will not 
have output until the parquet file is generated
   'compaction.tasks' = '1',  
   'compaction.async.enabled' = 'true',
   'compaction.trigger.strategy' = 'num_or_time',
    'compaction.delta_commits' ='1',
   'compaction.delta_seconds' = '60',
     'hive_sync.enable' = 'true',     -- Required. To enable hive 
synchronization
     'hive_sync.mode' = 'HMS',
     'hive_sync.use_jdbc' = 'false',
     'hive_sync.username' = 'hadoop',
     'hive_sync.db' = 'flinkstreamdb',
     'hive_sync.table' = 't4',
     'hive_sync.partition_fields' = 'partition',
     'hive_sync.partition_extractor_class' = 
'org.apache.hudi.hive.MultiPartKeysValueExtractor'
   );
   3. using flink sql directly insert into MOR table
   INSERT INTO t4 VALUES
     ('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
     ('id103','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
     ('id105','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
     ('id406','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3')
   
   4. using flink compaction or hudi cli to compact the avro logs into parquet
   ./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor 
lib/hudi-flink-bundle_2.12-0.10.1.jar  --path s3://emrfssampledata/flink-hudi/t4
   ./hudi-cli/hudi-cli.sh
   #connect --path s3://emrfssampledata/flink-hudi/t4
   #compaction run --parallelism 100 --sparkMemory 1g --retry 1 
--compactionInstant 20210602101315 --hoodieConfigs 
'hoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.BoundedIOCompactionStrategy,hoodie.compaction.target.io21,hoodie.compact.inline.max.delta.commits=1'
 --propsFilePath s3://emrfssampledata/flink-hudi/t4/.hoodie/hoodie.properties 
--schemaFilePath  s3://emrfssampledata/flink-hudi/t4/.hoodie/t4.json,
   
    7. using Trino/PrestoDB to query the hudi MOR RO view
   #presto-cli --catalog=hive
   #select * from t4_ro
   
   **Expected behavior**
   can query the Hudi MOR RO view dataset but no result retrieved 
   
   **Environment Description**
   
   * Hudi version : 0.10.1
   
   * Spark version : 3.1.2
   
   * flink version : 1.13.1
    
   * Hive version : 3.2
   
   * Hadoop version : 3.1
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   no error , but the compaction operation return with no parquet files 
genernated 
   
   **Expected behavior**
   
   parquet files compacted by avro based on the flink hudi table props with 
delta commit/strategy/commit_nums...etc 
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to