dongsilun opened a new issue, #5193: URL: https://github.com/apache/seatunnel/issues/5193
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened Create a test batch job [hive -> doris]. It looks like there's a Null Pointer Exception at the DorisSinkWriter. `2023-08-01 17:08:30,489 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (myhost executor 1): java.lang.NullPointerException at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.createSerializer(DorisSinkWriter.java:271) at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.<init>(DorisSinkWriter.java:94) at org.apache.seatunnel.connectors.doris.sink.DorisSink.createWriter(DorisSink.java:103) at org.apache.seatunnel.translation.spark.sink.write.SeaTunnelSparkDataWriterFactory.createWriter(SeaTunnelSparkDataWriterFactory.java:49) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:407) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:358) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ` ### SeaTunnel Version 2.3.2 ### SeaTunnel Config ```conf env { execution.parallelism = 3 job.mode = "BATCH" } source { Hive { preSql = "select * from hive2doris" table_name = "test.hive2doris" metastore_uri = "thrift://xxxx:9083" hdfs_site_path = /home/op/software/hadoop-3.3.2/etc/hadoop/hdfs-site.xml } } sink { Doris { fenodes = "xxxx:8030" username = root password = "" table.identifier = "test.hive2doris" sink.enable-2pc = "true" sink.label-prefix = "test_row" doris.config = { columns="id,str1,str2,str3,str4,str5,str6,str7,str8,str9,str10,num1,num2,num3,num4,num5,num6,num7,num8,num9" } } } ``` ### Running Command ```shell $SEATUNNEL_HOME/bin/start-seatunnel-spark-3-connector-v2.sh --master yarn --config hive2doris.conf ``` ### Error Exception ```log java.lang.NullPointerException at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.createSerializer(DorisSinkWriter.java:271) at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.<init>(DorisSinkWriter.java:94) at org.apache.seatunnel.connectors.doris.sink.DorisSink.createWriter(DorisSink.java:103) at org.apache.seatunnel.translation.spark.sink.write.SeaTunnelSparkDataWriterFactory.createWriter(SeaTunnelSparkDataWriterFactory.java:49) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:407) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:358) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ... org.apache.spark.SparkException: Writing job aborted at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobAbortedError(QueryExecutionErrors.scala:613) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:386) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:330) at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:236) at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:309) at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:308) at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:236) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:97) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:93) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:93) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:78) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:115) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:311) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247) at org.apache.seatunnel.core.starter.spark.execution.SinkExecuteProcessor.execute(SinkExecuteProcessor.java:125) at org.apache.seatunnel.core.starter.spark.execution.SparkExecution.execute(SparkExecution.java:74) at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:60) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.spark.SeaTunnelSpark.main(SeaTunnelSpark.java:35) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ``` ### Zeta or Flink or Spark Version Spark-3.2.2 ### Java or Scala Version jdk1.8 ### Screenshots  ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
