jeanleen opened a new issue, #6388:
URL: https://github.com/apache/seatunnel/issues/6388

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   kafka source can not consumer all history data when use batch mode
   i have 1000w+ messages  prepared to consume in kafka, when conumer data with 
batch mode, it ended when consumer part of data.
   #2024-02-26 17:44:47,768 INFO  
org.apache.seatunnel.engine.client.job.ClientJobProxy - Submit job finished, 
job id: 814432701363781634, job name: SeaTunnel
   2024-02-26 17:44:47,776 WARN  
org.apache.seatunnel.engine.client.job.JobMetricsRunner - Failed to get job 
metrics summary, it maybe first-run
   2024-02-26 17:44:55,561 INFO  
org.apache.seatunnel.engine.client.job.ClientJobProxy - Job 
(814432701363781634) end with state FINISHED
   2024-02-26 17:44:55,569 INFO  
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - 
   ***********************************************
              Job Statistic Information
   ***********************************************
   Start Time                : 2024-02-26 17:44:47
   End Time                  : 2024-02-26 17:44:55
   Total Time(s)             :                   8
   Total Read Count          :               53847
   Total Write Count         :               53847
   Total Failed Count        :                   0
   ***********************************************
   
   2024-02-26 17:44:55,570 INFO  com.hazelcast.core.LifecycleService - 
hz.client_1 [imc_test] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is 
SHUTTING_DOWN
   2024-02-26 17:44:55,577 INFO  
com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 
[imc_test] [5.1] Removed connection to endpoint: 
[hadoop002]:5801:da380055-6b10-4427-b326-ffb967a19d45, connection: 
ClientConnection{alive=false, connectionId=1, 
channel=NioChannel{/192.168.9.112:44263->hadoop002/192.168.9.113:5801}, 
remoteAddress=[hadoop002]:5801, lastReadTime=2024-02-26 17:44:55.564, 
lastWriteTime=2024-02-26 17:44:55.562, closedTime=2024-02-26 17:44:55.573, 
connected server version=5.1}
   2024-02-26 17:44:55,580 INFO  
com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 
[imc_test] [5.1] Removed connection to endpoint: 
[hadoop001]:5801:b42b8c7f-fd07-4297-b9f5-55ea8ca87ca4, connection: 
ClientConnection{alive=false, connectionId=2, 
channel=NioChannel{/192.168.9.112:36031->hadoop001/192.168.9.112:5801}, 
remoteAddress=[hadoop001]:5801, lastReadTime=2024-02-26 17:44:47.433, 
lastWriteTime=2024-02-26 17:44:47.433, closedTime=2024-02-26 17:44:55.578, 
connected server version=5.1}
   2024-02-26 17:44:55,580 INFO  com.hazelcast.core.LifecycleService - 
hz.client_1 [imc_test] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is 
CLIENT_DISCONNECTED
   2024-02-26 17:44:55,584 INFO  com.hazelcast.core.LifecycleService - 
hz.client_1 [imc_test] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is 
SHUTDOWN
   2024-02-26 17:44:55,584 INFO  
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - 
Closed SeaTunnel client......
   2024-02-26 17:44:55,584 INFO  
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - 
Closed metrics executor service ......
   2024-02-26 17:44:55,586 INFO  
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - run 
shutdown hook because get close signal
   
   ### SeaTunnel Version
   
   2..3.3
   
   ### SeaTunnel Config
   
   ```conf
   "env" : {
           "parallelism" : 3,
           "job.mode" : "BATCH",
           "checkpoint.interval" : 30000,
           "job.name" : "oalu_kafka"
       },
       "source" : [
           {
               "schema" : {
                   "fields" : {
                       "Price" : "STRING",
                       "Aplus" : "STRING",
                       "AmazonChoice" : "STRING",
                       "Brand" : "STRING"
                   }
               },
               "consumer.group" : "amc",
               "commit_on_checkpoint" : true,
               "format" : "json",
               "topic" : "lurProduct",
               "bootstrap.servers" : "192.168.9.31:9092,192.168.9.81:9092",
               "plugin_name" : "Kafka",
               "kafka.config" : {
                   "client.id" : "client_1",
                   "max.poll.records" : "100000",
                   "auto.offset.reset" : "earliest",
                   "max.partition.fetch.bytes" : "52428800",
                   "session.timeout.ms" : "30000"
               }
           }
       ],
       "sink" : [
           {
               "fs.defaultFS" : "hdfs://hadoop0011:8020",
               "path" : "/user/hive/warehouse/stg.db/pdc_lu_product_data/",
               "bath_size" : 10000,
               "file_format_type" : "orc",
               "plugin_name" : "HdfsFile"
           }
       ]
   }
   ```
   
   
   ### Running Command
   
   ```shell
   /usr/local/apache-seatunnel-2.3.3/bin/seatunnel.sh --config 
pdc_lu_kafka_bath.conf
   ```
   
   
   ### Error Exception
   
   ```log
   2024-02-26 17:44:47,539 INFO  
org.apache.seatunnel.api.configuration.ReadonlyConfig - Config uses fallback 
configuration key 'plugin_name' instead of key 'factory'
   2024-02-26 17:44:47,540 INFO  
org.apache.seatunnel.api.configuration.ReadonlyConfig - Config uses fallback 
configuration key 'plugin_name' instead of key 'factory'
   2024-02-26 17:44:47,543 INFO  
org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Load 
SeaTunnelSink Plugin from /usr/local/apache-seatunnel-2.3.3/connectors/seatunnel
   2024-02-26 17:44:47,547 INFO  
org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Discovery 
plugin jar: Kafka at: 
file:/usr/local/apache-seatunnel-2.3.3/connectors/seatunnel/connector-kafka-2.3.3.jar
   2024-02-26 17:44:47,547 INFO  
org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Discovery 
plugin jar: HdfsFile at: 
file:/usr/local/apache-seatunnel-2.3.3/connectors/seatunnel/connector-file-hadoop-2.3.3.jar
   2024-02-26 17:44:47,552 INFO  
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser - start 
generating all sources.
   2024-02-26 17:44:47,554 INFO  
org.apache.seatunnel.api.configuration.ReadonlyConfig - Config uses fallback 
configuration key 'plugin_name' instead of key 'factory'
   2024-02-26 17:44:47,570 INFO  
org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Load 
SeaTunnelSource Plugin from 
/usr/local/apache-seatunnel-2.3.3/connectors/seatunnel
   2024-02-26 17:44:47,574 INFO  
org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Discovery 
plugin jar: Kafka at: 
file:/usr/local/apache-seatunnel-2.3.3/connectors/seatunnel/connector-kafka-2.3.3.jar
   2024-02-26 17:44:47,578 INFO  
org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Load plugin: 
PluginIdentifier{engineType='seatunnel', pluginType='source', 
pluginName='Kafka'} from classpath
   2024-02-26 17:44:47,598 INFO  
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser - start 
generating all transforms.
   2024-02-26 17:44:47,598 INFO  
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser - start 
generating all sinks.
   2024-02-26 17:44:47,599 INFO  
org.apache.seatunnel.api.configuration.ReadonlyConfig - Config uses fallback 
configuration key 'plugin_name' instead of key 'factory'
   2024-02-26 17:44:47,601 INFO  
org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Load 
SeaTunnelSink Plugin from /usr/local/apache-seatunnel-2.3.3/connectors/seatunnel
   2024-02-26 17:44:47,601 INFO  
org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Discovery 
plugin jar: HdfsFile at: 
file:/usr/local/apache-seatunnel-2.3.3/connectors/seatunnel/connector-file-hadoop-2.3.3.jar
   2024-02-26 17:44:47,603 INFO  
org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Load plugin: 
PluginIdentifier{engineType='seatunnel', pluginType='sink', 
pluginName='HdfsFile'} from classpath
   2024-02-26 17:44:47,702 INFO  
org.apache.seatunnel.engine.client.job.ClientJobProxy - Start submit job, job 
id: 814432701363781634, with plugin jar 
[file:/usr/local/apache-seatunnel-2.3.3/connectors/seatunnel/connector-file-hadoop-2.3.3.jar,
 file:/usr/local/apache-seatunnel-2.3.3/plugins/jdbc/lib/ojdbc8-19.10.0.0.jar, 
file:/usr/local/apache-seatunnel-2.3.3/plugins/hive/lib/hive-exec-2.3.9.jar, 
file:/usr/local/apache-seatunnel-2.3.3/plugins/jdbc/lib/mysql-connector-java-8.0.32.jar,
 
file:/usr/local/apache-seatunnel-2.3.3/connectors/seatunnel/connector-kafka-2.3.3.jar,
 file:/usr/local/apache-seatunnel-2.3.3/plugins/jdbc/lib/orai18n-19.10.0.0.jar, 
file:/usr/local/apache-seatunnel-2.3.3/plugins/jdbc/lib/hive-jdbc-3.1.3.jar]
   2024-02-26 17:44:47,768 INFO  
org.apache.seatunnel.engine.client.job.ClientJobProxy - Submit job finished, 
job id: 814432701363781634, job name: SeaTunnel
   2024-02-26 17:44:47,776 WARN  
org.apache.seatunnel.engine.client.job.JobMetricsRunner - Failed to get job 
metrics summary, it maybe first-run
   2024-02-26 17:44:55,561 INFO  
org.apache.seatunnel.engine.client.job.ClientJobProxy - Job 
(814432701363781634) end with state FINISHED
   2024-02-26 17:44:55,569 INFO  
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - 
   ***********************************************
              Job Statistic Information
   ***********************************************
   Start Time                : 2024-02-26 17:44:47
   End Time                  : 2024-02-26 17:44:55
   Total Time(s)             :                   8
   Total Read Count          :               53847
   Total Write Count         :               53847
   Total Failed Count        :                   0
   ***********************************************
   
   2024-02-26 17:44:55,570 INFO  com.hazelcast.core.LifecycleService - 
hz.client_1 [imc_test] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is 
SHUTTING_DOWN
   2024-02-26 17:44:55,577 INFO  
com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 
[imc_test] [5.1] Removed connection to endpoint: 
[hadoop002]:5801:da380055-6b10-4427-b326-ffb967a19d45, connection: 
ClientConnection{alive=false, connectionId=1, 
channel=NioChannel{/192.168.9.112:44263->hadoop002/192.168.9.113:5801}, 
remoteAddress=[hadoop002]:5801, lastReadTime=2024-02-26 17:44:55.564, 
lastWriteTime=2024-02-26 17:44:55.562, closedTime=2024-02-26 17:44:55.573, 
connected server version=5.1}
   2024-02-26 17:44:55,580 INFO  
com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 
[imc_test] [5.1] Removed connection to endpoint: 
[hadoop001]:5801:b42b8c7f-fd07-4297-b9f5-55ea8ca87ca4, connection: 
ClientConnection{alive=false, connectionId=2, 
channel=NioChannel{/192.168.9.112:36031->hadoop001/192.168.9.112:5801}, 
remoteAddress=[hadoop001]:5801, lastReadTime=2024-02-26 17:44:47.433, 
lastWriteTime=2024-02-26 17:44:47.433, closedTime=2024-02-26 17:44:55.578, 
connected server version=5.1}
   2024-02-26 17:44:55,580 INFO  com.hazelcast.core.LifecycleService - 
hz.client_1 [imc_test] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is 
CLIENT_DISCONNECTED
   2024-02-26 17:44:55,584 INFO  com.hazelcast.core.LifecycleService - 
hz.client_1 [imc_test] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is 
SHUTDOWN
   2024-02-26 17:44:55,584 INFO  
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - 
Closed SeaTunnel client......
   2024-02-26 17:44:55,584 INFO  
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - 
Closed metrics executor service ......
   2024-02-26 17:44:55,586 INFO  
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - run 
shutdown hook because get close signal
   ```
   
   
   ### Zeta or Flink or Spark Version
   
   zeta
   
   ### Java or Scala Version
   
   java 1.8
   scala 2.12
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to