ashishmgofficial edited a comment on issue #2149:
URL: https://github.com/apache/hudi/issues/2149#issuecomment-704918685


   @bvaradar  : Thanks for the code . I followed your instructions but tried to 
add _is_hoodie_deleted column to the dataset using following code for testing 
   
   Im getting the following error with the code mentioned in the post
   
   ```
   Driver stacktrace:
           at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2043)
           at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2031)
           at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2030)
           at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
           at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
           at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2030)
           at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
           at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
           at scala.Option.foreach(Option.scala:257)
           at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:967)
           at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2264)
           at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2213)
           at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2202)
           at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
           at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:778)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
           at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1409)
           at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
           at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
           at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
           at org.apache.spark.rdd.RDD.take(RDD.scala:1382)
           at 
org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1517)
           at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1517)
           at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1517)
           at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
           at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
           at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
           at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1516)
           at 
org.apache.spark.api.java.JavaRDDLike$class.isEmpty(JavaRDDLike.scala:544)
           at 
org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45)
           at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:349)
           at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:238)
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:163)
           at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:161)
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:466)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
           at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
           at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
           at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
           at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
           at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
           at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
           at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: java.lang.RuntimeException: Error while decoding: 
java.lang.NegativeArraySizeException
   createexternalrow(input[0, bigint, true], input[1, string, false].toString, 
input[2, string, false].toString, input[3, int, false], input[4, int, true], 
input[5, string, true].toString, input[6, string, true].toString, input[7, int, 
true], input[8, string, true].toString, input[9, bigint, false], 
StructField(_ts_ms,LongType,true), StructField(_op,StringType,false), 
StructField(_hoodie_is_deleted,StringType,false), 
StructField(inc_id,IntegerType,false), StructField(year,IntegerType,true), 
StructField(violation_desc,StringType,true), 
StructField(violation_code,StringType,true), 
StructField(case_individual_id,IntegerType,true), 
StructField(flag,StringType,true), StructField(last_modified_ts,LongType,false))
           at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305)
           at 
org.apache.hudi.AvroConversionUtils$$anonfun$1.apply(AvroConversionUtils.scala:44)
           at 
org.apache.hudi.AvroConversionUtils$$anonfun$1.apply(AvroConversionUtils.scala:44)
           at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
           at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
           at scala.collection.Iterator$$anon$10.next(Iterator.scala:394)
           at scala.collection.Iterator$class.foreach(Iterator.scala:891)
           at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
           at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
           at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
           at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
           at 
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
           at scala.collection.AbstractIterator.to(Iterator.scala:1334)
           at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
           at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
           at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
           at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
           at 
org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$31.apply(RDD.scala:1409)
           at 
org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$31.apply(RDD.scala:1409)
           at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
           at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
           at org.apache.spark.scheduler.Task.run(Task.scala:123)
           at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.NegativeArraySizeException
           at 
org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:297)
           at 
org.apache.spark.unsafe.types.UTF8String.toString(UTF8String.java:1226)
           at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.createExternalRow_0_0$(Unknown
 Source)
           at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown
 Source)
           at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:302)
           ... 28 more
   ```
   
   Transformer 
   
   
   ```public class DebeziumCustomTransformer implements Transformer {
     private static final Logger LOG = 
LogManager.getLogger(DebeziumCustomTransformer.class);
     @Override
     public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, 
Dataset<Row> rowDataset,
         TypedProperties properties) {
       
       Dataset<Row> insertedOrUpdatedData = rowDataset
           .select("op", "ts_ms", "after.*")
           .withColumnRenamed("op", "_op")
           .withColumn("_is_hoodie_deleted",lit("false"))
           .withColumnRenamed("ts_ms", "_ts_ms")
           .filter(rowDataset.col("op").notEqual("d"));
       
       Dataset<Row> deletedData = rowDataset
           .select("op", "ts_ms", "before.*")
           .withColumnRenamed("op", "_op")
           .withColumn("_is_hoodie_deleted",lit("true"))
           .withColumnRenamed("ts_ms", "_ts_ms")
           .filter(rowDataset.col("op").equalTo("d"));
       
       Dataset<Row> transformedData = insertedOrUpdatedData.union(deletedData);
       return transformedData;
     }
   }
   ```
   
   SchemaProviderDebezium
   
   ```
   public class DebeziumRegistryProvider extends SchemaRegistryProvider {  
     public DebeziumRegistryProvider(TypedProperties props,
         JavaSparkContext jssc) {
       super(props, jssc);
     }
     
     /**
      * Debezium target schema is a nested structure with many metadata fields. 
This will
      * flatten the schema structure and only require necessary metadata 
information
      * @return
      */
     @Override
     public Schema getTargetSchema() {
       Schema registrySchema = super.getTargetSchema();
     
       Field dataField = registrySchema.getField("after");
       Field tsField = registrySchema.getField("ts_ms");
       Field opField = registrySchema.getField("op");
       Field hoodieDeleteField = registrySchema.getField("op");
     
       // Initialize with metadata columns
       FieldAssembler<Schema> payloadFieldAssembler = SchemaBuilder.builder()
           .record("formatted_debezium_payload")
           .fields()
           .name("_" + tsField.name()).type(tsField.schema()).withDefault(null)
           .name("_" + opField.name()).type(opField.schema()).withDefault(null)
           
.name("_hoodie_is_deleted").type(hoodieDeleteField.schema()).withDefault(null);
     
       // Add data columns to schema
       dataField.schema()
           .getTypes()
           // "after" field is a union with data schema and null schema, so we 
need to extract only the data schema portion
           .get(dataField.schema().getIndexNamed(registrySchema.getNamespace() 
+ ".Value"))
           .getFields()
           .forEach(field -> {
             
payloadFieldAssembler.name(field.name()).type(field.schema()).withDefault(null);
           });
       return payloadFieldAssembler.endRecord();
     }
   }
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to