[
https://issues.apache.org/jira/browse/HUDI-2911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
sivabalan narayanan updated HUDI-2911:
--------------------------------------
Fix Version/s: 0.10.0
(was: 0.11.0)
> Writing non-partitioned table produces incorrect "hoodie.properties" file
> -------------------------------------------------------------------------
>
> Key: HUDI-2911
> URL: https://issues.apache.org/jira/browse/HUDI-2911
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Alexey Kudinkin
> Assignee: Alexey Kudinkin
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 0.10.0
>
>
> After ingesting Hudi table w/ the following configuration, i'm still getting
> "hoodie.table.partition.fields=partitionpath" in the "hoodie.properties",
> which blocks this table form being read.
>
>
> {code:java}
> scala> val readDf: DataFrame =
> | spark.read.option("hoodie.enable.data.skipping",
> "false").format("hudi").load(outputPath)
> java.lang.IllegalArgumentException: Cannot find column: 'partitionpath' in
> the
> schema[StructField(_hoodie_commit_time,StringType,true),StructField(_hoodie_commit_seqno,StringType,true),StructField(_hoodie_record_key,StringType,true),StructField(_hoodie_partition_path,StringType,true),StructField(_hoodie_file_name,StringType,true),StructField(marketplace,StringType,true),StructField(customer_id,StringType,true),StructField(review_id,StringType,true),StructField(product_id,StringType,true),StructField(product_parent,StringType,true),StructField(product_title,StringType,true),StructField(star_rating,IntegerType,true),StructField(helpful_votes,IntegerType,true),StructField(total_votes,IntegerType,true),StructField(vine,StringType,true),StructField(verified_purchase,StringType,true),StructField(review_headline,StringType,true),StructField(review_body,StringType,true),StructField(review_date,DateType,true),StructField(year,IntegerType,true),StructField(product_category,StringType,true)]
> at
> org.apache.hudi.HoodieFileIndex.$anonfun$_partitionSchemaFromProperties$3(HoodieFileIndex.scala:118)
> at scala.collection.MapLike.getOrElse(MapLike.scala:131)
> at scala.collection.MapLike.getOrElse$(MapLike.scala:129)
> at scala.collection.AbstractMap.getOrElse(Map.scala:63)
> at
> org.apache.hudi.HoodieFileIndex.$anonfun$_partitionSchemaFromProperties$2(HoodieFileIndex.scala:117)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
> at
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
> at
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
> at scala.collection.TraversableLike.map(TraversableLike.scala:238)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
> at
> org.apache.hudi.HoodieFileIndex._partitionSchemaFromProperties$lzycompute(HoodieFileIndex.scala:116)
> at
> org.apache.hudi.HoodieFileIndex._partitionSchemaFromProperties(HoodieFileIndex.scala:110)
> at
> org.apache.hudi.HoodieFileIndex.getAllQueryPartitionPaths(HoodieFileIndex.scala:503)
> at
> org.apache.hudi.HoodieFileIndex.loadPartitionPathFiles(HoodieFileIndex.scala:575)
> at org.apache.hudi.HoodieFileIndex.refresh0(HoodieFileIndex.scala:360)
> at org.apache.hudi.HoodieFileIndex.<init>(HoodieFileIndex.scala:157)
> at
> org.apache.hudi.DefaultSource.getBaseFileOnlyView(DefaultSource.scala:199)
> at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:119)
> at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:69)
> at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)
> at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
> at
> org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)
> at scala.Option.getOrElse(Option.scala:189)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:239)
> ... 56 elidedscala> {code}
>
>
> Example table config:
> {code:java}
> val commonOpts =
> Map(
> "hoodie.compact.inline" -> "false",
> "hoodie.bulk_insert.shuffle.parallelism" -> "10"
> )
> spark.sparkContext.setLogLevel("DEBUG")
> ////////////////////////////////////////////////////////////////
> // Writing to Hudi
> ////////////////////////////////////////////////////////////////
> val fs = FSUtils.getFs(outputPath, spark.sparkContext.hadoopConfiguration)
> if (!fs.exists(new Path(outputPath))) {
> val df = spark.read.parquet(inputPath)
> df.write.format("hudi")
> .option(DataSourceWriteOptions.TABLE_TYPE.key(), COW_TABLE_TYPE_OPT_VAL)
> .option("hoodie.table.name", tableName)
> .option(PRECOMBINE_FIELD.key(), "review_id")
> .option(RECORDKEY_FIELD.key(), "review_id")
> //.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(),
> "product_category")
> .option("hoodie.clustering.inline", "true")
> .option("hoodie.clustering.inline.max.commits", "1")
> // NOTE: Small file limit is intentionally kept _ABOVE_ target file-size
> max threshold for Clustering,
> // to force re-clustering
> .option("hoodie.clustering.plan.strategy.small.file.limit",
> String.valueOf(1024 * 1024 * 1024)) // 1Gb
> .option("hoodie.clustering.plan.strategy.target.file.max.bytes",
> String.valueOf(128 * 1024 * 1024)) // 128Mb
> .option("hoodie.clustering.plan.strategy.max.num.groups",
> String.valueOf(4096))
> .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true")
> .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key,
> layoutOptStrategy)
> .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key,
> "product_id,customer_id")
> .option(DataSourceWriteOptions.OPERATION.key(),
> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
> .option(BULK_INSERT_SORT_MODE.key(), "NONE")
> .options(commonOpts)
> .mode(ErrorIfExists)
> .save(outputPath)
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)