Hi @Vinoth,
Here are reproduce steps.
1, Build from latest source
mvn clean package -DskipTests -DskipITs -Dcheckstyle.skip=true -Drat.skip=true
2, Write Data
export SPARK_HOME=/work/BigData/install/spark/spark-2.3.3-bin-hadoop2.6
${SPARK_HOME}/bin/spark-shell --jars `ls
packaging/hudi-spark-bundle/target/hudi-spark-bundle-*.*.*-SNAPSHOT.jar` --conf
'spark.serializer=org.apache.spark.serializer.KryoSerializer'
import org.apache.spark.sql.SaveMode._
var datas = List("{ \"name\": \"kenken\", \"ts\": 1574297893836, \"age\": 12,
\"location\": \"latitude\"}")
val df = spark.read.json(spark.sparkContext.parallelize(datas, 2))
df.write.format("org.apache.hudi").
option("hoodie.insert.shuffle.parallelism", "10").
option("hoodie.upsert.shuffle.parallelism", "10").
option("hoodie.delete.shuffle.parallelism", "10").
option("hoodie.bulkinsert.shuffle.parallelism", "10").
option("hoodie.datasource.write.recordkey.field", "name").
option("hoodie.datasource.write.partitionpath.field", "location").
option("hoodie.datasource.write.precombine.field", "ts").
option("hoodie.table.name", "hudi_mor_table").
mode(Overwrite).
save("file:///tmp/hudi_mor_table")
3, Append Data
df.write.format("org.apache.hudi").
option("hoodie.insert.shuffle.parallelism", "10").
option("hoodie.upsert.shuffle.parallelism", "10").
option("hoodie.delete.shuffle.parallelism", "10").
option("hoodie.bulkinsert.shuffle.parallelism", "10").
option("hoodie.datasource.write.recordkey.field", "name").
option("hoodie.datasource.write.partitionpath.field", "location").
option("hoodie.datasource.write.precombine.field", "ts").
option("hoodie.keep.max.commits", "5").
option("hoodie.keep.min.commits", "4").
option("hoodie.cleaner.commits.retained", "3").
option("hoodie.table.name", "hudi_mor_table").
mode(Append).
save("file:///tmp/hudi_mor_table")
4, Repeat about six times Append Data operation(above), will get the stackstrace
19/12/24 13:34:09 ERROR HoodieCommitArchiveLog: Failed to archive commits,
.commit file: 20191224132942.clean.requested
java.io.IOException: Not an Avro data file
at org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:50)
at
org.apache.hudi.common.util.AvroUtils.deserializeAvroMetadata(AvroUtils.java:147)
at org.apache.hudi.common.util.CleanerUtils.getCleanerPlan(CleanerUtils.java:88)
at
org.apache.hudi.io.HoodieCommitArchiveLog.convertToAvroRecord(HoodieCommitArchiveLog.java:294)
at
org.apache.hudi.io.HoodieCommitArchiveLog.archive(HoodieCommitArchiveLog.java:253)
at
org.apache.hudi.io.HoodieCommitArchiveLog.archiveIfRequired(HoodieCommitArchiveLog.java:122)
at org.apache.hudi.HoodieWriteClient.commit(HoodieWriteClient.java:562)
at org.apache.hudi.HoodieWriteClient.commit(HoodieWriteClient.java:523)
at org.apache.hudi.HoodieWriteClient.commit(HoodieWriteClient.java:514)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:159)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
BWT, I'm not familiar with this logic for now. If you have any ideas, feel free
to take over it.
best,
lamber-ken
At 2019-12-24 13:01:50, "Vinoth Chandar" <[email protected]> wrote:
Could you give 0.5.0-incubating (last release) a shot in the meantime?
Lamberken, do you have steps to reproduce this issue. Love to get a JIRA filed
so we could fix before the next release.
On Mon, Dec 23, 2019 at 7:25 PM lamberken <[email protected]> wrote:
Hi @mayu1,
I guess you used the latest master branch, this bug seems happened after
HUDI-398 merged.
I met the same exception, and I am trying to fix it [1].
You can try to build source before that commit, then continue your test.
[1] https://issues.apache.org/jira/browse/HUDI-453
best,
lamber-ken
At 2019-12-24 11:11:41, "[email protected]" <[email protected]> wrote:
>hello!
>I want to modify the quickstart program for performance testing and generate a
>dataset of ten million rows. However, the program will report an error after
>running it multiple times.
>
>error:
>Exception in thread "main" org.apache.hudi.exception.HoodieCommitException:
>Failed to archive commits
>at
>org.apache.hudi.io.HoodieCommitArchiveLog.archive(HoodieCommitArchiveLog.java:266)
>at
>org.apache.hudi.io.HoodieCommitArchiveLog.archiveIfRequired(HoodieCommitArchiveLog.java:122)
>at org.apache.hudi.HoodieWriteClient.commit(HoodieWriteClient.java:562)
>at org.apache.hudi.HoodieWriteClient.commit(HoodieWriteClient.java:523)
>at org.apache.hudi.HoodieWriteClient.commit(HoodieWriteClient.java:514)
>at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:152)
>at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
>at
>org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
>at
>org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
>at
>org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
>at
>org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
>at
>org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>at
>org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>at
>org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>at
>org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>at
>org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
>at
>org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>at
>org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>at
>org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>at
>org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>at
>org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
>at
>org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
>at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
>at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
>at HudiUpdate$.main(HudiUpdate.scala:38)
>at HudiUpdate.main(HudiUpdate.scala)
>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>at
>sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>at
>sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>at java.lang.reflect.Method.invoke(Method.java:498)
>at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>at
>org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
>at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
>at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
>at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
>at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
>at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
>at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>Caused by: java.io.IOException: Not an Avro data file
>at org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:50)
>at
>org.apache.hudi.common.util.AvroUtils.deserializeAvroMetadata(AvroUtils.java:147)
>at
>org.apache.hudi.common.util.CleanerUtils.getCleanerPlan(CleanerUtils.java:88)
>at
>org.apache.hudi.io.HoodieCommitArchiveLog.convertToAvroRecord(HoodieCommitArchiveLog.java:294)
>at
>org.apache.hudi.io.HoodieCommitArchiveLog.archive(HoodieCommitArchiveLog.java:253)
>... 41 more
>
>my program:
>import org.apache.spark.sql.SQLContext
>import org.apache.spark.{SparkConf, SparkContext}
>
>object HudiDataGen {
> def main(args: Array[String]): Unit = {
> import org.apache.hudi.DataSourceWriteOptions._
> import org.apache.hudi.QuickstartUtils._
> import org.apache.hudi.config.HoodieWriteConfig._
> import org.apache.spark.sql.SaveMode._
>
> import scala.collection.JavaConversions._
>
> //初始化
> val conf = new SparkConf().setAppName("HudiTest")
> // .setMaster("local")
> conf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer") //使用Kryo序列化库
> val sc = new SparkContext(conf)
> val spark = new SQLContext(sc)
>
> //设置表名、基本路径和数据生成器来为本指南生成记录。
> val tableName = "hudi_cow_table"
> val basePath = "hdfs://172.16.44.28:8020/flink/hudi"
> // val basePath = "file:///e:/hudi_cow_table"
> val dataGen = new DataGenerator
>
> //生成一些新的行程样本,将其加载到DataFrame中,然后将DataFrame写入Hudi数据集中,如下所示。
> val inserts = convertToStringList(dataGen.generateInserts(1000000))
> // println("insert:"+System.currentTimeMillis())
> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 8))
> df.write.format("org.apache.hudi").
> options(getQuickstartWriteConfigs).
> option(PRECOMBINE_FIELD_OPT_KEY, "ts").
> option(RECORDKEY_FIELD_OPT_KEY, "uuid").
> option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
> option(TABLE_NAME, tableName).
> mode(Append).
> save(basePath);
> println("finish")
> }
>}
>
>
>[email protected]