bithw1 opened a new issue, #18071:
URL: https://github.com/apache/hudi/issues/18071

   ### Describe the problem you faced
   
   Hi,
   
   I am using Hudi 0.15.0 and Flink 1.17.1,
   
   I have following code, when  I run in my develop environment,
   
   1. I see parquet files are generated continuously in the directory 
`D:\tmp\flink_20260202\hudi\test_mor_orders`, 
   2. there are `NO` `.log` file in the directory 
`D:\tmp\flink_20260202\hudi\test_mor_orders`
   3. There are a lot of `.deltacommit` file in the .hoodie folder,  
   4. No `.commit` no compaction files are in the .hoodie folder.
   5. No compaction files are in the .hoodie folder.
   
   
   It looks that only base parquet files are generated continuously, no log, no 
compaction  no commit,
   
   
   I would ask if there are misconfiguration is my code.
   
   
   The generated `.hoodie.property` file:
   
   ```
   #Properties saved on 2026-02-02T07:18:38.614Z
   #Mon Feb 02 15:18:38 CST 2026
   hoodie.datasource.write.drop.partition.columns=false
   hoodie.table.type=MERGE_ON_READ
   hoodie.archivelog.folder=archived
   hoodie.table.cdc.enabled=false
   
hoodie.compaction.payload.class=org.apache.hudi.common.model.EventTimeAvroPayload
   hoodie.timeline.layout.version=1
   hoodie.table.version=6
   hoodie.table.recordkey.fields=user_id
   hoodie.database.name=default_database
   hoodie.datasource.write.partitionpath.urlencode=false
   hoodie.table.name=test_mor_orders
   
hoodie.table.keygenerator.class=org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator
   hoodie.compaction.record.merger.strategy=eeb8d96f-b1e4-49fd-bbf8-28ac514178e5
   hoodie.datasource.write.hive_style_partitioning=false
   
hoodie.table.create.schema={"type"\:"record","name"\:"test_mor_orders_record","namespace"\:"hoodie.test_mor_orders","fields"\:[{"name"\:"user_id","type"\:"string"},{"name"\:"product","type"\:["null","string"],"default"\:null},{"name"\:"amount","type"\:["null","string"],"default"\:null}]}
   hoodie.table.checksum=2992539361
   ```
   
   
   The java code:
   
   ```
   package org.example.java;
   
   import org.apache.flink.streaming.api.environment.CheckpointConfig;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.table.api.EnvironmentSettings;
   import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
   import org.apache.hudi.common.model.WriteOperationType;
   import org.apache.hudi.configuration.FlinkOptions;
   
   
   /**
    * 
    */
   public class FlinkStreamingMORCompaction {
       public static void main(String[] args) {
           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
           CheckpointConfig checkpointConfig = env.getCheckpointConfig();
           checkpointConfig.setCheckpointInterval(5000);
           
checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
           StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
EnvironmentSettings.inStreamingMode());
   
           tableEnv.executeSql("CREATE TABLE random_source ( \n" +
                   "  user_id INT,\n" +
                   "  product VARCHAR,\n" +
                   "  amount VARCHAR\n" +
                   "  ) WITH ( \n" +
                   "  'connector' = 'datagen',\n" +
                   "  'rows-per-second' = '1',           \n" +
                   "  'fields.user_id.kind' = 'sequence',   \n" +
                   "  'fields.user_id.start' = '1',         \n" +
                   "  'fields.user_id.end' = '10000',       \n" +
                   "  'fields.product.length' = '5',\n" +
                   "  'fields.amount.length' = '5'        \n" +
                   ")"
           );
   
           String tableName = "test_mor_orders";
           if (args.length > 0) {
               tableName = args[0];
           }
   
           String tablePath = "/tmp/flink_20260202/hudi/" + tableName;
           String hoodieTableDDL = Configurations.sql(tableName)
                   .field("user_id string")
                   .field("product string")
                   .field("amount string")
                   .option(FlinkOptions.PATH, tablePath)
                   .option(FlinkOptions.OPERATION, WriteOperationType.INSERT)
                   .option(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ")
                   .option("compaction.delta_commits", 2)
                   .option("index.type", "BUCKET")
                   .option("hoodie.bucket.index.num.buckets", 4)
                   .noPartition()
                   .pkField("user_id")
                   .end();
           tableEnv.executeSql(hoodieTableDDL);
   
           tableEnv.executeSql(String.format("insert into %s select 
cast(user_id as String), product,amount from random_source", tableName));
       }
   }
   ```
   
   ### To Reproduce
   
   1.
   2.
   3.
   4.
   
   
   ### Expected behavior
   
   1
   
   ### Environment Description
   
   * Hudi version:
   * Spark version:
   * Flink version:
   * Hive version:
   * Hadoop version:
   * Storage (HDFS/S3/GCS..):
   * Running on Docker? (yes/no):
   
   
   ### Additional context
   
   _No response_
   
   ### Stacktrace
   
   ```shell
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to