LYL41011 opened a new issue, #4549:
URL: https://github.com/apache/seatunnel/issues/4549

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   I want to synchronize data from kafka to hive. When I start the task, an 
error occurs.
   
   Caused by: java.lang.NullPointerException
           at 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractWriteStrategy.lambda$buildSchemaWithRowType$0(AbstractWriteStrategy.java:129)
           at java.util.ArrayList.forEach(ArrayList.java:1259)
           at 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractWriteStrategy.buildSchemaWithRowType(AbstractWriteStrategy.java:127)
   
   The table creation statement is as follows:
   CREATE TABLE `ods_tmp`.`prd_kafka_rta_toutiao_gl_topic`(`reqid` string 
COMMENT '使用项目', `reqtime` string COMMENT '上报时间', `media` string COMMENT '媒体', 
`isping` string COMMENT '是否ping测流量', `flowtype` string COMMENT '流量类型', 
`reqparams` string COMMENT '媒体下发的参数', `imeimd5` string COMMENT '设备 imei 号的 
md5加密值', `oaidmd5` string COMMENT '设备 oaid 号的 md5加密值', `idfamd5` string COMMENT 
'设备 idfa 号的 md5加密值', `bucket` string COMMENT '', `results` string COMMENT 
'各账户的参竞、识别情况以及决策上下文', `exts` string COMMENT '扩展的字段', `timeconsume` string 
COMMENT '', `system` string COMMENT '', `debuginfo` string COMMENT '', `ts` 
string COMMENT '毓数消费时间戳') 
   PARTITIONED BY (`pday` string, `phour` string) 
   ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH 
SERDEPROPERTIES ( 'serialization.format' = '1' ) STORED AS INPUTFORMAT 
'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' 
   
   ### SeaTunnel Version
   
   2.3.1
   
   ### SeaTunnel Config
   
   ```conf
   env {
     execution.parallelism = 1
     job.mode = "STREAMING"
     checkpoint.interval = 20000
   
   }
   
   source {
   
     Kafka {
       result_table_name = "kafka_toutiao_gl_topic_source"
       schema = {
         fields {
   reqId= "string"
   reqTime= "string"
   media= "string"
   isPing= "string"
   flowType= "string"
   reqParams= "string"
   imeiMd5= "string"
   oaidMd5= "string"
   idfaMd5= "string"
   bucket= "string"
   results= "string"
   exts= "string"
   timeConsume= "string"
   system= "string"
   debugInfo= "string"
         }
       }
       topic = "toutiao_gl_topic"
       bootstrap.servers = "10.220.193.248:39092"
       kafka.max.poll.records = 10000
       kafka.auto.offset.reset = earliest
       consumer.group =  "yushu_dis_seatunnel"
     }
   
   }
   
   transform {
   Sql {
       source_table_name = "kafka_toutiao_gl_topic_source"
       result_table_name = "kafka_toutiao_gl_topic_result"
   query="select 
IFNULL(reqId,''),IFNULL(reqTime,''),IFNULL(media,''),IFNULL(isPing,''),IFNULL(flowType,''),IFNULL(reqParams,''),IFNULL(imeiMd5,''),IFNULL(oaidMd5,''),IFNULL(idfaMd5,''),IFNULL(bucket,''),IFNULL(results,''),IFNULL(exts,''),IFNULL(timeConsume,''),IFNULL(system,''),IFNULL(debugInfo,''),current_timestamp
 as ts,SUBSTRING(REPLACE(reqTime,'-',''),1,8) AS 
pday,SUBSTRING(REPLACE(reqTime,'-',''),10,2)  AS phour from 
kafka_toutiao_gl_topic_source"
   }
   }
   sink {
     Hive {
       source_table_name = "kafka_toutiao_gl_topic_result"
       table_name = "ods_tmp.prd_kafka_rta_toutiao_gl_topic"
       metastore_uri = "thrift://xxx:9083"
     }
   
   }
   ```
   
   
   ### Running Command
   
   ```shell
   ../bin/start-seatunnel-flink-13-connector-v2.sh --config 
kafka2hive_toutiao_gl_topic -m yarn-cluster -ynm seatunnel
   ```
   
   
   ### Error Exception
   
   ```log
   [finloan@jds01 config]$ ../bin/start-seatunnel-flink-13-connector-v2.sh 
--config kafka2hive_toutiao_gl_topic -m yarn-cluster -ynm seatunnel
   Execute SeaTunnel Flink Job: ${FLINK_HOME}/bin/flink run -m yarn-cluster 
-ynm seatunnel -c org.apache.seatunnel.core.starter.flink.SeaTunnelFlink 
/home/q/dis/apache-seatunnel-incubating-2.3.1/starter/seatunnel-flink-13-starter.jar
 --config kafka2hive_toutiao_gl_topic --name SeaTunnel
   Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or 
HADOOP_CLASSPATH was set.
   Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
   2023-04-11 17:58:43,199 INFO  
org.apache.hadoop.security.UserGroupInformation              [] - Login 
successful for user [email protected] using keytab file /home/q/dis/dis.keytab
   
   ------------------------------------------------------------
    The program finished with the following exception:
   
   org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Flink job executed failed
           at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
           at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
           at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
           at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
           at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
           at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
           at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
           at java.security.AccessController.doPrivileged(Native Method)
           at javax.security.auth.Subject.doAs(Subject.java:422)
           at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
           at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
           at 
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
   Caused by: 
org.apache.seatunnel.core.starter.exception.CommandExecuteException: Flink job 
executed failed
           at 
org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:63)
           at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
           at 
org.apache.seatunnel.core.starter.flink.SeaTunnelFlink.main(SeaTunnelFlink.java:34)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
           ... 11 more
   Caused by: java.lang.NullPointerException
           at 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractWriteStrategy.lambda$buildSchemaWithRowType$0(AbstractWriteStrategy.java:129)
           at java.util.ArrayList.forEach(ArrayList.java:1259)
           at 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractWriteStrategy.buildSchemaWithRowType(AbstractWriteStrategy.java:127)
           at 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.TextWriteStrategy.setSeaTunnelRowTypeInfo(TextWriteStrategy.java:68)
           at 
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink.setTypeInfo(BaseFileSink.java:71)
           at 
org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor.execute(SinkExecuteProcessor.java:110)
           at 
org.apache.seatunnel.core.starter.flink.execution.FlinkExecution.execute(FlinkExecution.java:109)
           at 
org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:61)
   
   **I change hive table type from textfile to orc, Still getting an error**
   
   
   Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:88)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
        at SinkConversion$42.processElement(Unknown Source)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
        ... 49 more
   Caused by: java.lang.NullPointerException
        at 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.OrcWriteStrategy.buildSchemaWithRowType(OrcWriteStrategy.java:195)
        at 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.OrcWriteStrategy.getOrCreateWriter(OrcWriteStrategy.java:116)
        at 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.OrcWriteStrategy.write(OrcWriteStrategy.java:74)
        at 
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSinkWriter.write(BaseFileSinkWriter.java:126)
        at 
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSinkWriter.write(BaseFileSinkWriter.java:43)
        at 
org.apache.seatunnel.translation.flink.sink.FlinkSinkWriter.write(FlinkSinkWriter.java:65)
        at 
org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.processElement(AbstractSinkWriterOperator.java:80)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
        ... 55 more
   ```
   
   
   ### Flink or Spark Version
   
   Flink  1.13.6
   
   ### Java or Scala Version
   
   1.8.0
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to