Thank you for answering my question I found that this is not a problem with HDFS, but that the number of records can be upsert when 1 million, and it will have this problem when 10 million
[email protected] From: nishith agarwal Date: 2019-12-25 15:19 To: dev Subject: Re: How to upsert on HDFS Hello Mayu, To query Hudi tables, you need to register it against the correct HudiInputFormat. For more information on querying the hudi table, please read the following documentation : https://hudi.apache.org/querying_data.html. The duplicates might be happening due to the absence of the inputformat which will choose the right file/update to pick. Thanks, Nishith On Tue, Dec 24, 2019 at 6:46 PM [email protected] <[email protected]> wrote: > Hello! > I want to update a record(upsert) of hudi. It works when I run on a local > dataset (such as file: /// e: / hudi_cow_table), but the original record > will not be overwritten when I run on HDFS (such as hdfs:// > 172.16.44.28:8020/flink/hudi) and generated a new record. why? > > My program:import java.util.Collections > > import org.apache.spark.sql.SQLContext > import org.apache.spark.{SparkConf, SparkContext} > > object HudiUpdate { > def main(args: Array[String]): Unit = { > import org.apache.hudi.DataSourceWriteOptions._ > import org.apache.hudi.QuickstartUtils._ > import org.apache.hudi.config.HoodieWriteConfig._ > import org.apache.spark.sql.SaveMode._ > > import scala.collection.JavaConversions._ > > //初始化 > val conf = new SparkConf().setAppName("HudiTest") > .setMaster("local") > conf.set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") //使用Kryo序列化库 > val sc = new SparkContext(conf) > val spark = new SQLContext(sc) > > val updateUUID = "079bd411-ca1e-4092-9523-0b694f9f41f2" > // val updateUUID = args(0) > > //设置表名、基本路径和数据生成器来为本指南生成记录。 > val tableName = Constant.tableName > // val basePath = Constant.hdfsPath > val basePath = Constant.localPath > > query(spark, basePath, updateUUID) > > //生成一些新的行程样本,将其加载到DataFrame中,然后将DataFrame写入Hudi数据集中,如下所示。 > val record = "{\"ts\": 0.0, \"uuid\": \"" + updateUUID + "\", > \"rider\": \"rider-213\", \"driver\": \"driver-213\", " + > "\"begin_lat\": 0.4726905879569653, \"begin_lon\": > 0.46157858450465483, \"end_lat\": 0.754803407008858, " + > "\"end_lon\": 0.9671159942018241, \"fare\": > 34.158284716382845, \"partitionpath\": \"americas/brazil/sao_paulo\"}" > println(record) > val upsert = Collections.singletonList(record); > // println("insert:"+System.currentTimeMillis()) > val df = spark.read.json(spark.sparkContext.parallelize(upsert, 1)) > > println("start:" + System.currentTimeMillis()) > df.write.format("org.apache.hudi"). > options(getQuickstartWriteConfigs). > option(PRECOMBINE_FIELD_OPT_KEY, "ts"). > option(RECORDKEY_FIELD_OPT_KEY, "uuid"). > option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). > option(TABLE_NAME, tableName). > mode(Append). > save(basePath); > println("end:" + System.currentTimeMillis()) > > query(spark, basePath, updateUUID) > > println("finish") > } > > def query(spark: SQLContext, basePath: String, updateUUID: String) = { > val roViewDF = spark. > read. > format("org.apache.hudi"). > load(basePath + "/*/*/*/*") > roViewDF.registerTempTable("hudi_ro_table") > val df0 = spark.sql("select * from hudi_ro_table where uuid='" + > updateUUID + "'") > // df0.printSchema(); > println(df0.rdd.collect().mkString(" ")) > } > } > > > > [email protected] >
