ashishmgofficial edited a comment on issue #2149:
URL: https://github.com/apache/hudi/issues/2149#issuecomment-704918685
@bvaradar : Thanks for the code . I followed your instructions but tried to
add _is_hoodie_deleted column to the dataset using following code for testing
Im getting the following error with the code mentioned in the post
```
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2043)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2031)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2030)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2030)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
at scala.Option.foreach(Option.scala:257)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:967)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2264)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2213)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2202)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:778)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1409)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.take(RDD.scala:1382)
at
org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1517)
at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1517)
at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1517)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1516)
at
org.apache.spark.api.java.JavaRDDLike$class.isEmpty(JavaRDDLike.scala:544)
at
org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:349)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:238)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:163)
at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:161)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:466)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.RuntimeException: Error while decoding:
java.lang.NegativeArraySizeException
createexternalrow(input[0, bigint, true], input[1, string, false].toString,
input[2, string, false].toString, input[3, int, false], input[4, int, true],
input[5, string, true].toString, input[6, string, true].toString, input[7, int,
true], input[8, string, true].toString, input[9, bigint, false],
StructField(_ts_ms,LongType,true), StructField(_op,StringType,false),
StructField(_hoodie_is_deleted,StringType,false),
StructField(inc_id,IntegerType,false), StructField(year,IntegerType,true),
StructField(violation_desc,StringType,true),
StructField(violation_code,StringType,true),
StructField(case_individual_id,IntegerType,true),
StructField(flag,StringType,true), StructField(last_modified_ts,LongType,false))
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305)
at
org.apache.hudi.AvroConversionUtils$$anonfun$1.apply(AvroConversionUtils.scala:44)
at
org.apache.hudi.AvroConversionUtils$$anonfun$1.apply(AvroConversionUtils.scala:44)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:394)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1334)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
at
org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$31.apply(RDD.scala:1409)
at
org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$31.apply(RDD.scala:1409)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NegativeArraySizeException
at
org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:297)
at
org.apache.spark.unsafe.types.UTF8String.toString(UTF8String.java:1226)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.createExternalRow_0_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown
Source)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:302)
... 28 more
```
Transformer
```public class DebeziumCustomTransformer implements Transformer {
private static final Logger LOG =
LogManager.getLogger(DebeziumCustomTransformer.class);
@Override
public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession,
Dataset<Row> rowDataset,
TypedProperties properties) {
Dataset<Row> insertedOrUpdatedData = rowDataset
.select("op", "ts_ms", "after.*")
.withColumnRenamed("op", "_op")
.withColumn("_is_hoodie_deleted",lit("false"))
.withColumnRenamed("ts_ms", "_ts_ms")
.filter(rowDataset.col("op").notEqual("d"));
Dataset<Row> deletedData = rowDataset
.select("op", "ts_ms", "before.*")
.withColumnRenamed("op", "_op")
.withColumn("_is_hoodie_deleted",lit("true"))
.withColumnRenamed("ts_ms", "_ts_ms")
.filter(rowDataset.col("op").equalTo("d"));
Dataset<Row> transformedData = insertedOrUpdatedData.union(deletedData);
return transformedData;
}
}
```
SchemaProviderDebezium
```
public class DebeziumRegistryProvider extends SchemaRegistryProvider {
public DebeziumRegistryProvider(TypedProperties props,
JavaSparkContext jssc) {
super(props, jssc);
}
/**
* Debezium target schema is a nested structure with many metadata fields.
This will
* flatten the schema structure and only require necessary metadata
information
* @return
*/
@Override
public Schema getTargetSchema() {
Schema registrySchema = super.getTargetSchema();
Field dataField = registrySchema.getField("after");
Field tsField = registrySchema.getField("ts_ms");
Field opField = registrySchema.getField("op");
Field hoodieDeleteField = registrySchema.getField("op");
// Initialize with metadata columns
FieldAssembler<Schema> payloadFieldAssembler = SchemaBuilder.builder()
.record("formatted_debezium_payload")
.fields()
.name("_" + tsField.name()).type(tsField.schema()).withDefault(null)
.name("_" + opField.name()).type(opField.schema()).withDefault(null)
.name("_hoodie_is_deleted").type(hoodieDeleteField.schema()).withDefault(null);
// Add data columns to schema
dataField.schema()
.getTypes()
// "after" field is a union with data schema and null schema, so we
need to extract only the data schema portion
.get(dataField.schema().getIndexNamed(registrySchema.getNamespace()
+ ".Value"))
.getFields()
.forEach(field -> {
payloadFieldAssembler.name(field.name()).type(field.schema()).withDefault(null);
});
return payloadFieldAssembler.endRecord();
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]