FuYouJ opened a new issue, #7714:
URL: https://github.com/apache/seatunnel/issues/7714

   ### Search before asking
   
   - [X] I had searched in the 
[feature](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22)
 and found no similar feature requirement.
   
   
   ### Description
   
   seatunnel code
   ```
   public String serializeRow(SeaTunnelRow row) {
           switch (row.getRowKind()) {
               case INSERT:
               case UPDATE_AFTER:
                   return serializeUpsert(row);
               case UPDATE_BEFORE:
               case DELETE:
                   return serializeDelete(row);
               default:
                   throw new ElasticsearchConnectorException(
                           CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
                           "Unsupported write row kind: " + row.getRowKind());
           }
       }
   private String serializeUpsert(SeaTunnelRow row) {
           String key = keyExtractor.apply(row);
           Map<String, Object> document = toDocumentMap(row, seaTunnelRowType);
           String documentStr;
   
           try {
               documentStr = objectMapper.writeValueAsString(document);
           } catch (JsonProcessingException e) {
               throw CommonError.jsonOperationError(
                       "Elasticsearch", "document:" + document.toString(), e);
           }
   
           if (key != null) {
               Map<String, String> upsertMetadata = createMetadata(row, key);
               String upsertMetadataStr;
               try {
                   upsertMetadataStr = 
objectMapper.writeValueAsString(upsertMetadata);
               } catch (JsonProcessingException e) {
                   throw CommonError.jsonOperationError(
                           "Elasticsearch", "upsertMetadata:" + 
upsertMetadata.toString(), e);
               }
   
               /**
                * format example: { "update" : {"_index" : "${your_index}", 
"_id" :
                * "${your_document_id}"} }\n { "doc" : ${your_document_json}, 
"doc_as_upsert" : true }
                */
               return new StringBuilder()
                       .append("{ \"update\" :")
                       .append(upsertMetadataStr)
                       .append(" }")
                       .append("\n")
                       .append("{ \"doc\" :")
                       .append(documentStr)
                       .append(", \"doc_as_upsert\" : true }")
                       .toString();
           }
   
           Map<String, String> indexMetadata = createMetadata(row);
           String indexMetadataStr;
           try {
               indexMetadataStr = 
objectMapper.writeValueAsString(indexMetadata);
           } catch (JsonProcessingException e) {
               throw CommonError.jsonOperationError(
                       "Elasticsearch", "indexMetadata:" + 
indexMetadata.toString(), e);
           }
   
           /**
            * format example: { "index" : {"_index" : "${your_index}", "_id" : 
"${your_document_id}"}
            * }\n ${your_document_json}
            */
           return new StringBuilder()
                   .append("{ \"index\" :")
                   .append(indexMetadataStr)
                   .append(" }")
                   .append("\n")
                   .append(documentStr)
                   .toString();
       }
   ```
   The 2.3.8 dev code classifies insert and update_after as upsert, in the 
common insert scenario, insert usually has better performance than upsert.I 
think we should discuss separately the cases where rowkind is insert and where 
rowkind is update.
   
   I wrote a simple code to compare the performance of index inserts and upsert
   ```java
   public void 
compareUpsertVsIndexWriteIntoNonexistenceDocPerformance(TestContainer container)
               throws IOException {
           // 获得十万条数据插入到待读取的索引中
           int count = 100000;
           int upsertCount = 0;
           int insertCount = 0;
           String upsertIndex = "upsert_index";
           String insertIndex = "insert_index";
   
           createIndexForResourceNull(upsertIndex);
           createIndexForResourceNull(insertIndex);
           String indexHeaderFormat = 
"{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}";
           String upsertHeaderFormat = "{ \"update\" 
:{\"_index\":\"%s\",\"_id\":\"%s\"} }";
   
           long upsertStart = System.currentTimeMillis();
           int id = 1;
           while (upsertCount <= count) {
               List<String> testDataSet = generateTestDataSet1();
               StringBuilder requestBody = new StringBuilder();
               for (String row : testDataSet) {
                   String bulkItem =
                           new StringBuilder()
                                   .append(String.format(upsertHeaderFormat, 
upsertIndex,id++))
                                   .append("\n")
                                   .append("{ \"doc\" :")
                                   .append(row)
                                   .append(", \"doc_as_upsert\" : true }")
                                   .toString();
                   requestBody.append(bulkItem).append("\n");
               }
               esRestClient.bulk(requestBody.toString());
               upsertCount += testDataSet.size();
           }
           long upsertEnd = System.currentTimeMillis();
   
           long insertStart = System.currentTimeMillis();
           while (insertCount <= count) {
               List<String> testDataSet = generateTestDataSet1();
               StringBuilder requestBody = new StringBuilder();
               for (String row : testDataSet) {
                   requestBody.append(String.format(indexHeaderFormat, 
insertIndex,id++))
                           .append("\n")
                           .append(row)
                           .append("\n");
               }
               esRestClient.bulk(requestBody.toString());
               insertCount += testDataSet.size();
           }
           long insertEnd = System.currentTimeMillis();
           long insertCost = (insertEnd - insertStart);
           long upsertCost = (upsertEnd - upsertStart);
           Assertions.assertTrue( insertCost <= upsertCost);
       }
   ```
   
![QQ_1726996350979](https://github.com/user-attachments/assets/29c8c067-075a-4bca-86b6-4b5e0adcb00c)
   
   
   ### Usage Scenario
   
   _No response_
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to