gaoyan1998 opened a new issue, #336:
URL: https://github.com/apache/doris-flink-connector/issues/336

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/incubator-doris/issues?q=is%3Aissue) and 
found no similar issues.
   
   
   ### Version
   
   1.2.7
   
   ### What's Wrong?
   
   ```
   2024-03-11 08:59:49,243 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - doris_dws_log[3]: Writer (1/1)#0 
(42a70c838a02977baf48478f21d3c85f_6d2677a0ecc3fd8df0b72ec675edf8f4_0_0) 
switched from CREATED to DEPLOYING.
   2024-03-11 08:59:49,244 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - Loading JAR files for task doris_dws_log[3]: Writer (1/1)#0 
(42a70c838a02977baf48478f21d3c85f_6d2677a0ecc3fd8df0b72ec675edf8f4_0_0) 
[DEPLOYING].
   2024-03-11 08:59:49,244 INFO  
org.apache.flink.runtime.io.network.partition.ResultPartition [] - Sort-merge 
partition 
83a2598d6bedaa33bba189576122d476#0@42a70c838a02977baf48478f21d3c85f_6d2677a0ecc3fd8df0b72ec675edf8f4_0_0
 initialized.
   2024-03-11 08:59:49,584 INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Using 
application-defined state backend: 
org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionStateBackend@6e41178f
   2024-03-11 08:59:49,584 INFO  
org.apache.flink.runtime.state.StateBackendLoader            [] - State backend 
loader loads the state backend as BatchExecutionStateBackend
   2024-03-11 08:59:49,584 INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Using 
application defined checkpoint storage: 
org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionCheckpointStorage@338a690e
   2024-03-11 08:59:49,584 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - doris_dws_log[3]: Writer (1/1)#0 
(42a70c838a02977baf48478f21d3c85f_6d2677a0ecc3fd8df0b72ec675edf8f4_0_0) 
switched from DEPLOYING to INITIALIZING.
   2024-03-11 08:59:49,850 INFO  org.apache.doris.flink.sink.writer.DorisWriter 
              [] - restore checkpointId 0
   2024-03-11 08:59:49,850 INFO  org.apache.doris.flink.sink.writer.DorisWriter 
              [] - labelPrefix scps_log_database_load2dws
   2024-03-11 08:59:49,855 INFO  org.apache.doris.flink.sink.writer.DorisWriter 
              [] - Send request to Doris FE 
'http://10.0.1.36:8030/api/backends?is_alive=true' with user 'admin'.
   2024-03-11 08:59:49,901 INFO  org.apache.doris.flink.sink.writer.DorisWriter 
              [] - Backend 
Info:{"backends":[{"ip":"10.0.1.46","http_port":8040,"is_alive":true},{"ip":"10.0.1.28","http_port":8040,"is_alive":true},{"ip":"10.0.1.22","http_port":8040,"is_alive":true}]}
   2024-03-11 08:59:49,988 INFO  
org.apache.doris.flink.sink.writer.RecordBuffer              [] - init 
RecordBuffer capacity 262144, count 3
   2024-03-11 08:59:49,989 INFO  
org.apache.doris.flink.sink.writer.DorisStreamLoad           [] - abort for 
labelSuffix scps_log_database_load2dws_0. start chkId 1.
   2024-03-11 08:59:50,079 INFO  
org.apache.doris.flink.sink.writer.DorisStreamLoad           [] - load Result {
   "TxnId": 53343092,
   "Label": "scps_log_database_load2dws_0_1",
   "TwoPhaseCommit": "true",
   "Status": "Success",
   "Message": "OK",
   "NumberTotalRows": 0,
   "NumberLoadedRows": 0,
   "NumberFilteredRows": 0,
   "NumberUnselectedRows": 0,
   "LoadBytes": 0,
   "LoadTimeMs": 16,
   "BeginTxnTimeMs": 0,
   "StreamLoadPutTimeMs": 6,
   "ReadDataTimeMs": 0,
   "WriteDataTimeMs": 7,
   "CommitAndPublishTimeMs": 0
   }
   2024-03-11 08:59:50,082 INFO  
org.apache.doris.flink.sink.writer.DorisStreamLoad           [] - abort 
53343092 for check label scps_log_database_load2dws_0_1.
   2024-03-11 08:59:50,085 INFO  
org.apache.doris.flink.sink.writer.DorisStreamLoad           [] - abort for 
labelSuffix scps_log_database_load2dws_0 finished
   2024-03-11 08:59:50,088 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - doris_dws_log[3]: Writer (1/1)#0 
(42a70c838a02977baf48478f21d3c85f_6d2677a0ecc3fd8df0b72ec675edf8f4_0_0) 
switched from INITIALIZING to RUNNING.
   2024-03-11 08:59:50,090 INFO  
org.apache.doris.flink.sink.writer.RecordBuffer              [] - start buffer 
data, read queue size 0, write queue size 3
   2024-03-11 08:59:50,090 INFO  
org.apache.doris.flink.sink.writer.DorisStreamLoad           [] - stream load 
started for scps_log_database_load2dws_0_1 on host 10.0.1.46:8040
   2024-03-11 08:59:50,091 INFO  
org.apache.doris.flink.sink.writer.DorisStreamLoad           [] - start execute 
load
   2024-03-11 09:01:30,985 INFO  org.apache.http.impl.execchain.RetryExec       
              [] - I/O exception (java.net.SocketException) caught when 
processing request to {}->http://10.0.1.46:8040: Broken pipe (Write failed)
   2024-03-11 09:01:40,287 ERROR org.apache.doris.flink.sink.writer.DorisWriter 
              [] - stream load finished unexpectedly, interrupt worker thread! 
org.apache.http.client.ClientProtocolException
   2024-03-11 09:01:40,290 WARN  org.apache.flink.runtime.taskmanager.Task      
              [] - doris_dws_log[3]: Writer (1/1)#0 
(42a70c838a02977baf48478f21d3c85f_6d2677a0ecc3fd8df0b72ec675edf8f4_0_0) 
switched from RUNNING to FAILED with failure cause:
   java.lang.RuntimeException: java.lang.InterruptedException
   at 
org.apache.doris.flink.sink.writer.RecordStream.write(RecordStream.java:59) 
~[flink-doris-connector-1.17-1.4.0.jar:1.4.0]
   at 
org.apache.doris.flink.sink.writer.DorisStreamLoad.writeRecord(DorisStreamLoad.java:195)
 ~[flink-doris-connector-1.17-1.4.0.jar:1.4.0]
   at 
org.apache.doris.flink.sink.writer.DorisWriter.write(DorisWriter.java:143) 
~[flink-doris-connector-1.17-1.4.0.jar:1.4.0]
   at 
org.apache.flink.streaming.api.transformations.SinkV1Adapter$SinkWriterV1Adapter.write(SinkV1Adapter.java:136)
 ~[flink-dist-1.17.2.jar:1.17.2]
   at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
 ~[flink-dist-1.17.2.jar:1.17.2]
   at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
 ~[flink-dist-1.17.2.jar:1.17.2]
   at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
 ~[flink-dist-1.17.2.jar:1.17.2]
   at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
 ~[flink-dist-1.17.2.jar:1.17.2]
   at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-dist-1.17.2.jar:1.17.2]
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
 ~[flink-dist-1.17.2.jar:1.17.2]
   at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
 ~[flink-dist-1.17.2.jar:1.17.2]
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
 ~[flink-dist-1.17.2.jar:1.17.2]
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) 
~[flink-dist-1.17.2.jar:1.17.2]
   at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
 ~[flink-dist-1.17.2.jar:1.17.2]
   at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) 
[flink-dist-1.17.2.jar:1.17.2]
   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
[flink-dist-1.17.2.jar:1.17.2]
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
[flink-dist-1.17.2.jar:1.17.2]
   at java.lang.Thread.run(Unknown Source) [?:?]
   Caused by: java.lang.InterruptedException
   at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(Unknown
 Source) ~[?:?]
   at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
 Source) ~[?:?]
   at java.util.concurrent.ArrayBlockingQueue.take(Unknown Source) ~[?:?]
   at 
org.apache.doris.flink.sink.writer.RecordBuffer.write(RecordBuffer.java:91) 
~[flink-doris-connector-1.17-1.4.0.jar:1.4.0]
   at 
org.apache.doris.flink.sink.writer.RecordStream.write(RecordStream.java:57) 
~[flink-doris-connector-1.17-1.4.0.jar:1.4.0]
   ... 17 more
   2024-03-11 09:01:40,294 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - Freeing task resources for doris_dws_log[3]: Writer (1/1)#0 
(42a70c838a02977baf48478f21d3c85f_6d2677a0ecc3fd8df0b72ec675edf8f4_0_0).
   2024-03-11 09:01:40,300 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - 
Un-registering task and sending final execution state FAILED to JobManager for 
task doris_dws_log[3]: Writer (1/1)#0 
42a70c838a02977baf48478f21d3c85f_6d2677a0ecc3fd8df0b72ec675edf8f4_0_0.
   2024-03-11 09:01:40,342 INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
TaskSlot(index:0, state:ACTIVE, resource profile: 
ResourceProfile{cpuCores=0.01, taskHeapMemory=537.600mb (563714445 bytes), 
taskOffHeapMemory=0 bytes, managedMemory=634.880mb (665719939 bytes), 
networkMemory=158.720mb (166429984 bytes)}, allocationId: 
af4ef76e87f887d787b3b34db222e140, jobId: 108303d779c058df66183f711b014211).
   2024-03-11 09:01:40,345 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 
108303d779c058df66183f711b014211 from job leader monitoring.
   2024-03-11 09:01:40,345 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close 
JobManager connection for job 108303d779c058df66183f711b014211.
   2024-03-11 09:02:14,326 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close 
ResourceManager connection f1cded2e81112d915b164504ca51796a.
   2024-03-11 09:02:14,327 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Connecting to 
ResourceManager 
akka.tcp://[email protected]:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000).
   2024-03-11 09:02:14,331 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Resolved 
ResourceManager address, beginning registration
   2024-03-11 09:02:14,337 ERROR 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Fatal error 
occurred in TaskExecutor 
akka.tcp://[email protected]:6122/user/rpc/taskmanager_0.
   org.apache.flink.util.FlinkException: The TaskExecutor's registration at the 
ResourceManager 
akka.tcp://[email protected]:6123/user/rpc/resourcemanager_*
 has been rejected: Rejected TaskExecutor registration at the ResourceManager 
because: The ResourceManager does not recognize this TaskExecutor.
   at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2442)
 ~[flink-dist-1.17.2.jar:1.17.2]
   at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2386)
 ~[flink-dist-1.17.2.jar:1.17.2]
   at 
org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109)
 ~[flink-dist-1.17.2.jar:1.17.2]
   at 
org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40)
 ~[flink-dist-1.17.2.jar:1.17.2]
   at 
org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269)
 ~[flink-dist-1.17.2.jar:1.17.2]
   at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) 
~[?:?]
   at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown 
Source) ~[?:?]
   at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) 
~[?:?]
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
 ~[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2]
   at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 ~[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2]
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
 ~[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2]
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
 ~[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2]
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
 ~[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2]
   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 
[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2]
   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 
[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2]
   at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2]
   at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 
[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2]
   at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2]
   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 
[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2]
   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2]
   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2]
   at akka.actor.Actor.aroundReceive(Actor.scala:537) 
[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2]
   at akka.actor.Actor.aroundReceive$(Actor.scala:535) 
[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2]
   at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) 
[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2]
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579) 
[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2]
   at akka.actor.ActorCell.invoke(ActorCell.scala:547) 
[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2]
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) 
[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2]
   at akka.dispatch.Mailbox.run(Mailbox.scala:231) 
[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2]
   at akka.dispatch.Mailbox.exec(Mailbox.scala:243) 
[flink-rpc-akka_c6ac3ba6-997e-4151-a8c3-e5b00451e5a5.jar:1.17.2]
   at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
   at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) 
[?:?]
   at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
   at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
   at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
   2024-03-11 09:02:14,338 ERROR 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - Fatal error 
occurred while executing the TaskManager. Shutting it down...
   ```
   
   
   ### What You Expected?
   
   i have try 1.4, 1.5 the same
   
   ### How to Reproduce?
   
   _No response_
   
   ### Anything Else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to