aa8808021 opened a new issue, #6863: URL: https://github.com/apache/seatunnel/issues/6863
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened Error synchronizing data from multiple tables to Kafka from MSSQL ### SeaTunnel Version 2.3.5 ### SeaTunnel Config ```conf env { # You can set engine configuration here parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 5000 } source { # This is a example source plugin **only for test and demonstrate the feature source plugin** SqlServer-CDC { base-url = "jdbc:sqlserver://localhost:1433;databaseName=test_01" username = "sa" password = "password" database-names = ["test_01"] table-names = ["test_01.dbo.table_a","test_01.dbo.table_b"] startup.mode="initial" } } transform { } sink { Kafka { bootstrap.servers = "xxxx" topic = "SYNC-TEST" kafka.request.timeout.ms = 60000 semantics = EXACTLY_ONCE } } ``` ### Running Command ```shell ./bin/seatunnel.sh --config ./config/test.config -e local ``` ### Error Exception ```log 2024-05-16 21:08:04,646 ERROR [o.a.k.c.p.KafkaProducer ] [BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=90000}] - [Producer clientId=producer-SeaTunnel4023-1, transactionalId=SeaTunnel4023-1] Interrupted while joining ioThread java.lang.InterruptedException: null at java.lang.Object.wait(Native Method) ~[?:1.8.0_391] at java.lang.Thread.join(Thread.java:1265) ~[?:1.8.0_391] at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1265) ~[?:?] at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1242) ~[?:?] at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1218) ~[?:?] at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender.close(KafkaTransactionSender.java:119) ~[?:?] at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.close(KafkaSinkWriter.java:140) ~[?:?] at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.close(SinkFlowLifeCycle.java:158) ~[seatunnel-starter.jar:2.3.5] at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$close$5(SeaTunnelTask.java:330) ~[seatunnel-starter.jar:2.3.5] at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) [?:1.8.0_391] at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) [?:1.8.0_391] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) [?:1.8.0_391] at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291) [?:1.8.0_391] at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) [?:1.8.0_391] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_391] at java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1881) [?:1.8.0_391] at java.util.concurrent.ForkJoinPool.externalHelpComplete(ForkJoinPool.java:2478) [?:1.8.0_391] at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:324) [?:1.8.0_391] at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405) [?:1.8.0_391] at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) [?:1.8.0_391] at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160) [?:1.8.0_391] at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174) [?:1.8.0_391] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) [?:1.8.0_391] at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) [?:1.8.0_391] at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583) [?:1.8.0_391] at org.apache.seatunnel.engine.server.task.SeaTunnelTask.close(SeaTunnelTask.java:327) [seatunnel-starter.jar:2.3.5] at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:720) [seatunnel-starter.jar:2.3.5] at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004) [seatunnel-starter.jar:2.3.5] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_391] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_391] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_391] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_391] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_391] 2024-05-16 21:08:04,646 INFO [o.a.k.c.p.KafkaProducer ] [BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=90000}] - [Producer clientId=producer-SeaTunnel4023-1, transactionalId=SeaTunnel4023-1] Proceeding to force close the producer since pending requests could not be completed within timeout 9223372036854775807 ms. 2024-05-16 21:08:04,647 INFO [o.a.k.c.m.Metrics ] [BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=90000}] - Metrics scheduler closed 2024-05-16 21:08:04,647 INFO [o.a.k.c.m.Metrics ] [BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=90000}] - Closing reporter org.apache.kafka.common.metrics.JmxReporter 2024-05-16 21:08:04,647 INFO [o.a.k.c.m.Metrics ] [BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=90000}] - Metrics reporters closed 2024-05-16 21:08:04,647 INFO [o.a.k.c.u.AppInfoParser ] [BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=90000}] - App info kafka.producer for producer-SeaTunnel4023-1 unregistered 2024-05-16 21:08:04,665 INFO [c.h.i.i.NodeExtension ] [main] - [localhost]:5801 [seatunnel-253591] [5.1] Destroying node NodeExtension. 2024-05-16 21:08:04,666 INFO [c.h.i.i.Node ] [main] - [localhost]:5801 [seatunnel-253591] [5.1] Hazelcast Shutdown is completed in 28 ms. 2024-05-16 21:08:04,666 INFO [c.h.c.LifecycleService ] [main] - [localhost]:5801 [seatunnel-253591] [5.1] [localhost]:5801 is SHUTDOWN 2024-05-16 21:08:04,666 INFO [s.c.s.s.c.ClientExecuteCommand] [main] - Closed HazelcastInstance ...... 2024-05-16 21:08:04,666 INFO [s.c.s.s.c.ClientExecuteCommand] [main] - Closed metrics executor service ...... 2024-05-16 21:08:04,666 ERROR [o.a.s.c.s.SeaTunnel ] [main] - =============================================================================== 2024-05-16 21:08:04,666 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Fatal Error, 2024-05-16 21:08:04,666 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues 2024-05-16 21:08:04,666 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Reason:SeaTunnel job executed failed 2024-05-16 21:08:04,667 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:202) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34) Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: com.hazelcast.nio.serialization.HazelcastSerializationException: java.lang.NegativeArraySizeException at com.hazelcast.internal.serialization.impl.SerializationUtil.handleException(SerializationUtil.java:111) at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:274) at com.hazelcast.spi.impl.NodeEngineImpl.toObject(NodeEngineImpl.java:385) at com.hazelcast.collection.impl.queue.QueueProxyImpl.drainTo(QueueProxyImpl.java:134) at org.apache.seatunnel.engine.server.task.flow.ShuffleSourceFlowLifeCycle.collect(ShuffleSourceFlowLifeCycle.java:94) at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703) at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.NegativeArraySizeException at org.apache.seatunnel.api.table.type.SeaTunnelRow.<init>(SeaTunnelRow.java:38) at org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:75) at org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:33) at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44) at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:268) ... 13 more at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:194) ... 2 more 2024-05-16 21:08:04,667 ERROR [o.a.s.c.s.SeaTunnel ] [main] - =============================================================================== Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:202) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34) Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: com.hazelcast.nio.serialization.HazelcastSerializationException: java.lang.NegativeArraySizeException at com.hazelcast.internal.serialization.impl.SerializationUtil.handleException(SerializationUtil.java:111) at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:274) at com.hazelcast.spi.impl.NodeEngineImpl.toObject(NodeEngineImpl.java:385) at com.hazelcast.collection.impl.queue.QueueProxyImpl.drainTo(QueueProxyImpl.java:134) at org.apache.seatunnel.engine.server.task.flow.ShuffleSourceFlowLifeCycle.collect(ShuffleSourceFlowLifeCycle.java:94) at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703) at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.NegativeArraySizeException at org.apache.seatunnel.api.table.type.SeaTunnelRow.<init>(SeaTunnelRow.java:38) at org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:75) at org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:33) at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44) at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:268) ... 13 more at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:194) ... 2 more 2024-05-16 21:08:09,127 INFO [r.IncrementalSourceSplitReader] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=30000}] - Close current fetcher org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher 2024-05-16 21:08:09,138 INFO [i.d.j.JdbcConnection ] [pool-26-thread-1] - Connection gracefully closed 2024-05-16 21:08:09,139 INFO [i.d.j.JdbcConnection ] [pool-27-thread-1] - Connection gracefully closed 2024-05-16 21:08:09,140 INFO [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=30000}] - Split fetcher 0 exited. 2024-05-16 21:08:14,521 INFO [r.IncrementalSourceSplitReader] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=30000}] - Close current fetcher org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher 2024-05-16 21:08:14,525 INFO [i.d.j.JdbcConnection ] [pool-28-thread-1] - Connection gracefully closed 2024-05-16 21:08:14,527 INFO [i.d.j.JdbcConnection ] [pool-29-thread-1] - Connection gracefully closed 2024-05-16 21:08:14,527 INFO [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=30000}] - Split fetcher 0 exited. 2024-05-16 21:08:18,585 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-5] - Metrics scheduler closed 2024-05-16 21:08:18,585 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-5] - Closing reporter org.apache.kafka.common.metrics.JmxReporter 2024-05-16 21:08:18,585 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-5] - Metrics reporters closed 2024-05-16 21:08:18,585 INFO [o.a.k.c.u.AppInfoParser ] [ForkJoinPool.commonPool-worker-5] - App info kafka.producer for producer-SeaTunnel6694-1 unregistered 2024-05-16 21:08:18,597 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-3] - Metrics scheduler closed 2024-05-16 21:08:18,597 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-3] - Closing reporter org.apache.kafka.common.metrics.JmxReporter 2024-05-16 21:08:18,597 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-3] - Metrics reporters closed 2024-05-16 21:08:18,597 INFO [o.a.k.c.u.AppInfoParser ] [ForkJoinPool.commonPool-worker-3] - App info kafka.producer for producer-SeaTunnel3841-1 unregistered 2024-05-16 21:08:29,196 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-4] - Metrics scheduler closed 2024-05-16 21:08:29,196 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-4] - Closing reporter org.apache.kafka.common.metrics.JmxReporter 2024-05-16 21:08:29,196 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-4] - Metrics reporters closed 2024-05-16 21:08:29,197 INFO [o.a.k.c.u.AppInfoParser ] [ForkJoinPool.commonPool-worker-4] - App info kafka.producer for producer-SeaTunnel4066-1 unregistered 2024-05-16 21:08:29,197 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-2] - Metrics scheduler closed 2024-05-16 21:08:29,197 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-2] - Closing reporter org.apache.kafka.common.metrics.JmxReporter 2024-05-16 21:08:29,198 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-2] - Metrics reporters closed 2024-05-16 21:08:29,198 INFO [o.a.k.c.u.AppInfoParser ] [ForkJoinPool.commonPool-worker-2] - App info kafka.producer for producer-SeaTunnel7334-1 unregistered 2024-05-16 21:08:34,594 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-6] - Metrics scheduler closed 2024-05-16 21:08:34,595 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-6] - Closing reporter org.apache.kafka.common.metrics.JmxReporter 2024-05-16 21:08:34,595 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-6] - Metrics reporters closed 2024-05-16 21:08:34,596 INFO [o.a.k.c.u.AppInfoParser ] [ForkJoinPool.commonPool-worker-6] - App info kafka.producer for producer-SeaTunnel3209-1 unregistered 2024-05-16 21:08:34,595 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-1] - Metrics scheduler closed 2024-05-16 21:08:34,598 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-1] - Closing reporter org.apache.kafka.common.metrics.JmxReporter 2024-05-16 21:08:34,598 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-1] - Metrics reporters closed 2024-05-16 21:08:34,599 INFO [o.a.k.c.u.AppInfoParser ] [ForkJoinPool.commonPool-worker-1] - App info kafka.producer for producer-SeaTunnel5301-1 unregistered 2024-05-16 21:08:34,604 INFO [s.c.s.s.c.ClientExecuteCommand] [ForkJoinPool.commonPool-worker-1] - run shutdown hook because get close signal ``` ### Zeta or Flink or Spark Version Zeta ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
