LYL41011 opened a new issue, #4410:
URL: https://github.com/apache/incubator-seatunnel/issues/4410

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   I tried to synchronize clickhouse data to hive using seatunnel. 
   When I synchronized 100000 pieces of data, the task ran normally. 
   When I expanded the amount of data to 200000 pieces, an error was 
thrown:**Caused by: java.io.UncheckedIOException: Failed to read column #10 of 
601: big_loan_state String**
   
   After many attempts, I have found that when the amount of data is small, it 
can operate normally, while when the amount of data is large, errors will be 
reported
   
   ### SeaTunnel Version
   
   2.3.0release
   
   ### SeaTunnel Config
   
   ```conf
   env {
     execution.parallelism = 1
     job.mode = "BATCH"
   }
   
   source {
     
      Clickhouse {
       host = "10.220.171.9:8123"
       database = "dist"
       sql = "select * from dist.ics_ln_loan  limit 200000"
       username = "default"
       password = "6lYaUiFi"
       result_table_name = "test"
     } 
   }
   
   transform {
   
   }
   sink {
     # choose stdout output plugin to output data to console
   
     Hive {
       kerberos.enable="N"
       table_name = "ods_tmp.test_seatunnel_sink"
       metastore_uri = "thrift://p71368v.hulk.bjmd.qihoo.net:9083"
       partition_by = ["partitions"]
     }
   }
   ```
   
   
   ### Running Command
   
   ```shell
   bin/start-seatunnel-flink-connector-v2.sh --config 
./config/flink.ck2hive.streaming.conf.template
   ```
   
   
   ### Error Exception
   
   ```log
   2023-03-24 10:36:03
   org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
        at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
        at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
   Caused by: java.io.UncheckedIOException: Failed to read column #10 of 601: 
big_loan_state String
        at 
com.clickhouse.client.ClickHouseDataProcessor.nextRecord(ClickHouseDataProcessor.java:154)
        at 
com.clickhouse.client.ClickHouseDataProcessor.access$100(ClickHouseDataProcessor.java:21)
        at 
com.clickhouse.client.ClickHouseDataProcessor$RecordsIterator.next(ClickHouseDataProcessor.java:36)
        at 
com.clickhouse.client.ClickHouseDataProcessor$RecordsIterator.next(ClickHouseDataProcessor.java:22)
        at java.util.Iterator.forEachRemaining(Iterator.java:116)
        at 
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
        at 
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
        at 
org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhouseSourceReader.pollNext(ClickhouseSourceReader.java:76)
        at 
org.apache.seatunnel.translation.source.ParallelSource.run(ParallelSource.java:128)
        at 
org.apache.seatunnel.translation.flink.source.BaseSeaTunnelSourceFunction.run(BaseSeaTunnelSourceFunction.java:83)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
   Caused by: java.io.IOException: Reached end of input stream after reading 93 
of 101 bytes
        at 
com.clickhouse.client.stream.AbstractByteArrayInputStream.readBytes(AbstractByteArrayInputStream.java:240)
        at 
com.clickhouse.client.data.ClickHouseRowBinaryProcessor$MappedFunctions.lambda$buildMappingsForDataTypes$64(ClickHouseRowBinaryProcessor.java:336)
        at 
com.clickhouse.client.data.ClickHouseRowBinaryProcessor$MappedFunctions.deserialize(ClickHouseRowBinaryProcessor.java:466)
        at 
com.clickhouse.client.data.ClickHouseRowBinaryProcessor.readAndFill(ClickHouseRowBinaryProcessor.java:509)
        at 
com.clickhouse.client.ClickHouseDataProcessor.nextRecord(ClickHouseDataProcessor.java:143)
        ... 12 more
   ```
   
   
   ### Flink or Spark Version
   
   Flink1.13.6release
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to