namuny opened a new issue, #6212:
URL: https://github.com/apache/hudi/issues/6212

   **Summary**
   During clustering, Hudi creates duplicate parquet file with the same file 
group ID and identical content. One of the two files are later marked as a 
duplicate and deleted. I'm using inline clustering with a single writer, so 
there's no concurrency issues at play.
   
   
   **Details**
   ![Screen Shot 2022-07-26 at 8 59 27 
am](https://user-images.githubusercontent.com/12869068/180874304-48221e99-d2a5-4f4f-b40b-7c5536db9f08.png)
   ![Screen Shot 2022-07-26 at 8 59 17 
am](https://user-images.githubusercontent.com/12869068/180874316-f9e249b9-1c2e-41d7-96d1-4fb87bf3e9c9.png)
   
   The two spark jobs above are triggered during inline clustering.
   
   ![Screen Shot 2022-07-26 at 9 00 59 
am](https://user-images.githubusercontent.com/12869068/180874446-98c0013f-95bc-4ccc-af3c-b84ed503aeec.png)
   ![Screen Shot 2022-07-26 at 9 00 55 
am](https://user-images.githubusercontent.com/12869068/180874448-818716c8-9d07-42ba-911b-ae7190d423bc.png)
   
   As can be seen above, both of these Spark jobs trigger 
`MultipleSparkJobExecutionStrategy.performClustering` method. This ends up 
creating and storing two identical clustered files.
   
   When `HoodieTable.reconcileAgainstMarkers` runs, the newer file is 
identified as a duplicate, and is deleted.
   
   **Expected behavior**
   
   Hudi should only store the clustered file once. Storing a duplicate of the 
file unnecessarily increases the duration.
   
   **Environment Description**
   
   * Hudi version : 0.11.0
   
   * Spark version : 3.1.2
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : No
   
   
   **Additional context**
   
   I'm using Copy on Write and inline clustering. My write config is:
   
   ```
   .write
         .format(HUDI_WRITE_FORMAT)
         .option(TBL_NAME.key(), tableName)
         .option(TABLE_TYPE.key(), COW_TABLE_TYPE_OPT_VAL)
         .option(PARTITIONPATH_FIELD.key(), ...)
         .option(PRECOMBINE_FIELD.key(), ...)
         .option(COMBINE_BEFORE_INSERT.key(), "true")
         .option(KEYGENERATOR_CLASS_NAME.key(), CUSTOM_KEY_GENERATOR)
         .option(URL_ENCODE_PARTITIONING.key(), "true")
         .option(HIVE_SYNC_ENABLED.key(), "true")
         .option(HIVE_DATABASE.key(), ...)
         .option(HIVE_PARTITION_FIELDS.key(), ...)
         .option(HIVE_TABLE.key(), tableName)
         .option(HIVE_TABLE_PROPERTIES.key(), tableName)
         .option(HIVE_PARTITION_EXTRACTOR_CLASS.key(), 
MULTI_PART_KEYS_VALUE_EXTRACTOR)
         .option(HIVE_USE_JDBC.key(), "false")
         .option(HIVE_SUPPORT_TIMESTAMP_TYPE.key(), "true")
         .option(HIVE_STYLE_PARTITIONING.key(), "true")
         .option(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP, 
INPUT_TIMESTAMP_TYPE)
         .option(KeyGeneratorOptions.Config.INPUT_TIME_UNIT, 
INPUT_TIMESTAMP_UNIT)
         .option(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, 
OUTPUT_TIMESTAMP_FORMAT)
         .option(OPERATION.key(), UPSERT_OPERATION_OPT_VAL)
         .option(INLINE_CLUSTERING.key(), "true")
         .option(INLINE_CLUSTERING_MAX_COMMITS.key(), "2")
         .option(PLAN_STRATEGY_SMALL_FILE_LIMIT.key(), "73400320") // 70MB
         .option(PLAN_STRATEGY_TARGET_FILE_MAX_BYTES.key(), "209715200") // 
200MB
         .option(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), ...)
         .option(PARQUET_MAX_FILE_SIZE.key(), "104857600") // 100MB
         .option(PARQUET_SMALL_FILE_LIMIT.key(), "104857600") // 100MB
         .option(PARALLELISM_VALUE.key(), getParallelism().toString)
         .option(FILE_LISTING_PARALLELISM_VALUE.key(), 
getParallelism().toString)
         .option(FINALIZE_WRITE_PARALLELISM_VALUE.key(), 
getParallelism().toString)
         .option(DELETE_PARALLELISM_VALUE.key(), getParallelism().toString)
         .option(INSERT_PARALLELISM_VALUE.key(), getParallelism().toString)
         .option(ROLLBACK_PARALLELISM_VALUE.key(), getParallelism().toString)
         .option(UPSERT_PARALLELISM_VALUE.key(), getParallelism().toString)
         .option(INDEX_TYPE.key(), indexType)
         .option(SIMPLE_INDEX_PARALLELISM.key(), getParallelism().toString)
         .option(AUTO_CLEAN.key(), "true")
         .option(CLEANER_PARALLELISM_VALUE.key(), getParallelism().toString)
         .option(CLEANER_COMMITS_RETAINED.key(), "10")
         .option(PRESERVE_COMMIT_METADATA.key(), "true")
         .option(HoodieMetadataConfig.ENABLE.key(), "true")
         .option(META_SYNC_CONDITIONAL_SYNC.key(), "false")
         .option(ROLLBACK_PENDING_CLUSTERING_ON_CONFLICT.key(), "true")
         .option(UPDATES_STRATEGY.key(), 
"org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy")
         .option(MARKERS_TYPE.key(), MarkerType.DIRECT.toString)
         .mode(SaveMode.Append)
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to