[ 
https://issues.apache.org/jira/browse/HUDI-8645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Vexler closed HUDI-8645.
---------------------------------
    Resolution: Fixed

Clustering test code:
{code:java}
import scala.collection.JavaConversions._import 
org.apache.spark.sql.SaveMode._import 
org.apache.hudi.DataSourceReadOptions._import 
org.apache.hudi.DataSourceWriteOptions._import 
org.apache.hudi.common.table.HoodieTableConfig._import 
org.apache.hudi.config.HoodieWriteConfig._import 
org.apache.hudi.keygen.constant.KeyGeneratorOptions._import 
org.apache.hudi.common.model.HoodieRecordimport spark.implicits._
val 
tableBasePath="s3a://performance-benchmark-datasets-us-west-2/jars/release_testing_jars/1.0.0-dec3-jon/tables/test1/"
spark.read.format("parquet")    
.load("s3a://performance-benchmark-datasets-us-west-2/public_datasets/small_20r_100p/0/*.parquet")
    .write.format("hudi")    .option(OPERATION_OPT_KEY, "bulk_insert")    
.option(TABLE_TYPE.key(), "MERGE_ON_READ")    .option(TABLE_NAME, "hudi_test")  
  .option(RECORDKEY_FIELD.key(), "key")    .option(PRECOMBINE_FIELD_NAME.key(), 
"ts")    .option("hoodie.clustering.plan.partition.filter.mode", "NONE")    
.option("hoodie.clustering.execution.strategy.class", 
"org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy")
    .option("hoodie.clustering.plan.strategy.sort.columns", "key,longField")    
.option("hoodie.parquet.small.file.limit", "0")    
.option("hoodie.bulkinsert.shuffle.parallelism", "105")    
.option("hoodie.clustering.inline", "true")    
.option("hoodie.clustering.inline.max.commits", "2")    
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824")  
  .option("hoodie.clustering.plan.strategy.small.file.limit", "99999999999999") 
   .option("hoodie.compact.inline", "false")    
.option("hoodie.datasource.write.row.writer.enable", "true")    
.option("hoodie.clustering.max.parallelism", "105")    
.option("hoodie.clustering.plan.strategy.max.bytes.per.group",  100 * 1024 * 
1024 * 1024L)    .option("hoodie.clustering.plan.strategy.max.num.groups", "3") 
   .mode(Overwrite)    .save(tableBasePath)
for (i <- 1 to 3) {    val df = spark.read.format("parquet")        
.load("s3a://performance-benchmark-datasets-us-west-2/public_datasets/small_20r_100p/"
 + i + "/*.parquet")        .write.format("hudi")        
.option(OPERATION_OPT_KEY, "upsert")        .option(TABLE_TYPE.key(), 
"MERGE_ON_READ")        .option(TABLE_NAME, "hudi_test")        
.option(RECORDKEY_FIELD.key(), "key")        
.option(PRECOMBINE_FIELD_NAME.key(), "ts")        
.option("hoodie.clustering.plan.partition.filter.mode", "NONE")        
.option("hoodie.clustering.execution.strategy.class", 
"org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy")
        .option("hoodie.clustering.plan.strategy.sort.columns", 
"key,longField")        .option("hoodie.parquet.small.file.limit", "0")        
.option("hoodie.bulkinsert.shuffle.parallelism", "105")        
.option("hoodie.clustering.inline", "true")        
.option("hoodie.clustering.inline.max.commits", "2")        
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824")  
      .option("hoodie.clustering.plan.strategy.small.file.limit", 
"99999999999999")        .option("hoodie.compact.inline", "false")        
.option("hoodie.datasource.write.row.writer.enable", "true")        
.option("hoodie.clustering.max.parallelism", "105")        
.option("hoodie.clustering.plan.strategy.max.bytes.per.group",  100 * 1024 * 
1024 * 1024L)        .option("hoodie.clustering.plan.strategy.max.num.groups", 
"3")        .mode(Append)        .save(tableBasePath)
} {code}

Compaction test code:
{code:java}
import scala.collection.JavaConversions._import 
org.apache.spark.sql.SaveMode._import 
org.apache.hudi.DataSourceReadOptions._import 
org.apache.hudi.DataSourceWriteOptions._import 
org.apache.hudi.common.table.HoodieTableConfig._import 
org.apache.hudi.config.HoodieWriteConfig._import 
org.apache.hudi.keygen.constant.KeyGeneratorOptions._import 
org.apache.hudi.common.model.HoodieRecordimport spark.implicits._
val 
tableBasePath="s3a://performance-benchmark-datasets-us-west-2/jars/release_testing_jars/1.0.0-dec3-jon/tables/test2/"
spark.read.format("parquet")    
.load("s3a://performance-benchmark-datasets-us-west-2/public_datasets/small_20r_100p/0/*.parquet")
    .write.format("hudi")    .option(OPERATION_OPT_KEY, "bulk_insert")    
.option(TABLE_TYPE.key(), "MERGE_ON_READ")    .option(TABLE_NAME, "hudi_test")  
  .option(RECORDKEY_FIELD.key(), "key")    .option(PRECOMBINE_FIELD_NAME.key(), 
"ts")    .option("hoodie.parquet.small.file.limit", "0")    
.option("hoodie.bulkinsert.shuffle.parallelism", "105")    
.option("hoodie.compact.inline", "true")    
.option("hoodie.compact.inline.max.delta.commits", "2")    
.option("hoodie.compact.inline", "false")    
.option("hoodie.datasource.write.row.writer.enable", "true")    
.mode(Overwrite)    .save(tableBasePath)
for (i <- 1 to 3) {    val df = spark.read.format("parquet")        
.load("s3a://performance-benchmark-datasets-us-west-2/public_datasets/small_20r_100p/"
 + i + "/*.parquet")        .write.format("hudi")        
.option(OPERATION_OPT_KEY, "upsert")        .option(TABLE_TYPE.key(), 
"MERGE_ON_READ")        .option(TABLE_NAME, "hudi_test")        
.option(RECORDKEY_FIELD.key(), "key")        
.option(PRECOMBINE_FIELD_NAME.key(), "ts")        
.option("hoodie.parquet.small.file.limit", "0")        
.option("hoodie.bulkinsert.shuffle.parallelism", "105")        
.option("hoodie.compact.inline", "true")        
.option("hoodie.compact.inline.max.delta.commits", "2")        
.option("hoodie.compact.inline", "false")        
.option("hoodie.datasource.write.row.writer.enable", "true")        
.mode(Append)        .save(tableBasePath)} {code}
Test reading both tables:
{code:java}
val df1 = 
spark.read.format("hudi").load("s3a://performance-benchmark-datasets-us-west-2/jars/release_testing_jars/1.0.0-dec3-jon/tables/test1/")val
 df2 = 
spark.read.format("hudi").load("s3a://performance-benchmark-datasets-us-west-2/jars/release_testing_jars/1.0.0-dec3-jon/tables/test2/")df1.count()df2.count()
val df1a = 
df1.select("key","partition","ts","textField","longField","round")val df2a = 
df2.select("key","partition","ts","textField","longField","round")
df1a.except(df2a).count()df2a.except(df1a).count() {code}
Results:

everything worked properly and both tables had same number of records and the 
resulting dataframes were the same

This was tested on an emr cluster with 1 driver and 3 nodes, all 4 were 
m5xlarge with 4 cores and 16gb memory

 

> Test simple clustering and compaction with new fg reader on emr
> ---------------------------------------------------------------
>
>                 Key: HUDI-8645
>                 URL: https://issues.apache.org/jira/browse/HUDI-8645
>             Project: Apache Hudi
>          Issue Type: Test
>          Components: clustering, compaction, reader-core, spark, writer-core
>            Reporter: Jonathan Vexler
>            Assignee: Jonathan Vexler
>            Priority: Critical
>
> Test simple clustering and compaction on emr using the new fg reader



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to