[
https://issues.apache.org/jira/browse/HUDI-1711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17437695#comment-17437695
]
Alexey Kudinkin commented on HUDI-1711:
---------------------------------------
Hit very similar issue recently (using Spark 3.1.1, Hudi 0.10 SNAPSHOT)
{code:java}
21/11/03 01:19:49 WARN ConsumerConfig: The configuration
'hoodie.deltastreamer.source.kafka.value.deserializer.class' was supplied but
isn't a known config.
21/11/03 01:19:49 INFO AppInfoParser: Kafka version: 2.5.0
21/11/03 01:19:49 INFO AppInfoParser: Kafka commitId: 66563e712b0b9f84
21/11/03 01:19:49 INFO AppInfoParser: Kafka startTimeMs: 1635902389072
21/11/03 01:19:49 INFO KafkaConsumer: [Consumer
clientId=consumer-spark-executor-null-4, groupId=spark-executor-null]
Subscribed to partition(s): ghCfAvroCommitCommentEvent-0
21/11/03 01:19:49 INFO InternalKafkaConsumer: Initial fetch for
spark-executor-null ghCfAvroCommitCommentEvent-0 15844
21/11/03 01:19:49 INFO KafkaConsumer: [Consumer
clientId=consumer-spark-executor-null-4, groupId=spark-executor-null] Seeking
to offset 15844 for partition ghCfAvroCommitCommentEvent-0
21/11/03 01:19:49 INFO Metadata: [Consumer
clientId=consumer-spark-executor-null-4, groupId=spark-executor-null] Cluster
ID: lkc-nd6r3
21/11/03 01:19:49 ERROR Executor: Exception in task 0.3 in stage 1.0 (TID 4)
java.lang.RuntimeException: Error while decoding:
java.lang.NegativeArraySizeException: -1253566778
createexternalrow(input[0, string, false].toString, input[1, bigint, false],
input[2, string, false].toString, input[3, string, true].toString, if
(isnull(input[4,
struct<id:int,login:string,gravatar_id:string,url:string,avatar_url:string,display_login:string>,
false])) null else createexternalrow(if (input[4,
struct<id:int,login:string,gravatar_id:string,url:string,avatar_url:string,display_login:string>,
false].isNullAt) null else input[4,
struct<id:int,login:string,gravatar_id:string,url:string,avatar_url:string,display_login:string>,
false].id, if (input[4,
struct<id:int,login:string,gravatar_id:string,url:string,avatar_url:string,display_login:string>,
false].isNullAt) null else input[4,
struct<id:int,login:string,gravatar_id:string,url:string,avatar_url:string,display_login:string>,
false].login.toString, if (input[4,
struct<id:int,login:string,gravatar_id:string,url:string,avatar_url:string,display_login:string>,
false].isNullAt) null else input[4,
struct<id:int,login:string,gravatar_id:string,url:string,avatar_url:string,display_login:string>,
false].gravatar_id.toString, if (input[4,
struct<id:int,login:string,gravatar_id:string,url:string,avatar_url:string,display_login:string>,
false].isNullAt) null else input[4,
struct<id:int,login:string,gravatar_id:string,url:string,avatar_url:string,display_login:string>,
false].url.toString, if (input[4,
struct<id:int,login:string,gravatar_id:string,url:string,avatar_url:string,display_login:string>,
false].isNullAt) null else input[4,
struct<id:int,login:string,gravatar_id:string,url:string,avatar_url:string,display_login:string>,
false].avatar_url.toString, if (input[4,
struct<id:int,login:string,gravatar_id:string,url:string,avatar_url:string,display_login:string>,
false].isNullAt) null else input[4,
struct<id:int,login:string,gravatar_id:string,url:string,avatar_url:string,display_login:string>,
false].display_login.toString, StructField(id,IntegerType,true),
StructField(login,StringType,true), StructField(gravatar_id,StringType,true),
StructField(url,StringType,true), StructField(avatar_url,StringType,true),
StructField(display_login,StringType,true)), if (isnull(input[5,
struct<id:int,name:string,url:string>, false])) null else createexternalrow(if
(input[5, struct<id:int,name:string,url:string>, false].isNullAt) null else
input[5, struct<id:int,name:string,url:string>, false].id, if (input[5,
struct<id:int,name:string,url:string>, false].isNullAt) null else input[5,
struct<id:int,name:string,url:string>, false].name.toString, if (input[5,
struct<id:int,name:string,url:string>, false].isNullAt) null else input[5,
struct<id:int,name:string,url:string>, false].url.toString,
StructField(id,IntegerType,true), StructField(name,StringType,true),
StructField(url,StringType,true)), input[6, string, true].toString, input[7,
string, true].toString, input[8, bigint, false], input[9, boolean, false],
StructField(date,StringType,false), StructField(timestamp,LongType,false),
StructField(id,StringType,false), StructField(type,StringType,true),
StructField(actor,StructType(StructField(id,IntegerType,true),
StructField(login,StringType,true), StructField(gravatar_id,StringType,true),
StructField(url,StringType,true), StructField(avatar_url,StringType,true),
StructField(display_login,StringType,true)),false),
StructField(repo,StructType(StructField(id,IntegerType,true),
StructField(name,StringType,true), StructField(url,StringType,true)),false),
StructField(payload,StringType,true), StructField(org,StringType,true),
StructField(created_at,LongType,false), StructField(public,BooleanType,false))
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:186)
at
org.apache.hudi.Spark3RowSerDe.deserializeRow(Spark3RowSerDe.scala:31)
at
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRddInternal$1(HoodieSparkUtils.scala:150)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$SliceIterator.next(Iterator.scala:271)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
at scala.collection.AbstractIterator.to(Iterator.scala:1429)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449)
at
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NegativeArraySizeException: -1253566778
at
org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:298)
at
org.apache.spark.unsafe.types.UTF8String.toString(UTF8String.java:1358)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.createExternalRow_2_2$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown
Source)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:182)
... 31 more
21/11/03 01:19:49 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
21/11/03 01:19:49 INFO MemoryStore: MemoryStore cleared
21/11/03 01:19:49 INFO BlockManager: BlockManager stopped
21/11/03 01:19:49 INFO ShutdownHookManager: Shutdown hook called
21/11/03 01:19:49 INFO ShutdownHookManager: Deleting directory
/var/data/spark-38b628d6-9535-4fd0-b7f3-9753db3956f7/spark-033453d3-7901-4d43-8b3b-b07e3cb8c6a6
21/11/03 01:19:49 INFO MetricsSystemImpl: Stopping s3a-file-system metrics
system...
21/11/03 01:19:49 INFO MetricsSystemImpl: s3a-file-system metrics system
stopped.
21/11/03 01:19:49 INFO MetricsSystemImpl: s3a-file-system metrics system
shutdown complete.
{code}
> Avro Schema Exception with Spark 3.0 in 0.7
> -------------------------------------------
>
> Key: HUDI-1711
> URL: https://issues.apache.org/jira/browse/HUDI-1711
> Project: Apache Hudi
> Issue Type: Bug
> Components: DeltaStreamer
> Reporter: Balaji Varadarajan
> Assignee: sivabalan narayanan
> Priority: Major
> Labels: sev:critical, user-support-issues
>
> GH: [https://github.com/apache/hudi/issues/2705]
>
>
> {{21/03/22 10:10:35 WARN util.package: Truncated the string representation of
> a plan since it was too large. This behavior can be adjusted by setting
> 'spark.sql.debug.maxToStringFields'.
> 21/03/22 10:10:35 ERROR executor.Executor: Exception in task 0.0 in stage 1.0
> (TID 1)
> java.lang.RuntimeException: Error while decoding:
> java.lang.NegativeArraySizeException: -1255727808
> createexternalrow(if (isnull(input[0,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true])) null else createexternalrow(if (input[0,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].isNullAt) null else input[0,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].id, if (input[0,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].isNullAt) null else input[0,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].name.toString, if (input[0,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].isNullAt) null else input[0,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].type.toString, if (input[0,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].isNullAt) null else input[0,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].url.toString, if (input[0,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].isNullAt) null else input[0,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].user.toString, if (input[0,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].isNullAt) null else input[0,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].password.toString, if (input[0,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].isNullAt) null else input[0,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].create_time.toString, if (input[0,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].isNullAt) null else input[0,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].create_user.toString, if (input[0,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].isNullAt) null else input[0,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].update_time.toString, if (input[0,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].isNullAt) null else input[0,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].update_user.toString, if (input[0,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].isNullAt) null else input[0,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].del_flag, StructField(id,IntegerType,false),
> StructField(name,StringType,true), StructField(type,StringType,true),
> StructField(url,StringType,true), StructField(user,StringType,true),
> StructField(password,StringType,true),
> StructField(create_time,StringType,true),
> StructField(create_user,StringType,true),
> StructField(update_time,StringType,true),
> StructField(update_user,StringType,true),
> StructField(del_flag,IntegerType,true)), if (isnull(input[1,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true])) null else createexternalrow(if (input[1,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].isNullAt) null else input[1,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].id, if (input[1,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].isNullAt) null else input[1,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].name.toString, if (input[1,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].isNullAt) null else input[1,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].type.toString, if (input[1,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].isNullAt) null else input[1,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].url.toString, if (input[1,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].isNullAt) null else input[1,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].user.toString, if (input[1,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].isNullAt) null else input[1,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].password.toString, if (input[1,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].isNullAt) null else input[1,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].create_time.toString, if (input[1,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].isNullAt) null else input[1,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].create_user.toString, if (input[1,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].isNullAt) null else input[1,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].update_time.toString, if (input[1,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].isNullAt) null else input[1,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].update_user.toString, if (input[1,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].isNullAt) null else input[1,
> struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>,
> true].del_flag, StructField(id,IntegerType,false),
> StructField(name,StringType,true), StructField(type,StringType,true),
> StructField(url,StringType,true), StructField(user,StringType,true),
> StructField(password,StringType,true),
> StructField(create_time,StringType,true),
> StructField(create_user,StringType,true),
> StructField(update_time,StringType,true),
> StructField(update_user,StringType,true),
> StructField(del_flag,IntegerType,true)), if (isnull(input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false])) null else createexternalrow(if (input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].isNullAt) null else input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].version.toString, if (input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].isNullAt) null else input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].connector.toString, if (input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].isNullAt) null else input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].name.toString, if (input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].isNullAt) null else input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].ts_ms, if (input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].isNullAt) null else input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].snapshot.toString, if (input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].isNullAt) null else input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].db.toString, if (input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].isNullAt) null else input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].table.toString, if (input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].isNullAt) null else input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].server_id, if (input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].isNullAt) null else input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].gtid.toString, if (input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].isNullAt) null else input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].file.toString, if (input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].isNullAt) null else input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].pos, if (input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].isNullAt) null else input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].row, if (input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].isNullAt) null else input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].thread, if (input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].isNullAt) null else input[2,
> struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>,
> false].query.toString, StructField(version,StringType,false),
> StructField(connector,StringType,false), StructField(name,StringType,false),
> StructField(ts_ms,LongType,false), StructField(snapshot,StringType,true),
> StructField(db,StringType,false), StructField(table,StringType,true),
> StructField(server_id,LongType,false), StructField(gtid,StringType,true),
> StructField(file,StringType,false), ... 4 more fields), input[3, string,
> false].toString, input[4, bigint, true], if (isnull(input[5,
> struct<id:string,total_order:bigint,data_collection_order:bigint>, true]))
> null else createexternalrow(if (input[5,
> struct<id:string,total_order:bigint,data_collection_order:bigint>,
> true].isNullAt) null else input[5,
> struct<id:string,total_order:bigint,data_collection_order:bigint>,
> true].id.toString, if (input[5,
> struct<id:string,total_order:bigint,data_collection_order:bigint>,
> true].isNullAt) null else input[5,
> struct<id:string,total_order:bigint,data_collection_order:bigint>,
> true].total_order, if (input[5,
> struct<id:string,total_order:bigint,data_collection_order:bigint>,
> true].isNullAt) null else input[5,
> struct<id:string,total_order:bigint,data_collection_order:bigint>,
> true].data_collection_order, StructField(id,StringType,false),
> StructField(total_order,LongType,false),
> StructField(data_collection_order,LongType,false)),
> StructField(before,StructType(StructField(id,IntegerType,false),
> StructField(name,StringType,true), StructField(type,StringType,true),
> StructField(url,StringType,true), StructField(user,StringType,true),
> StructField(password,StringType,true),
> StructField(create_time,StringType,true),
> StructField(create_user,StringType,true),
> StructField(update_time,StringType,true),
> StructField(update_user,StringType,true),
> StructField(del_flag,IntegerType,true)),true),
> StructField(after,StructType(StructField(id,IntegerType,false),
> StructField(name,StringType,true), StructField(type,StringType,true),
> StructField(url,StringType,true), StructField(user,StringType,true),
> StructField(password,StringType,true),
> StructField(create_time,StringType,true),
> StructField(create_user,StringType,true),
> StructField(update_time,StringType,true),
> StructField(update_user,StringType,true),
> StructField(del_flag,IntegerType,true)),true),
> StructField(source,StructType(StructField(version,StringType,false),
> StructField(connector,StringType,false), StructField(name,StringType,false),
> StructField(ts_ms,LongType,false), StructField(snapshot,StringType,true),
> StructField(db,StringType,false), StructField(table,StringType,true),
> StructField(server_id,LongType,false), StructField(gtid,StringType,true),
> StructField(file,StringType,false), StructField(pos,LongType,false),
> StructField(row,IntegerType,false), StructField(thread,LongType,true),
> StructField(query,StringType,true)),false), StructField(op,StringType,false),
> StructField(ts_ms,LongType,true),
> StructField(transaction,StructType(StructField(id,StringType,false),
> StructField(total_order,LongType,false),
> StructField(data_collection_order,LongType,false)),true))
> at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:188)
> at
> org.apache.hudi.Spark3RowDeserializer.deserializeRow(Spark3RowDeserializer.scala:31)
> at
> org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$1(HoodieSparkUtils.scala:103)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
> at scala.collection.Iterator$SliceIterator.next(Iterator.scala:271)
> at scala.collection.Iterator.foreach(Iterator.scala:941)
> at scala.collection.Iterator.foreach$(Iterator.scala:941)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
> at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
> at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
> at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
> at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
> at scala.collection.AbstractIterator.to(Iterator.scala:1429)
> at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
> at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
> at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
> at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
> at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1423)
> at
> org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2154)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:127)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
> at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: java.lang.NegativeArraySizeException: -1255727808
> at
> org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:298)
> at
> org.apache.spark.unsafe.types.UTF8String.toString(UTF8String.java:1358)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.If_6$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.createExternalRow_0_0$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.createExternalRow_3_0$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:184)
> ... 31 more
> 21/03/22 10:10:35 INFO executor.CoarseGrainedExecutorBackend: Got assigned
> task 2}}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)