This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7be09a4222 [INLONG-11495][Sort] Add end-to-end test for Kafka
connector v1.18 (#11554)
7be09a4222 is described below
commit 7be09a42226a251efcdd2db8f86c84715ddb8293
Author: Hengyuan <[email protected]>
AuthorDate: Mon Dec 2 12:48:05 2024 +0800
[INLONG-11495][Sort] Add end-to-end test for Kafka connector v1.18 (#11554)
---
.../sort-end-to-end-tests-v1.18/pom.xml | 46 +++-
.../sort/tests/Kafka2Elasticsearch7Test.java | 248 +++++++++++++++++++++
.../test/resources/env/kafka_test_kafka_init.txt | 1 +
.../resources/flinkSql/kafka_to_elasticsearch.sql | 24 ++
.../org.apache.flink.table.factories.Factory | 4 +-
5 files changed, 314 insertions(+), 9 deletions(-)
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/pom.xml
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/pom.xml
index 78e19a16eb..850d5dddfd 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/pom.xml
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/pom.xml
@@ -31,8 +31,8 @@
<properties>
<inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir>
<flink.version>1.18.1</flink.version>
- <elasticsearch.version>6.8.17</elasticsearch.version>
<flink.shaded.jackson.version>2.15.3-18.0</flink.shaded.jackson.version>
+ <kafka.clients.version>3.7.1</kafka.clients.version>
</properties>
<dependencies>
@@ -51,6 +51,10 @@
<artifactId>postgresql</artifactId>
<version>${testcontainers.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>kafka</artifactId>
+ </dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
@@ -61,16 +65,11 @@
<artifactId>elasticsearch</artifactId>
<version>${testcontainers.version}</version>
</dependency>
- <dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>elasticsearch-rest-high-level-client</artifactId>
- <version>${elasticsearch.version}</version>
- </dependency>
<!--
https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-client
-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
- <version>${elasticsearch.version}</version>
+ <version>${elasticsearch7.version}</version>
</dependency>
<dependency>
@@ -142,6 +141,17 @@
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.clients.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>co.elastic.clients</groupId>
+ <artifactId>elasticsearch-java</artifactId>
+ <version>${elasticsearch7.version}</version>
+ </dependency>
</dependencies>
<build>
<plugins>
@@ -158,6 +168,23 @@
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
+ <artifactItem>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-kafka-v1.18</artifactId>
+ <version>${project.version}</version>
+
<destFileName>sort-connector-kafka.jar</destFileName>
+ <type>jar</type>
+
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.inlong</groupId>
+
<artifactId>sort-connector-elasticsearch7-v1.18</artifactId>
+ <version>${project.version}</version>
+
<destFileName>sort-connector-elasticsearch7.jar</destFileName>
+ <type>jar</type>
+
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+ </artifactItem>
+
</artifactItems>
</configuration>
<executions>
@@ -203,6 +230,11 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${plugin.surefire.version}</version>
+ <configuration>
+ <systemPropertyVariables>
+
<log4j.configurationFile>src/test/resources/log4j2-test.properties</log4j.configurationFile>
+ </systemPropertyVariables>
+ </configuration>
</plugin>
</plugins>
</build>
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Kafka2Elasticsearch7Test.java
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Kafka2Elasticsearch7Test.java
new file mode 100644
index 0000000000..0cf94a0663
--- /dev/null
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Kafka2Elasticsearch7Test.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tests;
+
+import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
+import org.apache.inlong.sort.tests.utils.PlaceholderResolver;
+import org.apache.inlong.sort.tests.utils.TestUtils;
+
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.json.jackson.JacksonJsonpMapper;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.RestClient;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class Kafka2Elasticsearch7Test extends FlinkContainerTestEnvJRE8 {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(Kafka2Elasticsearch7Test.class);
+ public static final Logger KAFKA_LOG =
LoggerFactory.getLogger(KafkaContainer.class);
+ public static final Logger ELASTICSEARCH_LOGGER =
LoggerFactory.getLogger(ElasticsearchContainer.class);
+
+ private static final Path kafkaJar =
TestUtils.getResource("sort-connector-kafka.jar");
+ private static final Path elasticsearchJar =
TestUtils.getResource("sort-connector-elasticsearch7.jar");
+
+ private static final int ELASTICSEARCH_DEFAULT_PORT = 9200;
+
+ private static final String FIRST_KAFKA_MESSAGE = "{\"message\":\"Hello
From Kafka\"}";
+ private static final String SECOND_KAFKA_MESSAGE = "{\"message\":\"Goodbye
From ElasticSearch\"}";
+
+ private static final String FIRST_EXPECTED_MESSAGE = "Hello From Kafka";
+ private static final String SECOND_EXPECTED_MESSAGE = "Goodbye From
ElasticSearch";
+
+ private static final String sqlFile;
+
+ static {
+ try {
+ sqlFile = Paths
+
.get(Kafka2Elasticsearch7Test.class.getResource("/flinkSql/kafka_to_elasticsearch.sql").toURI())
+ .toString();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @ClassRule
+ public static final KafkaContainer KAFKA =
+ new
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("kafka")
+ .withEmbeddedZookeeper()
+ .withLogConsumer(new Slf4jLogConsumer(KAFKA_LOG));
+
+ @ClassRule
+ public static final ElasticsearchContainer ELASTICSEARCH =
+ new
ElasticsearchContainer(DockerImageName.parse("docker.elastic.co/elasticsearch/elasticsearch:7.17.13"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("elasticsearch")
+ .withLogConsumer(new
Slf4jLogConsumer(ELASTICSEARCH_LOGGER));
+
+ @Before
+ public void setup() throws IOException {
+ waitUntilJobRunning(Duration.ofSeconds(30));
+ initializeKafkaTopic("test-topic");
+ initializeElasticsearchIndex();
+ }
+
+ private void initializeKafkaTopic(String topic) {
+ String fileName = "kafka_test_kafka_init.txt";
+ int port = KafkaContainer.ZOOKEEPER_PORT;
+
+ Map<String, Object> properties = new HashMap<>();
+ properties.put("TOPIC", topic);
+ properties.put("ZOOKEEPER_PORT", port);
+
+ try {
+ String createKafkaStatement = getCreateStatement(fileName,
properties);
+ ExecResult result = KAFKA.execInContainer("bash", "-c",
createKafkaStatement);
+ LOG.info("Create kafka topic: {}, std: {}", createKafkaStatement,
result.getStdout());
+ if (result.getExitCode() != 0) {
+ throw new RuntimeException("Init kafka topic failed. Exit
code:" + result.getExitCode());
+ }
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String getCreateStatement(String fileName, Map<String, Object>
properties) {
+ URL url =
Objects.requireNonNull(Kafka2Elasticsearch7Test.class.getResource("/env/" +
fileName));
+
+ try {
+ Path file = Paths.get(url.toURI());
+ return PlaceholderResolver.getDefaultResolver().resolveByMap(
+ new String(Files.readAllBytes(file),
StandardCharsets.UTF_8),
+ properties);
+ } catch (IOException | URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void initializeElasticsearchIndex() throws IOException {
+ RestClient restClient = RestClient.builder(
+ new HttpHost("localhost",
ELASTICSEARCH.getMappedPort(ELASTICSEARCH_DEFAULT_PORT), "http"))
+ .build();
+ RestClientTransport transport = new RestClientTransport(restClient,
new JacksonJsonpMapper());
+ ElasticsearchClient client = new ElasticsearchClient(transport);
+
+ client.indices().create(c -> c.index("test-index"));
+ LOG.info("Created Elasticsearch index: test-index");
+ }
+
+ @AfterClass
+ public static void teardown() {
+ if (KAFKA != null) {
+ KAFKA.stop();
+ }
+ if (ELASTICSEARCH != null) {
+ ELASTICSEARCH.stop();
+ }
+ }
+
+ @Test
+ public void testKafkaToElasticsearch() throws Exception {
+ submitSQLJob(sqlFile, kafkaJar, elasticsearchJar);
+ waitUntilJobRunning(Duration.ofSeconds(10));
+
+ // Produce messages to Kafka
+ org.apache.kafka.clients.producer.KafkaProducer<String, String>
producer =
+ new
org.apache.kafka.clients.producer.KafkaProducer<>(getKafkaProducerConfig());
+ producer.send(
+ new
org.apache.kafka.clients.producer.ProducerRecord<>("test-topic", "key1",
FIRST_KAFKA_MESSAGE));
+ producer.send(
+ new
org.apache.kafka.clients.producer.ProducerRecord<>("test-topic", "key2",
SECOND_KAFKA_MESSAGE));
+
+ // Query Elasticsearch to verify data is ingested
+ RestClient restClient = RestClient.builder(
+ new HttpHost("localhost", ELASTICSEARCH.getMappedPort(9200),
"http"))
+ .build();
+ RestClientTransport transport = new RestClientTransport(restClient,
new JacksonJsonpMapper());
+ ElasticsearchClient client = new ElasticsearchClient(transport);
+
+ List<String> messages = new ArrayList<>();
+ int maxRetries = 10; // Maximum number of retries (10 seconds)
+ int retryCount = 0;
+
+ while (retryCount < maxRetries) {
+ co.elastic.clients.elasticsearch.core.SearchRequest searchRequest =
+ new
co.elastic.clients.elasticsearch.core.SearchRequest.Builder()
+ .index("test-index")
+ .query(q -> q.matchAll(m -> m))
+ .build();
+
+ co.elastic.clients.elasticsearch.core.SearchResponse<Map> response
=
+ client.search(searchRequest, Map.class);
+
+ // Extract `message` fields using Elasticsearch Java API
+ messages = response.hits().hits().stream()
+ .map(hit -> {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> source = hit.source();
+ if (source != null && source.containsKey("message")) {
+ return (String) source.get("message");
+ }
+ return null;
+ })
+ .filter(Objects::nonNull) // Remove null values
+ .collect(Collectors.toList());
+
+ if (!messages.isEmpty()) {
+ // Stop polling if data is found
+ break;
+ }
+
+ // Wait for 1 second before retrying
+ Thread.sleep(1000);
+ retryCount++;
+ }
+
+ if (messages.isEmpty()) {
+ throw new AssertionError("Elasticsearch validation failed: No
messages found after polling.");
+ }
+
+ LOG.info("Extracted messages from Elasticsearch: {}", messages);
+
+ // Create expected messages list
+ List<String> expectedMessages = new ArrayList<>();
+ expectedMessages.add(FIRST_EXPECTED_MESSAGE);
+ expectedMessages.add(SECOND_EXPECTED_MESSAGE);
+
+ // Validate messages against the expected messages
+ if (new HashSet<>(messages).equals(new HashSet<>(expectedMessages))) {
+ LOG.info("Elasticsearch contains all expected messages: {}",
expectedMessages);
+ } else {
+ throw new AssertionError(
+ String.format("Elasticsearch validation failed. Expected:
%s, Found: %s", expectedMessages,
+ messages));
+ }
+ }
+
+ private java.util.Properties getKafkaProducerConfig() {
+ java.util.Properties props = new java.util.Properties();
+ String bootstrapServers = KAFKA.getBootstrapServers();
+ props.put("bootstrap.servers", bootstrapServers);
+ props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
+ return props;
+ }
+}
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/env/kafka_test_kafka_init.txt
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/env/kafka_test_kafka_init.txt
new file mode 100644
index 0000000000..b2f31d78fa
--- /dev/null
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/env/kafka_test_kafka_init.txt
@@ -0,0 +1 @@
+kafka-topics --create --topic ${TOPIC} --replication-factor 1 --partitions 1
--zookeeper localhost:${ZOOKEEPER_PORT}
\ No newline at end of file
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/flinkSql/kafka_to_elasticsearch.sql
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/flinkSql/kafka_to_elasticsearch.sql
new file mode 100644
index 0000000000..77cdeb8cae
--- /dev/null
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/flinkSql/kafka_to_elasticsearch.sql
@@ -0,0 +1,24 @@
+CREATE TABLE kafka_source (
+ `message` STRING
+) WITH (
+ 'connector' = 'kafka-inlong',
+ 'topic' = 'test-topic',
+ 'properties.bootstrap.servers' = 'kafka:9092',
+ 'properties.group.id' = 'flink-group',
+ 'scan.startup.mode' = 'earliest-offset',
+ 'format' = 'json'
+);
+
+
+CREATE TABLE elasticsearch_sink (
+ `message` STRING
+) WITH (
+ 'connector' = 'elasticsearch7-inlong',
+ 'hosts' = 'http://elasticsearch:9200',
+ 'index' = 'test-index',
+ 'format' = 'json'
+);
+
+
+INSERT INTO elasticsearch_sink
+SELECT * FROM kafka_source;
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 9b8bf8e042..5f3c21a9d4 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,5 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
-org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory
+org.apache.inlong.sort.kafka.table.KafkaDynamicTableFactory
+org.apache.inlong.sort.kafka.table.UpsertKafkaDynamicTableFactory