Mayu1,

There shouldn't be a difference of upsert of 1 million vs 10 million. This
does seem to be a problem in the way the dataset is being queries. Could
you please give that link a read and if you still run into issues, we can
help you.

-Nishith

On Tue, Dec 24, 2019 at 11:36 PM ma...@bonc.com.cn <ma...@bonc.com.cn>
wrote:

> Thank you for answering my question
> I found that this is not a problem with HDFS, but that the number of
> records can be upsert when 1 million, and it will have this problem when 10
> million
>
>
>
> ma...@bonc.com.cn
>
> From: nishith agarwal
> Date: 2019-12-25 15:19
> To: dev
> Subject: Re: How to upsert on HDFS
> Hello Mayu,
>
> To query Hudi tables, you need to register it against the correct
> HudiInputFormat. For more information on querying the hudi table, please
> read the following documentation :
> https://hudi.apache.org/querying_data.html.
> The duplicates might be happening due to the absence of the inputformat
> which will choose the right file/update to pick.
>
> Thanks,
> Nishith
>
> On Tue, Dec 24, 2019 at 6:46 PM ma...@bonc.com.cn <ma...@bonc.com.cn>
> wrote:
>
> > Hello!
> > I want to update a record(upsert) of hudi. It works when I run on a local
> > dataset (such as file: /// e: / hudi_cow_table), but the original record
> > will not be overwritten when I run on HDFS (such as hdfs://
> > 172.16.44.28:8020/flink/hudi) and generated a new record. why?
> >
> > My program:import java.util.Collections
> >
> > import org.apache.spark.sql.SQLContext
> > import org.apache.spark.{SparkConf, SparkContext}
> >
> > object HudiUpdate {
> >     def main(args: Array[String]): Unit = {
> >         import org.apache.hudi.DataSourceWriteOptions._
> >         import org.apache.hudi.QuickstartUtils._
> >         import org.apache.hudi.config.HoodieWriteConfig._
> >         import org.apache.spark.sql.SaveMode._
> >
> >         import scala.collection.JavaConversions._
> >
> >         //初始化
> >         val conf = new SparkConf().setAppName("HudiTest")
> >                 .setMaster("local")
> >         conf.set("spark.serializer",
> > "org.apache.spark.serializer.KryoSerializer") //使用Kryo序列化库
> >         val sc = new SparkContext(conf)
> >         val spark = new SQLContext(sc)
> >
> >         val updateUUID = "079bd411-ca1e-4092-9523-0b694f9f41f2"
> > //        val updateUUID = args(0)
> >
> >         //设置表名、基本路径和数据生成器来为本指南生成记录。
> >         val tableName = Constant.tableName
> > //        val basePath = Constant.hdfsPath
> >         val basePath = Constant.localPath
> >
> >         query(spark, basePath, updateUUID)
> >
> >         //生成一些新的行程样本,将其加载到DataFrame中,然后将DataFrame写入Hudi数据集中,如下所示。
> >         val record = "{\"ts\": 0.0, \"uuid\": \"" + updateUUID + "\",
> > \"rider\": \"rider-213\", \"driver\": \"driver-213\", " +
> >                 "\"begin_lat\": 0.4726905879569653, \"begin_lon\":
> > 0.46157858450465483, \"end_lat\": 0.754803407008858, " +
> >                 "\"end_lon\": 0.9671159942018241, \"fare\":
> > 34.158284716382845, \"partitionpath\": \"americas/brazil/sao_paulo\"}"
> >         println(record)
> >         val upsert = Collections.singletonList(record);
> >         //        println("insert:"+System.currentTimeMillis())
> >         val df = spark.read.json(spark.sparkContext.parallelize(upsert,
> 1))
> >
> >         println("start:" + System.currentTimeMillis())
> >         df.write.format("org.apache.hudi").
> >                 options(getQuickstartWriteConfigs).
> >                 option(PRECOMBINE_FIELD_OPT_KEY, "ts").
> >                 option(RECORDKEY_FIELD_OPT_KEY, "uuid").
> >                 option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
> >                 option(TABLE_NAME, tableName).
> >                 mode(Append).
> >                 save(basePath);
> >         println("end:" + System.currentTimeMillis())
> >
> >         query(spark, basePath, updateUUID)
> >
> >         println("finish")
> >     }
> >
> >     def query(spark: SQLContext, basePath: String, updateUUID: String) =
> {
> >         val roViewDF = spark.
> >                 read.
> >                 format("org.apache.hudi").
> >                 load(basePath + "/*/*/*/*")
> >         roViewDF.registerTempTable("hudi_ro_table")
> >         val df0 = spark.sql("select * from  hudi_ro_table where uuid='" +
> > updateUUID + "'")
> >         //        df0.printSchema();
> >         println(df0.rdd.collect().mkString(" "))
> >     }
> > }
> >
> >
> >
> > ma...@bonc.com.cn
> >
>

Reply via email to