Hello Mayu,

To query Hudi tables, you need to register it against the correct
HudiInputFormat. For more information on querying the hudi table, please
read the following documentation :
https://hudi.apache.org/querying_data.html.
The duplicates might be happening due to the absence of the inputformat
which will choose the right file/update to pick.

Thanks,
Nishith

On Tue, Dec 24, 2019 at 6:46 PM [email protected] <[email protected]> wrote:

> Hello!
> I want to update a record(upsert) of hudi. It works when I run on a local
> dataset (such as file: /// e: / hudi_cow_table), but the original record
> will not be overwritten when I run on HDFS (such as hdfs://
> 172.16.44.28:8020/flink/hudi) and generated a new record. why?
>
> My program:import java.util.Collections
>
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.{SparkConf, SparkContext}
>
> object HudiUpdate {
>     def main(args: Array[String]): Unit = {
>         import org.apache.hudi.DataSourceWriteOptions._
>         import org.apache.hudi.QuickstartUtils._
>         import org.apache.hudi.config.HoodieWriteConfig._
>         import org.apache.spark.sql.SaveMode._
>
>         import scala.collection.JavaConversions._
>
>         //初始化
>         val conf = new SparkConf().setAppName("HudiTest")
>                 .setMaster("local")
>         conf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer") //使用Kryo序列化库
>         val sc = new SparkContext(conf)
>         val spark = new SQLContext(sc)
>
>         val updateUUID = "079bd411-ca1e-4092-9523-0b694f9f41f2"
> //        val updateUUID = args(0)
>
>         //设置表名、基本路径和数据生成器来为本指南生成记录。
>         val tableName = Constant.tableName
> //        val basePath = Constant.hdfsPath
>         val basePath = Constant.localPath
>
>         query(spark, basePath, updateUUID)
>
>         //生成一些新的行程样本,将其加载到DataFrame中,然后将DataFrame写入Hudi数据集中,如下所示。
>         val record = "{\"ts\": 0.0, \"uuid\": \"" + updateUUID + "\",
> \"rider\": \"rider-213\", \"driver\": \"driver-213\", " +
>                 "\"begin_lat\": 0.4726905879569653, \"begin_lon\":
> 0.46157858450465483, \"end_lat\": 0.754803407008858, " +
>                 "\"end_lon\": 0.9671159942018241, \"fare\":
> 34.158284716382845, \"partitionpath\": \"americas/brazil/sao_paulo\"}"
>         println(record)
>         val upsert = Collections.singletonList(record);
>         //        println("insert:"+System.currentTimeMillis())
>         val df = spark.read.json(spark.sparkContext.parallelize(upsert, 1))
>
>         println("start:" + System.currentTimeMillis())
>         df.write.format("org.apache.hudi").
>                 options(getQuickstartWriteConfigs).
>                 option(PRECOMBINE_FIELD_OPT_KEY, "ts").
>                 option(RECORDKEY_FIELD_OPT_KEY, "uuid").
>                 option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
>                 option(TABLE_NAME, tableName).
>                 mode(Append).
>                 save(basePath);
>         println("end:" + System.currentTimeMillis())
>
>         query(spark, basePath, updateUUID)
>
>         println("finish")
>     }
>
>     def query(spark: SQLContext, basePath: String, updateUUID: String) = {
>         val roViewDF = spark.
>                 read.
>                 format("org.apache.hudi").
>                 load(basePath + "/*/*/*/*")
>         roViewDF.registerTempTable("hudi_ro_table")
>         val df0 = spark.sql("select * from  hudi_ro_table where uuid='" +
> updateUUID + "'")
>         //        df0.printSchema();
>         println(df0.rdd.collect().mkString(" "))
>     }
> }
>
>
>
> [email protected]
>

Reply via email to