goldzzz1 opened a new issue, #5885:
URL: https://github.com/apache/seatunnel/issues/5885

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   seatunnel 流模式http写入paimon报错
   
   ### SeaTunnel Version
   
   2.3.2
   
   ### SeaTunnel Config
   
   ```conf
   env {
     
     job.mode = "STREAMING"
   }
   source{
   Http {
     url = "https://statistics-api.wildberries.ru/api/v1/supplier/orders";
     headers = 
{Authorization="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3NJRCI6ImJlYjUxMGU3LThiOTEtNGYyYi05YjdkLWZmMWQ1MzlmOWI3ZCJ9.ryRtg7TAlJ9VzhtoTq2XkhGGYI6N30AA0py3Lv0zoIo"}
     params = {dateFrom="2023-11-17"}
     json_field = {
         date = "$.[*].date"
         lastChangeDate = "$.[*].lastChangeDate"
         supplierArticle = "$.[*].supplierArticle"
         techSize = "$.[*].techSize"
         barcode = "$.[*].barcode"
         totalPrice = "$.[*].totalPrice"
         discountPercent = "$.[*].discountPercent"
         warehouseName = "$.[*].warehouseName"
         oblast = "$.[*].oblast"
         incomeID = "$.[*].incomeID"
         odid = "$.[*].odid"
         nmId = "$.[*].nmId"
         subject = "$.[*].subject"
         category = "$.[*].category"
         brand = "$.[*].brand"
         isCancel = "$.[*].isCancel"
         cancel_dt= "$.[*].cancel_dt"
         gNumber = "$.[*].gNumber"
         sticker = "$.[*].sticker"
         srid = "$.[*].srid"
         orderType= "$.[*].orderType"
       }
     schema = {
       fields {
       date=string
        lastChangeDate=string
        supplierArticle=string
        techSize=string
       barcode=string
       totalPrice=string
       discountPercent=string
       warehouseName=string
       oblast=string
       incomeID=string
       odid=string
       nmId=string
       subject =string
       category =string
       brand =string
       isCancel =boolean
       cancel_dt =string
       gNumber=string
       sticker=string
       srid=string
       orderType=string
       token=string
       source=string
       }
     }
   result_table_name = "fake"
   
    }
   }
   transform {
     Sql {
       source_table_name = "fake"
       result_table_name = "fake1"
       query = "SELECT 
odid,date,lastChangeDate,supplierArticle,techSize,barcode,totalPrice,discountPercent,warehouseName,oblast,incomeID,nmId,subject,category,brand,isCancel,cancel_dt,gNumber,sticker,srid,orderType,'ооо"13"'
 as token,'wb' as source from fake "
     }
   }
   
   
   
   
   sink {
     Paimon {
       warehouse = "hdfs://localhost:9000/path/to/warehouse"
       database = "default"
       table = "ods_orders"
     }
   }
   ```
   
   
   ### Running Command
   
   ```shell
   sh ./bin/seatunnel.sh --config ./config/v2.batch1.config
   ```
   
   
   ### Error Exception
   
   ```log
   Caused by: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
org.apache.seatunnel.engine.server.checkpoint.CheckpointException: 
CheckpointCoordinator inside have error.
           at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:255)
           at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:251)
           at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointCoordinator.java:344)
           at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:188)
           at 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.run(CheckpointErrorReportOperation.java:48)
           at 
com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
           at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
           at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
           at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
           at 
com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
   Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException:
 ErrorCode:[PAIMON-01], ErrorDescription:[Paimon write commit failed] - Flink 
table store commit operation failed
           at 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonAggregatedCommitter.commit(PaimonAggregatedCommitter.java:67)
           at 
org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask.restoreState(SinkAggregatedCommitterTask.java:266)
           at 
org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$null$0(NotifyTaskRestoreOperation.java:106)
           at 
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:750)
   Caused by: java.lang.RuntimeException: File deletion conflicts detected! 
Give up committing.
   
   
   
   Caused by: java.lang.RuntimeException: File deletion conflicts detected! 
Give up committing.
   
   Conflicts during commits are normal and this failure is intended to resolve 
the conflicts.
   Conflicts are mainly caused by the following scenarios:
   1. Multiple jobs are writing into the same partition at the same time, you 
can use 
https://paimon.apache.org/docs/master/maintenance/write-performance/#dedicated-compaction-job
 to support multiple writing.
   2. You're recovering from an old savepoint, or you're creating multiple jobs 
from a savepoint.
      The job will fail continuously in this scenario to protect metadata from 
corruption.
      You can either recover from the latest savepoint, or you can revert the 
table to the snapshot corresponding to the old savepoint.
   ```
   
   
   ### Zeta or Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   1.8
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to