[
https://issues.apache.org/jira/browse/HUDI-3065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
sivabalan narayanan updated HUDI-3065:
--------------------------------------
Description:
with 0.8.0, if partition is of the format "/partitionKey=partitionValue",
Spark auto partition discovery will kick in. we can see explicit fields in
hudi's table schema.
But with 0.9.0, it does not happen.
// launch spark shell with 0.8.0
{code:java}
import org.apache.hudi.QuickstartUtils._import
scala.collection.JavaConversions._import org.apache.spark.sql.SaveMode._import
org.apache.hudi.DataSourceReadOptions._import
org.apache.hudi.DataSourceWriteOptions._import
org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"val basePath = "file:///tmp/hudi_trips_cow"val
dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
val newDf = df.withColumn("partitionpath", regexp_replace($"partitionpath",
"(.*)(\\/){1}(.*)(\\/){1}", "continent=$1$2country=$3$4city="))
newDf.write.format("hudi"). options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY,
"uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath)
tripsSnapshotDF.printSchema {code}
//output : check for continent, country, city in the end.
|– _hoodie_commit_time: string (nullable = true)|
|-- _hoodie_commit_seqno: string (nullable = true)
|-- _hoodie_record_key: string (nullable = true)
|-- _hoodie_partition_path: string (nullable = true)
|-- _hoodie_file_name: string (nullable = true)
|-- begin_lat: double (nullable = true)
|-- begin_lon: double (nullable = true)
|-- driver: string (nullable = true)
|-- end_lat: double (nullable = true)
|-- end_lon: double (nullable = true)
|-- fare: double (nullable = true)
|-- partitionpath: string (nullable = true)
|-- rider: string (nullable = true)
|-- ts: long (nullable = true)
|-- uuid: string (nullable = true)
|-- continent: string (nullable = true)
|-- country: string (nullable = true)
|-- city: string (nullable = true)
Lets run this with 0.9.0.
{code:java}
import org.apache.hudi.QuickstartUtils._import
scala.collection.JavaConversions._import org.apache.spark.sql.SaveMode._import
org.apache.hudi.DataSourceReadOptions._import
org.apache.hudi.DataSourceWriteOptions._import
org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"val basePath = "file:///tmp/hudi_trips_cow"val
dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
val newDf = df.withColumn("partitionpath", regexp_replace($"partitionpath",
"(.*)(\\/){1}(.*)(\\/){1}", "continent=$1$2country=$3$4city="))
newDf.write.format("hudi"). options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY,
"uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)
val tripsSnapshotDF = spark.
| read.
| format("hudi").
| load(basePath )
tripsSnapshotDF.printSchema {code}
/output: continent, country, city is missing.
root
|-- _hoodie_commit_time: string (nullable = true)
|-- _hoodie_commit_seqno: string (nullable = true)
|-- _hoodie_record_key: string (nullable = true)
|-- _hoodie_partition_path: string (nullable = true)
|-- _hoodie_file_name: string (nullable = true)
|-- begin_lat: double (nullable = true)
|-- begin_lon: double (nullable = true)
|-- driver: string (nullable = true)
|-- end_lat: double (nullable = true)
|-- end_lon: double (nullable = true)
|-- fare: double (nullable = true)
|-- rider: string (nullable = true)
|-- ts: long (nullable = true)
|-- uuid: string (nullable = true)
|-- partitionpath: string (nullable = true)
Ref issue: [https://github.com/apache/hudi/issues/3984]
was:
with 0.8.0, if partition is of the format "/partitionKey=partitionValue",
Spark auto partition discovery will kick in. we can see explicit fields in
hudi's table schema.
But with 0.9.0, it does not happen.
// launch spark shell with 0.8.0
{code:java}
import org.apache.hudi.QuickstartUtils._import
scala.collection.JavaConversions._import org.apache.spark.sql.SaveMode._import
org.apache.hudi.DataSourceReadOptions._import
org.apache.hudi.DataSourceWriteOptions._import
org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"val basePath = "file:///tmp/hudi_trips_cow"val
dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
val newDf = df.withColumn("partitionpath", regexp_replace($"partitionpath",
"(.*)(\\/){1}(.*)(\\/){1}", "continent=$1$2country=$3$4city="))
newDf.write.format("hudi"). options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY,
"uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath)
tripsSnapshotDF.printSchema {code}
//output
|-- _hoodie_commit_time: string (nullable = true)
|-- _hoodie_commit_seqno: string (nullable = true)
|-- _hoodie_record_key: string (nullable = true)
|-- _hoodie_partition_path: string (nullable = true)
|-- _hoodie_file_name: string (nullable = true)
|-- begin_lat: double (nullable = true)
|-- begin_lon: double (nullable = true)
|-- driver: string (nullable = true)
|-- end_lat: double (nullable = true)
|-- end_lon: double (nullable = true)
|-- fare: double (nullable = true)
|-- partitionpath: string (nullable = true)
|-- rider: string (nullable = true)
|-- ts: long (nullable = true)
|-- uuid: string (nullable = true)
|-- continent: string (nullable = true)
|-- country: string (nullable = true)
|-- city: string (nullable = true)
Lets run this with 0.9.0.
{code:java}
import org.apache.hudi.QuickstartUtils._import
scala.collection.JavaConversions._import org.apache.spark.sql.SaveMode._import
org.apache.hudi.DataSourceReadOptions._import
org.apache.hudi.DataSourceWriteOptions._import
org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"val basePath = "file:///tmp/hudi_trips_cow"val
dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
val newDf = df.withColumn("partitionpath", regexp_replace($"partitionpath",
"(.*)(\\/){1}(.*)(\\/){1}", "continent=$1$2country=$3$4city="))
newDf.write.format("hudi"). options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY,
"uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)
val tripsSnapshotDF = spark.
| read.
| format("hudi").
| load(basePath )
tripsSnapshotDF.printSchema {code}
/output
root
|-- _hoodie_commit_time: string (nullable = true)
|-- _hoodie_commit_seqno: string (nullable = true)
|-- _hoodie_record_key: string (nullable = true)
|-- _hoodie_partition_path: string (nullable = true)
|-- _hoodie_file_name: string (nullable = true)
|-- begin_lat: double (nullable = true)
|-- begin_lon: double (nullable = true)
|-- driver: string (nullable = true)
|-- end_lat: double (nullable = true)
|-- end_lon: double (nullable = true)
|-- fare: double (nullable = true)
|-- rider: string (nullable = true)
|-- ts: long (nullable = true)
|-- uuid: string (nullable = true)
|-- partitionpath: string (nullable = true)
Ref issue: [https://github.com/apache/hudi/issues/3984]
> spark auto partition discovery does not work from 0.9.0
> -------------------------------------------------------
>
> Key: HUDI-3065
> URL: https://issues.apache.org/jira/browse/HUDI-3065
> Project: Apache Hudi
> Issue Type: Bug
> Components: Spark Integration
> Reporter: sivabalan narayanan
> Priority: Major
> Labels: sev:critical
>
> with 0.8.0, if partition is of the format "/partitionKey=partitionValue",
> Spark auto partition discovery will kick in. we can see explicit fields in
> hudi's table schema.
> But with 0.9.0, it does not happen.
> // launch spark shell with 0.8.0
> {code:java}
> import org.apache.hudi.QuickstartUtils._import
> scala.collection.JavaConversions._import
> org.apache.spark.sql.SaveMode._import
> org.apache.hudi.DataSourceReadOptions._import
> org.apache.hudi.DataSourceWriteOptions._import
> org.apache.hudi.config.HoodieWriteConfig._
> val tableName = "hudi_trips_cow"val basePath =
> "file:///tmp/hudi_trips_cow"val dataGen = new DataGenerator
> val inserts = convertToStringList(dataGen.generateInserts(10))
> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
> val newDf = df.withColumn("partitionpath", regexp_replace($"partitionpath",
> "(.*)(\\/){1}(.*)(\\/){1}", "continent=$1$2country=$3$4city="))
> newDf.write.format("hudi"). options(getQuickstartWriteConfigs).
> option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY,
> "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
> option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)
> val tripsSnapshotDF = spark.
> read.
> format("hudi").
> load(basePath)
> tripsSnapshotDF.printSchema {code}
> //output : check for continent, country, city in the end.
> |– _hoodie_commit_time: string (nullable = true)|
> |-- _hoodie_commit_seqno: string (nullable = true)
> |-- _hoodie_record_key: string (nullable = true)
> |-- _hoodie_partition_path: string (nullable = true)
> |-- _hoodie_file_name: string (nullable = true)
> |-- begin_lat: double (nullable = true)
> |-- begin_lon: double (nullable = true)
> |-- driver: string (nullable = true)
> |-- end_lat: double (nullable = true)
> |-- end_lon: double (nullable = true)
> |-- fare: double (nullable = true)
> |-- partitionpath: string (nullable = true)
> |-- rider: string (nullable = true)
> |-- ts: long (nullable = true)
> |-- uuid: string (nullable = true)
> |-- continent: string (nullable = true)
> |-- country: string (nullable = true)
> |-- city: string (nullable = true)
>
>
> Lets run this with 0.9.0.
> {code:java}
> import org.apache.hudi.QuickstartUtils._import
> scala.collection.JavaConversions._import
> org.apache.spark.sql.SaveMode._import
> org.apache.hudi.DataSourceReadOptions._import
> org.apache.hudi.DataSourceWriteOptions._import
> org.apache.hudi.config.HoodieWriteConfig._
> val tableName = "hudi_trips_cow"val basePath =
> "file:///tmp/hudi_trips_cow"val dataGen = new DataGenerator
> val inserts = convertToStringList(dataGen.generateInserts(10))
> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
> val newDf = df.withColumn("partitionpath", regexp_replace($"partitionpath",
> "(.*)(\\/){1}(.*)(\\/){1}", "continent=$1$2country=$3$4city="))
> newDf.write.format("hudi"). options(getQuickstartWriteConfigs).
> option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY,
> "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
> option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)
> val tripsSnapshotDF = spark.
> | read.
> | format("hudi").
> | load(basePath )
> tripsSnapshotDF.printSchema {code}
> /output: continent, country, city is missing.
> root
> |-- _hoodie_commit_time: string (nullable = true)
> |-- _hoodie_commit_seqno: string (nullable = true)
> |-- _hoodie_record_key: string (nullable = true)
> |-- _hoodie_partition_path: string (nullable = true)
> |-- _hoodie_file_name: string (nullable = true)
> |-- begin_lat: double (nullable = true)
> |-- begin_lon: double (nullable = true)
> |-- driver: string (nullable = true)
> |-- end_lat: double (nullable = true)
> |-- end_lon: double (nullable = true)
> |-- fare: double (nullable = true)
> |-- rider: string (nullable = true)
> |-- ts: long (nullable = true)
> |-- uuid: string (nullable = true)
> |-- partitionpath: string (nullable = true)
>
> Ref issue: [https://github.com/apache/hudi/issues/3984]
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)