jeanleen opened a new issue, #6388: URL: https://github.com/apache/seatunnel/issues/6388
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened kafka source can not consumer all history data when use batch mode i have 1000w+ messages prepared to consume in kafka, when conumer data with batch mode, it ended when consumer part of data. #2024-02-26 17:44:47,768 INFO org.apache.seatunnel.engine.client.job.ClientJobProxy - Submit job finished, job id: 814432701363781634, job name: SeaTunnel 2024-02-26 17:44:47,776 WARN org.apache.seatunnel.engine.client.job.JobMetricsRunner - Failed to get job metrics summary, it maybe first-run 2024-02-26 17:44:55,561 INFO org.apache.seatunnel.engine.client.job.ClientJobProxy - Job (814432701363781634) end with state FINISHED 2024-02-26 17:44:55,569 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - *********************************************** Job Statistic Information *********************************************** Start Time : 2024-02-26 17:44:47 End Time : 2024-02-26 17:44:55 Total Time(s) : 8 Total Read Count : 53847 Total Write Count : 53847 Total Failed Count : 0 *********************************************** 2024-02-26 17:44:55,570 INFO com.hazelcast.core.LifecycleService - hz.client_1 [imc_test] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTTING_DOWN 2024-02-26 17:44:55,577 INFO com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 [imc_test] [5.1] Removed connection to endpoint: [hadoop002]:5801:da380055-6b10-4427-b326-ffb967a19d45, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/192.168.9.112:44263->hadoop002/192.168.9.113:5801}, remoteAddress=[hadoop002]:5801, lastReadTime=2024-02-26 17:44:55.564, lastWriteTime=2024-02-26 17:44:55.562, closedTime=2024-02-26 17:44:55.573, connected server version=5.1} 2024-02-26 17:44:55,580 INFO com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 [imc_test] [5.1] Removed connection to endpoint: [hadoop001]:5801:b42b8c7f-fd07-4297-b9f5-55ea8ca87ca4, connection: ClientConnection{alive=false, connectionId=2, channel=NioChannel{/192.168.9.112:36031->hadoop001/192.168.9.112:5801}, remoteAddress=[hadoop001]:5801, lastReadTime=2024-02-26 17:44:47.433, lastWriteTime=2024-02-26 17:44:47.433, closedTime=2024-02-26 17:44:55.578, connected server version=5.1} 2024-02-26 17:44:55,580 INFO com.hazelcast.core.LifecycleService - hz.client_1 [imc_test] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED 2024-02-26 17:44:55,584 INFO com.hazelcast.core.LifecycleService - hz.client_1 [imc_test] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN 2024-02-26 17:44:55,584 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed SeaTunnel client...... 2024-02-26 17:44:55,584 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed metrics executor service ...... 2024-02-26 17:44:55,586 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - run shutdown hook because get close signal ### SeaTunnel Version 2..3.3 ### SeaTunnel Config ```conf "env" : { "parallelism" : 3, "job.mode" : "BATCH", "checkpoint.interval" : 30000, "job.name" : "oalu_kafka" }, "source" : [ { "schema" : { "fields" : { "Price" : "STRING", "Aplus" : "STRING", "AmazonChoice" : "STRING", "Brand" : "STRING" } }, "consumer.group" : "amc", "commit_on_checkpoint" : true, "format" : "json", "topic" : "lurProduct", "bootstrap.servers" : "192.168.9.31:9092,192.168.9.81:9092", "plugin_name" : "Kafka", "kafka.config" : { "client.id" : "client_1", "max.poll.records" : "100000", "auto.offset.reset" : "earliest", "max.partition.fetch.bytes" : "52428800", "session.timeout.ms" : "30000" } } ], "sink" : [ { "fs.defaultFS" : "hdfs://hadoop0011:8020", "path" : "/user/hive/warehouse/stg.db/pdc_lu_product_data/", "bath_size" : 10000, "file_format_type" : "orc", "plugin_name" : "HdfsFile" } ] } ``` ### Running Command ```shell /usr/local/apache-seatunnel-2.3.3/bin/seatunnel.sh --config pdc_lu_kafka_bath.conf ``` ### Error Exception ```log 2024-02-26 17:44:47,539 INFO org.apache.seatunnel.api.configuration.ReadonlyConfig - Config uses fallback configuration key 'plugin_name' instead of key 'factory' 2024-02-26 17:44:47,540 INFO org.apache.seatunnel.api.configuration.ReadonlyConfig - Config uses fallback configuration key 'plugin_name' instead of key 'factory' 2024-02-26 17:44:47,543 INFO org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Load SeaTunnelSink Plugin from /usr/local/apache-seatunnel-2.3.3/connectors/seatunnel 2024-02-26 17:44:47,547 INFO org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Discovery plugin jar: Kafka at: file:/usr/local/apache-seatunnel-2.3.3/connectors/seatunnel/connector-kafka-2.3.3.jar 2024-02-26 17:44:47,547 INFO org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Discovery plugin jar: HdfsFile at: file:/usr/local/apache-seatunnel-2.3.3/connectors/seatunnel/connector-file-hadoop-2.3.3.jar 2024-02-26 17:44:47,552 INFO org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser - start generating all sources. 2024-02-26 17:44:47,554 INFO org.apache.seatunnel.api.configuration.ReadonlyConfig - Config uses fallback configuration key 'plugin_name' instead of key 'factory' 2024-02-26 17:44:47,570 INFO org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Load SeaTunnelSource Plugin from /usr/local/apache-seatunnel-2.3.3/connectors/seatunnel 2024-02-26 17:44:47,574 INFO org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Discovery plugin jar: Kafka at: file:/usr/local/apache-seatunnel-2.3.3/connectors/seatunnel/connector-kafka-2.3.3.jar 2024-02-26 17:44:47,578 INFO org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Load plugin: PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='Kafka'} from classpath 2024-02-26 17:44:47,598 INFO org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser - start generating all transforms. 2024-02-26 17:44:47,598 INFO org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser - start generating all sinks. 2024-02-26 17:44:47,599 INFO org.apache.seatunnel.api.configuration.ReadonlyConfig - Config uses fallback configuration key 'plugin_name' instead of key 'factory' 2024-02-26 17:44:47,601 INFO org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Load SeaTunnelSink Plugin from /usr/local/apache-seatunnel-2.3.3/connectors/seatunnel 2024-02-26 17:44:47,601 INFO org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Discovery plugin jar: HdfsFile at: file:/usr/local/apache-seatunnel-2.3.3/connectors/seatunnel/connector-file-hadoop-2.3.3.jar 2024-02-26 17:44:47,603 INFO org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Load plugin: PluginIdentifier{engineType='seatunnel', pluginType='sink', pluginName='HdfsFile'} from classpath 2024-02-26 17:44:47,702 INFO org.apache.seatunnel.engine.client.job.ClientJobProxy - Start submit job, job id: 814432701363781634, with plugin jar [file:/usr/local/apache-seatunnel-2.3.3/connectors/seatunnel/connector-file-hadoop-2.3.3.jar, file:/usr/local/apache-seatunnel-2.3.3/plugins/jdbc/lib/ojdbc8-19.10.0.0.jar, file:/usr/local/apache-seatunnel-2.3.3/plugins/hive/lib/hive-exec-2.3.9.jar, file:/usr/local/apache-seatunnel-2.3.3/plugins/jdbc/lib/mysql-connector-java-8.0.32.jar, file:/usr/local/apache-seatunnel-2.3.3/connectors/seatunnel/connector-kafka-2.3.3.jar, file:/usr/local/apache-seatunnel-2.3.3/plugins/jdbc/lib/orai18n-19.10.0.0.jar, file:/usr/local/apache-seatunnel-2.3.3/plugins/jdbc/lib/hive-jdbc-3.1.3.jar] 2024-02-26 17:44:47,768 INFO org.apache.seatunnel.engine.client.job.ClientJobProxy - Submit job finished, job id: 814432701363781634, job name: SeaTunnel 2024-02-26 17:44:47,776 WARN org.apache.seatunnel.engine.client.job.JobMetricsRunner - Failed to get job metrics summary, it maybe first-run 2024-02-26 17:44:55,561 INFO org.apache.seatunnel.engine.client.job.ClientJobProxy - Job (814432701363781634) end with state FINISHED 2024-02-26 17:44:55,569 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - *********************************************** Job Statistic Information *********************************************** Start Time : 2024-02-26 17:44:47 End Time : 2024-02-26 17:44:55 Total Time(s) : 8 Total Read Count : 53847 Total Write Count : 53847 Total Failed Count : 0 *********************************************** 2024-02-26 17:44:55,570 INFO com.hazelcast.core.LifecycleService - hz.client_1 [imc_test] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTTING_DOWN 2024-02-26 17:44:55,577 INFO com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 [imc_test] [5.1] Removed connection to endpoint: [hadoop002]:5801:da380055-6b10-4427-b326-ffb967a19d45, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/192.168.9.112:44263->hadoop002/192.168.9.113:5801}, remoteAddress=[hadoop002]:5801, lastReadTime=2024-02-26 17:44:55.564, lastWriteTime=2024-02-26 17:44:55.562, closedTime=2024-02-26 17:44:55.573, connected server version=5.1} 2024-02-26 17:44:55,580 INFO com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 [imc_test] [5.1] Removed connection to endpoint: [hadoop001]:5801:b42b8c7f-fd07-4297-b9f5-55ea8ca87ca4, connection: ClientConnection{alive=false, connectionId=2, channel=NioChannel{/192.168.9.112:36031->hadoop001/192.168.9.112:5801}, remoteAddress=[hadoop001]:5801, lastReadTime=2024-02-26 17:44:47.433, lastWriteTime=2024-02-26 17:44:47.433, closedTime=2024-02-26 17:44:55.578, connected server version=5.1} 2024-02-26 17:44:55,580 INFO com.hazelcast.core.LifecycleService - hz.client_1 [imc_test] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED 2024-02-26 17:44:55,584 INFO com.hazelcast.core.LifecycleService - hz.client_1 [imc_test] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN 2024-02-26 17:44:55,584 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed SeaTunnel client...... 2024-02-26 17:44:55,584 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed metrics executor service ...... 2024-02-26 17:44:55,586 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - run shutdown hook because get close signal ``` ### Zeta or Flink or Spark Version zeta ### Java or Scala Version java 1.8 scala 2.12 ### Screenshots _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
