Mayu1, There shouldn't be a difference of upsert of 1 million vs 10 million. This does seem to be a problem in the way the dataset is being queries. Could you please give that link a read and if you still run into issues, we can help you.
-Nishith On Tue, Dec 24, 2019 at 11:36 PM ma...@bonc.com.cn <ma...@bonc.com.cn> wrote: > Thank you for answering my question > I found that this is not a problem with HDFS, but that the number of > records can be upsert when 1 million, and it will have this problem when 10 > million > > > > ma...@bonc.com.cn > > From: nishith agarwal > Date: 2019-12-25 15:19 > To: dev > Subject: Re: How to upsert on HDFS > Hello Mayu, > > To query Hudi tables, you need to register it against the correct > HudiInputFormat. For more information on querying the hudi table, please > read the following documentation : > https://hudi.apache.org/querying_data.html. > The duplicates might be happening due to the absence of the inputformat > which will choose the right file/update to pick. > > Thanks, > Nishith > > On Tue, Dec 24, 2019 at 6:46 PM ma...@bonc.com.cn <ma...@bonc.com.cn> > wrote: > > > Hello! > > I want to update a record(upsert) of hudi. It works when I run on a local > > dataset (such as file: /// e: / hudi_cow_table), but the original record > > will not be overwritten when I run on HDFS (such as hdfs:// > > 172.16.44.28:8020/flink/hudi) and generated a new record. why? > > > > My program:import java.util.Collections > > > > import org.apache.spark.sql.SQLContext > > import org.apache.spark.{SparkConf, SparkContext} > > > > object HudiUpdate { > > def main(args: Array[String]): Unit = { > > import org.apache.hudi.DataSourceWriteOptions._ > > import org.apache.hudi.QuickstartUtils._ > > import org.apache.hudi.config.HoodieWriteConfig._ > > import org.apache.spark.sql.SaveMode._ > > > > import scala.collection.JavaConversions._ > > > > //初始化 > > val conf = new SparkConf().setAppName("HudiTest") > > .setMaster("local") > > conf.set("spark.serializer", > > "org.apache.spark.serializer.KryoSerializer") //使用Kryo序列化库 > > val sc = new SparkContext(conf) > > val spark = new SQLContext(sc) > > > > val updateUUID = "079bd411-ca1e-4092-9523-0b694f9f41f2" > > // val updateUUID = args(0) > > > > //设置表名、基本路径和数据生成器来为本指南生成记录。 > > val tableName = Constant.tableName > > // val basePath = Constant.hdfsPath > > val basePath = Constant.localPath > > > > query(spark, basePath, updateUUID) > > > > //生成一些新的行程样本,将其加载到DataFrame中,然后将DataFrame写入Hudi数据集中,如下所示。 > > val record = "{\"ts\": 0.0, \"uuid\": \"" + updateUUID + "\", > > \"rider\": \"rider-213\", \"driver\": \"driver-213\", " + > > "\"begin_lat\": 0.4726905879569653, \"begin_lon\": > > 0.46157858450465483, \"end_lat\": 0.754803407008858, " + > > "\"end_lon\": 0.9671159942018241, \"fare\": > > 34.158284716382845, \"partitionpath\": \"americas/brazil/sao_paulo\"}" > > println(record) > > val upsert = Collections.singletonList(record); > > // println("insert:"+System.currentTimeMillis()) > > val df = spark.read.json(spark.sparkContext.parallelize(upsert, > 1)) > > > > println("start:" + System.currentTimeMillis()) > > df.write.format("org.apache.hudi"). > > options(getQuickstartWriteConfigs). > > option(PRECOMBINE_FIELD_OPT_KEY, "ts"). > > option(RECORDKEY_FIELD_OPT_KEY, "uuid"). > > option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). > > option(TABLE_NAME, tableName). > > mode(Append). > > save(basePath); > > println("end:" + System.currentTimeMillis()) > > > > query(spark, basePath, updateUUID) > > > > println("finish") > > } > > > > def query(spark: SQLContext, basePath: String, updateUUID: String) = > { > > val roViewDF = spark. > > read. > > format("org.apache.hudi"). > > load(basePath + "/*/*/*/*") > > roViewDF.registerTempTable("hudi_ro_table") > > val df0 = spark.sql("select * from hudi_ro_table where uuid='" + > > updateUUID + "'") > > // df0.printSchema(); > > println(df0.rdd.collect().mkString(" ")) > > } > > } > > > > > > > > ma...@bonc.com.cn > > >