This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git


The following commit(s) were added to refs/heads/main by this push:
     new 767fbe8994 Use health checks & Awaitility for elasticsearch-rest 
assertions with mutating and destrucive operations
767fbe8994 is described below

commit 767fbe8994f8bcb512215868ff73c8b8b0633126
Author: James Netherton <[email protected]>
AuthorDate: Fri Mar 13 14:55:04 2026 +0000

    Use health checks & Awaitility for elasticsearch-rest assertions with 
mutating and destrucive operations
---
 .../client/it/ElasticsearchRestClientTest.java     | 202 +++++++++++++++------
 1 file changed, 148 insertions(+), 54 deletions(-)

diff --git 
a/integration-tests/elasticsearch-rest-client/src/test/java/org/apache/camel/quarkus/component/elasticsearch/rest/client/it/ElasticsearchRestClientTest.java
 
b/integration-tests/elasticsearch-rest-client/src/test/java/org/apache/camel/quarkus/component/elasticsearch/rest/client/it/ElasticsearchRestClientTest.java
index 2151b98e56..cbd3710383 100644
--- 
a/integration-tests/elasticsearch-rest-client/src/test/java/org/apache/camel/quarkus/component/elasticsearch/rest/client/it/ElasticsearchRestClientTest.java
+++ 
b/integration-tests/elasticsearch-rest-client/src/test/java/org/apache/camel/quarkus/component/elasticsearch/rest/client/it/ElasticsearchRestClientTest.java
@@ -16,7 +16,14 @@
  */
 package org.apache.camel.quarkus.component.elasticsearch.rest.client.it;
 
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -28,7 +35,11 @@ import io.smallrye.certs.Format;
 import io.smallrye.certs.junit5.Certificate;
 import org.apache.camel.quarkus.test.support.certificate.TestCertificates;
 import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
+import org.eclipse.microprofile.config.ConfigProvider;
+import org.jboss.logging.Logger;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import static org.hamcrest.Matchers.is;
@@ -40,6 +51,13 @@ import static org.hamcrest.Matchers.is;
 @QuarkusTest
 @QuarkusTestResource(ElasticsearchRestTestResource.class)
 class ElasticsearchRestClientTest {
+    private static final Logger LOG = 
Logger.getLogger(ElasticsearchRestClientTest.class);
+
+    @BeforeEach
+    public void beforeEach() throws ConditionTimeoutException {
+        // Ensure the Elasticsearch cluster is ready before each test
+        waitClusterReady();
+    }
 
     @AfterEach
     public void afterEach() {
@@ -81,15 +99,17 @@ class ElasticsearchRestClientTest {
                 .asString();
 
         // Verify index exists
-        RestAssured.given()
-                .queryParam("indexName", indexName)
-                .queryParam("indexId", indexId)
-                .get("/elasticsearch-rest-client/get")
-                .then()
-                .statusCode(200)
-                .body(
-                        "id", is(documentId),
-                        "value", is(documentValue));
+        Awaitility.await().pollInterval(50, TimeUnit.MILLISECONDS).atMost(10, 
TimeUnit.SECONDS).untilAsserted(() -> {
+            RestAssured.given()
+                    .queryParam("indexName", indexName)
+                    .queryParam("indexId", indexId)
+                    .get("/elasticsearch-rest-client/get")
+                    .then()
+                    .statusCode(200)
+                    .body(
+                            "id", is(documentId),
+                            "value", is(documentValue));
+        });
 
         // Update indexed data
         String updatedDocumentValue = documentValue + " Updated";
@@ -104,15 +124,17 @@ class ElasticsearchRestClientTest {
                 .statusCode(200);
 
         // Verify updated data
-        RestAssured.given()
-                .queryParam("indexName", indexName)
-                .queryParam("indexId", indexId)
-                .get("/elasticsearch-rest-client/get")
-                .then()
-                .statusCode(200)
-                .body(
-                        "id", is(documentId),
-                        "value", is(updatedDocumentValue));
+        Awaitility.await().pollInterval(50, TimeUnit.MILLISECONDS).atMost(10, 
TimeUnit.SECONDS).untilAsserted(() -> {
+            RestAssured.given()
+                    .queryParam("indexName", indexName)
+                    .queryParam("indexId", indexId)
+                    .get("/elasticsearch-rest-client/get")
+                    .then()
+                    .statusCode(200)
+                    .body(
+                            "id", is(documentId),
+                            "value", is(updatedDocumentValue));
+        });
 
         // Delete indexed data
         RestAssured.given()
@@ -123,12 +145,14 @@ class ElasticsearchRestClientTest {
                 .statusCode(204);
 
         // Verify data deleted
-        RestAssured.given()
-                .queryParam("indexName", indexName)
-                .queryParam("indexId", indexId)
-                .get("/elasticsearch-rest-client/get")
-                .then()
-                .statusCode(404);
+        Awaitility.await().pollInterval(50, TimeUnit.MILLISECONDS).atMost(10, 
TimeUnit.SECONDS).untilAsserted(() -> {
+            RestAssured.given()
+                    .queryParam("indexName", indexName)
+                    .queryParam("indexId", indexId)
+                    .get("/elasticsearch-rest-client/get")
+                    .then()
+                    .statusCode(404);
+        });
     }
 
     @Test
@@ -162,15 +186,17 @@ class ElasticsearchRestClientTest {
                 .asString();
 
         // Verify index exists
-        RestAssured.given()
-                .queryParam("indexName", indexName)
-                .queryParam("indexId", indexId)
-                .get("/elasticsearch-rest-client/get")
-                .then()
-                .statusCode(200)
-                .body(
-                        "id", is(documentId),
-                        "value", is(documentValue));
+        Awaitility.await().pollInterval(50, TimeUnit.MILLISECONDS).atMost(10, 
TimeUnit.SECONDS).untilAsserted(() -> {
+            RestAssured.given()
+                    .queryParam("indexName", indexName)
+                    .queryParam("indexId", indexId)
+                    .get("/elasticsearch-rest-client/get")
+                    .then()
+                    .statusCode(200)
+                    .body(
+                            "id", is(documentId),
+                            "value", is(documentValue));
+        });
 
         // Delete indexed data
         RestAssured.given()
@@ -181,12 +207,14 @@ class ElasticsearchRestClientTest {
                 .statusCode(204);
 
         // Verify data deleted
-        RestAssured.given()
-                .queryParam("indexName", indexName)
-                .queryParam("indexId", indexId)
-                .get("/elasticsearch-rest-client/get")
-                .then()
-                .statusCode(404);
+        Awaitility.await().pollInterval(50, TimeUnit.MILLISECONDS).atMost(10, 
TimeUnit.SECONDS).untilAsserted(() -> {
+            RestAssured.given()
+                    .queryParam("indexName", indexName)
+                    .queryParam("indexId", indexId)
+                    .get("/elasticsearch-rest-client/get")
+                    .then()
+                    .statusCode(404);
+        });
     }
 
     @Test
@@ -219,15 +247,17 @@ class ElasticsearchRestClientTest {
                 .asString();
 
         // Verify index exists
-        RestAssured.given()
-                .queryParam("indexName", indexName)
-                .queryParam("indexId", indexId)
-                .get("/elasticsearch-rest-client/get")
-                .then()
-                .statusCode(200)
-                .body(
-                        "id", is(documentId),
-                        "value", is(documentValue));
+        Awaitility.await().pollInterval(50, TimeUnit.MILLISECONDS).atMost(10, 
TimeUnit.SECONDS).untilAsserted(() -> {
+            RestAssured.given()
+                    .queryParam("indexName", indexName)
+                    .queryParam("indexId", indexId)
+                    .get("/elasticsearch-rest-client/get")
+                    .then()
+                    .statusCode(200)
+                    .body(
+                            "id", is(documentId),
+                            "value", is(documentValue));
+        });
 
         // Delete indexed data
         RestAssured.given()
@@ -238,12 +268,14 @@ class ElasticsearchRestClientTest {
                 .body(is("true"));
 
         // Verify data deleted
-        RestAssured.given()
-                .queryParam("indexName", indexName)
-                .queryParam("indexId", indexId)
-                .get("/elasticsearch-rest-client/get")
-                .then()
-                .statusCode(404);
+        Awaitility.await().pollInterval(50, TimeUnit.MILLISECONDS).atMost(10, 
TimeUnit.SECONDS).untilAsserted(() -> {
+            RestAssured.given()
+                    .queryParam("indexName", indexName)
+                    .queryParam("indexId", indexId)
+                    .get("/elasticsearch-rest-client/get")
+                    .then()
+                    .statusCode(404);
+        });
     }
 
     @Test
@@ -385,4 +417,66 @@ class ElasticsearchRestClientTest {
                             "value[0]", is(documentValueB));
         });
     }
+
+    /**
+     * Queries the Elasticsearch cluster health status and waits until it's 
green or yellow.
+     * Retries with Awaitility until the cluster is ready.
+     *
+     * @throws ConditionTimeoutException if the request fails after all retries
+     */
+    private void waitClusterReady() throws ConditionTimeoutException {
+        String hostAddresses = ConfigProvider.getConfig()
+                
.getValue("camel.component.elasticsearch-rest-client.host-addresses-list", 
String.class);
+        String username = ConfigProvider.getConfig()
+                .getValue("camel.component.elasticsearch-rest-client.user", 
String.class);
+        String password = ConfigProvider.getConfig()
+                
.getValue("camel.component.elasticsearch-rest-client.password", String.class);
+
+        Awaitility.await()
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(10, TimeUnit.SECONDS)
+                .until(() -> {
+                    try {
+                        URL url = new 
URL(String.format("http://%s/_cluster/health";, hostAddresses));
+                        HttpURLConnection connection = (HttpURLConnection) 
url.openConnection();
+
+                        // Set up Basic Authentication
+                        String auth = String.format("%s:%s", username, 
password);
+                        String encodedAuth = 
Base64.getEncoder().encodeToString(auth.getBytes(StandardCharsets.UTF_8));
+                        connection.setRequestProperty("Authorization", "Basic 
" + encodedAuth);
+                        connection.setRequestMethod("GET");
+                        connection.setConnectTimeout(5000);
+                        connection.setReadTimeout(5000);
+
+                        int responseCode = connection.getResponseCode();
+                        if (responseCode == HttpURLConnection.HTTP_OK) {
+                            try (BufferedReader reader = new BufferedReader(
+                                    new 
InputStreamReader(connection.getInputStream()))) {
+                                StringBuilder response = new StringBuilder();
+                                String line;
+                                while ((line = reader.readLine()) != null) {
+                                    response.append(line);
+                                }
+                                String healthJson = response.toString();
+
+                                // Check if cluster status is green or yellow
+                                if (healthJson.contains("\"status\":\"green\"")
+                                        || 
healthJson.contains("\"status\":\"yellow\"")) {
+                                    LOG.info("Cluster health is ready: " + 
healthJson);
+                                    return healthJson;
+                                } else {
+                                    LOG.info("Cluster not ready yet, current 
status: " + healthJson);
+                                    return null;
+                                }
+                            }
+                        } else {
+                            LOG.info("Cluster health check returned code: " + 
responseCode + ", retrying...");
+                            return null;
+                        }
+                    } catch (Exception e) {
+                        LOG.info("Failed to query cluster health: " + 
e.getMessage() + ", retrying...");
+                        return null;
+                    }
+                }, Objects::nonNull);
+    }
 }

Reply via email to