[
https://issues.apache.org/jira/browse/HUDI-8645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jonathan Vexler closed HUDI-8645.
---------------------------------
Resolution: Fixed
Clustering test code:
{code:java}
import scala.collection.JavaConversions._import
org.apache.spark.sql.SaveMode._import
org.apache.hudi.DataSourceReadOptions._import
org.apache.hudi.DataSourceWriteOptions._import
org.apache.hudi.common.table.HoodieTableConfig._import
org.apache.hudi.config.HoodieWriteConfig._import
org.apache.hudi.keygen.constant.KeyGeneratorOptions._import
org.apache.hudi.common.model.HoodieRecordimport spark.implicits._
val
tableBasePath="s3a://performance-benchmark-datasets-us-west-2/jars/release_testing_jars/1.0.0-dec3-jon/tables/test1/"
spark.read.format("parquet")
.load("s3a://performance-benchmark-datasets-us-west-2/public_datasets/small_20r_100p/0/*.parquet")
.write.format("hudi") .option(OPERATION_OPT_KEY, "bulk_insert")
.option(TABLE_TYPE.key(), "MERGE_ON_READ") .option(TABLE_NAME, "hudi_test")
.option(RECORDKEY_FIELD.key(), "key") .option(PRECOMBINE_FIELD_NAME.key(),
"ts") .option("hoodie.clustering.plan.partition.filter.mode", "NONE")
.option("hoodie.clustering.execution.strategy.class",
"org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy")
.option("hoodie.clustering.plan.strategy.sort.columns", "key,longField")
.option("hoodie.parquet.small.file.limit", "0")
.option("hoodie.bulkinsert.shuffle.parallelism", "105")
.option("hoodie.clustering.inline", "true")
.option("hoodie.clustering.inline.max.commits", "2")
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824")
.option("hoodie.clustering.plan.strategy.small.file.limit", "99999999999999")
.option("hoodie.compact.inline", "false")
.option("hoodie.datasource.write.row.writer.enable", "true")
.option("hoodie.clustering.max.parallelism", "105")
.option("hoodie.clustering.plan.strategy.max.bytes.per.group", 100 * 1024 *
1024 * 1024L) .option("hoodie.clustering.plan.strategy.max.num.groups", "3")
.mode(Overwrite) .save(tableBasePath)
for (i <- 1 to 3) { val df = spark.read.format("parquet")
.load("s3a://performance-benchmark-datasets-us-west-2/public_datasets/small_20r_100p/"
+ i + "/*.parquet") .write.format("hudi")
.option(OPERATION_OPT_KEY, "upsert") .option(TABLE_TYPE.key(),
"MERGE_ON_READ") .option(TABLE_NAME, "hudi_test")
.option(RECORDKEY_FIELD.key(), "key")
.option(PRECOMBINE_FIELD_NAME.key(), "ts")
.option("hoodie.clustering.plan.partition.filter.mode", "NONE")
.option("hoodie.clustering.execution.strategy.class",
"org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy")
.option("hoodie.clustering.plan.strategy.sort.columns",
"key,longField") .option("hoodie.parquet.small.file.limit", "0")
.option("hoodie.bulkinsert.shuffle.parallelism", "105")
.option("hoodie.clustering.inline", "true")
.option("hoodie.clustering.inline.max.commits", "2")
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824")
.option("hoodie.clustering.plan.strategy.small.file.limit",
"99999999999999") .option("hoodie.compact.inline", "false")
.option("hoodie.datasource.write.row.writer.enable", "true")
.option("hoodie.clustering.max.parallelism", "105")
.option("hoodie.clustering.plan.strategy.max.bytes.per.group", 100 * 1024 *
1024 * 1024L) .option("hoodie.clustering.plan.strategy.max.num.groups",
"3") .mode(Append) .save(tableBasePath)
} {code}
Compaction test code:
{code:java}
import scala.collection.JavaConversions._import
org.apache.spark.sql.SaveMode._import
org.apache.hudi.DataSourceReadOptions._import
org.apache.hudi.DataSourceWriteOptions._import
org.apache.hudi.common.table.HoodieTableConfig._import
org.apache.hudi.config.HoodieWriteConfig._import
org.apache.hudi.keygen.constant.KeyGeneratorOptions._import
org.apache.hudi.common.model.HoodieRecordimport spark.implicits._
val
tableBasePath="s3a://performance-benchmark-datasets-us-west-2/jars/release_testing_jars/1.0.0-dec3-jon/tables/test2/"
spark.read.format("parquet")
.load("s3a://performance-benchmark-datasets-us-west-2/public_datasets/small_20r_100p/0/*.parquet")
.write.format("hudi") .option(OPERATION_OPT_KEY, "bulk_insert")
.option(TABLE_TYPE.key(), "MERGE_ON_READ") .option(TABLE_NAME, "hudi_test")
.option(RECORDKEY_FIELD.key(), "key") .option(PRECOMBINE_FIELD_NAME.key(),
"ts") .option("hoodie.parquet.small.file.limit", "0")
.option("hoodie.bulkinsert.shuffle.parallelism", "105")
.option("hoodie.compact.inline", "true")
.option("hoodie.compact.inline.max.delta.commits", "2")
.option("hoodie.compact.inline", "false")
.option("hoodie.datasource.write.row.writer.enable", "true")
.mode(Overwrite) .save(tableBasePath)
for (i <- 1 to 3) { val df = spark.read.format("parquet")
.load("s3a://performance-benchmark-datasets-us-west-2/public_datasets/small_20r_100p/"
+ i + "/*.parquet") .write.format("hudi")
.option(OPERATION_OPT_KEY, "upsert") .option(TABLE_TYPE.key(),
"MERGE_ON_READ") .option(TABLE_NAME, "hudi_test")
.option(RECORDKEY_FIELD.key(), "key")
.option(PRECOMBINE_FIELD_NAME.key(), "ts")
.option("hoodie.parquet.small.file.limit", "0")
.option("hoodie.bulkinsert.shuffle.parallelism", "105")
.option("hoodie.compact.inline", "true")
.option("hoodie.compact.inline.max.delta.commits", "2")
.option("hoodie.compact.inline", "false")
.option("hoodie.datasource.write.row.writer.enable", "true")
.mode(Append) .save(tableBasePath)} {code}
Test reading both tables:
{code:java}
val df1 =
spark.read.format("hudi").load("s3a://performance-benchmark-datasets-us-west-2/jars/release_testing_jars/1.0.0-dec3-jon/tables/test1/")val
df2 =
spark.read.format("hudi").load("s3a://performance-benchmark-datasets-us-west-2/jars/release_testing_jars/1.0.0-dec3-jon/tables/test2/")df1.count()df2.count()
val df1a =
df1.select("key","partition","ts","textField","longField","round")val df2a =
df2.select("key","partition","ts","textField","longField","round")
df1a.except(df2a).count()df2a.except(df1a).count() {code}
Results:
everything worked properly and both tables had same number of records and the
resulting dataframes were the same
This was tested on an emr cluster with 1 driver and 3 nodes, all 4 were
m5xlarge with 4 cores and 16gb memory
> Test simple clustering and compaction with new fg reader on emr
> ---------------------------------------------------------------
>
> Key: HUDI-8645
> URL: https://issues.apache.org/jira/browse/HUDI-8645
> Project: Apache Hudi
> Issue Type: Test
> Components: clustering, compaction, reader-core, spark, writer-core
> Reporter: Jonathan Vexler
> Assignee: Jonathan Vexler
> Priority: Critical
>
> Test simple clustering and compaction on emr using the new fg reader
--
This message was sent by Atlassian Jira
(v8.20.10#820010)