Hello Mayu, To query Hudi tables, you need to register it against the correct HudiInputFormat. For more information on querying the hudi table, please read the following documentation : https://hudi.apache.org/querying_data.html. The duplicates might be happening due to the absence of the inputformat which will choose the right file/update to pick.
Thanks, Nishith On Tue, Dec 24, 2019 at 6:46 PM [email protected] <[email protected]> wrote: > Hello! > I want to update a record(upsert) of hudi. It works when I run on a local > dataset (such as file: /// e: / hudi_cow_table), but the original record > will not be overwritten when I run on HDFS (such as hdfs:// > 172.16.44.28:8020/flink/hudi) and generated a new record. why? > > My program:import java.util.Collections > > import org.apache.spark.sql.SQLContext > import org.apache.spark.{SparkConf, SparkContext} > > object HudiUpdate { > def main(args: Array[String]): Unit = { > import org.apache.hudi.DataSourceWriteOptions._ > import org.apache.hudi.QuickstartUtils._ > import org.apache.hudi.config.HoodieWriteConfig._ > import org.apache.spark.sql.SaveMode._ > > import scala.collection.JavaConversions._ > > //初始化 > val conf = new SparkConf().setAppName("HudiTest") > .setMaster("local") > conf.set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") //使用Kryo序列化库 > val sc = new SparkContext(conf) > val spark = new SQLContext(sc) > > val updateUUID = "079bd411-ca1e-4092-9523-0b694f9f41f2" > // val updateUUID = args(0) > > //设置表名、基本路径和数据生成器来为本指南生成记录。 > val tableName = Constant.tableName > // val basePath = Constant.hdfsPath > val basePath = Constant.localPath > > query(spark, basePath, updateUUID) > > //生成一些新的行程样本,将其加载到DataFrame中,然后将DataFrame写入Hudi数据集中,如下所示。 > val record = "{\"ts\": 0.0, \"uuid\": \"" + updateUUID + "\", > \"rider\": \"rider-213\", \"driver\": \"driver-213\", " + > "\"begin_lat\": 0.4726905879569653, \"begin_lon\": > 0.46157858450465483, \"end_lat\": 0.754803407008858, " + > "\"end_lon\": 0.9671159942018241, \"fare\": > 34.158284716382845, \"partitionpath\": \"americas/brazil/sao_paulo\"}" > println(record) > val upsert = Collections.singletonList(record); > // println("insert:"+System.currentTimeMillis()) > val df = spark.read.json(spark.sparkContext.parallelize(upsert, 1)) > > println("start:" + System.currentTimeMillis()) > df.write.format("org.apache.hudi"). > options(getQuickstartWriteConfigs). > option(PRECOMBINE_FIELD_OPT_KEY, "ts"). > option(RECORDKEY_FIELD_OPT_KEY, "uuid"). > option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). > option(TABLE_NAME, tableName). > mode(Append). > save(basePath); > println("end:" + System.currentTimeMillis()) > > query(spark, basePath, updateUUID) > > println("finish") > } > > def query(spark: SQLContext, basePath: String, updateUUID: String) = { > val roViewDF = spark. > read. > format("org.apache.hudi"). > load(basePath + "/*/*/*/*") > roViewDF.registerTempTable("hudi_ro_table") > val df0 = spark.sql("select * from hudi_ro_table where uuid='" + > updateUUID + "'") > // df0.printSchema(); > println(df0.rdd.collect().mkString(" ")) > } > } > > > > [email protected] >
