bobgalvao opened a new issue #1723:
URL: https://github.com/apache/hudi/issues/1723
Hi,
I'm having a trouble using Apache Hudi with S3.
**Steps to reproduce the behavior:**
1. Produce messages to topic Kafka. (2000 records per window on average)
2. Start streaming (sample code below).
3. Intermittently errors start to occur
4. It is necessary to leave the streaming consuming the message of Kafka for
the error to occur. There is no standard.
**Environment Description:**
AWS EMR: emr-5.29.0
Hudi version : 0.5.0-inc
Spark version : 2.4.4
Hive version : 2.3.6
Hadoop version : 2.8.5
Storage : S3
Running on Docker? : No
The errors occur intermittently, making subsequent writing impossible for
the error (error - 1) “Unrecognized token 'Objavro' ..” and for the error
(error - 2 / 3) “Could not find any data file written for commit…” / "Failed to
read schema from data...". In this last case, it normalize it in the next
execution, but the streaming or batch processing ends with an error.
Due to the problem in the use of S3, I started using HDFS with the same
code, where I had no problems with inconsistencies caused by S3.
I have already enabled EMRFS, but the same errors occur. I also enabled
“hoodie.consistency.check.enabled” as recommended when using S3 storage. It
seems to me to be related to the consistency of the S3.
I often get the errors below:
**Error – 1 (when this error occurs it is no longer possible to use the Hudi
dataset.):**
20/05/21 17:49:36 ERROR JobScheduler: Error running job streaming job
1590083340000 ms.0
org.apache.hudi.hive.HoodieHiveSyncException: Failed to get dataset schema
for AWS_CASE
at
org.apache.hudi.hive.HoodieHiveClient.getDataSchema(HoodieHiveClient.java:414)
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:93)
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:71)
at
org.apache.hudi.HoodieSparkSqlWriter$.syncHive(HoodieSparkSqlWriter.scala:236)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:169)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at br.com.agi.bigdata.awscase.Main$.processRDD(Main.scala:91)
at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:117)
at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:114)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: unable to read commit metadata
at
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:328)
at
org.apache.hudi.hive.HoodieHiveClient.readSchemaFromLastCompaction(HoodieHiveClient.java:428)
at
org.apache.hudi.hive.HoodieHiveClient.getDataSchema(HoodieHiveClient.java:407)
... 48 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token
'Objavro': was expecting ('true', 'false' or 'null')
at [Source:
Objavro.schema�{"type":"record","name":"HoodieCompactionPlan","namespace":"org.apache.hudi.avro.model","fields":[{"name":"operations","type":["null",{"type":"array","items":{"type":"record","name":"HoodieCompactionOperation","fields":[{"name":"baseInstantTime","type":["null",{"type":"string","avro.java.string":"String"}]},{"name":"deltaFilePaths","type":["null",{"type":"array","items":{"type":"string","avro.java.string":"String"}}],"default":null},{"name":"dataFilePath","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"fileId","type":["null",{"type":"string","avro.java.string":"String"}]},{"name":"partitionPath","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"metrics","type":["null",{"type":"map","values":"double","avro.java.string":"String"}],"default":null}]}}],"default":null},{"name":"extraMetadata","type":["null",{"type":"map","values":{"type":"string","avro.java.string":"String"},"avro.java.string":"String"}],"default":null}]}�
_v
:���[�gקGC�20200521174801�s3://bucket01/AWS_CASE/0/.cfe57b0e-1ac0-4650-960a-615cdba323f9-0_20200521174801.log.1_1-927-392109�s3://bucket01/AWS_CASE/0/cfe57b0e-1ac0-4650-960a-615cdba323f9-0_1-895-378588_20200521174801.parquetLcfe57b0e-1ac0-4650-960a-615cdba323f9-00
TOTAL_LOG_FILES�?TOTAL_IO_MB
TOTAL_IO_READ_MB(TOTAL_LOG_FILES_SIZE�@"TOTAL_IO_WRITE_MB&TOTAL_LOG_FILE_SIZE�@�
_v :���[�gקGC; line: 1, column: 11]
at
com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581)
at
com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533)
at
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2462)
at
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1621)
at
com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:689)
at
com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3776)
at
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3721)
at
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726)
at
org.apache.hudi.common.model.HoodieCommitMetadata.fromJsonString(HoodieCommitMetadata.java:129)
at
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:326)
... 50 more
**Error – 2:**
java.lang.IllegalArgumentException: Could not find any data file written for
commit [20200520134502__deltacommit__COMPLETED], could not get schema for
dataset s3://bucket01/AWS_CASE
, CommitMetadata :HoodieCommitMetadata{partitionToWriteStats={},
compacted=false, extraMetadataMap={}}
at
org.apache.hudi.hive.HoodieHiveClient.lambda$null$10(HoodieHiveClient.java:393)
at java.util.Optional.orElseThrow(Optional.java:290)
at
org.apache.hudi.hive.HoodieHiveClient.lambda$getDataSchema$11(HoodieHiveClient.java:391)
at java.util.Optional.orElseGet(Optional.java:267)
at
org.apache.hudi.hive.HoodieHiveClient.getDataSchema(HoodieHiveClient.java:387)
at
org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:93)
at
org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:71)
at
org.apache.hudi.HoodieSparkSqlWriter$.syncHive(HoodieSparkSqlWriter.scala:236)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:169)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at
br.com.agi.bigdata.fastdata.aux.emr.schema1.table1.Main$.processRDD(Main.scala:109)
at
br.com.agi.bigdata.fastdata.aux.emr.schema1.table1.Main$$anonfun$main$1.apply(Main.scala:145)
at
br.com.agi.bigdata.fastdata.aux.emr.schema1.table1.Main$$anonfun$main$1.apply(Main.scala:135)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
**Error - 3:**
20/05/21 17:20:33 ERROR JobScheduler: Error running job streaming job
1590081600000 ms.0
java.lang.IllegalArgumentException: Failed to read schema from data file
s3://bucket01/AWS_CASE/0/cfe57b0e-1ac0-4650-960a-615cdba323f9-0_0-1453-608455_20200521172025.parquet.
File does not exist.
at
org.apache.hudi.hive.HoodieHiveClient.readSchemaFromDataFile(HoodieHiveClient.java:456)
at
org.apache.hudi.hive.HoodieHiveClient.readSchemaFromLastCompaction(HoodieHiveClient.java:432)
at
org.apache.hudi.hive.HoodieHiveClient.getDataSchema(HoodieHiveClient.java:407)
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:93)
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:71)
at
org.apache.hudi.HoodieSparkSqlWriter$.syncHive(HoodieSparkSqlWriter.scala:236)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:169)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at br.com.agi.bigdata.awscase.Main$.processRDD(Main.scala:91)
at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:117)
at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:114)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
20/05/21 17:20:33 ERROR ApplicationMaster: User class threw exception:
java.lang.IllegalArgumentException: Failed to read schema from data file
s3://bucket01/AWS_CASE/0/cfe57b0e-1ac0-4650-960a-615cdba323f9-0_0-1453-608455_20200521172025.parquet.
File does not exist.
java.lang.IllegalArgumentException: Failed to read schema from data file
s3://bucket01/AWS_CASE/0/cfe57b0e-1ac0-4650-960a-615cdba323f9-0_0-1453-608455_20200521172025.parquet.
File does not exist.
at
org.apache.hudi.hive.HoodieHiveClient.readSchemaFromDataFile(HoodieHiveClient.java:456)
at
org.apache.hudi.hive.HoodieHiveClient.readSchemaFromLastCompaction(HoodieHiveClient.java:432)
at
org.apache.hudi.hive.HoodieHiveClient.getDataSchema(HoodieHiveClient.java:407)
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:93)
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:71)
at
org.apache.hudi.HoodieSparkSqlWriter$.syncHive(HoodieSparkSqlWriter.scala:236)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:169)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at br.com.agi.bigdata.awscase.Main$.processRDD(Main.scala:91)
at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:117)
at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:114)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
**Hudi-cli information:**
hudi->connect --path s3://bucket01/AWS_CASE
6671 [Spring Shell] INFO org.apache.hudi.common.table.HoodieTableMetaClient
- Loading HoodieTableMetaClient from s3://bucket01/AWS_CASE
7242 [Spring Shell] WARN org.apache.hadoop.util.NativeCodeLoader - Unable
to load native-hadoop library for your platform... using builtin-java classes
where applicable
11229 [Spring Shell] INFO org.apache.hudi.common.util.FSUtils - Hadoop
Configuration: fs.defaultFS: [hdfs://ip-10-xx-x-xx.agi.aws.local:8020],
Config:[Configuration: core-default.xml, core-site.xml, mapred-default.xml,
mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml,
hdfs-site.xml, emrfs-site.xml], FileSystem:
[com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem@6a9763d6]
12778 [Spring Shell] INFO org.apache.hudi.common.table.HoodieTableConfig -
Loading dataset properties from s3://bucket01/AWS_CASE/.hoodie/hoodie.properties
12795 [Spring Shell] INFO
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem - Opening
's3://bucket01/AWS_CASE/.hoodie/hoodie.properties' for reading
12835 [Spring Shell] INFO
org.apache.hudi.common.table.HoodieTableMetaClient - Finished Loading Table of
type MERGE_ON_READ from s3://bucket01/AWS_CASE
Metadata for table AWS_CASE loaded
hudi:AWS_CASE->desc
35395 [Spring Shell] INFO
org.apache.hudi.common.table.timeline.HoodieActiveTimeline - Loaded instants
java.util.stream.ReferencePipeline$Head@1e3450cf
╔═════════════════════════════════╤════════════════════════════════════════════════╗
║ Property │ Value
║
╠═════════════════════════════════╪════════════════════════════════════════════════╣
║ basePath │ s3://bucket01/AWS_CASE
║
╟─────────────────────────────────┼────────────────────────────────────────────────╢
║ metaPath │ s3://bucket01/AWS_CASE/.hoodie
║
╟─────────────────────────────────┼────────────────────────────────────────────────╢
║ fileSystem │ s3
║
╟─────────────────────────────────┼────────────────────────────────────────────────╢
║ hoodie.table.name │ AWS_CASE
║
╟─────────────────────────────────┼────────────────────────────────────────────────╢
║ hoodie.compaction.payload.class │
org.apache.hudi.common.model.HoodieAvroPayload ║
╟─────────────────────────────────┼────────────────────────────────────────────────╢
║ hoodie.table.type │ MERGE_ON_READ
║
╟─────────────────────────────────┼────────────────────────────────────────────────╢
║ hoodie.archivelog.folder │ archived
║
╚═════════════════════════════════╧════════════════════════════════════════════════╝
**hudi:AWS_CASE->commits show**
69651 [Spring Shell] INFO
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem - Opening
's3://bucket01/AWS_CASE/.hoodie/20200521174926.commit' for reading
Command failed java.lang.reflect.UndeclaredThrowableException
java.lang.reflect.UndeclaredThrowableException
at
org.springframework.util.ReflectionUtils.rethrowRuntimeException(ReflectionUtils.java:315)
at
org.springframework.util.ReflectionUtils.handleInvocationTargetException(ReflectionUtils.java:295)
at
org.springframework.util.ReflectionUtils.handleReflectionException(ReflectionUtils.java:279)
at
org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:219)
at
org.springframework.shell.core.SimpleExecutionStrategy.invoke(SimpleExecutionStrategy.java:68)
at
org.springframework.shell.core.SimpleExecutionStrategy.execute(SimpleExecutionStrategy.java:59)
at
org.springframework.shell.core.AbstractShell.executeCommand(AbstractShell.java:134)
at
org.springframework.shell.core.JLineShell.promptLoop(JLineShell.java:533)
at org.springframework.shell.core.JLineShell.run(JLineShell.java:179)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: unable to read commit metadata
at
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:328)
at
org.apache.hudi.cli.commands.CommitsCommand.showCommits(CommitsCommand.java:89)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:216)
... 6 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token
'Objavro': was expecting ('true', 'false' or 'null')
at [Source:
Objavro.schema�{"type":"record","name":"HoodieCompactionPlan","namespace":"org.apache.hudi.avro.model","fields":[{"name":"operations","type":["null",{"type":"array","items":{"type":"record","name":"HoodieCompactionOperation","fields":[{"name":"baseInstantTime","type":["null",{"type":"string","avro.java.string":"String"}]},{"name":"deltaFilePaths","type":["null",{"type":"array","items":{"type":"string","avro.java.string":"String"}}],"default":null},{"name":"dataFilePath","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"fileId","type":["null",{"type":"string","avro.java.string":"String"}]},{"name":"partitionPath","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"metrics","type":["null",{"type":"map","values":"double","avro.java.string":"String"}],"default":null}]}}],"default":null},{"name":"extraMetadata","type":["null",{"type":"map","values":{"type":"string","avro.java.string":"String"},"avro.java.string":"String"}],"default":null}]}�
_v
:���[�gקGC�20200521174801�s3://bucket01/AWS_CASE/0/.cfe57b0e-1ac0-4650-960a-615cdba323f9-0_20200521174801.log.1_1-927-392109�s3://bucket01/AWS_CASE/0/cfe57b0e-1ac0-4650-960a-615cdba323f9-0_1-895-378588_20200521174801.parquetLcfe57b0e-1ac0-4650-960a-615cdba323f9-00
TOTAL_LOG_FILES�?TOTAL_IO_MB
TOTAL_IO_READ_MB(TOTAL_LOG_FILES_SIZE�@"TOTAL_IO_WRITE_MB&TOTAL_LOG_FILE_SIZE�@�
_v :���[�gקGC; line: 1, column: 11]
at
com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581)
at
com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533)
at
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2462)
at
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1621)
at
com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:689)
at
com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3776)
at
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3721)
at
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726)
at
org.apache.hudi.common.model.HoodieCommitMetadata.fromJsonString(HoodieCommitMetadata.java:129)
at
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:326)
... 12 more
**Sample code used:**
package br.com.agi.bigdata.awscase
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{ArrayType, StringType, StructField,
StructType}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import
org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{CanCommitOffsets,
HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.{Minutes, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object Main {
val sparkConf: SparkConf = new SparkConf()
.set("spark.sql.catalogImplementation", "hive")
.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
.set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true")
.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite",
"true")
.set("spark.yarn.maxAppAttempts", "3")
.set("spark.locality.wait", "1")
.set("spark.streaming.backpressure.enabled", "true")
.set("spark.streaming.backpressure.initialRate", "1")
.set("spark.streaming.receiver.maxRate", "1")
.set("spark.streaming.receiver.initialRate", "1")
.set("spark.sql.hive.convertMetastoreParquet", "false")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(sparkConf)
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
def processRDD(rddRaw: RDD[ConsumerRecord[String, String]]): Unit = {
import spark.implicits._
val hiveSchema = spark.sql(s"select * from
datalake.aux.emr.schema1.table1 limit 1").schema
val fieldsRemove = Array("current_ts", "table", "op_type", "op_ts",
"pos", "primary_keys", "tokens", "date_partition")
val jsonSchema = Seq[StructField](
StructField("table", StringType),
StructField("op_type", StringType),
StructField("op_ts", StringType),
StructField("current_ts", StringType),
StructField("pos", StringType),
StructField("primary_keys", ArrayType(StringType)),
StructField("normalizedkey", StringType),
StructField("after", StructType(hiveSchema.fields.filter(it =>
!fieldsRemove.contains(it.name)).map(it => StructField(it.name.toUpperCase,
it.dataType, it.nullable, it.metadata))))
)
var df = spark.read
.schema(StructType(jsonSchema))
.json(rddRaw.map(_.value()))
.withColumn("normalizedkey", concat(lit(col("current_ts")),
lit(col("pos"))).cast(StringType))
.select("table", "op_type", "op_ts", "current_ts", "pos",
"primary_keys", "normalizedkey", "after.*")
val hiveTableName = "AWS_CASE"
val pks = df.select($"primary_keys").first().getSeq[String](0).map(col)
df.withColumn("key", concat(pks: _*))
.withColumn("partition_id", (pks(0)/10000000).cast("Integer"))
.write.format("org.apache.hudi")
.option(HoodieWriteConfig.TABLE_NAME, hiveTableName)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "key")
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "datalake")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,
"normalizedkey")
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, hiveTableName)
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
classOf[MultiPartKeysValueExtractor].getName)
.option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY,
DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL,
"false")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,
"partition_id")
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY,
"partition_id")
.option(DataSourceWriteOptions.HIVE_URL_OPT_KEY,
"jdbc:hive2://10.xx.x.xxx:10000")
.option("hoodie.consistency.check.enabled", "true")
.option("hoodie.cleaner.policy", "KEEP_LATEST_FILE_VERSIONS")
.option("hoodie.keep.max.commits", 2)
.option("hoodie.keep.min.commits", 1)
.option("hoodie.parquet.compression.codec", "snappy")
.option("hoodie.cleaner.commits.retained", 0)
.option("hoodie.parquet.max.file.size", 1073741824)
.option("hoodie.parquet.small.file.limit", 943718400)
.mode(SaveMode.Append)
.save("s3://bucket01/AWS_CASE")
}
def main(args: Array[String]): Unit = {
val ssc = new StreamingContext(sc, Minutes(5))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "broker01:9092,broker02:9092,broker03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "aws_case",
"enable.auto.commit" -> (false: java.lang.Boolean),
"auto.offset.reset" -> "earliest"
)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](Array("TOPIC-01"), kafkaParams)
)
stream.foreachRDD(rddRaw => {
val offsetRanges = rddRaw.asInstanceOf[HasOffsetRanges].offsetRanges
if (!rddRaw.isEmpty()) {
processRDD(rddRaw)
}
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
ssc.start()
ssc.awaitTermination()
}
}
Build.sbt:
name := "aws-case-hudi"
version := "0.1"
scalaVersion := "2.11.12"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-streaming" % "2.4.4" % "provided",
"org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.4.4",
"org.apache.spark" %% "spark-sql" % "2.4.4",
"org.apache.spark" %% "spark-core" % "2.4.4",
"org.apache.hudi" % "hudi-spark-bundle" % "0.5.0-incubating",
"org.apache.httpcomponents" % "httpclient" % "4.5.12",
)
Best regards,
bobgalvao
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]