aa8808021 opened a new issue, #6863:
URL: https://github.com/apache/seatunnel/issues/6863

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   Error synchronizing data from multiple tables to Kafka from MSSQL
   
   ### SeaTunnel Version
   
   2.3.5
   
   ### SeaTunnel Config
   
   ```conf
   env {
     # You can set engine configuration here
     parallelism = 1
     job.mode = "STREAMING"
     checkpoint.interval = 5000
   }
   
   source {
     # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
     SqlServer-CDC {
       base-url = "jdbc:sqlserver://localhost:1433;databaseName=test_01"
       username = "sa"
       password = "password"
       database-names = ["test_01"]
       table-names = ["test_01.dbo.table_a","test_01.dbo.table_b"]
       startup.mode="initial"
   
     }
   }
   
   transform {
   }
   
   sink {
     Kafka {
       bootstrap.servers = "xxxx"
       topic = "SYNC-TEST"
   
       kafka.request.timeout.ms = 60000
       semantics = EXACTLY_ONCE
     }
   }
   ```
   
   
   ### Running Command
   
   ```shell
   ./bin/seatunnel.sh --config ./config/test.config -e local
   ```
   
   
   ### Error Exception
   
   ```log
   2024-05-16 21:08:04,646 ERROR [o.a.k.c.p.KafkaProducer       ] 
[BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, 
taskGroupId=90000}] - [Producer clientId=producer-SeaTunnel4023-1, 
transactionalId=SeaTunnel4023-1] Interrupted while joining ioThread
   java.lang.InterruptedException: null
        at java.lang.Object.wait(Native Method) ~[?:1.8.0_391]
        at java.lang.Thread.join(Thread.java:1265) ~[?:1.8.0_391]
        at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1265) 
~[?:?]
        at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1242) 
~[?:?]
        at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1218) 
~[?:?]
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender.close(KafkaTransactionSender.java:119)
 ~[?:?]
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.close(KafkaSinkWriter.java:140)
 ~[?:?]
        at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.close(SinkFlowLifeCycle.java:158)
 ~[seatunnel-starter.jar:2.3.5]
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$close$5(SeaTunnelTask.java:330)
 ~[seatunnel-starter.jar:2.3.5]
        at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) 
[?:1.8.0_391]
        at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) 
[?:1.8.0_391]
        at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) 
[?:1.8.0_391]
        at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291) 
[?:1.8.0_391]
        at 
java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) 
[?:1.8.0_391]
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
[?:1.8.0_391]
        at 
java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1881) 
[?:1.8.0_391]
        at 
java.util.concurrent.ForkJoinPool.externalHelpComplete(ForkJoinPool.java:2478) 
[?:1.8.0_391]
        at 
java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:324) 
[?:1.8.0_391]
        at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405) 
[?:1.8.0_391]
        at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) 
[?:1.8.0_391]
        at 
java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160) 
[?:1.8.0_391]
        at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
 [?:1.8.0_391]
        at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) 
[?:1.8.0_391]
        at 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) 
[?:1.8.0_391]
        at 
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583) 
[?:1.8.0_391]
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.close(SeaTunnelTask.java:327)
 [seatunnel-starter.jar:2.3.5]
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:720)
 [seatunnel-starter.jar:2.3.5]
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004)
 [seatunnel-starter.jar:2.3.5]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_391]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_391]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_391]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_391]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_391]
   2024-05-16 21:08:04,646 INFO  [o.a.k.c.p.KafkaProducer       ] 
[BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, 
taskGroupId=90000}] - [Producer clientId=producer-SeaTunnel4023-1, 
transactionalId=SeaTunnel4023-1] Proceeding to force close the producer since 
pending requests could not be completed within timeout 9223372036854775807 ms.
   2024-05-16 21:08:04,647 INFO  [o.a.k.c.m.Metrics             ] 
[BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, 
taskGroupId=90000}] - Metrics scheduler closed
   2024-05-16 21:08:04,647 INFO  [o.a.k.c.m.Metrics             ] 
[BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, 
taskGroupId=90000}] - Closing reporter 
org.apache.kafka.common.metrics.JmxReporter
   2024-05-16 21:08:04,647 INFO  [o.a.k.c.m.Metrics             ] 
[BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, 
taskGroupId=90000}] - Metrics reporters closed
   2024-05-16 21:08:04,647 INFO  [o.a.k.c.u.AppInfoParser       ] 
[BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, 
taskGroupId=90000}] - App info kafka.producer for producer-SeaTunnel4023-1 
unregistered
   2024-05-16 21:08:04,665 INFO  [c.h.i.i.NodeExtension         ] [main] - 
[localhost]:5801 [seatunnel-253591] [5.1] Destroying node NodeExtension.
   2024-05-16 21:08:04,666 INFO  [c.h.i.i.Node                  ] [main] - 
[localhost]:5801 [seatunnel-253591] [5.1] Hazelcast Shutdown is completed in 28 
ms.
   2024-05-16 21:08:04,666 INFO  [c.h.c.LifecycleService        ] [main] - 
[localhost]:5801 [seatunnel-253591] [5.1] [localhost]:5801 is SHUTDOWN
   2024-05-16 21:08:04,666 INFO  [s.c.s.s.c.ClientExecuteCommand] [main] - 
Closed HazelcastInstance ......
   2024-05-16 21:08:04,666 INFO  [s.c.s.s.c.ClientExecuteCommand] [main] - 
Closed metrics executor service ......
   2024-05-16 21:08:04,666 ERROR [o.a.s.c.s.SeaTunnel           ] [main] -
   
   
===============================================================================
   
   
   2024-05-16 21:08:04,666 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 
Fatal Error,
   
   2024-05-16 21:08:04,666 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 
Please submit bug report in https://github.com/apache/seatunnel/issues
   
   2024-05-16 21:08:04,666 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 
Reason:SeaTunnel job executed failed
   
   2024-05-16 21:08:04,667 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 
Exception 
StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: 
SeaTunnel job executed failed
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:202)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at 
org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
   Caused by: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
com.hazelcast.nio.serialization.HazelcastSerializationException: 
java.lang.NegativeArraySizeException
        at 
com.hazelcast.internal.serialization.impl.SerializationUtil.handleException(SerializationUtil.java:111)
        at 
com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:274)
        at 
com.hazelcast.spi.impl.NodeEngineImpl.toObject(NodeEngineImpl.java:385)
        at 
com.hazelcast.collection.impl.queue.QueueProxyImpl.drainTo(QueueProxyImpl.java:134)
        at 
org.apache.seatunnel.engine.server.task.flow.ShuffleSourceFlowLifeCycle.collect(ShuffleSourceFlowLifeCycle.java:94)
        at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
        at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: java.lang.NegativeArraySizeException
        at 
org.apache.seatunnel.api.table.type.SeaTunnelRow.<init>(SeaTunnelRow.java:38)
        at 
org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:75)
        at 
org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:33)
        at 
com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
        at 
com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:268)
        ... 13 more
   
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:194)
        ... 2 more
   
   2024-05-16 21:08:04,667 ERROR [o.a.s.c.s.SeaTunnel           ] [main] -
   
===============================================================================
   
   
   
   Exception in thread "main" 
org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel 
job executed failed
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:202)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at 
org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
   Caused by: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
com.hazelcast.nio.serialization.HazelcastSerializationException: 
java.lang.NegativeArraySizeException
        at 
com.hazelcast.internal.serialization.impl.SerializationUtil.handleException(SerializationUtil.java:111)
        at 
com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:274)
        at 
com.hazelcast.spi.impl.NodeEngineImpl.toObject(NodeEngineImpl.java:385)
        at 
com.hazelcast.collection.impl.queue.QueueProxyImpl.drainTo(QueueProxyImpl.java:134)
        at 
org.apache.seatunnel.engine.server.task.flow.ShuffleSourceFlowLifeCycle.collect(ShuffleSourceFlowLifeCycle.java:94)
        at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
        at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: java.lang.NegativeArraySizeException
        at 
org.apache.seatunnel.api.table.type.SeaTunnelRow.<init>(SeaTunnelRow.java:38)
        at 
org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:75)
        at 
org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:33)
        at 
com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
        at 
com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:268)
        ... 13 more
   
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:194)
        ... 2 more
   2024-05-16 21:08:09,127 INFO  [r.IncrementalSourceSplitReader] [Source Data 
Fetcher for BlockingWorker-TaskGroupLocation{jobId=843474790923632641, 
pipelineId=1, taskGroupId=30000}] - Close current fetcher 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher
   2024-05-16 21:08:09,138 INFO  [i.d.j.JdbcConnection          ] 
[pool-26-thread-1] - Connection gracefully closed
   2024-05-16 21:08:09,139 INFO  [i.d.j.JdbcConnection          ] 
[pool-27-thread-1] - Connection gracefully closed
   2024-05-16 21:08:09,140 INFO  [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data 
Fetcher for BlockingWorker-TaskGroupLocation{jobId=843474790923632641, 
pipelineId=1, taskGroupId=30000}] - Split fetcher 0 exited.
   2024-05-16 21:08:14,521 INFO  [r.IncrementalSourceSplitReader] [Source Data 
Fetcher for BlockingWorker-TaskGroupLocation{jobId=843474790923632641, 
pipelineId=1, taskGroupId=30000}] - Close current fetcher 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher
   2024-05-16 21:08:14,525 INFO  [i.d.j.JdbcConnection          ] 
[pool-28-thread-1] - Connection gracefully closed
   2024-05-16 21:08:14,527 INFO  [i.d.j.JdbcConnection          ] 
[pool-29-thread-1] - Connection gracefully closed
   2024-05-16 21:08:14,527 INFO  [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data 
Fetcher for BlockingWorker-TaskGroupLocation{jobId=843474790923632641, 
pipelineId=1, taskGroupId=30000}] - Split fetcher 0 exited.
   2024-05-16 21:08:18,585 INFO  [o.a.k.c.m.Metrics             ] 
[ForkJoinPool.commonPool-worker-5] - Metrics scheduler closed
   2024-05-16 21:08:18,585 INFO  [o.a.k.c.m.Metrics             ] 
[ForkJoinPool.commonPool-worker-5] - Closing reporter 
org.apache.kafka.common.metrics.JmxReporter
   2024-05-16 21:08:18,585 INFO  [o.a.k.c.m.Metrics             ] 
[ForkJoinPool.commonPool-worker-5] - Metrics reporters closed
   2024-05-16 21:08:18,585 INFO  [o.a.k.c.u.AppInfoParser       ] 
[ForkJoinPool.commonPool-worker-5] - App info kafka.producer for 
producer-SeaTunnel6694-1 unregistered
   2024-05-16 21:08:18,597 INFO  [o.a.k.c.m.Metrics             ] 
[ForkJoinPool.commonPool-worker-3] - Metrics scheduler closed
   2024-05-16 21:08:18,597 INFO  [o.a.k.c.m.Metrics             ] 
[ForkJoinPool.commonPool-worker-3] - Closing reporter 
org.apache.kafka.common.metrics.JmxReporter
   2024-05-16 21:08:18,597 INFO  [o.a.k.c.m.Metrics             ] 
[ForkJoinPool.commonPool-worker-3] - Metrics reporters closed
   2024-05-16 21:08:18,597 INFO  [o.a.k.c.u.AppInfoParser       ] 
[ForkJoinPool.commonPool-worker-3] - App info kafka.producer for 
producer-SeaTunnel3841-1 unregistered
   2024-05-16 21:08:29,196 INFO  [o.a.k.c.m.Metrics             ] 
[ForkJoinPool.commonPool-worker-4] - Metrics scheduler closed
   2024-05-16 21:08:29,196 INFO  [o.a.k.c.m.Metrics             ] 
[ForkJoinPool.commonPool-worker-4] - Closing reporter 
org.apache.kafka.common.metrics.JmxReporter
   2024-05-16 21:08:29,196 INFO  [o.a.k.c.m.Metrics             ] 
[ForkJoinPool.commonPool-worker-4] - Metrics reporters closed
   2024-05-16 21:08:29,197 INFO  [o.a.k.c.u.AppInfoParser       ] 
[ForkJoinPool.commonPool-worker-4] - App info kafka.producer for 
producer-SeaTunnel4066-1 unregistered
   2024-05-16 21:08:29,197 INFO  [o.a.k.c.m.Metrics             ] 
[ForkJoinPool.commonPool-worker-2] - Metrics scheduler closed
   2024-05-16 21:08:29,197 INFO  [o.a.k.c.m.Metrics             ] 
[ForkJoinPool.commonPool-worker-2] - Closing reporter 
org.apache.kafka.common.metrics.JmxReporter
   2024-05-16 21:08:29,198 INFO  [o.a.k.c.m.Metrics             ] 
[ForkJoinPool.commonPool-worker-2] - Metrics reporters closed
   2024-05-16 21:08:29,198 INFO  [o.a.k.c.u.AppInfoParser       ] 
[ForkJoinPool.commonPool-worker-2] - App info kafka.producer for 
producer-SeaTunnel7334-1 unregistered
   2024-05-16 21:08:34,594 INFO  [o.a.k.c.m.Metrics             ] 
[ForkJoinPool.commonPool-worker-6] - Metrics scheduler closed
   2024-05-16 21:08:34,595 INFO  [o.a.k.c.m.Metrics             ] 
[ForkJoinPool.commonPool-worker-6] - Closing reporter 
org.apache.kafka.common.metrics.JmxReporter
   2024-05-16 21:08:34,595 INFO  [o.a.k.c.m.Metrics             ] 
[ForkJoinPool.commonPool-worker-6] - Metrics reporters closed
   2024-05-16 21:08:34,596 INFO  [o.a.k.c.u.AppInfoParser       ] 
[ForkJoinPool.commonPool-worker-6] - App info kafka.producer for 
producer-SeaTunnel3209-1 unregistered
   2024-05-16 21:08:34,595 INFO  [o.a.k.c.m.Metrics             ] 
[ForkJoinPool.commonPool-worker-1] - Metrics scheduler closed
   2024-05-16 21:08:34,598 INFO  [o.a.k.c.m.Metrics             ] 
[ForkJoinPool.commonPool-worker-1] - Closing reporter 
org.apache.kafka.common.metrics.JmxReporter
   2024-05-16 21:08:34,598 INFO  [o.a.k.c.m.Metrics             ] 
[ForkJoinPool.commonPool-worker-1] - Metrics reporters closed
   2024-05-16 21:08:34,599 INFO  [o.a.k.c.u.AppInfoParser       ] 
[ForkJoinPool.commonPool-worker-1] - App info kafka.producer for 
producer-SeaTunnel5301-1 unregistered
   2024-05-16 21:08:34,604 INFO  [s.c.s.s.c.ClientExecuteCommand] 
[ForkJoinPool.commonPool-worker-1] - run shutdown hook because get close signal
   ```
   
   
   ### Zeta or Flink or Spark Version
   
   Zeta
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to