codejoyan commented on issue #2592:
URL: https://github.com/apache/hudi/issues/2592#issuecomment-785121095
The below succeeds. I removed the decimal fields only and was able to save
```
scala> val inputDF = spark.read.format("csv").option("header",
"true").load("hdfs://finstrnhw00ha/user/j0s0j7j/test_data.csv")
inputDF: org.apache.spark.sql.DataFrame = [col_1: string, col_2: string ...
12 more fields]
scala> val formattedDF = inputDF.selectExpr("col_1", "cast(col_2 as integer)
col_2",
| "cast(col_3 as short) col_3", "col_4", "col_5", "cast(col_6 as byte)
col_6",
| "cast(col_9 as timestamp) col_9", "col_10", "cast(col_11 as
timestamp) col_11",
| "col_12", "cntry_cd", "cast(bus_dt as date) bus_dt")
formattedDF: org.apache.spark.sql.DataFrame = [col_1: string, col_2: int ...
10 more fields]
scala> val transformedDF = formattedDF.withColumn("partitionpath",
concat(lit("cntry_cd="), col("cntry_cd"), lit("/bus_dt="), col("bus_dt")))
transformedDF: org.apache.spark.sql.DataFrame = [col_1: string, col_2: int
... 11 more fields]
scala> transformedDF.show
+--------------------+-----+-----+-----+-----+-----+--------------------+-----------+--------------------+-----------+--------+----------+--------------------+
| col_1|col_2|col_3|col_4|col_5|col_6| col_9|
col_10| col_11| col_12|cntry_cd| bus_dt|
partitionpath|
+--------------------+-----+-----+-----+-----+-----+--------------------+-----------+--------------------+-----------+--------+----------+--------------------+
|7IN00716079317820...| 716| 3| AB| INR| null|2021-02-14
20:23:...|useridjsb91|2021-02-14 20:23:...|useridjsb91|
IN|2021-02-01|cntry_cd=IN/bus_d...|
|7IN00716079317820...| 716| 2| AB| INR| null|2021-02-14
20:23:...|useridjsb91|2021-02-14 20:23:...|useridjsb91|
IN|2021-02-01|cntry_cd=IN/bus_d...|
|7IN00716079317820...| 716| 1| AB| INR| null|2021-02-14
20:23:...|useridjsb91|2021-02-14 20:23:...|useridjsb91|
IN|2021-02-01|cntry_cd=IN/bus_d...|
|AU700716079381819...| 5700| 5| AB| INR| null|2021-02-14
20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91|
IN|2021-02-02|cntry_cd=IN/bus_d...|
|AU700716079381819...| 5700| 6| AB| INR| null|2021-02-14
20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91|
IN|2021-02-02|cntry_cd=IN/bus_d...|
|AU700716079381819...| 5700| 4| AB| INR| null|2021-02-14
20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91|
AU|2021-02-01|cntry_cd=AU/bus_d...|
|AU700716079381819...| 5700| 3| AB| INR| null|2021-02-14
20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91|
AU|2021-02-01|cntry_cd=AU/bus_d...|
|AU700716079381819...| 5700| 1| AB| INR| null|2021-02-14
20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91|
AU|2021-02-01|cntry_cd=AU/bus_d...|
+--------------------+-----+-----+-----+-----+-----+--------------------+-----------+--------------------+-----------+--------+----------+--------------------+
scala> transformedDF.write.format("org.apache.hudi").
| options(getQuickstartWriteConfigs).
| option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "col_9").
| option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY,
"col_2,col_1,col_3").
| option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,
"partitionpath").
| option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY,
"org.apache.hudi.keygen.ComplexKeyGenerator").
| option("hoodie.upsert.shuffle.parallelism","2").
| option("hoodie.insert.shuffle.parallelism","2").
| option(HoodieWriteConfig.TABLE_NAME, "targetTableHudi").
| mode(SaveMode.Append).
| save(targetPath)
scala> formattedDF.printSchema
root
|-- col_1: string (nullable = true)
|-- col_2: integer (nullable = true)
|-- col_3: short (nullable = true)
|-- col_4: string (nullable = true)
|-- col_5: string (nullable = true)
|-- col_6: byte (nullable = true)
|-- col_9: timestamp (nullable = true)
|-- col_10: string (nullable = true)
|-- col_11: timestamp (nullable = true)
|-- col_12: string (nullable = true)
|-- cntry_cd: string (nullable = true)
|-- bus_dt: date (nullable = true)
scala> formattedDF.show
+--------------------+-----+-----+-----+-----+-----+--------------------+-----------+--------------------+-----------+--------+----------+
| col_1|col_2|col_3|col_4|col_5|col_6| col_9|
col_10| col_11| col_12|cntry_cd| bus_dt|
+--------------------+-----+-----+-----+-----+-----+--------------------+-----------+--------------------+-----------+--------+----------+
|7IN00716079317820...| 716| 3| AB| INR| null|2021-02-14
20:23:...|useridjsb91|2021-02-14 20:23:...|useridjsb91| IN|2021-02-01|
|7IN00716079317820...| 716| 2| AB| INR| null|2021-02-14
20:23:...|useridjsb91|2021-02-14 20:23:...|useridjsb91| IN|2021-02-01|
|7IN00716079317820...| 716| 1| AB| INR| null|2021-02-14
20:23:...|useridjsb91|2021-02-14 20:23:...|useridjsb91| IN|2021-02-01|
|AU700716079381819...| 5700| 5| AB| INR| null|2021-02-14
20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91| IN|2021-02-02|
|AU700716079381819...| 5700| 6| AB| INR| null|2021-02-14
20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91| IN|2021-02-02|
|AU700716079381819...| 5700| 4| AB| INR| null|2021-02-14
20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91| AU|2021-02-01|
|AU700716079381819...| 5700| 3| AB| INR| null|2021-02-14
20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91| AU|2021-02-01|
|AU700716079381819...| 5700| 1| AB| INR| null|2021-02-14
20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91| AU|2021-02-01|
+--------------------+-----+-----+-----+-----+-----+--------------------+-----------+--------------------+-----------+--------+----------+
scala> val testSnapshotDF = spark.read.format("org.apache.hudi").
| load(targetPath + "/*/*/*")
21/02/24 14:36:27 WARN DefaultSource: Loading Base File Only View.
testSnapshotDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time:
string, _hoodie_commit_seqno: string ... 16 more fields]
scala> testSnapshotDF.createOrReplaceTempView("test_hudi_ro")
scala> spark.sql("select cntry_cd,bus_dt,count(1) from test_hudi_ro group by
cntry_cd,bus_dt").show
+--------+----------+--------+
|cntry_cd| bus_dt|count(1)|
+--------+----------+--------+
| IN|2021-02-01| 3|
| AU|2021-02-01| 3|
| IN|2021-02-02| 2|
+--------+----------+--------+
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]