nsivabalan commented on issue #13013:
URL: https://github.com/apache/hudi/issues/13013#issuecomment-2743860521
for non partitioned, you can't use custom key gen. also, the property you
are using for key gen class is wrong.
Here is the script for mutliple record keys, non partitioned using complex
key gen.
```
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option("hoodie.datasource.write.partitionpath.field","").
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option("hoodie.datasource.write.partitionpath.field","").
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
val df = spark.read.format("hudi").load(basePath)
df.registerTempTable("tbl1")
spark.sql("select uuid, ts, fare from tbl1 order by 1,2 ").show(20, false)
// deletes
val dfToDelete =
spark.read.format("hudi").load(basePath).limit(2).drop("_hoodie_commit_time","_hoodie_commit_seqno","_hoodie_record_key","_hoodie_partition_path","_hoodie_file_name")
dfToDelete.cache
dfToDelete.select("uuid").show(false)
dfToDelete.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option("hoodie.datasource.write.partitionpath.field","").
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").
option("hoodie.datasource.write.operation","delete").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
val df = spark.read.format("hudi").load(basePath)
df.registerTempTable("tbl1")
spark.sql("select uuid, ts, fare from tbl1 order by 1,2 ").show(20, false)
```
for single record key w/ partitions, you don't need to set any key gen class
only.
```
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
val df = spark.read.format("hudi").load(basePath)
df.registerTempTable("tbl1")
spark.sql("select partitionpath, uuid, ts, fare from tbl1 order by 1,2
").show(20, false)
// deletes
val dfToDelete =
spark.read.format("hudi").load(basePath).limit(2).drop("_hoodie_commit_time","_hoodie_commit_seqno","_hoodie_record_key","_hoodie_partition_path","_hoodie_file_name")
dfToDelete.cache
dfToDelete.select("partitionpath","uuid").show(false)
dfToDelete.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option("hoodie.datasource.write.operation","delete").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
val df = spark.read.format("hudi").load(basePath)
df.registerTempTable("tbl2")
spark.sql("select partitionpath, uuid, ts, fare from tbl2 order by 1,2
").show(20, false)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]