bvaradar commented on issue #2592: URL: https://github.com/apache/hudi/issues/2592#issuecomment-785896778
I was unable to setup spark-2.3.0 in my setup. But,with spark-2.4.4, this works fine as below. Can you use spark-2.4.x version. spark-2.3 seems too old though ? `21/02/25 05:14:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://ip-192-168-1-81.ec2.internal:4040 Spark context available as 'sc' (master = local[*], app id = local-1614258873363). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.4 /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_251) Type in expressions to have them evaluated. Type :help for more information. scala> import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceWriteOptions scala> import org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs import org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs scala> import org.apache.hudi.config.{HoodieCompactionConfig, HoodieStorageConfig, HoodieWriteConfig} import org.apache.hudi.config.{HoodieCompactionConfig, HoodieStorageConfig, HoodieWriteConfig} scala> import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.{SaveMode, SparkSession} scala> import org.apache.spark.sql.functions.{col, concat, lit} import org.apache.spark.sql.functions.{col, concat, lit} scala> scala> val inputDF = spark.read.format("csv").option("header", "true").load("file:///tmp/input.csv") inputDF: org.apache.spark.sql.DataFrame = [col_1: string, col_2: string ... 12 more fields] scala> val formattedDF = inputDF.selectExpr("col_1", "cast(col_2 as integer) col_2", | "cast(col_3 as short) col_3", "col_4", "col_5", "cast(col_6 as byte) col_6", "cast(col_7 as decimal(9,2)) col_7", | "cast(col_8 as decimal(9,2)) col_8", "cast(col_9 as timestamp) col_9", "col_10", "cast(col_11 as timestamp) col_11", | "col_12", "cntry_cd", "cast(bus_dt as date) bus_dt") formattedDF: org.apache.spark.sql.DataFrame = [col_1: string, col_2: int ... 12 more fields] scala> formattedDF.printSchema() root |-- col_1: string (nullable = true) |-- col_2: integer (nullable = true) |-- col_3: short (nullable = true) |-- col_4: string (nullable = true) |-- col_5: string (nullable = true) |-- col_6: byte (nullable = true) |-- col_7: decimal(9,2) (nullable = true) |-- col_8: decimal(9,2) (nullable = true) |-- col_9: timestamp (nullable = true) |-- col_10: string (nullable = true) |-- col_11: timestamp (nullable = true) |-- col_12: string (nullable = true) |-- cntry_cd: string (nullable = true) |-- bus_dt: date (nullable = true) scala> formattedDF.show +--------------------+-----+-----+-----+-----+-----+-----+-----+--------------------+-----------+--------------------+-----------+--------+----------+ | col_1|col_2|col_3|col_4|col_5|col_6|col_7|col_8| col_9| col_10| col_11| col_12|cntry_cd| bus_dt| +--------------------+-----+-----+-----+-----+-----+-----+-----+--------------------+-----------+--------------------+-----------+--------+----------+ |7IN00716079317820...| 716| 3| AB| INR| null| 0.00| 1.00|2021-02-14 20:23:...|useridjsb91|2021-02-14 20:23:...|useridjsb91| IN|2021-02-01| |7IN00716079317820...| 716| 2| AB| INR| null| 0.00| 1.00|2021-02-14 20:23:...|useridjsb91|2021-02-14 20:23:...|useridjsb91| IN|2021-02-01| |7IN00716079317820...| 716| 1| AB| INR| null| 0.00| 1.00|2021-02-14 20:23:...|useridjsb91|2021-02-14 20:23:...|useridjsb91| IN|2021-02-01| |AU700716079381819...| 5700| 5| AB| INR| null| 0.00| 1.00|2021-02-14 20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91| IN|2021-02-02| |AU700716079381819...| 5700| 6| AB| INR| null| 4.00| 1.97|2021-02-14 20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91| IN|2021-02-02| |AU700716079381819...| 5700| 4| AB| INR| null| 0.00| 1.00|2021-02-14 20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91| AU|2021-02-01| |AU700716079381819...| 5700| 3| AB| INR| null| 0.00| 1.00|2021-02-14 20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91| AU|2021-02-01| |AU700716079381819...| 5700| 1| AB| INR| null| 0.00| 1.00|2021-02-14 20:06:...|useridjsb91|2021-02-14 20:06:...|useridjsb91| AU|2021-02-01| +--------------------+-----+-----+-----+-----+-----+-----+-----+--------------------+-----------+--------------------+-----------+--------+----------+ scala> scala> val transformedDF = formattedDF.withColumn("partitionpath", concat(lit("cntry_cd="), col("cntry_cd"), lit("/bus_dt="), col("bus_dt"))) transformedDF: org.apache.spark.sql.DataFrame = [col_1: string, col_2: int ... 13 more fields] scala> val targetPath = "file:///tmp/output/" targetPath: String = file:///tmp/output/ scala> transformedDF.write.format("org.apache.hudi"). | options(getQuickstartWriteConfigs). | option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "col_9"). | option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "col_2,col_1,col_3"). | option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.ComplexKeyGenerator"). | option("hoodie.upsert.shuffle.parallelism","2"). | option("hoodie.insert.shuffle.parallelism","2"). | option(HoodieWriteConfig.TABLE_NAME, "targetTableHudi"). | mode(SaveMode.Append). | save(targetPath) scala>` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
