[ 
https://issues.apache.org/jira/browse/HUDI-2911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated HUDI-2911:
-----------------------------
    Fix Version/s: 0.11.0
                       (was: 0.10.0)

> Writing non-partitioned table produces incorrect "hoodie.properties" file
> -------------------------------------------------------------------------
>
>                 Key: HUDI-2911
>                 URL: https://issues.apache.org/jira/browse/HUDI-2911
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Alexey Kudinkin
>            Assignee: Alexey Kudinkin
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 0.11.0
>
>
> After ingesting Hudi table w/ the following configuration, i'm still getting 
> "hoodie.table.partition.fields=partitionpath" in the "hoodie.properties", 
> which blocks this table form being read.
>  
>  
> {code:java}
> scala> val readDf: DataFrame =
>      |   spark.read.option("hoodie.enable.data.skipping", 
> "false").format("hudi").load(outputPath)
> java.lang.IllegalArgumentException: Cannot find column: 'partitionpath' in 
> the 
> schema[StructField(_hoodie_commit_time,StringType,true),StructField(_hoodie_commit_seqno,StringType,true),StructField(_hoodie_record_key,StringType,true),StructField(_hoodie_partition_path,StringType,true),StructField(_hoodie_file_name,StringType,true),StructField(marketplace,StringType,true),StructField(customer_id,StringType,true),StructField(review_id,StringType,true),StructField(product_id,StringType,true),StructField(product_parent,StringType,true),StructField(product_title,StringType,true),StructField(star_rating,IntegerType,true),StructField(helpful_votes,IntegerType,true),StructField(total_votes,IntegerType,true),StructField(vine,StringType,true),StructField(verified_purchase,StringType,true),StructField(review_headline,StringType,true),StructField(review_body,StringType,true),StructField(review_date,DateType,true),StructField(year,IntegerType,true),StructField(product_category,StringType,true)]
>   at 
> org.apache.hudi.HoodieFileIndex.$anonfun$_partitionSchemaFromProperties$3(HoodieFileIndex.scala:118)
>   at scala.collection.MapLike.getOrElse(MapLike.scala:131)
>   at scala.collection.MapLike.getOrElse$(MapLike.scala:129)
>   at scala.collection.AbstractMap.getOrElse(Map.scala:63)
>   at 
> org.apache.hudi.HoodieFileIndex.$anonfun$_partitionSchemaFromProperties$2(HoodieFileIndex.scala:117)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>   at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
>   at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
>   at 
> org.apache.hudi.HoodieFileIndex._partitionSchemaFromProperties$lzycompute(HoodieFileIndex.scala:116)
>   at 
> org.apache.hudi.HoodieFileIndex._partitionSchemaFromProperties(HoodieFileIndex.scala:110)
>   at 
> org.apache.hudi.HoodieFileIndex.getAllQueryPartitionPaths(HoodieFileIndex.scala:503)
>   at 
> org.apache.hudi.HoodieFileIndex.loadPartitionPathFiles(HoodieFileIndex.scala:575)
>   at org.apache.hudi.HoodieFileIndex.refresh0(HoodieFileIndex.scala:360)
>   at org.apache.hudi.HoodieFileIndex.<init>(HoodieFileIndex.scala:157)
>   at 
> org.apache.hudi.DefaultSource.getBaseFileOnlyView(DefaultSource.scala:199)
>   at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:119)
>   at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:69)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)
>   at 
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
>   at 
> org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)
>   at scala.Option.getOrElse(Option.scala:189)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:239)
>   ... 56 elidedscala> {code}
>  
>  
> Example table config: 
> {code:java}
> val commonOpts =
>   Map(
>     "hoodie.compact.inline" -> "false",
>     "hoodie.bulk_insert.shuffle.parallelism" -> "10"
>   )
> spark.sparkContext.setLogLevel("DEBUG")
> ////////////////////////////////////////////////////////////////
> // Writing to Hudi
> ////////////////////////////////////////////////////////////////
> val fs = FSUtils.getFs(outputPath, spark.sparkContext.hadoopConfiguration)
> if (!fs.exists(new Path(outputPath))) {
>   val df = spark.read.parquet(inputPath)
>   df.write.format("hudi")
>     .option(DataSourceWriteOptions.TABLE_TYPE.key(), COW_TABLE_TYPE_OPT_VAL)
>     .option("hoodie.table.name", tableName)
>     .option(PRECOMBINE_FIELD.key(), "review_id")
>     .option(RECORDKEY_FIELD.key(), "review_id")
>     //.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), 
> "product_category")
>     .option("hoodie.clustering.inline", "true")
>     .option("hoodie.clustering.inline.max.commits", "1")
>     // NOTE: Small file limit is intentionally kept _ABOVE_ target file-size 
> max threshold for Clustering,
>     // to force re-clustering
>     .option("hoodie.clustering.plan.strategy.small.file.limit", 
> String.valueOf(1024 * 1024 * 1024)) // 1Gb
>     .option("hoodie.clustering.plan.strategy.target.file.max.bytes", 
> String.valueOf(128 * 1024 * 1024)) // 128Mb
>     .option("hoodie.clustering.plan.strategy.max.num.groups", 
> String.valueOf(4096))
>     .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true")
>     .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key, 
> layoutOptStrategy)
>     .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, 
> "product_id,customer_id")
>     .option(DataSourceWriteOptions.OPERATION.key(), 
> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
>     .option(BULK_INSERT_SORT_MODE.key(), "NONE")
>     .options(commonOpts)
>     .mode(ErrorIfExists)
>     .save(outputPath)
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to