w398507661 opened a new issue, #4316:
URL: https://github.com/apache/incubator-seatunnel/issues/4316

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   2.3.0版本 由PG向ES导入数据报错
   es mapping :{
     "seatunneltest_tst_20230213_1" : {
       "mappings" : {
         "acount_y" : {
           "properties" : {
             "id" : {
               "type" : "text",
               "fields" : {
                 "keyword" : {
                   "type" : "keyword",
                   "ignore_above" : 256
                 }
               }
             },
             "type" : {
               "type" : "text",
               "fields" : {
                 "keyword" : {
                   "type" : "keyword",
                   "ignore_above" : 256
                 }
               }
             }
           }
         }
       }
     }
   }
   
   es settings{
     "seatunneltest_tst_20230213_1" : {
       "settings" : {
         "index" : {
           "refresh_interval" : "1s",
           "number_of_shards" : "5",
           "provided_name" : "seatunneltest_tst_20230213_1",
           "creation_date" : "1676280835561",
           "number_of_replicas" : "0",
           "uuid" : "dk1wGNFfTzWvG_QtIfHpIg",
           "version" : {
             "created" : "6070099"
           }
         }
       }
     }
   }
   
   
   ### SeaTunnel Version
   
   2.3.0
   
   ### SeaTunnel Config
   
   ```conf
   env {
     # seatunnel defined streaming batch duration in seconds
    ## [spark.app.name](http://spark.app.name) = "SeaTunnel"
     #spark.sql.catalogImplementation = "hive"
     spark.executor.instances = 10
     spark.executor.cores = 20
     spark.executor.memory = "40g"
   }
   
   source {
     jdbc {
       driver = "org.postgresql.Driver"
       url = "jdbc:postgresql://10.122.141.186:5432/biportaldb"
       user = "a_appconnect"
       password = "HhtDR#U5"
       table = "seatunnel_test"
       result_table_name = "seatunnel_test"
       #jdbc.SSL = "true"
       #jdbc.SSLKeyStorePath = "/data/ludp_jks/keystore.jks"
       query = "select * from seatunnel_test"
    }
   
   }
   
   transform {
     # split data by specific delimiter
   
   # you can also use other filter plugins, such as sql
     # sql {
     #   sql = "select * from accesslog where request_time > 1000"
     #} 
   }
   
   
   sink {
     # choose stdout output plugin to output data to console
     # Console {}
        elasticsearch {
                hosts = ["10.122.33.95:9201"]
                index = "seatunneltest_tst_20230213_1"
        }
   }
   ```
   
   
   ### Running Command
   
   ```shell
   nohup ./bin/start-seatunnel-spark-connector-v2.sh  --master yarn 
--deploy-mode client --config config/spark.batch.pg.to.es.conf &
   ```
   
   
   ### Error Exception
   
   ```log
   2023-03-09 15:59:03 INFO  YarnScheduler:54 - Killing all running tasks in 
stage 0: Stage cancelled
   2023-03-09 15:59:03 INFO  DAGScheduler:54 - ResultStage 0 (save at 
SinkExecuteProcessor.java:85) failed in 12.895 s due to Job aborted due to 
stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost 
task 0.3 in stage 0.0 (TID 3, cbi238.cdh.com, executor 5): 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException:
 ErrorCode:[COMMON-08], ErrorDescription:[Sql operation failed, such as 
(execute,addBatch,close) etc...] - ElasticSearch execute batch statement error
           at 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink.ElasticsearchSinkWriter.bulkEsWithRetry(ElasticsearchSinkWriter.java:122)
           at 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink.ElasticsearchSinkWriter.write(ElasticsearchSinkWriter.java:92)
           at 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink.ElasticsearchSinkWriter.write(ElasticsearchSinkWriter.java:50)
           at 
org.apache.seatunnel.translation.spark.sink.SparkDataWriter.write(SparkDataWriter.java:58)
           at 
org.apache.seatunnel.translation.spark.sink.SparkDataWriter.write(SparkDataWriter.java:37)
           at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:118)
           at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
           at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
           at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
           at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
           at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
           at org.apache.spark.scheduler.Task.run(Task.scala:121)
           at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.RuntimeException: Execute given execution failed after 
retry 3 times
           at 
org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:68)
           at 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink.ElasticsearchSinkWriter.bulkEsWithRetry(ElasticsearchSinkWriter.java:108)
           ... 18 more
   Caused by: 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException:
 ErrorCode:[ELASTICSEARCH-01], ErrorDescription:[Bulk es response error] - bulk 
es error,request boy={ "index" :{"_index":"seatunneltest_tst_20230213_1"}}
   ```
   
   
   ### Flink or Spark Version
   
   spark 2.4.0
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to