nsivabalan commented on issue #6341:
URL: https://github.com/apache/hudi/issues/6341#issuecomment-1238781965
I reused our quick start guide and could able to get it working.
```
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
// spark-shell
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
mode(Overwrite).
save(basePath)
// spark-shell
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot
where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key,
_hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
spark.sql("select distinct partitionpath, uuid, fare from
hudi_trips_snapshot").show(false)
// picked two random UUIDs from previous output.
val dfToDelete = spark.sql("select * from inputDf1 where uuid in
('151d3208-18d7-4b88-9e8a-4994f44bc1a9','3bbf759d-e5d7-43f0-a924-01d253b263d5')
")
dfToDelete.show()
dfToDelete.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
mode(Append).
save(basePath)
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot
where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key,
_hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
spark.sql("select distinct partitionpath, uuid, fare from
hudi_trips_snapshot").show()
spark.sql("select distinct partitionpath, uuid, fare from
hudi_trips_snapshot where uuid in
('151d3208-18d7-4b88-9e8a-4994f44bc1a9','3bbf759d-e5d7-43f0-a924-01d253b263d5')").show(false)
+-------------+----+----+
|partitionpath|uuid|fare|
+-------------+----+----+
+-------------+----+----+
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]