[
https://issues.apache.org/jira/browse/HUDI-3065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17471637#comment-17471637
]
Raymond Xu edited comment on HUDI-3065 at 1/10/22, 2:04 AM:
------------------------------------------------------------
After discussion with [~x1q1j1] [[email protected]], we think that auto
partition discovery behavior should be address separately. In the end state, we
should have a keygen or a flag to help user enable partition discovery. Without
the keygen or partition discover flag, we respect user's setting and take
partition paths as is. i.e., no partition auto discovery. Will close this as
won't fix and the next steps are recorded in the linked tickets. cc
[~shivnarayan]
was (Author: xushiyan):
After discussion with [~x1q1j1] [[email protected]], we think that auto
partition discovery behavior should be address separately. In the end state, we
should have a keygen or a flag to help user enable partition discovery. Without
the keygen or partition discover flag, we respect user's setting and take
partition paths as is. i.e., no partition auto discovery. Will close this as
won't fix and the next steps are recorded in the linked tickets. cc @
> spark auto partition discovery does not work from 0.9.0
> -------------------------------------------------------
>
> Key: HUDI-3065
> URL: https://issues.apache.org/jira/browse/HUDI-3065
> Project: Apache Hudi
> Issue Type: Bug
> Components: Spark Integration
> Reporter: sivabalan narayanan
> Assignee: Yann Byron
> Priority: Major
> Labels: core-flow-ds, sev:critical, spark
> Fix For: 0.10.1
>
>
> with 0.8.0, if partition is of the format "/partitionKey=partitionValue",
> Spark auto partition discovery will kick in. we can see explicit fields in
> hudi's table schema.
> But with 0.9.0, it does not happen.
> // launch spark shell with 0.8.0
> {code:scala}
> import org.apache.hudi.QuickstartUtils._
> import scala.collection.JavaConversions._
> import org.apache.spark.sql.SaveMode._
> import org.apache.hudi.DataSourceReadOptions._
> import org.apache.hudi.DataSourceWriteOptions._
> import org.apache.hudi.config.HoodieWriteConfig._
> val tableName = "hudi_trips_cow"
> val basePath = "file:///tmp/hudi_trips_cow"
> val dataGen = new DataGenerator
> val inserts = convertToStringList(dataGen.generateInserts(10))
> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
> val newDf = df.withColumn("partitionpath", regexp_replace($"partitionpath",
> "(.*)(\\/){1}(.*)(\\/){1}", "continent=$1$2country=$3$4city="))
> newDf.write.format("hudi").
> options(getQuickstartWriteConfigs).
> option(PRECOMBINE_FIELD_OPT_KEY, "ts").
> option(RECORDKEY_FIELD_OPT_KEY, "uuid").
> option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
> option(TABLE_NAME, tableName).
> mode(Overwrite).save(basePath)
> val tripsSnapshotDF = spark.
> read.
> format("hudi").
> load(basePath)
> tripsSnapshotDF.printSchema
> {code}
> // output : check for continent, country, city in the end.
> {code}
> |– _hoodie_commit_time: string (nullable = true)|
> |-- _hoodie_commit_seqno: string (nullable = true)
> |-- _hoodie_record_key: string (nullable = true)
> |-- _hoodie_partition_path: string (nullable = true)
> |-- _hoodie_file_name: string (nullable = true)
> |-- begin_lat: double (nullable = true)
> |-- begin_lon: double (nullable = true)
> |-- driver: string (nullable = true)
> |-- end_lat: double (nullable = true)
> |-- end_lon: double (nullable = true)
> |-- fare: double (nullable = true)
> |-- partitionpath: string (nullable = true)
> |-- rider: string (nullable = true)
> |-- ts: long (nullable = true)
> |-- uuid: string (nullable = true)
> |-- continent: string (nullable = true)
> |-- country: string (nullable = true)
> |-- city: string (nullable = true)
> {code}
>
> Lets run this with 0.9.0.
> {code:scala}
> import org.apache.hudi.QuickstartUtils._
> import scala.collection.JavaConversions._
> import org.apache.spark.sql.SaveMode._
> import org.apache.hudi.DataSourceReadOptions._
> import org.apache.hudi.DataSourceWriteOptions._
> import org.apache.hudi.config.HoodieWriteConfig._
> val tableName = "hudi_trips_cow"
> val basePath = "file:///tmp/hudi_trips_cow"
> val dataGen = new DataGenerator
> val inserts = convertToStringList(dataGen.generateInserts(10))
> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
> val newDf = df.withColumn("partitionpath", regexp_replace($"partitionpath",
> "(.*)(\\/){1}(.*)(\\/){1}", "continent=$1$2country=$3$4city="))
> newDf.write.format("hudi").
> options(getQuickstartWriteConfigs).
> option(PRECOMBINE_FIELD_OPT_KEY, "ts").
> option(RECORDKEY_FIELD_OPT_KEY, "uuid").
> option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
> option(TABLE_NAME, tableName).
> mode(Overwrite). save(basePath)
> val tripsSnapshotDF = spark.
> | read.
> | format("hudi").
> | load(basePath )
> tripsSnapshotDF.printSchema
> {code}
> //output: continent, country, city is missing.
> {code}
> root
> |-- _hoodie_commit_time: string (nullable = true)
> |-- _hoodie_commit_seqno: string (nullable = true)
> |-- _hoodie_record_key: string (nullable = true)
> |-- _hoodie_partition_path: string (nullable = true)
> |-- _hoodie_file_name: string (nullable = true)
> |-- begin_lat: double (nullable = true)
> |-- begin_lon: double (nullable = true)
> |-- driver: string (nullable = true)
> |-- end_lat: double (nullable = true)
> |-- end_lon: double (nullable = true)
> |-- fare: double (nullable = true)
> |-- rider: string (nullable = true)
> |-- ts: long (nullable = true)
> |-- uuid: string (nullable = true)
> |-- partitionpath: string (nullable = true)
> {code}
> Ref issue: [https://github.com/apache/hudi/issues/3984]
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)