Hello!
I want to update a record(upsert) of hudi. It works when I run on a local
dataset (such as file: /// e: / hudi_cow_table), but the original record will
not be overwritten when I run on HDFS (such as
hdfs://172.16.44.28:8020/flink/hudi) and generated a new record. why?
My program:import java.util.Collections
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object HudiUpdate {
def main(args: Array[String]): Unit = {
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.SaveMode._
import scala.collection.JavaConversions._
//初始化
val conf = new SparkConf().setAppName("HudiTest")
.setMaster("local")
conf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer") //使用Kryo序列化库
val sc = new SparkContext(conf)
val spark = new SQLContext(sc)
val updateUUID = "079bd411-ca1e-4092-9523-0b694f9f41f2"
// val updateUUID = args(0)
//设置表名、基本路径和数据生成器来为本指南生成记录。
val tableName = Constant.tableName
// val basePath = Constant.hdfsPath
val basePath = Constant.localPath
query(spark, basePath, updateUUID)
//生成一些新的行程样本,将其加载到DataFrame中,然后将DataFrame写入Hudi数据集中,如下所示。
val record = "{\"ts\": 0.0, \"uuid\": \"" + updateUUID + "\",
\"rider\": \"rider-213\", \"driver\": \"driver-213\", " +
"\"begin_lat\": 0.4726905879569653, \"begin_lon\":
0.46157858450465483, \"end_lat\": 0.754803407008858, " +
"\"end_lon\": 0.9671159942018241, \"fare\": 34.158284716382845,
\"partitionpath\": \"americas/brazil/sao_paulo\"}"
println(record)
val upsert = Collections.singletonList(record);
// println("insert:"+System.currentTimeMillis())
val df = spark.read.json(spark.sparkContext.parallelize(upsert, 1))
println("start:" + System.currentTimeMillis())
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath);
println("end:" + System.currentTimeMillis())
query(spark, basePath, updateUUID)
println("finish")
}
def query(spark: SQLContext, basePath: String, updateUUID: String) = {
val roViewDF = spark.
read.
format("org.apache.hudi").
load(basePath + "/*/*/*/*")
roViewDF.registerTempTable("hudi_ro_table")
val df0 = spark.sql("select * from hudi_ro_table where uuid='" +
updateUUID + "'")
// df0.printSchema();
println(df0.rdd.collect().mkString(" "))
}
}
[email protected]