[ 
https://issues.apache.org/jira/browse/HUDI-4290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo updated HUDI-4290:
----------------------------
    Description: 
When querying the Hudi table using Hive connector in Presto after a cluster 
action is complete in the table, the query result contains duplicate records.

Environment: Presto 0.274-SNAPSHOT (latest), Hudi 0.11

Steps to reproduce:

Write Hudi table with clustering
{code:java}
./bin/spark-shell  \
     --master yarn \
     --deploy-mode client \
     --driver-memory 8g \
     --executor-memory 8g \
     --num-executors 20 \
     --executor-cores 4 \
     --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.11.1 \
     --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
     --conf spark.kryoserializer.buffer=256m \
     --conf spark.kryoserializer.buffer.max=1024m \
     --conf "spark.driver.defaultJavaOptions=-XX:+UseG1GC" \
     --conf "spark.executor.defaultJavaOptions=-XX:+UseG1GC" \
     --conf spark.ui.proxyBase="" \
     --conf 'spark.eventLog.enabled=true' --conf 
'spark.eventLog.dir=hdfs:///var/log/spark/apps' \
     --conf "spark.sql.hive.convertMetastoreParquet=false" \
     --conf 
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
     --conf 
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
  {code}
 
{code:java}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieClusteringConfig
import org.apache.hudi.HoodieDataSourceHelpers
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.SaveMode

val srcPath = "s3a://amazon-reviews-pds/parquet/"
val tableName = "amazon_reviews_clustered"
val tablePath = <>

val inputDF = spark.read.format("parquet").load(srcPath)

inputDF.write.
  format("hudi").
  option(HoodieWriteConfig.TABLE_NAME, tableName).
  option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL).
  option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL).
  option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "review_id").
  option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
"product_category").
  option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "review_date").
  option(HoodieClusteringConfig.INLINE_CLUSTERING_PROP, "true").
  option(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP, "0").
  option(HoodieClusteringConfig.CLUSTERING_TARGET_PARTITIONS, "43").
  option(HoodieClusteringConfig.CLUSTERING_MAX_NUM_GROUPS, "100").
  option(HoodieClusteringConfig.CLUSTERING_SORT_COLUMNS_PROPERTY, 
"star_rating,total_votes").
  option("hoodie.metadata.index.column.stats.enable", "true").
  option(BULK_INSERT_SORT_MODE.key(), "NONE").
  mode(SaveMode.Overwrite).
  save(tablePath) {code}
Query the table using Hive connector in Presto:
{code:java}
/presto-cli --catalog hudi --server localhost:9090

select count(review_id) from <table_name> where star_rating > 4 and total_votes 
> 10;{code}
The result is different from a Hudi table without clustering like below:
{code:java}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.HoodieDataSourceHelpers
import org.apache.spark.sql.SaveMode
import org.apache.hudi.config.HoodieWriteConfig._

val srcPath = "s3a://amazon-reviews-pds/parquet/"
val tableName = "amazon_reviews_no_clustering"
val tablePath = <>
val inputDF = 
spark.read.format("parquet").load(srcPath)inputDF.write.format("hudi").
  option(HoodieWriteConfig.TABLE_NAME, tableName).
  option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL).
  option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL).
  option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "review_id").
  option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
"product_category").
  option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "review_date").
  option("hoodie.metadata.index.column.stats.enable", "true").
  option(BULK_INSERT_SORT_MODE.key(), "NONE").
  mode(SaveMode.Overwrite).
  save(tablePath) {code}
 

 

 

  was:
When querying the Hudi table using Hive connector in Presto after a cluster 
action is complete in the table, the query result contains duplicate records.

Environment: Presto 0.274-SNAPSHOT (latest), Hudi 0.11

Steps to reproduce:

Write Hudi table with clustering

 
{code:java}
./bin/spark-shell  \
     --master yarn \
     --deploy-mode client \
     --driver-memory 8g \
     --executor-memory 8g \
     --num-executors 20 \
     --executor-cores 4 \
     --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.11.1 \
     --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
     --conf spark.kryoserializer.buffer=256m \
     --conf spark.kryoserializer.buffer.max=1024m \
     --conf "spark.driver.defaultJavaOptions=-XX:+UseG1GC" \
     --conf "spark.executor.defaultJavaOptions=-XX:+UseG1GC" \
     --conf spark.ui.proxyBase="" \
     --conf 'spark.eventLog.enabled=true' --conf 
'spark.eventLog.dir=hdfs:///var/log/spark/apps' \
     --conf "spark.sql.hive.convertMetastoreParquet=false" \
     --conf 
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
     --conf 
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
  {code}
 
{code:java}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieClusteringConfig
import org.apache.hudi.HoodieDataSourceHelpers
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.SaveMode

val srcPath = "s3a://amazon-reviews-pds/parquet/"
val tableName = "amazon_reviews_clustered"
val tablePath = <>

val inputDF = spark.read.format("parquet").load(srcPath)

inputDF.write.
  format("hudi").
  option(HoodieWriteConfig.TABLE_NAME, tableName).
  option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL).
  option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL).
  option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "review_id").
  option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
"product_category").
  option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "review_date").
  option(HoodieClusteringConfig.INLINE_CLUSTERING_PROP, "true").
  option(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP, "0").
  option(HoodieClusteringConfig.CLUSTERING_TARGET_PARTITIONS, "43").
  option(HoodieClusteringConfig.CLUSTERING_MAX_NUM_GROUPS, "100").
  option(HoodieClusteringConfig.CLUSTERING_SORT_COLUMNS_PROPERTY, 
"star_rating,total_votes").
  option("hoodie.metadata.index.column.stats.enable", "true").
  option(BULK_INSERT_SORT_MODE.key(), "NONE").
  mode(SaveMode.Overwrite).
  save(tablePath) {code}
Query the table using Hive connector in Presto:

 

 
{code:java}
/presto-cli --catalog hudi --server localhost:9090

select count(review_id) from <table_name> where star_rating > 4 and total_votes 
> 10;{code}
The result is different from a Hudi table without clustering like below:

 

 
{code:java}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.HoodieDataSourceHelpers
import org.apache.spark.sql.SaveMode
import org.apache.hudi.config.HoodieWriteConfig._

val srcPath = "s3a://amazon-reviews-pds/parquet/"
val tableName = "amazon_reviews_no_clustering"
val tablePath = <>
val inputDF = 
spark.read.format("parquet").load(srcPath)inputDF.write.format("hudi").
  option(HoodieWriteConfig.TABLE_NAME, tableName).
  option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL).
  option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL).
  option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "review_id").
  option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
"product_category").
  option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "review_date").
  option("hoodie.metadata.index.column.stats.enable", "true").
  option(BULK_INSERT_SORT_MODE.key(), "NONE").
  mode(SaveMode.Overwrite).
  save(tablePath) {code}
 

 

 


> Hive connector in Presto returns duplicate records after clustering
> -------------------------------------------------------------------
>
>                 Key: HUDI-4290
>                 URL: https://issues.apache.org/jira/browse/HUDI-4290
>             Project: Apache Hudi
>          Issue Type: Task
>            Reporter: Ethan Guo
>            Priority: Blocker
>              Labels: Presto
>             Fix For: 0.12.0
>
>
> When querying the Hudi table using Hive connector in Presto after a cluster 
> action is complete in the table, the query result contains duplicate records.
> Environment: Presto 0.274-SNAPSHOT (latest), Hudi 0.11
> Steps to reproduce:
> Write Hudi table with clustering
> {code:java}
> ./bin/spark-shell  \
>      --master yarn \
>      --deploy-mode client \
>      --driver-memory 8g \
>      --executor-memory 8g \
>      --num-executors 20 \
>      --executor-cores 4 \
>      --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.11.1 \
>      --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>      --conf spark.kryoserializer.buffer=256m \
>      --conf spark.kryoserializer.buffer.max=1024m \
>      --conf "spark.driver.defaultJavaOptions=-XX:+UseG1GC" \
>      --conf "spark.executor.defaultJavaOptions=-XX:+UseG1GC" \
>      --conf spark.ui.proxyBase="" \
>      --conf 'spark.eventLog.enabled=true' --conf 
> 'spark.eventLog.dir=hdfs:///var/log/spark/apps' \
>      --conf "spark.sql.hive.convertMetastoreParquet=false" \
>      --conf 
> 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
>      --conf 
> 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
>   {code}
>  
> {code:java}
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.hudi.config.HoodieWriteConfig
> import org.apache.hudi.config.HoodieClusteringConfig
> import org.apache.hudi.HoodieDataSourceHelpers
> import org.apache.hudi.config.HoodieWriteConfig._
> import org.apache.spark.sql.SaveMode
> val srcPath = "s3a://amazon-reviews-pds/parquet/"
> val tableName = "amazon_reviews_clustered"
> val tablePath = <>
> val inputDF = spark.read.format("parquet").load(srcPath)
> inputDF.write.
>   format("hudi").
>   option(HoodieWriteConfig.TABLE_NAME, tableName).
>   option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL).
>   option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL).
>   option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "review_id").
>   option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
> "product_category").
>   option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "review_date").
>   option(HoodieClusteringConfig.INLINE_CLUSTERING_PROP, "true").
>   option(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP, "0").
>   option(HoodieClusteringConfig.CLUSTERING_TARGET_PARTITIONS, "43").
>   option(HoodieClusteringConfig.CLUSTERING_MAX_NUM_GROUPS, "100").
>   option(HoodieClusteringConfig.CLUSTERING_SORT_COLUMNS_PROPERTY, 
> "star_rating,total_votes").
>   option("hoodie.metadata.index.column.stats.enable", "true").
>   option(BULK_INSERT_SORT_MODE.key(), "NONE").
>   mode(SaveMode.Overwrite).
>   save(tablePath) {code}
> Query the table using Hive connector in Presto:
> {code:java}
> /presto-cli --catalog hudi --server localhost:9090
> select count(review_id) from <table_name> where star_rating > 4 and 
> total_votes > 10;{code}
> The result is different from a Hudi table without clustering like below:
> {code:java}
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.hudi.config.HoodieWriteConfig
> import org.apache.hudi.HoodieDataSourceHelpers
> import org.apache.spark.sql.SaveMode
> import org.apache.hudi.config.HoodieWriteConfig._
> val srcPath = "s3a://amazon-reviews-pds/parquet/"
> val tableName = "amazon_reviews_no_clustering"
> val tablePath = <>
> val inputDF = 
> spark.read.format("parquet").load(srcPath)inputDF.write.format("hudi").
>   option(HoodieWriteConfig.TABLE_NAME, tableName).
>   option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL).
>   option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL).
>   option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "review_id").
>   option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
> "product_category").
>   option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "review_date").
>   option("hoodie.metadata.index.column.stats.enable", "true").
>   option(BULK_INSERT_SORT_MODE.key(), "NONE").
>   mode(SaveMode.Overwrite).
>   save(tablePath) {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to