abhisheksahani91 opened a new issue, #10138: URL: https://github.com/apache/hudi/issues/10138
**_Tips before filing an issue_** - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? - Join the mailing list to engage in conversations and get faster support at [email protected]. - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. GLUE version: 4.0 HUDI Version: 0.12.1 HUDI Table Type: MOR I am maintaining a MOR table using a file-based schema in Delta Streamer. If I add a new field to the schema file and restart the glue job, post that compaction starts failing ERROR STACK TRACE: 23/11/17 21:57:24 ERROR AsyncCompactService: Compactor executor failed org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 63.0 failed 4 times, most recent failure: Lost task 5.3 in stage 63.0 (TID 449) (10.24.69.253 executor 9): org.apache.hudi.exception.HoodieException: Exception when reading log file at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:352) at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:192) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:110) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:103) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:324) at org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:198) at org.apache.hudi.table.action.compact.HoodieCompactor.lambda$compact$57154431$1(HoodieCompactor.java:138) at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1525) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1435) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1499) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1322) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376) at org.apache.spark.rdd.RDD.iterator(RDD.scala:327) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:138) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1517) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.avro.AvroTypeException: Found user_data, expecting user_data, missing required field newCol at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308) at org.apache.avro.io.parsing.Parser.advance(Parser.java:86) at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:127) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:240) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) at org.apache.hudi.common.table.log.block.HoodieAvroDataBlock$RecordIterator.next(HoodieAvroDataBlock.java:207) at org.apache.hudi.common.table.log.block.HoodieAvroDataBlock$RecordIterator.next(HoodieAvroDataBlock.java:144) at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:382) at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:464) at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:343) ... 29 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2863) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2799) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2798) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) ~[scala-library-2.12.15.jar:?] at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) ~[scala-library-2.12.15.jar:?] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) ~[scala-library-2.12.15.jar:?] at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2798) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1239) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1239) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at scala.Option.foreach(Option.scala:407) ~[scala-library-2.12.15.jar:?] at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1239) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3051) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2993) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.SparkContext.runJob(SparkContext.scala:2229) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.SparkContext.runJob(SparkContext.scala:2250) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.SparkContext.runJob(SparkContext.scala:2294) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rdd.RDD.withScope(RDD.scala:406) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rdd.RDD.collect(RDD.scala:1020) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:155) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.table.action.compact.RunCompactionActionExecutor.execute(RunCompactionActionExecutor.java:93) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.table.HoodieSparkMergeOnReadTable.compact(HoodieSparkMergeOnReadTable.java:142) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.client.SparkRDDWriteClient.compact(SparkRDDWriteClient.java:342) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.client.BaseHoodieWriteClient.compact(BaseHoodieWriteClient.java:1062) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.client.HoodieSparkCompactor.compact(HoodieSparkCompactor.java:52) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.async.AsyncCompactService.lambda$null$0(AsyncCompactService.java:84) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_382] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382] Caused by: org.apache.hudi.exception.HoodieException: Exception when reading log file at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:352) ~[hudi-aws-bundle-0.12.1.jar:0.12.1] at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:192) ~[hudi-aws-bundle-0.12.1.jar:0.12.1] at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:110) ~[hudi-aws-bundle-0.12.1.jar:0.12.1] at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:103) ~[hudi-aws-bundle-0.12.1.jar:0.12.1] at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:324) ~[hudi-aws-bundle-0.12.1.jar:0.12.1] at org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:198) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.table.action.compact.HoodieCompactor.lambda$compact$57154431$1(HoodieCompactor.java:138) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) ~[scala-library-2.12.15.jar:?] at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) ~[scala-library-2.12.15.jar:?] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) ~[scala-library-2.12.15.jar:?] at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1525) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1435) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1499) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1322) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rdd.RDD.iterator(RDD.scala:327) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.scheduler.Task.run(Task.scala:138) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1517) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] ... 3 more Caused by: org.apache.avro.AvroTypeException: Found user_data, expecting user_data, missing required field newCol at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.io.parsing.Parser.advance(Parser.java:86) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:127) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:240) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.0.jar:1.11.0] at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) ~[avro-1.11.0.jar:1.11.0] at org.apache.hudi.common.table.log.block.HoodieAvroDataBlock$RecordIterator.next(HoodieAvroDataBlock.java:207) ~[hudi-aws-bundle-0.12.1.jar:0.12.1] at org.apache.hudi.common.table.log.block.HoodieAvroDataBlock$RecordIterator.next(HoodieAvroDataBlock.java:144) ~[hudi-aws-bundle-0.12.1.jar:0.12.1] at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:382) ~[hudi-aws-bundle-0.12.1.jar:0.12.1] at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:464) ~[hudi-aws-bundle-0.12.1.jar:0.12.1] at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:343) ~[hudi-aws-bundle-0.12.1.jar:0.12.1] at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:192) ~[hudi-aws-bundle-0.12.1.jar:0.12.1] at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:110) ~[hudi-aws-bundle-0.12.1.jar:0.12.1] at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:103) ~[hudi-aws-bundle-0.12.1.jar:0.12.1] at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:324) ~[hudi-aws-bundle-0.12.1.jar:0.12.1] at org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:198) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.hudi.table.action.compact.HoodieCompactor.lambda$compact$57154431$1(HoodieCompactor.java:138) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1] at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) ~[scala-library-2.12.15.jar:?] at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) ~[scala-library-2.12.15.jar:?] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) ~[scala-library-2.12.15.jar:?] at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1525) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1435) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1499) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1322) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rdd.RDD.iterator(RDD.scala:327) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.scheduler.Task.run(Task.scala:138) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1517) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] ... 3 more In the above error, I have added a new nullable field newCol, post that compaction starts failing Delta Streamer code: import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ import org.apache.spark.sql.SparkSession import org.apache.spark.api.java.JavaSparkContext import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer import org.apache.hudi.utilities.deltastreamer.SchedulerConfGenerator import org.apache.hudi.utilities.UtilHelpers object GlueApp { def main(sysArgs: Array[String]) { val args = GlueArgParser.getResolvedOptions( sysArgs, Seq("JOB_NAME","TARGET_BUCKET","CONFIG_BUCKET","KAFKA_BOOTSTRAP_SERVERS", "TARGET_TABLE", "SOURCE_TOPIC", "HOODIE_RECORDKEY_FIELD", "HOODIE_PRECOMBINE_FIELD","PARTITION_FIELD","TARGET_DATABASE").toArray) var config = Array( "--schemaprovider-class", "org.apache.hudi.utilities.schema.FilebasedSchemaProvider", "--source-class", "org.apache.hudi.utilities.sources.JsonKafkaSource", "--source-ordering-field", "ts_ux", "--target-base-path", "s3://"+ args("TARGET_BUCKET") + "/hudi_test_mor3/" + args("TARGET_TABLE") + "/", "--target-table", args("TARGET_TABLE"), "--table-type" , "MERGE_ON_READ", //"--table-type" , "COPY_ON_WRITE", "--enable-hive-sync", "--hoodie-conf", "hoodie.deltastreamer.schemaprovider.source.schema.file=s3://schema/artifacts/hudi/hudi-deltastreamer-glue/config/mongodb-userdata-prod-schema-new.avsc", "--hoodie-conf", "hoodie.deltastreamer.schemaprovider.target.schema.file=s3://schema/artifacts/hudi/hudi-deltastreamer-glue/config/mongodb-userdata-prod-schema-new.avsc", "--hoodie-conf", "hoodie.deltastreamer.source.kafka.topic="+ args("SOURCE_TOPIC") + "", //"--hoodie-conf", "hoodie.datasource.hive_sync.table="+ args("TARGET_TABLE") + "", "--hoodie-conf", "hoodie.datasource.write.recordkey.field=" +args("HOODIE_RECORDKEY_FIELD") + "", "--hoodie-conf", "hoodie.datasource.write.precombine.field=" +args("HOODIE_PRECOMBINE_FIELD") + "", "--hoodie-conf", "hoodie.datasource.hive_sync.enable=true", "--hoodie-conf", "hoodie.datasource.hive_sync.database="+ args("TARGET_DATABASE") + "", "--hoodie-conf", "hoodie.datasource.hive_sync.table="+ args("TARGET_TABLE") + "", "--hoodie-conf", "hoodie.datasource.write.operation=UPSERT", "--hoodie-conf", "hoodie.datasource.hive_sync.use_jdbc=false", "--hoodie-conf", "hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor", "--hoodie-conf", "hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator", "--hoodie-conf", "security.protocol=PLAINTEXT", "--hoodie-conf", "auto.offset.reset=latest", "--hoodie-conf", "bootstrap.servers=" + args("KAFKA_BOOTSTRAP_SERVERS"), "--hoodie-conf", "group.id=native-hudi-job", "--hoodie-conf", "hoodie.kafka.allow.commit.on.errors=true", "--hoodie-conf", "hoodie.write.allow_null_updates", "--hoodie-conf", "hoodie.index.type=SIMPLE", "--hoodie-conf", "hoodie.upsert.shuffle.parallelism=200", "--hoodie-conf", "hoodie.finalize.write.parallelism=400", "--hoodie-conf", "hoodie.markers.delete.parallelism=200", "--hoodie-conf", "hoodie.file.listing.parallelism=400", "--hoodie-conf", "hoodie.cleaner.parallelism=400", "--hoodie-conf", "hoodie.archive.delete.parallelism=200", "--hoodie-conf", "compaction.trigger.strategy=NUM_OR_TIME", "--hoodie-conf", "hoodie.compact.inline.trigger.strategy=NUM_OR_TIME", "--hoodie-conf", "compaction.schedule.enabled=true", "--hoodie-conf", "compaction.async.enabled=true", "--hoodie-conf", "compaction.delta_commits=5", "--hoodie-conf", "hoodie.compact.inline.max.delta.commits=5", "--hoodie-conf", "compaction.delta_seconds=600", "--hoodie-conf", "hoodie.compact.inline.max.delta.seconds=600", "--hoodie-conf", "hoodie.metrics.on=true", "--hoodie-conf", "hoodie.metrics.reporter.type=CLOUDWATCH", //"--hoodie-conf", "hoodie.precommit.validators=org.apache.hudi.client.validator.SqlQuerySingleResultPreCommitValidator", //"--hoodie-conf", "hoodie.precommit.validators.single.value.sql.queries=select count(*) from <TABLE_NAME> where updtdTm is null#0", "--hoodie-conf", "hoodie.deltastreamer.kafka.commit_on_errors=true", "--continuous" //"--commit-on-errors" ) val cfg = HoodieDeltaStreamer.getConfig(config) val additionalSparkConfigs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg) val jssc = UtilHelpers.buildSparkContext("delta-streamer-test", "jes", additionalSparkConfigs) val spark = jssc.sc val glueContext: GlueContext = new GlueContext(spark) Job.init(args("JOB_NAME"), glueContext, args.asJava) try { new HoodieDeltaStreamer(cfg, jssc).sync(); } finally { jssc.stop(); } Job.commit() } } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
