This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new bb9fd516ec [Fix][Connector-V2][Elasticsearch]Fix sink configuration
for DROP_DATA (#7124)
bb9fd516ec is described below
commit bb9fd516ec27fbdf2da49a327fafcb52a8250afd
Author: Wudadada <[email protected]>
AuthorDate: Tue Jul 9 21:49:28 2024 +0800
[Fix][Connector-V2][Elasticsearch]Fix sink configuration for DROP_DATA
(#7124)
---
.../catalog/ElasticSearchCatalog.java | 3 +-
.../elasticsearch/client/EsRestClient.java | 29 +++++++++
.../exception/ElasticsearchConnectorErrorCode.java | 3 +-
.../connector/elasticsearch/ElasticsearchIT.java | 69 ++++++++++++++++++----
4 files changed, 88 insertions(+), 16 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
index b1eb60e289..bbf594eb10 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
@@ -217,8 +217,7 @@ public class ElasticSearchCatalog implements Catalog {
@Override
public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) {
- dropTable(tablePath, ignoreIfNotExists);
- createTable(tablePath, null, ignoreIfNotExists);
+ esRestClient.clearIndexData(tablePath.getTableName());
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index 18c9b7c109..f80f20f673 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -480,6 +480,35 @@ public class EsRestClient {
}
}
+ public void clearIndexData(String indexName) {
+ String endpoint = String.format("/%s/_delete_by_query", indexName);
+ Request request = new Request("POST", endpoint);
+ String jsonString = "{ \"query\": { \"match_all\": {} } }";
+ request.setJsonEntity(jsonString);
+
+ try {
+ Response response = restClient.performRequest(request);
+ if (response == null) {
+ throw new ElasticsearchConnectorException(
+
ElasticsearchConnectorErrorCode.CLEAR_INDEX_DATA_FAILED,
+ "POST " + endpoint + " response null");
+ }
+ // todo: if the index doesn't exist, the response status code is
200?
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ return;
+ } else {
+ throw new ElasticsearchConnectorException(
+
ElasticsearchConnectorErrorCode.CLEAR_INDEX_DATA_FAILED,
+ String.format(
+ "POST %s response status code=%d",
+ endpoint,
response.getStatusLine().getStatusCode()));
+ }
+ } catch (IOException ex) {
+ throw new ElasticsearchConnectorException(
+ ElasticsearchConnectorErrorCode.CLEAR_INDEX_DATA_FAILED,
ex);
+ }
+ }
+
/**
* get es field name and type mapping realtion
*
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
index 67f01201dd..fe182868d4 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
@@ -28,7 +28,8 @@ public enum ElasticsearchConnectorErrorCode implements
SeaTunnelErrorCode {
LIST_INDEX_FAILED("ELASTICSEARCH-05", "List elasticsearch index failed"),
DROP_INDEX_FAILED("ELASTICSEARCH-06", "Drop elasticsearch index failed"),
CREATE_INDEX_FAILED("ELASTICSEARCH-07", "Create elasticsearch index
failed"),
- ES_FIELD_TYPE_NOT_SUPPORT("ELASTICSEARCH-08", "Not support the
elasticsearch field type");
+ ES_FIELD_TYPE_NOT_SUPPORT("ELASTICSEARCH-08", "Not support the
elasticsearch field type"),
+ CLEAR_INDEX_DATA_FAILED("ELASTICSEARCH-09", "Clear elasticsearch index
data failed");
;
private final String code;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index 3180f386b2..623dd9d221 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -447,35 +447,78 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
}
@Test
- public void testCatalog() {
+ public void testCatalog() throws InterruptedException,
JsonProcessingException {
Map<String, Object> configMap = new HashMap<>();
configMap.put("username", "elastic");
configMap.put("password", "elasticsearch");
- configMap.put("hosts", Arrays.asList("https://" +
container.getHttpHostAddress()));
+ configMap.put(
+ "hosts", Collections.singletonList("https://" +
container.getHttpHostAddress()));
configMap.put("index", "st_index3");
configMap.put("tls_verify_certificate", false);
configMap.put("tls_verify_hostname", false);
configMap.put("index_type", "st");
+
final ElasticSearchCatalog elasticSearchCatalog =
new ElasticSearchCatalog("Elasticsearch", "",
ReadonlyConfig.fromMap(configMap));
elasticSearchCatalog.open();
+
TablePath tablePath = TablePath.of("", "st_index3");
- // index exists
+
+ // Verify index does not exist initially
final boolean existsBefore =
elasticSearchCatalog.tableExists(tablePath);
- Assertions.assertFalse(existsBefore);
- // create index
+ Assertions.assertFalse(existsBefore, "Index should not exist
initially");
+
+ // Create index
elasticSearchCatalog.createTable(tablePath, null, false);
final boolean existsAfter =
elasticSearchCatalog.tableExists(tablePath);
- Assertions.assertTrue(existsAfter);
- // data exists?
- final boolean existsData =
elasticSearchCatalog.isExistsData(tablePath);
- Assertions.assertFalse(existsData);
- // truncate
+ Assertions.assertTrue(existsAfter, "Index should be created");
+
+ // Generate and add multiple records
+ List<String> data = generateTestData();
+ StringBuilder requestBody = new StringBuilder();
+ String indexHeader = "{\"index\":{\"_index\":\"st_index3\"}}\n";
+ for (String record : data) {
+ requestBody.append(indexHeader);
+ requestBody.append(record);
+ requestBody.append("\n");
+ }
+ esRestClient.bulk(requestBody.toString());
+ Thread.sleep(2000); // Wait for data to be indexed
+
+ // Verify data exists
+ List<String> sourceFields = Arrays.asList("field1", "field2");
+ Map<String, Object> query = new HashMap<>();
+ query.put("match_all", new HashMap<>());
+ ScrollResult scrollResult =
+ esRestClient.searchByScroll("st_index3", sourceFields, query,
"1m", 100);
+ Assertions.assertFalse(scrollResult.getDocs().isEmpty(), "Data should
exist in the index");
+
+ // Truncate the table
elasticSearchCatalog.truncateTable(tablePath, false);
- Assertions.assertTrue(elasticSearchCatalog.tableExists(tablePath));
- // drop
+ Thread.sleep(2000); // Wait for data to be indexed
+
+ // Verify data is deleted
+ scrollResult = esRestClient.searchByScroll("st_index3", sourceFields,
query, "1m", 100);
+ Assertions.assertTrue(
+ scrollResult.getDocs().isEmpty(), "Data should be deleted from
the index");
+
+ // Drop the table
elasticSearchCatalog.dropTable(tablePath, false);
- Assertions.assertFalse(elasticSearchCatalog.tableExists(tablePath));
+ Assertions.assertFalse(
+ elasticSearchCatalog.tableExists(tablePath), "Index should be
dropped");
+
elasticSearchCatalog.close();
}
+
+ private List<String> generateTestData() throws JsonProcessingException {
+ List<String> data = new ArrayList<>();
+ ObjectMapper objectMapper = new ObjectMapper();
+ for (int i = 0; i < 10; i++) {
+ Map<String, Object> record = new HashMap<>();
+ record.put("field1", "value" + i);
+ record.put("field2", i);
+ data.add(objectMapper.writeValueAsString(record));
+ }
+ return data;
+ }
}