FuYouJ opened a new issue, #7714: URL: https://github.com/apache/seatunnel/issues/7714
### Search before asking - [X] I had searched in the [feature](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22) and found no similar feature requirement. ### Description seatunnel code ``` public String serializeRow(SeaTunnelRow row) { switch (row.getRowKind()) { case INSERT: case UPDATE_AFTER: return serializeUpsert(row); case UPDATE_BEFORE: case DELETE: return serializeDelete(row); default: throw new ElasticsearchConnectorException( CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, "Unsupported write row kind: " + row.getRowKind()); } } private String serializeUpsert(SeaTunnelRow row) { String key = keyExtractor.apply(row); Map<String, Object> document = toDocumentMap(row, seaTunnelRowType); String documentStr; try { documentStr = objectMapper.writeValueAsString(document); } catch (JsonProcessingException e) { throw CommonError.jsonOperationError( "Elasticsearch", "document:" + document.toString(), e); } if (key != null) { Map<String, String> upsertMetadata = createMetadata(row, key); String upsertMetadataStr; try { upsertMetadataStr = objectMapper.writeValueAsString(upsertMetadata); } catch (JsonProcessingException e) { throw CommonError.jsonOperationError( "Elasticsearch", "upsertMetadata:" + upsertMetadata.toString(), e); } /** * format example: { "update" : {"_index" : "${your_index}", "_id" : * "${your_document_id}"} }\n { "doc" : ${your_document_json}, "doc_as_upsert" : true } */ return new StringBuilder() .append("{ \"update\" :") .append(upsertMetadataStr) .append(" }") .append("\n") .append("{ \"doc\" :") .append(documentStr) .append(", \"doc_as_upsert\" : true }") .toString(); } Map<String, String> indexMetadata = createMetadata(row); String indexMetadataStr; try { indexMetadataStr = objectMapper.writeValueAsString(indexMetadata); } catch (JsonProcessingException e) { throw CommonError.jsonOperationError( "Elasticsearch", "indexMetadata:" + indexMetadata.toString(), e); } /** * format example: { "index" : {"_index" : "${your_index}", "_id" : "${your_document_id}"} * }\n ${your_document_json} */ return new StringBuilder() .append("{ \"index\" :") .append(indexMetadataStr) .append(" }") .append("\n") .append(documentStr) .toString(); } ``` The 2.3.8 dev code classifies insert and update_after as upsert, in the common insert scenario, insert usually has better performance than upsert.I think we should discuss separately the cases where rowkind is insert and where rowkind is update. I wrote a simple code to compare the performance of index inserts and upsert ```java public void compareUpsertVsIndexWriteIntoNonexistenceDocPerformance(TestContainer container) throws IOException { // 获得十万条数据插入到待读取的索引中 int count = 100000; int upsertCount = 0; int insertCount = 0; String upsertIndex = "upsert_index"; String insertIndex = "insert_index"; createIndexForResourceNull(upsertIndex); createIndexForResourceNull(insertIndex); String indexHeaderFormat = "{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}"; String upsertHeaderFormat = "{ \"update\" :{\"_index\":\"%s\",\"_id\":\"%s\"} }"; long upsertStart = System.currentTimeMillis(); int id = 1; while (upsertCount <= count) { List<String> testDataSet = generateTestDataSet1(); StringBuilder requestBody = new StringBuilder(); for (String row : testDataSet) { String bulkItem = new StringBuilder() .append(String.format(upsertHeaderFormat, upsertIndex,id++)) .append("\n") .append("{ \"doc\" :") .append(row) .append(", \"doc_as_upsert\" : true }") .toString(); requestBody.append(bulkItem).append("\n"); } esRestClient.bulk(requestBody.toString()); upsertCount += testDataSet.size(); } long upsertEnd = System.currentTimeMillis(); long insertStart = System.currentTimeMillis(); while (insertCount <= count) { List<String> testDataSet = generateTestDataSet1(); StringBuilder requestBody = new StringBuilder(); for (String row : testDataSet) { requestBody.append(String.format(indexHeaderFormat, insertIndex,id++)) .append("\n") .append(row) .append("\n"); } esRestClient.bulk(requestBody.toString()); insertCount += testDataSet.size(); } long insertEnd = System.currentTimeMillis(); long insertCost = (insertEnd - insertStart); long upsertCost = (upsertEnd - upsertStart); Assertions.assertTrue( insertCost <= upsertCost); } ```  ### Usage Scenario _No response_ ### Related issues _No response_ ### Are you willing to submit a PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
