spark版本2.3.2.3.1.0.0-78,提交代码为:
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("Demo")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.master("local[3]")
.getOrCreate()
// insert(spark)
update(spark)
query(spark)
// incremen¬talQueryPermalink(spark)
spark.stop()
}
/**
* 插入数据
*
* @param spark
*/
def insert(spark: SparkSession): Unit = {
val tableName = "hudi_archive_test"
val pathRoot = "/Users/tangxiuhong"
val basePath = pathRoot + "/deltalake/hudi/"
val inserts = List(
"""{"id" : 1, "name": "iteblog", "age" : 101, "ts" : 1, "dt" :
"20191212"}""",
"""{"id" : 2, "name": "iteblog_hadoop", "age" : 102, "ts" : 1, "dt" :
"20191213"}""",
"""{"id" : 3, "name": "hudi", "age" : 103, "ts" : 2, "dt" : "20191212"}""")
// val inserts = List(
// """{"id" : 4, "name": "iteblog", "age" : 102, "ts" : 2, "dt" :
"20191212","addr" : "云南"}""",
// """{"id" : 5, "name": "iteblog_hadoop", "age" : 103, "ts" : 2, "dt" :
"20191213","addr" : "浙江"}""",
// """{"id" : 6, "name": "hudi", "age" : 104, "ts" : 2, "dt" :
"20191212","addr" : "云南"}""")
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("org.apache.hudi")
// 设置主键列名
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
// 设置数据更新时间的列名
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
// 设置多级分区必须设置为org.apache.hudi.keygen.ComplexKeyGenerator
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY,
"org.apache.hudi.keygen.ComplexKeyGenerator")
// 设置多级分区列
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt,ts")
// 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引
为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
// 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引
.option(HoodieIndexConfig.INDEX_TYPE_PROP,
HoodieIndex.IndexType.GLOBAL_BLOOM.name())
// 并行度参数设置
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
// 表名称设置
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.mode(SaveMode.Append)
.save(basePath)
}
报以上错误是怎么回事呢?
[email protected]