This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push:
new 767fbe8994 Use health checks & Awaitility for elasticsearch-rest
assertions with mutating and destrucive operations
767fbe8994 is described below
commit 767fbe8994f8bcb512215868ff73c8b8b0633126
Author: James Netherton <[email protected]>
AuthorDate: Fri Mar 13 14:55:04 2026 +0000
Use health checks & Awaitility for elasticsearch-rest assertions with
mutating and destrucive operations
---
.../client/it/ElasticsearchRestClientTest.java | 202 +++++++++++++++------
1 file changed, 148 insertions(+), 54 deletions(-)
diff --git
a/integration-tests/elasticsearch-rest-client/src/test/java/org/apache/camel/quarkus/component/elasticsearch/rest/client/it/ElasticsearchRestClientTest.java
b/integration-tests/elasticsearch-rest-client/src/test/java/org/apache/camel/quarkus/component/elasticsearch/rest/client/it/ElasticsearchRestClientTest.java
index 2151b98e56..cbd3710383 100644
---
a/integration-tests/elasticsearch-rest-client/src/test/java/org/apache/camel/quarkus/component/elasticsearch/rest/client/it/ElasticsearchRestClientTest.java
+++
b/integration-tests/elasticsearch-rest-client/src/test/java/org/apache/camel/quarkus/component/elasticsearch/rest/client/it/ElasticsearchRestClientTest.java
@@ -16,7 +16,14 @@
*/
package org.apache.camel.quarkus.component.elasticsearch.rest.client.it;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -28,7 +35,11 @@ import io.smallrye.certs.Format;
import io.smallrye.certs.junit5.Certificate;
import org.apache.camel.quarkus.test.support.certificate.TestCertificates;
import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
+import org.eclipse.microprofile.config.ConfigProvider;
+import org.jboss.logging.Logger;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.Matchers.is;
@@ -40,6 +51,13 @@ import static org.hamcrest.Matchers.is;
@QuarkusTest
@QuarkusTestResource(ElasticsearchRestTestResource.class)
class ElasticsearchRestClientTest {
+ private static final Logger LOG =
Logger.getLogger(ElasticsearchRestClientTest.class);
+
+ @BeforeEach
+ public void beforeEach() throws ConditionTimeoutException {
+ // Ensure the Elasticsearch cluster is ready before each test
+ waitClusterReady();
+ }
@AfterEach
public void afterEach() {
@@ -81,15 +99,17 @@ class ElasticsearchRestClientTest {
.asString();
// Verify index exists
- RestAssured.given()
- .queryParam("indexName", indexName)
- .queryParam("indexId", indexId)
- .get("/elasticsearch-rest-client/get")
- .then()
- .statusCode(200)
- .body(
- "id", is(documentId),
- "value", is(documentValue));
+ Awaitility.await().pollInterval(50, TimeUnit.MILLISECONDS).atMost(10,
TimeUnit.SECONDS).untilAsserted(() -> {
+ RestAssured.given()
+ .queryParam("indexName", indexName)
+ .queryParam("indexId", indexId)
+ .get("/elasticsearch-rest-client/get")
+ .then()
+ .statusCode(200)
+ .body(
+ "id", is(documentId),
+ "value", is(documentValue));
+ });
// Update indexed data
String updatedDocumentValue = documentValue + " Updated";
@@ -104,15 +124,17 @@ class ElasticsearchRestClientTest {
.statusCode(200);
// Verify updated data
- RestAssured.given()
- .queryParam("indexName", indexName)
- .queryParam("indexId", indexId)
- .get("/elasticsearch-rest-client/get")
- .then()
- .statusCode(200)
- .body(
- "id", is(documentId),
- "value", is(updatedDocumentValue));
+ Awaitility.await().pollInterval(50, TimeUnit.MILLISECONDS).atMost(10,
TimeUnit.SECONDS).untilAsserted(() -> {
+ RestAssured.given()
+ .queryParam("indexName", indexName)
+ .queryParam("indexId", indexId)
+ .get("/elasticsearch-rest-client/get")
+ .then()
+ .statusCode(200)
+ .body(
+ "id", is(documentId),
+ "value", is(updatedDocumentValue));
+ });
// Delete indexed data
RestAssured.given()
@@ -123,12 +145,14 @@ class ElasticsearchRestClientTest {
.statusCode(204);
// Verify data deleted
- RestAssured.given()
- .queryParam("indexName", indexName)
- .queryParam("indexId", indexId)
- .get("/elasticsearch-rest-client/get")
- .then()
- .statusCode(404);
+ Awaitility.await().pollInterval(50, TimeUnit.MILLISECONDS).atMost(10,
TimeUnit.SECONDS).untilAsserted(() -> {
+ RestAssured.given()
+ .queryParam("indexName", indexName)
+ .queryParam("indexId", indexId)
+ .get("/elasticsearch-rest-client/get")
+ .then()
+ .statusCode(404);
+ });
}
@Test
@@ -162,15 +186,17 @@ class ElasticsearchRestClientTest {
.asString();
// Verify index exists
- RestAssured.given()
- .queryParam("indexName", indexName)
- .queryParam("indexId", indexId)
- .get("/elasticsearch-rest-client/get")
- .then()
- .statusCode(200)
- .body(
- "id", is(documentId),
- "value", is(documentValue));
+ Awaitility.await().pollInterval(50, TimeUnit.MILLISECONDS).atMost(10,
TimeUnit.SECONDS).untilAsserted(() -> {
+ RestAssured.given()
+ .queryParam("indexName", indexName)
+ .queryParam("indexId", indexId)
+ .get("/elasticsearch-rest-client/get")
+ .then()
+ .statusCode(200)
+ .body(
+ "id", is(documentId),
+ "value", is(documentValue));
+ });
// Delete indexed data
RestAssured.given()
@@ -181,12 +207,14 @@ class ElasticsearchRestClientTest {
.statusCode(204);
// Verify data deleted
- RestAssured.given()
- .queryParam("indexName", indexName)
- .queryParam("indexId", indexId)
- .get("/elasticsearch-rest-client/get")
- .then()
- .statusCode(404);
+ Awaitility.await().pollInterval(50, TimeUnit.MILLISECONDS).atMost(10,
TimeUnit.SECONDS).untilAsserted(() -> {
+ RestAssured.given()
+ .queryParam("indexName", indexName)
+ .queryParam("indexId", indexId)
+ .get("/elasticsearch-rest-client/get")
+ .then()
+ .statusCode(404);
+ });
}
@Test
@@ -219,15 +247,17 @@ class ElasticsearchRestClientTest {
.asString();
// Verify index exists
- RestAssured.given()
- .queryParam("indexName", indexName)
- .queryParam("indexId", indexId)
- .get("/elasticsearch-rest-client/get")
- .then()
- .statusCode(200)
- .body(
- "id", is(documentId),
- "value", is(documentValue));
+ Awaitility.await().pollInterval(50, TimeUnit.MILLISECONDS).atMost(10,
TimeUnit.SECONDS).untilAsserted(() -> {
+ RestAssured.given()
+ .queryParam("indexName", indexName)
+ .queryParam("indexId", indexId)
+ .get("/elasticsearch-rest-client/get")
+ .then()
+ .statusCode(200)
+ .body(
+ "id", is(documentId),
+ "value", is(documentValue));
+ });
// Delete indexed data
RestAssured.given()
@@ -238,12 +268,14 @@ class ElasticsearchRestClientTest {
.body(is("true"));
// Verify data deleted
- RestAssured.given()
- .queryParam("indexName", indexName)
- .queryParam("indexId", indexId)
- .get("/elasticsearch-rest-client/get")
- .then()
- .statusCode(404);
+ Awaitility.await().pollInterval(50, TimeUnit.MILLISECONDS).atMost(10,
TimeUnit.SECONDS).untilAsserted(() -> {
+ RestAssured.given()
+ .queryParam("indexName", indexName)
+ .queryParam("indexId", indexId)
+ .get("/elasticsearch-rest-client/get")
+ .then()
+ .statusCode(404);
+ });
}
@Test
@@ -385,4 +417,66 @@ class ElasticsearchRestClientTest {
"value[0]", is(documentValueB));
});
}
+
+ /**
+ * Queries the Elasticsearch cluster health status and waits until it's
green or yellow.
+ * Retries with Awaitility until the cluster is ready.
+ *
+ * @throws ConditionTimeoutException if the request fails after all retries
+ */
+ private void waitClusterReady() throws ConditionTimeoutException {
+ String hostAddresses = ConfigProvider.getConfig()
+
.getValue("camel.component.elasticsearch-rest-client.host-addresses-list",
String.class);
+ String username = ConfigProvider.getConfig()
+ .getValue("camel.component.elasticsearch-rest-client.user",
String.class);
+ String password = ConfigProvider.getConfig()
+
.getValue("camel.component.elasticsearch-rest-client.password", String.class);
+
+ Awaitility.await()
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(10, TimeUnit.SECONDS)
+ .until(() -> {
+ try {
+ URL url = new
URL(String.format("http://%s/_cluster/health", hostAddresses));
+ HttpURLConnection connection = (HttpURLConnection)
url.openConnection();
+
+ // Set up Basic Authentication
+ String auth = String.format("%s:%s", username,
password);
+ String encodedAuth =
Base64.getEncoder().encodeToString(auth.getBytes(StandardCharsets.UTF_8));
+ connection.setRequestProperty("Authorization", "Basic
" + encodedAuth);
+ connection.setRequestMethod("GET");
+ connection.setConnectTimeout(5000);
+ connection.setReadTimeout(5000);
+
+ int responseCode = connection.getResponseCode();
+ if (responseCode == HttpURLConnection.HTTP_OK) {
+ try (BufferedReader reader = new BufferedReader(
+ new
InputStreamReader(connection.getInputStream()))) {
+ StringBuilder response = new StringBuilder();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ response.append(line);
+ }
+ String healthJson = response.toString();
+
+ // Check if cluster status is green or yellow
+ if (healthJson.contains("\"status\":\"green\"")
+ ||
healthJson.contains("\"status\":\"yellow\"")) {
+ LOG.info("Cluster health is ready: " +
healthJson);
+ return healthJson;
+ } else {
+ LOG.info("Cluster not ready yet, current
status: " + healthJson);
+ return null;
+ }
+ }
+ } else {
+ LOG.info("Cluster health check returned code: " +
responseCode + ", retrying...");
+ return null;
+ }
+ } catch (Exception e) {
+ LOG.info("Failed to query cluster health: " +
e.getMessage() + ", retrying...");
+ return null;
+ }
+ }, Objects::nonNull);
+ }
}