cdmikechen opened a new issue #2705:
URL: https://github.com/apache/hudi/issues/2705
**Describe the problem you faced**
I use spark operator on openshift 4.6 to receive Kafka data and insert data
to hudi table. I use `hudi-utilities_2.12` (maven build in 2.12 and spark3),
and use debezium to read mysql binlog.
When spark read kafka data, It shows the following error in *Stacktrace*
I don't know if this is bug in hudi 0.7.0 with spark3 or spark3 has a
problem with the structure of avro. The same program can be run in Hudi 0.6.0
based on spark on yarn.
the debezium avro schema is
```json
[{
"type": "record",
"name": "hoodie_source",
"namespace": "hoodie.source",
"fields": [{
"name": "before",
"type": [{
"type": "record",
"name": "before",
"namespace": "hoodie.source.hoodie_source",
"fields": [{
"name": "id",
"type": "int"
}, {
"name": "name",
"type": ["string", "null"]
}, {
"name": "type",
"type": ["string", "null"]
}, {
"name": "url",
"type": ["string", "null"]
}, {
"name": "user",
"type": ["string", "null"]
}, {
"name": "password",
"type": ["string", "null"]
}, {
"name": "create_time",
"type": ["string", "null"]
}, {
"name": "create_user",
"type": ["string", "null"]
}, {
"name": "update_time",
"type": ["string", "null"]
}, {
"name": "update_user",
"type": ["string", "null"]
}, {
"name": "del_flag",
"type": ["int", "null"]
}]
}, "null"]
}, {
"name": "after",
"type": [{
"type": "record",
"name": "after",
"namespace": "hoodie.source.hoodie_source",
"fields": [{
"name": "id",
"type": "int"
}, {
"name": "name",
"type": ["string", "null"]
}, {
"name": "type",
"type": ["string", "null"]
}, {
"name": "url",
"type": ["string", "null"]
}, {
"name": "user",
"type": ["string", "null"]
}, {
"name": "password",
"type": ["string", "null"]
}, {
"name": "create_time",
"type": ["string", "null"]
}, {
"name": "create_user",
"type": ["string", "null"]
}, {
"name": "update_time",
"type": ["string", "null"]
}, {
"name": "update_user",
"type": ["string", "null"]
}, {
"name": "del_flag",
"type": ["int", "null"]
}]
}, "null"]
}, {
"name": "source",
"type": {
"type": "record",
"name": "source",
"namespace": "hoodie.source.hoodie_source",
"fields": [{
"name": "version",
"type": "string"
}, {
"name": "connector",
"type": "string"
}, {
"name": "name",
"type": "string"
}, {
"name": "ts_ms",
"type": "long"
}, {
"name": "snapshot",
"type": ["string", "null"]
}, {
"name": "db",
"type": "string"
}, {
"name": "table",
"type": ["string", "null"]
}, {
"name": "server_id",
"type": "long"
}, {
"name": "gtid",
"type": ["string", "null"]
}, {
"name": "file",
"type": "string"
}, {
"name": "pos",
"type": "long"
}, {
"name": "row",
"type": "int"
}, {
"name": "thread",
"type": ["long", "null"]
}, {
"name": "query",
"type": ["string", "null"]
}]
}
}, {
"name": "op",
"type": "string"
}, {
"name": "ts_ms",
"type": ["long", "null"]
}, {
"name": "transaction",
"type": [{
"type": "record",
"name": "transaction",
"namespace": "hoodie.source.hoodie_source",
"fields": [{
"name": "id",
"type": "string"
}, {
"name": "total_order",
"type": "long"
}, {
"name": "data_collection_order",
"type": "long"
}]
}, "null"]
}]
}, {
"type": "record",
"name": "hoodie_source",
"namespace": "hoodie.source",
"fields": [{
"name": "before",
"type": [{
"type": "record",
"name": "before",
"namespace": "hoodie.source.hoodie_source",
"fields": [{
"name": "id",
"type": "int"
}, {
"name": "name",
"type": ["string", "null"]
}, {
"name": "type",
"type": ["string", "null"]
}, {
"name": "url",
"type": ["string", "null"]
}, {
"name": "user",
"type": ["string", "null"]
}, {
"name": "password",
"type": ["string", "null"]
}, {
"name": "create_time",
"type": ["string", "null"]
}, {
"name": "create_user",
"type": ["string", "null"]
}, {
"name": "update_time",
"type": ["string", "null"]
}, {
"name": "update_user",
"type": ["string", "null"]
}, {
"name": "del_flag",
"type": ["int", "null"]
}]
}, "null"]
}, {
"name": "after",
"type": [{
"type": "record",
"name": "after",
"namespace": "hoodie.source.hoodie_source",
"fields": [{
"name": "id",
"type": "int"
}, {
"name": "name",
"type": ["string", "null"]
}, {
"name": "type",
"type": ["string", "null"]
}, {
"name": "url",
"type": ["string", "null"]
}, {
"name": "user",
"type": ["string", "null"]
}, {
"name": "password",
"type": ["string", "null"]
}, {
"name": "create_time",
"type": ["string", "null"]
}, {
"name": "create_user",
"type": ["string", "null"]
}, {
"name": "update_time",
"type": ["string", "null"]
}, {
"name": "update_user",
"type": ["string", "null"]
}, {
"name": "del_flag",
"type": ["int", "null"]
}]
}, "null"]
}, {
"name": "source",
"type": {
"type": "record",
"name": "source",
"namespace": "hoodie.source.hoodie_source",
"fields": [{
"name": "version",
"type": "string"
}, {
"name": "connector",
"type": "string"
}, {
"name": "name",
"type": "string"
}, {
"name": "ts_ms",
"type": "long"
}, {
"name": "snapshot",
"type": ["string", "null"]
}, {
"name": "db",
"type": "string"
}, {
"name": "table",
"type": ["string", "null"]
}, {
"name": "server_id",
"type": "long"
}, {
"name": "gtid",
"type": ["string", "null"]
}, {
"name": "file",
"type": "string"
}, {
"name": "pos",
"type": "long"
}, {
"name": "row",
"type": "int"
}, {
"name": "thread",
"type": ["long", "null"]
}, {
"name": "query",
"type": ["string", "null"]
}]
}
}, {
"name": "op",
"type": "string"
}, {
"name": "ts_ms",
"type": ["long", "null"]
}, {
"name": "transaction",
"type": [{
"type": "record",
"name": "transaction",
"namespace": "hoodie.source.hoodie_source",
"fields": [{
"name": "id",
"type": "string"
}, {
"name": "total_order",
"type": "long"
}, {
"name": "data_collection_order",
"type": "long"
}]
}, "null"]
}]
}]
```
**Environment Description**
* Hudi version : 0.7.0 ( scala 2.12 )
* Spark version : 3.0.2 ( jre11 or jre8 )
* Hive version : 3.1.2 ( on MR3 1.2 )
* Hadoop version : 3.1.1 (HDP-3.1.4.0)
* Storage (HDFS/S3/GCS..) : HDFS
* Running on Docker? (yes/no) : yes ( OpenShift 4.6 and Spark Operator
v1beta2-1.2.1-3.0.0 )
**Additional context**
Add any other context about the problem here.
**Stacktrace**
```log
21/03/22 10:10:35 WARN util.package: Truncated the string representation of
a plan since it was too large. This behavior can be adjusted by setting
'spark.sql.debug.maxToStringFields'.
21/03/22 10:10:35 ERROR executor.Executor: Exception in task 0.0 in stage
1.0 (TID 1)
java.lang.RuntimeException: Error while decoding:
java.lang.NegativeArraySizeException: -1255727808
createexternalrow(if (isnull(input[0,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true])) null else createexternalrow(if (input[0,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].isNullAt) null else input[0,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].id, if (input[0,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].isNullAt) null else input[0,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_us
er:string,del_flag:int>, true].name.toString, if (input[0,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].isNullAt) null else input[0,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].type.toString, if (input[0,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].isNullAt) null else input[0,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].url.toString, if (input[0,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:st
ring,update_user:string,del_flag:int>, true].isNullAt) null else input[0,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].user.toString, if (input[0,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].isNullAt) null else input[0,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].password.toString, if (input[0,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].isNullAt) null else input[0,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:s
tring,update_time:string,update_user:string,del_flag:int>,
true].create_time.toString, if (input[0,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].isNullAt) null else input[0,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].create_user.toString, if (input[0,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].isNullAt) null else input[0,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].update_time.toString, if (input[0,
struct<id:int,name:string,type:string,url:string,user:string,password:str
ing,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].isNullAt) null else input[0,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].update_user.toString, if (input[0,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].isNullAt) null else input[0,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].del_flag, StructField(id,IntegerType,false),
StructField(name,StringType,true), StructField(type,StringType,true),
StructField(url,StringType,true), StructField(user,StringType,true),
StructField(password,StringType,true),
StructField(create_time,StringType,true), StructField(crea
te_user,StringType,true), StructField(update_time,StringType,true),
StructField(update_user,StringType,true),
StructField(del_flag,IntegerType,true)), if (isnull(input[1,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true])) null else createexternalrow(if (input[1,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].isNullAt) null else input[1,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].id, if (input[1,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].isNullAt) null else input[1, struct<id:in
t,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].name.toString, if (input[1,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].isNullAt) null else input[1,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].type.toString, if (input[1,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].isNullAt) null else input[1,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].url.toString, if (input[1
,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].isNullAt) null else input[1,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].user.toString, if (input[1,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].isNullAt) null else input[1,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].password.toString, if (input[1,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].isNullA
t) null else input[1,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].create_time.toString, if (input[1,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].isNullAt) null else input[1,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].create_user.toString, if (input[1,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].isNullAt) null else input[1,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:strin
g,del_flag:int>, true].update_time.toString, if (input[1,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].isNullAt) null else input[1,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].update_user.toString, if (input[1,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].isNullAt) null else input[1,
struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
true].del_flag, StructField(id,IntegerType,false),
StructField(name,StringType,true), StructField(type,StringType,true),
StructField(url,StringType,true), S
tructField(user,StringType,true), StructField(password,StringType,true),
StructField(create_time,StringType,true),
StructField(create_user,StringType,true),
StructField(update_time,StringType,true),
StructField(update_user,StringType,true),
StructField(del_flag,IntegerType,true)), if (isnull(input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false])) null else createexternalrow(if (input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].isNullAt) null else input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].version.toString, if (inpu
t[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].isNullAt) null else input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].connector.toString, if (input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].isNullAt) null else input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].name.toString, if (input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:stri
ng,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].isNullAt) null else input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].ts_ms, if (input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].isNullAt) null else input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].snapshot.toString, if (input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,threa
d:bigint,query:string>, false].isNullAt) null else input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].db.toString, if (input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].isNullAt) null else input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].table.toString, if (input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].isNullAt) null else input[2, struct<version:string,connec
tor:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].server_id, if (input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].isNullAt) null else input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].gtid.toString, if (input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].isNullAt) null else input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint
,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].file.toString, if (input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].isNullAt) null else input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].pos, if (input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].isNullAt) null else input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].row, if (input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].isNullAt) null else input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].thread, if (input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].isNullAt) null else input[2,
struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
false].query.toString, StructField(version,StringType,false),
StructField(connector,StringType,false), StructField(name,StringTy
pe,false), StructField(ts_ms,LongType,false),
StructField(snapshot,StringType,true), StructField(db,StringType,false),
StructField(table,StringType,true), StructField(server_id,LongType,false),
StructField(gtid,StringType,true), StructField(file,StringType,false), ... 4
more fields), input[3, string, false].toString, input[4, bigint, true], if
(isnull(input[5,
struct<id:string,total_order:bigint,data_collection_order:bigint>, true])) null
else createexternalrow(if (input[5,
struct<id:string,total_order:bigint,data_collection_order:bigint>,
true].isNullAt) null else input[5,
struct<id:string,total_order:bigint,data_collection_order:bigint>,
true].id.toString, if (input[5,
struct<id:string,total_order:bigint,data_collection_order:bigint>,
true].isNullAt) null else input[5,
struct<id:string,total_order:bigint,data_collection_order:bigint>,
true].total_order, if (input[5,
struct<id:string,total_order:bigint,data_collection_order:bigint>,
true].isNullAt) null else input[5, struct<id:stri
ng,total_order:bigint,data_collection_order:bigint>,
true].data_collection_order, StructField(id,StringType,false),
StructField(total_order,LongType,false),
StructField(data_collection_order,LongType,false)),
StructField(before,StructType(StructField(id,IntegerType,false),
StructField(name,StringType,true), StructField(type,StringType,true),
StructField(url,StringType,true), StructField(user,StringType,true),
StructField(password,StringType,true),
StructField(create_time,StringType,true),
StructField(create_user,StringType,true),
StructField(update_time,StringType,true),
StructField(update_user,StringType,true),
StructField(del_flag,IntegerType,true)),true),
StructField(after,StructType(StructField(id,IntegerType,false),
StructField(name,StringType,true), StructField(type,StringType,true),
StructField(url,StringType,true), StructField(user,StringType,true),
StructField(password,StringType,true),
StructField(create_time,StringType,true),
StructField(create_user,StringType,true), Stru
ctField(update_time,StringType,true),
StructField(update_user,StringType,true),
StructField(del_flag,IntegerType,true)),true),
StructField(source,StructType(StructField(version,StringType,false),
StructField(connector,StringType,false), StructField(name,StringType,false),
StructField(ts_ms,LongType,false), StructField(snapshot,StringType,true),
StructField(db,StringType,false), StructField(table,StringType,true),
StructField(server_id,LongType,false), StructField(gtid,StringType,true),
StructField(file,StringType,false), StructField(pos,LongType,false),
StructField(row,IntegerType,false), StructField(thread,LongType,true),
StructField(query,StringType,true)),false), StructField(op,StringType,false),
StructField(ts_ms,LongType,true),
StructField(transaction,StructType(StructField(id,StringType,false),
StructField(total_order,LongType,false),
StructField(data_collection_order,LongType,false)),true))
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:188)
at
org.apache.hudi.Spark3RowDeserializer.deserializeRow(Spark3RowDeserializer.scala:31)
at
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$1(HoodieSparkUtils.scala:103)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$SliceIterator.next(Iterator.scala:271)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
at scala.collection.AbstractIterator.to(Iterator.scala:1429)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1423)
at
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2154)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NegativeArraySizeException: -1255727808
at
org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:298)
at
org.apache.spark.unsafe.types.UTF8String.toString(UTF8String.java:1358)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.If_6$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.createExternalRow_0_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.createExternalRow_3_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown
Source)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:184)
... 31 more
21/03/22 10:10:35 INFO executor.CoarseGrainedExecutorBackend: Got assigned
task 2
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]