nsivabalan commented on issue #13013:
URL: https://github.com/apache/hudi/issues/13013#issuecomment-2743860521

   for non partitioned, you can't use custom key gen. also, the property you 
are using for key gen class is wrong. 
   
   Here is the script for mutliple record keys, non partitioned using complex 
key gen. 
   ``` 
   
   import org.apache.hudi.QuickstartUtils._
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.config.HoodieWriteConfig._
   import org.apache.hudi.common.model.HoodieRecord
   
   val tableName = "hudi_trips_cow"
   val basePath = "file:///tmp/hudi_trips_cow"
   val dataGen = new DataGenerator
   
   val inserts = convertToStringList(dataGen.generateInserts(10))
   val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
   df.write.format("hudi").
     options(getQuickstartWriteConfigs).
     option(PRECOMBINE_FIELD_OPT_KEY, "ts").
     option(RECORDKEY_FIELD_OPT_KEY, "uuid").
     option("hoodie.datasource.write.partitionpath.field","").
     
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").
     option(TABLE_NAME, tableName).
     mode(Overwrite).
     save(basePath)
   
   val updates = convertToStringList(dataGen.generateUpdates(10))
   val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
   df.write.format("hudi").
     options(getQuickstartWriteConfigs).
     option(PRECOMBINE_FIELD_OPT_KEY, "ts").
     option(RECORDKEY_FIELD_OPT_KEY, "uuid").
     option("hoodie.datasource.write.partitionpath.field","").
     
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").
     option(TABLE_NAME, tableName).
     mode(Append).
     save(basePath)
   
   
   val df = spark.read.format("hudi").load(basePath)
   df.registerTempTable("tbl1")
   
   spark.sql("select uuid, ts, fare from tbl1 order by 1,2 ").show(20, false)
   
   
   // deletes 
   
   val dfToDelete = 
spark.read.format("hudi").load(basePath).limit(2).drop("_hoodie_commit_time","_hoodie_commit_seqno","_hoodie_record_key","_hoodie_partition_path","_hoodie_file_name")
   
   dfToDelete.cache
   
   dfToDelete.select("uuid").show(false)
   
   
   dfToDelete.write.format("hudi").
     options(getQuickstartWriteConfigs).
     option(PRECOMBINE_FIELD_OPT_KEY, "ts").
     option(RECORDKEY_FIELD_OPT_KEY, "uuid").
     option("hoodie.datasource.write.partitionpath.field","").
     
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").
     option("hoodie.datasource.write.operation","delete").
     option(TABLE_NAME, tableName).
     mode(Append).
     save(basePath)
   
   
   val df = spark.read.format("hudi").load(basePath)
   df.registerTempTable("tbl1")
   
   spark.sql("select uuid, ts, fare from tbl1 order by 1,2 ").show(20, false)
   
   ``` 
   
   
   for single record key w/ partitions, you don't need to set any key gen class 
only. 
   ```
   
   import org.apache.hudi.QuickstartUtils._
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.config.HoodieWriteConfig._
   import org.apache.hudi.common.model.HoodieRecord
   
   val tableName = "hudi_trips_cow"
   val basePath = "file:///tmp/hudi_trips_cow"
   val dataGen = new DataGenerator
   
   val inserts = convertToStringList(dataGen.generateInserts(10))
   val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
   df.write.format("hudi").
     options(getQuickstartWriteConfigs).
     option(PRECOMBINE_FIELD_OPT_KEY, "ts").
     option(RECORDKEY_FIELD_OPT_KEY, "uuid").
     option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
     option(TABLE_NAME, tableName).
     mode(Overwrite).
     save(basePath)
   
   val updates = convertToStringList(dataGen.generateUpdates(10))
   val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
   df.write.format("hudi").
     options(getQuickstartWriteConfigs).
     option(PRECOMBINE_FIELD_OPT_KEY, "ts").
     option(RECORDKEY_FIELD_OPT_KEY, "uuid").
     option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
     option(TABLE_NAME, tableName).
     mode(Append).
     save(basePath)
   
   
   val df = spark.read.format("hudi").load(basePath)
   df.registerTempTable("tbl1")
   
   spark.sql("select partitionpath, uuid, ts, fare from tbl1 order by 1,2 
").show(20, false)
   
   
   // deletes 
   
   val dfToDelete = 
spark.read.format("hudi").load(basePath).limit(2).drop("_hoodie_commit_time","_hoodie_commit_seqno","_hoodie_record_key","_hoodie_partition_path","_hoodie_file_name")
   
   dfToDelete.cache
   
   dfToDelete.select("partitionpath","uuid").show(false)
   
   
   dfToDelete.write.format("hudi").
     options(getQuickstartWriteConfigs).
     option(PRECOMBINE_FIELD_OPT_KEY, "ts").
     option(RECORDKEY_FIELD_OPT_KEY, "uuid").
     option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
     option("hoodie.datasource.write.operation","delete").
     option(TABLE_NAME, tableName).
     mode(Append).
     save(basePath)
   
   
   
   val df = spark.read.format("hudi").load(basePath)
   df.registerTempTable("tbl2")
   
   spark.sql("select partitionpath, uuid, ts, fare from tbl2 order by 1,2 
").show(20, false)
   
   ```
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to