This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit ac14dba9fba67452cd28405b3f59d79d7440fe98
Author: wuzexian <[email protected]>
AuthorDate: Fri Jul 12 16:59:18 2024 +0800

    [FLINK-35894][pipeline-connector][es] Introduce Elasticsearch Sink 
Connector for Flink CDC Pipeline
---
 .../pom.xml                                        | 222 +++++++++++
 .../config/ElasticsearchSinkOptions.java           |  95 +++++
 .../elasticsearch/serializer/ColumnType.java       |  46 +++
 .../serializer/ElasticsearchEventSerializer.java   | 268 ++++++++++++++
 .../serializer/ElasticsearchRowConverter.java      | 205 +++++++++++
 .../elasticsearch/sink/ElasticsearchDataSink.java  |  81 ++++
 .../sink/ElasticsearchDataSinkFactory.java         | 116 ++++++
 .../sink/ElasticsearchDataSinkOptions.java         | 104 ++++++
 .../sink/ElasticsearchMetadataApplier.java         |  45 +++
 .../elasticsearch/v2/Elasticsearch8AsyncSink.java  | 105 ++++++
 .../v2/Elasticsearch8AsyncSinkBuilder.java         | 258 +++++++++++++
 .../v2/Elasticsearch8AsyncSinkSerializer.java      |  46 +++
 .../v2/Elasticsearch8AsyncWriter.java              | 217 +++++++++++
 .../connectors/elasticsearch/v2/NetworkConfig.java | 123 +++++++
 .../cdc/connectors/elasticsearch/v2/Operation.java |  52 +++
 .../elasticsearch/v2/OperationSerializer.java      |  50 +++
 .../org.apache.flink.cdc.common.factories.Factory  |  16 +
 .../sink/ElasticsearchDataSinkFactoryTest.java     | 157 ++++++++
 .../sink/ElasticsearchDataSinkITCaseTest.java      | 410 +++++++++++++++++++++
 .../src/test/resources/log4j2-test.properties      |  25 ++
 .../src/test/resources/testcontainers.properties   |   1 +
 .../flink-cdc-pipeline-connectors/pom.xml          |   1 +
 22 files changed, 2643 insertions(+)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml
new file mode 100644
index 000000000..157a260c3
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml
@@ -0,0 +1,222 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-cdc-pipeline-connectors</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>flink-cdc-pipeline-connector-elasticsearch</artifactId>
+    <packaging>jar</packaging>
+
+    <name>flink-cdc-pipeline-connector-elasticsearch</name>
+    <url>http://maven.apache.org</url>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <elasticsearch.version>8.12.1</elasticsearch.version>
+        <flink.version>1.18.0</flink.version>
+        <scala.binary.version>2.12</scala.binary.version>
+        <jackson.version>2.13.2</jackson.version>
+        
<surefire.module.config>--add-opens=java.base/java.util=ALL-UNNAMED</surefire.module.config>
+        <testcontainers.version>1.16.0</testcontainers.version>
+        <httpclient.version>4.5.13</httpclient.version>
+        <junit.jupiter.version>5.7.1</junit.jupiter.version>
+        <assertj.version>3.18.1</assertj.version>
+        <junit.version>4.13.2</junit.version>
+        <slf4j.version>1.7.32</slf4j.version>
+        <junit.platform.version>1.10.2</junit.platform.version>
+        <paimon.version>0.7.0-incubating</paimon.version>
+        <hadoop.version>2.8.5</hadoop.version>
+        <hive.version>2.3.9</hive.version>
+        <mockito.version>3.4.6</mockito.version>
+        <jakarta.json.version>2.0.2</jakarta.json.version>
+    </properties>
+
+    <dependencies>
+        <!-- Flink Dependencies -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-base</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-json</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>${httpclient.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Jackson Databind -->
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+
+        <!-- Testcontainers -->
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Elasticsearch Clients -->
+        <dependency>
+            <groupId>co.elastic.clients</groupId>
+            <artifactId>elasticsearch-java</artifactId>
+            <version>${elasticsearch.version}</version>
+        </dependency>
+
+        <!-- SLF4J API -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+
+        <!-- Testing -->
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <version>${junit.jupiter.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <version>${junit.jupiter.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>${assertj.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Architecture Tests -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-architecture-tests-test</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-architecture-tests-production</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.platform</groupId>
+            <artifactId>junit-platform-launcher</artifactId>
+            <version>${junit.platform.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-cdc-composer</artifactId>
+            <version>${revision}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>jakarta.json</groupId>
+                <artifactId>jakarta.json-api</artifactId>
+                <version>${jakarta.json.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-core</artifactId>
+                <version>${jackson.version}</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.2.4</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            
<createDependencyReducedPom>false</createDependencyReducedPom>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.8.1</version>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java
new file mode 100644
index 000000000..17ec76fd4
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.elasticsearch.config;
+
+import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig;
+
+import org.apache.http.HttpHost;
+
+import java.io.Serializable;
+import java.util.List;
+
+/** Elasticsearch DataSink Options reference {@link ElasticsearchSinkOptions}. 
*/
+public class ElasticsearchSinkOptions implements Serializable {
+
+    private final int maxBatchSize;
+    private final int maxInFlightRequests;
+    private final int maxBufferedRequests;
+    private final long maxBatchSizeInBytes;
+    private final long maxTimeInBufferMS;
+    private final long maxRecordSizeInBytes;
+    private final NetworkConfig networkConfig;
+
+    /** Constructor for ElasticsearchSinkOptions. */
+    public ElasticsearchSinkOptions(
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes,
+            NetworkConfig networkConfig) {
+        this.maxBatchSize = maxBatchSize;
+        this.maxInFlightRequests = maxInFlightRequests;
+        this.maxBufferedRequests = maxBufferedRequests;
+        this.maxBatchSizeInBytes = maxBatchSizeInBytes;
+        this.maxTimeInBufferMS = maxTimeInBufferMS;
+        this.maxRecordSizeInBytes = maxRecordSizeInBytes;
+        this.networkConfig = networkConfig;
+    }
+
+    /** @return the maximum batch size */
+    public int getMaxBatchSize() {
+        return maxBatchSize;
+    }
+
+    /** @return the maximum number of in-flight requests */
+    public int getMaxInFlightRequests() {
+        return maxInFlightRequests;
+    }
+
+    /** @return the maximum number of buffered requests */
+    public int getMaxBufferedRequests() {
+        return maxBufferedRequests;
+    }
+
+    /** @return the maximum batch size in bytes */
+    public long getMaxBatchSizeInBytes() {
+        return maxBatchSizeInBytes;
+    }
+
+    /** @return the maximum time in buffer in milliseconds */
+    public long getMaxTimeInBufferMS() {
+        return maxTimeInBufferMS;
+    }
+
+    /** @return the maximum record size in bytes */
+    public long getMaxRecordSizeInBytes() {
+        return maxRecordSizeInBytes;
+    }
+
+    /** @return the network configuration */
+    public NetworkConfig getNetworkConfig() {
+        return networkConfig;
+    }
+
+    /** @return the list of Elasticsearch hosts */
+    public List<HttpHost> getHosts() {
+        return networkConfig.getHosts();
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ColumnType.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ColumnType.java
new file mode 100644
index 000000000..1a3721634
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ColumnType.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.elasticsearch.serializer;
+
+/**
+ * Enumeration of column types supported in the Elasticsearch connector. These 
types represent the
+ * various data types that can be used in database columns and are relevant 
for serialization and
+ * deserialization processes.
+ */
+public enum ColumnType {
+    BOOLEAN,
+    TINYINT,
+    SMALLINT,
+    INTEGER,
+    BIGINT,
+    FLOAT,
+    DOUBLE,
+    CHAR,
+    VARCHAR,
+    BINARY,
+    VARBINARY,
+    DECIMAL,
+    DATE,
+    TIME_WITHOUT_TIME_ZONE,
+    TIMESTAMP_WITHOUT_TIME_ZONE,
+    TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+    TIMESTAMP_WITH_TIME_ZONE,
+    ARRAY,
+    MAP,
+    ROW
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java
new file mode 100644
index 000000000..f60231103
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.elasticsearch.serializer;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.event.*;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.*;
+import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant;
+import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation;
+import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.cdc.common.types.DataTypeChecks.*;
+
+/** A serializer for Event to BulkOperationVariant. */
+public class ElasticsearchEventSerializer implements ElementConverter<Event, 
BulkOperationVariant> {
+    private final ObjectMapper objectMapper = new ObjectMapper();
+    private final Map<TableId, Schema> schemaMaps = new HashMap<>();
+    private final ConcurrentHashMap<TableId, 
List<ElasticsearchRowConverter.SerializationConverter>>
+            converterCache = new ConcurrentHashMap<>();
+
+    /** Format DATE type data. */
+    public static final DateTimeFormatter DATE_FORMATTER =
+            DateTimeFormatter.ofPattern("yyyy-MM-dd");
+
+    /** Format timestamp-related type data. */
+    public static final DateTimeFormatter DATE_TIME_FORMATTER =
+            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
+
+    /** ZoneId from pipeline config to support timestamp with local time zone. 
*/
+    private final ZoneId pipelineZoneId;
+
+    public ElasticsearchEventSerializer(ZoneId zoneId) {
+        this.pipelineZoneId = zoneId;
+    }
+
+    @Override
+    public BulkOperationVariant apply(Event event, SinkWriter.Context context) 
{
+        try {
+            if (event instanceof DataChangeEvent) {
+                return applyDataChangeEvent((DataChangeEvent) event);
+            } else if (event instanceof SchemaChangeEvent) {
+                IndexOperation<Map<String, Object>> indexOperation =
+                        applySchemaChangeEvent((SchemaChangeEvent) event);
+                if (indexOperation != null) {
+                    return indexOperation;
+                }
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to serialize event", e);
+        }
+        return null;
+    }
+
+    private IndexOperation<Map<String, Object>> applySchemaChangeEvent(
+            SchemaChangeEvent schemaChangeEvent) throws IOException {
+        TableId tableId = schemaChangeEvent.tableId();
+
+        if (schemaChangeEvent instanceof CreateTableEvent) {
+            Schema schema = ((CreateTableEvent) schemaChangeEvent).getSchema();
+            schemaMaps.put(tableId, schema);
+            return createSchemaIndexOperation(tableId, schema);
+        } else if (schemaChangeEvent instanceof AddColumnEvent
+                || schemaChangeEvent instanceof DropColumnEvent
+                || schemaChangeEvent instanceof RenameColumnEvent) {
+            if (!schemaMaps.containsKey(tableId)) {
+                throw new RuntimeException("Schema of " + tableId + " does not 
exist.");
+            }
+            Schema updatedSchema =
+                    
SchemaUtils.applySchemaChangeEvent(schemaMaps.get(tableId), schemaChangeEvent);
+            schemaMaps.put(tableId, updatedSchema);
+            return createSchemaIndexOperation(tableId, updatedSchema);
+        } else {
+            if (!schemaMaps.containsKey(tableId)) {
+                throw new RuntimeException("Schema of " + tableId + " does not 
exist.");
+            }
+            Schema updatedSchema =
+                    
SchemaUtils.applySchemaChangeEvent(schemaMaps.get(tableId), schemaChangeEvent);
+            schemaMaps.put(tableId, updatedSchema);
+        }
+        return null;
+    }
+
+    private IndexOperation<Map<String, Object>> createSchemaIndexOperation(
+            TableId tableId, Schema schema) {
+        Map<String, Object> schemaMap = new HashMap<>();
+        schemaMap.put(
+                "columns",
+                schema.getColumns().stream()
+                        .map(Column::asSummaryString)
+                        .collect(Collectors.toList()));
+        schemaMap.put("primaryKeys", schema.primaryKeys());
+        schemaMap.put("options", schema.options());
+
+        return new IndexOperation.Builder<Map<String, Object>>()
+                .index(tableId.toString())
+                .id(tableId.getTableName())
+                .document(schemaMap)
+                .build();
+    }
+
+    private BulkOperationVariant applyDataChangeEvent(DataChangeEvent event)
+            throws JsonProcessingException {
+        TableId tableId = event.tableId();
+        Schema schema = schemaMaps.get(tableId);
+        Preconditions.checkNotNull(schema, event.tableId() + " does not 
exist.");
+        Map<String, Object> valueMap;
+        OperationType op = event.op();
+
+        Object[] uniqueId =
+                generateUniqueId(
+                        op == OperationType.DELETE ? event.before() : 
event.after(), schema);
+        String id = 
Arrays.stream(uniqueId).map(Object::toString).collect(Collectors.joining("_"));
+
+        switch (op) {
+            case INSERT:
+            case REPLACE:
+            case UPDATE:
+                valueMap = serializeRecord(tableId, event.after(), schema, 
pipelineZoneId);
+                return new IndexOperation.Builder<>()
+                        .index(tableId.toString())
+                        .id(id)
+                        .document(valueMap)
+                        .build();
+            case DELETE:
+                return new 
DeleteOperation.Builder().index(tableId.toString()).id(id).build();
+            default:
+                throw new UnsupportedOperationException("Unsupported Operation 
" + op);
+        }
+    }
+
+    private Object[] generateUniqueId(RecordData recordData, Schema schema) {
+        List<String> primaryKeys = schema.primaryKeys();
+        return primaryKeys.stream()
+                .map(
+                        primaryKey -> {
+                            Column column =
+                                    schema.getColumns().stream()
+                                            .filter(col -> 
col.getName().equals(primaryKey))
+                                            .findFirst()
+                                            .orElseThrow(
+                                                    () ->
+                                                            new 
IllegalStateException(
+                                                                    "Primary 
key column not found: "
+                                                                            + 
primaryKey));
+                            int index = schema.getColumns().indexOf(column);
+                            return getFieldValue(recordData, column.getType(), 
index);
+                        })
+                .toArray();
+    }
+
+    private Object getFieldValue(RecordData recordData, DataType dataType, int 
index) {
+        switch (dataType.getTypeRoot()) {
+            case BOOLEAN:
+                return recordData.getBoolean(index);
+            case TINYINT:
+                return recordData.getByte(index);
+            case SMALLINT:
+                return recordData.getShort(index);
+            case INTEGER:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+                return recordData.getInt(index);
+            case BIGINT:
+                return recordData.getLong(index);
+            case FLOAT:
+                return recordData.getFloat(index);
+            case DOUBLE:
+                return recordData.getDouble(index);
+            case CHAR:
+            case VARCHAR:
+                return recordData.getString(index);
+            case DECIMAL:
+                return recordData.getDecimal(index, getPrecision(dataType), 
getScale(dataType));
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return recordData.getTimestamp(index, getPrecision(dataType));
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return recordData.getLocalZonedTimestampData(index, 
getPrecision(dataType));
+            case TIMESTAMP_WITH_TIME_ZONE:
+                return recordData.getZonedTimestamp(index, 
getPrecision(dataType));
+            case BINARY:
+            case VARBINARY:
+                return recordData.getBinary(index);
+            case ARRAY:
+                return recordData.getArray(index);
+            case MAP:
+                return recordData.getMap(index);
+            case ROW:
+                return recordData.getRow(index, getFieldCount(dataType));
+            default:
+                throw new IllegalArgumentException("Unsupported type: " + 
dataType);
+        }
+    }
+
+    public Map<String, Object> serializeRecord(
+            TableId tableId, RecordData recordData, Schema schema, ZoneId 
pipelineZoneId) {
+        List<Column> columns = schema.getColumns();
+        Map<String, Object> record = new HashMap<>();
+        Preconditions.checkState(
+                columns.size() == recordData.getArity(),
+                "Column size does not match the data size.");
+
+        List<ElasticsearchRowConverter.SerializationConverter> converters =
+                getOrCreateConverters(tableId, schema);
+
+        for (int i = 0; i < recordData.getArity(); i++) {
+            Column column = columns.get(i);
+            Object field = converters.get(i).serialize(i, recordData);
+            record.put(column.getName(), field);
+        }
+
+        return record;
+    }
+
+    private List<ElasticsearchRowConverter.SerializationConverter> 
getOrCreateConverters(
+            TableId tableId, Schema schema) {
+        return converterCache.computeIfAbsent(
+                tableId,
+                id -> {
+                    List<ElasticsearchRowConverter.SerializationConverter> 
converters =
+                            new ArrayList<>();
+                    for (Column column : schema.getColumns()) {
+                        ColumnType columnType =
+                                
ColumnType.valueOf(column.getType().getTypeRoot().name());
+                        converters.add(
+                                
ElasticsearchRowConverter.createNullableExternalConverter(
+                                        columnType, pipelineZoneId));
+                    }
+                    return converters;
+                });
+    }
+
+    @Override
+    public void open(Sink.InitContext context) {
+        ElementConverter.super.open(context);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchRowConverter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchRowConverter.java
new file mode 100644
index 000000000..948c012a5
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchRowConverter.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.elasticsearch.serializer;
+
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.TimestampData;
+
+import java.time.*;
+import java.time.format.DateTimeFormatter;
+
+/** Converter class for serializing row data to Elasticsearch compatible 
formats. */
+public class ElasticsearchRowConverter {
+
+    // Date and time formatters for various temporal types
+    private static final DateTimeFormatter DATE_FORMATTER =
+            DateTimeFormatter.ofPattern("yyyy-MM-dd");
+    private static final DateTimeFormatter TIME_FORMATTER =
+            DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS");
+    private static final DateTimeFormatter DATE_TIME_FORMATTER =
+            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
+
+    /**
+     * Creates a nullable external converter for the given column type and 
time zone.
+     *
+     * @param columnType The type of the column to convert.
+     * @param zoneId The time zone to use for temporal conversions.
+     * @return A SerializationConverter that can handle null values.
+     */
+    public static SerializationConverter createNullableExternalConverter(
+            ColumnType columnType, java.time.ZoneId zoneId) {
+        return 
wrapIntoNullableExternalConverter(createExternalConverter(columnType, zoneId));
+    }
+
+    /**
+     * Wraps a SerializationConverter to handle null values.
+     *
+     * @param serializationConverter The original SerializationConverter.
+     * @return A SerializationConverter that returns null for null input.
+     */
+    static SerializationConverter wrapIntoNullableExternalConverter(
+            SerializationConverter serializationConverter) {
+        return (pos, data) -> {
+            if (data == null || data.isNullAt(pos)) {
+                return null;
+            } else {
+                return serializationConverter.serialize(pos, data);
+            }
+        };
+    }
+
+    /**
+     * Creates an external converter for the given column type and time zone.
+     *
+     * @param columnType The type of the column to convert.
+     * @param zoneId The time zone to use for temporal conversions.
+     * @return A SerializationConverter for the specified column type.
+     */
+    static SerializationConverter createExternalConverter(
+            ColumnType columnType, java.time.ZoneId zoneId) {
+        switch (columnType) {
+                // Basic types
+            case BOOLEAN:
+                return (pos, data) -> data.getBoolean(pos);
+            case INTEGER:
+                return (pos, data) -> data.getInt(pos);
+            case DOUBLE:
+                return (pos, data) -> data.getDouble(pos);
+            case VARCHAR:
+            case CHAR:
+                return (pos, data) -> data.getString(pos).toString();
+            case FLOAT:
+                return (pos, data) -> data.getFloat(pos);
+            case BIGINT:
+                return (pos, data) -> data.getLong(pos);
+            case TINYINT:
+                return (pos, data) -> data.getByte(pos);
+            case SMALLINT:
+                return (pos, data) -> data.getShort(pos);
+            case BINARY:
+            case VARBINARY:
+                return (pos, data) -> data.getBinary(pos);
+
+                // Decimal type
+            case DECIMAL:
+                return (pos, data) -> {
+                    DecimalData decimalData = data.getDecimal(pos, 17, 2);
+                    return decimalData != null ? 
decimalData.toBigDecimal().toString() : null;
+                };
+
+                // Date and time types
+            case DATE:
+                return (pos, data) -> {
+                    int days = data.getInt(pos);
+                    LocalDate date = LocalDate.ofEpochDay(days);
+                    return date.format(DATE_FORMATTER);
+                };
+            case TIME_WITHOUT_TIME_ZONE:
+                return (pos, data) -> {
+                    int milliseconds = data.getInt(pos);
+                    LocalTime time = LocalTime.ofNanoOfDay(milliseconds * 
1_000_000L);
+                    return time.format(TIME_FORMATTER);
+                };
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return (pos, data) -> {
+                    long milliseconds = data.getTimestamp(pos, 
6).getMillisecond();
+                    LocalDateTime dateTime =
+                            LocalDateTime.ofEpochSecond(
+                                    milliseconds / 1000,
+                                    (int) (milliseconds % 1000) * 1_000_000,
+                                    java.time.ZoneOffset.UTC);
+                    return dateTime.format(DATE_TIME_FORMATTER);
+                };
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return (pos, data) -> {
+                    try {
+                        if (data.isNullAt(pos)) {
+                            return null;
+                        }
+
+                        // Debug logging
+                        System.out.println(
+                                "Processing TIMESTAMP_WITH_LOCAL_TIME_ZONE at 
position: " + pos);
+                        System.out.println("Data type: " + 
data.getClass().getName());
+
+                        // Attempt to retrieve timestamp data
+                        long milliseconds;
+                        int nanos = 0;
+
+                        try {
+                            // Try using getTimestamp method
+                            TimestampData timestampData = 
data.getTimestamp(pos, 6);
+                            milliseconds = timestampData.getMillisecond();
+                            nanos = timestampData.getNanoOfMillisecond();
+                        } catch (Exception e) {
+                            // Fallback to getLong if getTimestamp fails
+                            milliseconds = data.getLong(pos);
+                        }
+
+                        // Create Instant object
+                        Instant instant =
+                                Instant.ofEpochSecond(
+                                        milliseconds / 1000,
+                                        (milliseconds % 1000) * 1_000_000L + 
nanos);
+
+                        // Format timestamp using UTC timezone
+                        return DateTimeFormatter.ISO_INSTANT.format(instant);
+
+                    } catch (Exception e) {
+                        return "ERROR_PROCESSING_TIMESTAMP";
+                    }
+                };
+            case TIMESTAMP_WITH_TIME_ZONE:
+                return (pos, data) -> {
+                    long milliseconds = data.getTimestamp(pos, 
6).getMillisecond();
+                    LocalDateTime dateTime =
+                            LocalDateTime.ofEpochSecond(
+                                    milliseconds / 1000,
+                                    (int) (milliseconds % 1000) * 1_000_000,
+                                    
zoneId.getRules().getOffset(java.time.Instant.now()));
+                    return dateTime.atZone(zoneId).format(DATE_TIME_FORMATTER);
+                };
+
+                // Complex types
+            case ARRAY:
+                return (pos, data) -> data.getArray(pos);
+            case MAP:
+                return (pos, data) -> data.getMap(pos);
+            case ROW:
+                return (pos, data) -> {
+                    RecordData rowData = data.getRow(pos, 5); // Assuming 5 
fields, adjust as needed
+                    return rowData != null ? rowData.toString() : null;
+                };
+            default:
+                throw new IllegalArgumentException("Unsupported column type: " 
+ columnType);
+        }
+    }
+
+    /** Interface for serialization converters. */
+    public interface SerializationConverter {
+        /**
+         * Serializes a value from the given position in the RecordData.
+         *
+         * @param pos The position of the value in the RecordData.
+         * @param data The RecordData containing the value to serialize.
+         * @return The serialized object.
+         */
+        Object serialize(int pos, RecordData data);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java
new file mode 100644
index 000000000..3507774f1
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.elasticsearch.sink;
+
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.sink.DataSink;
+import org.apache.flink.cdc.common.sink.EventSinkProvider;
+import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
+import org.apache.flink.cdc.common.sink.MetadataApplier;
+import 
org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions;
+import 
org.apache.flink.cdc.connectors.elasticsearch.serializer.ElasticsearchEventSerializer;
+import 
org.apache.flink.cdc.connectors.elasticsearch.v2.Elasticsearch8AsyncSinkBuilder;
+
+import org.apache.http.HttpHost;
+
+import java.io.Serializable;
+import java.time.ZoneId;
+
+/**
+ * A {@link DataSink} implementation for Elasticsearch connector.
+ *
+ * @param <InputT> The input type of the sink.
+ */
+public class ElasticsearchDataSink<InputT> implements DataSink, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The Elasticsearch sink options. */
+    private final ElasticsearchSinkOptions elasticsearchOptions;
+
+    /** The time zone ID for handling time-related operations. */
+    private final ZoneId zoneId;
+
+    /**
+     * Constructs an ElasticsearchDataSink with the given options and time 
zone.
+     *
+     * @param elasticsearchOptions The Elasticsearch sink options.
+     * @param zoneId The time zone ID for handling time-related operations.
+     */
+    public ElasticsearchDataSink(ElasticsearchSinkOptions 
elasticsearchOptions, ZoneId zoneId) {
+        this.elasticsearchOptions = elasticsearchOptions;
+        this.zoneId = zoneId;
+    }
+
+    @Override
+    public EventSinkProvider getEventSinkProvider() {
+        return FlinkSinkProvider.of(
+                new Elasticsearch8AsyncSinkBuilder<Event>()
+                        .setHosts(elasticsearchOptions.getHosts().toArray(new 
HttpHost[0]))
+                        .setElementConverter(
+                                new 
ElasticsearchEventSerializer(ZoneId.systemDefault()))
+                        
.setMaxBatchSize(elasticsearchOptions.getMaxBatchSize())
+                        
.setMaxInFlightRequests(elasticsearchOptions.getMaxInFlightRequests())
+                        
.setMaxBufferedRequests(elasticsearchOptions.getMaxBufferedRequests())
+                        
.setMaxBatchSizeInBytes(elasticsearchOptions.getMaxBatchSizeInBytes())
+                        
.setMaxTimeInBufferMS(elasticsearchOptions.getMaxTimeInBufferMS())
+                        
.setMaxRecordSizeInBytes(elasticsearchOptions.getMaxRecordSizeInBytes())
+                        .build());
+    }
+
+    @Override
+    public MetadataApplier getMetadataApplier() {
+        // Currently, no metadata application is needed for Elasticsearch
+        return schemaChangeEvent -> {};
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java
new file mode 100644
index 000000000..a84c4691a
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.elasticsearch.sink;
+
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.factories.DataSinkFactory;
+import org.apache.flink.cdc.common.factories.FactoryHelper;
+import org.apache.flink.cdc.common.pipeline.PipelineOptions;
+import org.apache.flink.cdc.common.sink.DataSink;
+import 
org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions;
+import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig;
+
+import org.apache.http.HttpHost;
+
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.*;
+
+/** Factory for creating {@link ElasticsearchDataSink}. */
+public class ElasticsearchDataSinkFactory implements DataSinkFactory {
+
+    public static final String IDENTIFIER = "elasticsearch";
+
+    @Override
+    public DataSink createDataSink(Context context) {
+        FactoryHelper.createFactoryHelper(this, context).validate();
+
+        Configuration configuration =
+                
Configuration.fromMap(context.getFactoryConfiguration().toMap());
+        ZoneId zoneId = determineZoneId(context);
+
+        ElasticsearchSinkOptions sinkOptions = 
buildSinkConnectorOptions(configuration);
+        return new ElasticsearchDataSink(sinkOptions, zoneId);
+    }
+
+    private ZoneId determineZoneId(Context context) {
+        String configuredZone =
+                
context.getPipelineConfiguration().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE);
+        String defaultZone = 
PipelineOptions.PIPELINE_LOCAL_TIME_ZONE.defaultValue();
+
+        return Objects.equals(configuredZone, defaultZone)
+                ? ZoneId.systemDefault()
+                : ZoneId.of(configuredZone);
+    }
+
+    private ElasticsearchSinkOptions buildSinkConnectorOptions(Configuration 
cdcConfig) {
+        List<HttpHost> hosts = parseHosts(cdcConfig.get(HOSTS));
+        String username = cdcConfig.get(USERNAME);
+        String password = cdcConfig.get(PASSWORD);
+        NetworkConfig networkConfig =
+                new NetworkConfig(hosts, username, password, null, null, null);
+        return new ElasticsearchSinkOptions(
+                cdcConfig.get(MAX_BATCH_SIZE),
+                cdcConfig.get(MAX_IN_FLIGHT_REQUESTS),
+                cdcConfig.get(MAX_BUFFERED_REQUESTS),
+                cdcConfig.get(MAX_BATCH_SIZE_IN_BYTES),
+                cdcConfig.get(MAX_TIME_IN_BUFFER_MS),
+                cdcConfig.get(MAX_RECORD_SIZE_IN_BYTES),
+                networkConfig);
+    }
+
+    private List<HttpHost> parseHosts(String hostsStr) {
+        return Arrays.stream(hostsStr.split(","))
+                .map(HttpHost::create)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> requiredOptions = new HashSet<>();
+        requiredOptions.add(HOSTS);
+        requiredOptions.add(INDEX);
+        return requiredOptions;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> optionalOptions = new HashSet<>();
+        optionalOptions.add(MAX_BATCH_SIZE);
+        optionalOptions.add(MAX_IN_FLIGHT_REQUESTS);
+        optionalOptions.add(MAX_BUFFERED_REQUESTS);
+        optionalOptions.add(MAX_BATCH_SIZE_IN_BYTES);
+        optionalOptions.add(MAX_TIME_IN_BUFFER_MS);
+        optionalOptions.add(MAX_RECORD_SIZE_IN_BYTES);
+        optionalOptions.add(USERNAME);
+        optionalOptions.add(PASSWORD);
+        return optionalOptions;
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java
new file mode 100644
index 000000000..c5e414527
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.elasticsearch.sink;
+
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+
+/** Options for the Elasticsearch data sink. */
+public class ElasticsearchDataSinkOptions {
+
+    /** The comma-separated list of Elasticsearch hosts to connect to. */
+    public static final ConfigOption<String> HOSTS =
+            ConfigOptions.key("hosts")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The comma-separated list of Elasticsearch hosts 
to connect to.");
+
+    /** The maximum number of actions to buffer for each bulk request. */
+    public static final ConfigOption<Integer> MAX_BATCH_SIZE =
+            ConfigOptions.key("batch.size.max")
+                    .intType()
+                    .defaultValue(500)
+                    .withDescription(
+                            "The maximum number of actions to buffer for each 
bulk request.");
+
+    /** The maximum number of concurrent requests that the sink will try to 
execute. */
+    public static final ConfigOption<Integer> MAX_IN_FLIGHT_REQUESTS =
+            ConfigOptions.key("inflight.requests.max")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "The maximum number of concurrent requests that 
the sink will try to execute.");
+
+    /** The maximum number of requests to keep in the in-memory buffer. */
+    public static final ConfigOption<Integer> MAX_BUFFERED_REQUESTS =
+            ConfigOptions.key("buffered.requests.max")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The maximum number of requests to keep in the 
in-memory buffer.");
+
+    /** The maximum size of batch requests in bytes. */
+    public static final ConfigOption<Long> MAX_BATCH_SIZE_IN_BYTES =
+            ConfigOptions.key("batch.size.max.bytes")
+                    .longType()
+                    .defaultValue(5L * 1024L * 1024L)
+                    .withDescription("The maximum size of batch requests in 
bytes.");
+
+    /** The maximum time to wait for incomplete batches before flushing. */
+    public static final ConfigOption<Long> MAX_TIME_IN_BUFFER_MS =
+            ConfigOptions.key("buffer.time.max.ms")
+                    .longType()
+                    .defaultValue(5000L)
+                    .withDescription(
+                            "The maximum time to wait for incomplete batches 
before flushing.");
+
+    /** The maximum size of a single record in bytes. */
+    public static final ConfigOption<Long> MAX_RECORD_SIZE_IN_BYTES =
+            ConfigOptions.key("record.size.max.bytes")
+                    .longType()
+                    .defaultValue(10L * 1024L * 1024L)
+                    .withDescription("The maximum size of a single record in 
bytes.");
+
+    /** The Elasticsearch index name to write to. */
+    public static final ConfigOption<String> INDEX =
+            ConfigOptions.key("index")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The Elasticsearch index name to write 
to.");
+
+    /** The username for Elasticsearch authentication. */
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The username for Elasticsearch 
authentication.");
+
+    /** The password for Elasticsearch authentication. */
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The password for Elasticsearch 
authentication.");
+
+    private ElasticsearchDataSinkOptions() {
+        // This class should not be instantiated
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchMetadataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchMetadataApplier.java
new file mode 100644
index 000000000..9206ed7bc
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchMetadataApplier.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.elasticsearch.sink;
+
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.sink.MetadataApplier;
+
+/**
+ * A metadata applier for Elasticsearch that handles schema change events. 
This class is responsible
+ * for applying metadata changes to the Elasticsearch index based on schema 
change events from the
+ * CDC source.
+ */
+public class ElasticsearchMetadataApplier implements MetadataApplier {
+
+    /**
+     * Applies the given schema change event to the Elasticsearch index.
+     *
+     * @param event The schema change event to apply.
+     */
+    @Override
+    public void applySchemaChange(SchemaChangeEvent event) {
+        // TODO: Implement the logic to apply schema changes to Elasticsearch
+        // This might include:
+        // - Creating new indices
+        // - Updating existing index mappings
+        // - Handling column additions or deletions
+        throw new UnsupportedOperationException(
+                "Schema change application is not yet implemented for 
Elasticsearch.");
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSink.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSink.java
new file mode 100644
index 000000000..960c64ba2
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSink.java
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.flink.cdc.connectors.elasticsearch.v2;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Elasticsearch8AsyncSink Apache Flink's Async Sink that submits Operations 
into an Elasticsearch
+ * cluster.
+ *
+ * @param <InputT> type of records that will be converted into {@link 
Operation} see {@link
+ *     Elasticsearch8AsyncSinkBuilder} on how to construct valid instances
+ */
+public class Elasticsearch8AsyncSink<InputT> extends AsyncSinkBase<InputT, 
Operation> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(Elasticsearch8AsyncSink.class);
+
+    @VisibleForTesting protected final NetworkConfig networkConfig;
+
+    protected Elasticsearch8AsyncSink(
+            ElementConverter<InputT, Operation> converter,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInByte,
+            NetworkConfig networkConfig) {
+        super(
+                converter,
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInByte);
+
+        this.networkConfig = networkConfig;
+    }
+
+    @Override
+    public StatefulSinkWriter<InputT, BufferedRequestState<Operation>> 
createWriter(
+            InitContext context) {
+        return new Elasticsearch8AsyncWriter<>(
+                getElementConverter(),
+                context,
+                getMaxBatchSize(),
+                getMaxInFlightRequests(),
+                getMaxBufferedRequests(),
+                getMaxBatchSizeInBytes(),
+                getMaxTimeInBufferMS(),
+                getMaxRecordSizeInBytes(),
+                networkConfig,
+                Collections.emptyList());
+    }
+
+    @Override
+    public StatefulSinkWriter<InputT, BufferedRequestState<Operation>> 
restoreWriter(
+            InitContext context, Collection<BufferedRequestState<Operation>> 
recoveredState) {
+        return new Elasticsearch8AsyncWriter<>(
+                getElementConverter(),
+                context,
+                getMaxBatchSize(),
+                getMaxInFlightRequests(),
+                getMaxBufferedRequests(),
+                getMaxBatchSizeInBytes(),
+                getMaxTimeInBufferMS(),
+                getMaxRecordSizeInBytes(),
+                networkConfig,
+                recoveredState);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<BufferedRequestState<Operation>> 
getWriterStateSerializer() {
+        return new Elasticsearch8AsyncSinkSerializer();
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkBuilder.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkBuilder.java
new file mode 100644
index 000000000..2e6598e09
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkBuilder.java
@@ -0,0 +1,258 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.flink.cdc.connectors.elasticsearch.v2;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant;
+import co.elastic.clients.transport.TransportUtils;
+import org.apache.http.Header;
+import org.apache.http.HttpHost;
+import org.apache.http.conn.ssl.TrustAllStrategy;
+import org.apache.http.ssl.SSLContexts;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Elasticsearch8AsyncSinkBuilder The builder to construct the 
Elasticsearch8Sink {@link
+ * Elasticsearch8AsyncSink}.
+ *
+ * @param <InputT> the type of records to be sunk into an Elasticsearch cluster
+ */
+public class Elasticsearch8AsyncSinkBuilder<InputT>
+        extends AsyncSinkBaseBuilder<InputT, Operation, 
Elasticsearch8AsyncSinkBuilder<InputT>> {
+
+    private static final int DEFAULT_MAX_BATCH_SIZE = 500;
+    private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
+    private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 10_000;
+    private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 5 * 1024 * 1024;
+    private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
+    private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 1024 * 1024;
+
+    /** The hosts where the Elasticsearch cluster is reachable. */
+    private List<HttpHost> hosts;
+
+    /** The headers to be sent with the requests made to Elasticsearch 
cluster. */
+    private List<Header> headers;
+
+    /** The username to authenticate the connection with the Elasticsearch 
cluster. */
+    private String username;
+
+    /** The password to authenticate the connection with the Elasticsearch 
cluster. */
+    private String password;
+
+    /**
+     * The element converter that will be called on every stream element to be 
processed and
+     * buffered.
+     */
+    private ElementConverter<InputT, BulkOperationVariant> elementConverter;
+
+    private SerializableSupplier<SSLContext> sslContextSupplier;
+
+    private SerializableSupplier<HostnameVerifier> sslHostnameVerifier;
+
+    /**
+     * setHosts set the hosts where the Elasticsearch cluster is reachable.
+     *
+     * @param hosts the hosts address
+     * @return {@code Elasticsearch8AsyncSinkBuilder}
+     */
+    public Elasticsearch8AsyncSinkBuilder<InputT> setHosts(HttpHost... hosts) {
+        checkNotNull(hosts);
+        checkArgument(hosts.length > 0, "Hosts cannot be empty");
+        this.hosts = Arrays.asList(hosts);
+        return this;
+    }
+
+    /**
+     * setHeaders set the headers to be sent with the requests made to 
Elasticsearch cluster..
+     *
+     * @param headers the headers
+     * @return {@code Elasticsearch8AsyncSinkBuilder}
+     */
+    public Elasticsearch8AsyncSinkBuilder<InputT> setHeaders(Header... 
headers) {
+        checkNotNull(hosts);
+        checkArgument(headers.length > 0, "Headers cannot be empty");
+        this.headers = Arrays.asList(headers);
+        return this;
+    }
+
+    /**
+     * Allows to bypass the certificates chain validation and connect to 
insecure network endpoints
+     * (for example, servers which use self-signed certificates).
+     *
+     * @return this builder
+     */
+    public Elasticsearch8AsyncSinkBuilder<InputT> allowInsecure() {
+        this.sslContextSupplier =
+                () -> {
+                    try {
+                        return SSLContexts.custom()
+                                .loadTrustMaterial(TrustAllStrategy.INSTANCE)
+                                .build();
+                    } catch (final NoSuchAlgorithmException
+                            | KeyStoreException
+                            | KeyManagementException ex) {
+                        throw new IllegalStateException("Unable to create 
custom SSL context", ex);
+                    }
+                };
+        return this;
+    }
+
+    /**
+     * Set the certificate fingerprint to be used to verify the HTTPS 
connection.
+     *
+     * @param certificateFingerprint the certificate fingerprint
+     * @return {@code Elasticsearch8AsyncSinkBuilder}
+     */
+    public Elasticsearch8AsyncSinkBuilder<InputT> setCertificateFingerprint(
+            String certificateFingerprint) {
+        checkNotNull(certificateFingerprint, "certificateFingerprint must not 
be null");
+        this.sslContextSupplier =
+                () -> 
TransportUtils.sslContextFromCaFingerprint(certificateFingerprint);
+        return this;
+    }
+
+    /**
+     * Sets the supplier for getting an {@link SSLContext} instance.
+     *
+     * @param sslContextSupplier the serializable SSLContext supplier function
+     * @return this builder
+     */
+    public Elasticsearch8AsyncSinkBuilder<InputT> setSslContextSupplier(
+            SerializableSupplier<SSLContext> sslContextSupplier) {
+        this.sslContextSupplier = checkNotNull(sslContextSupplier);
+        return this;
+    }
+
+    /**
+     * Sets the supplier for getting an SSL {@link HostnameVerifier} instance.
+     *
+     * @param sslHostnameVerifierSupplier the serializable hostname verifier 
supplier function
+     * @return this builder
+     */
+    public Elasticsearch8AsyncSinkBuilder<InputT> setSslHostnameVerifier(
+            SerializableSupplier<HostnameVerifier> 
sslHostnameVerifierSupplier) {
+        this.sslHostnameVerifier = sslHostnameVerifierSupplier;
+        return this;
+    }
+
+    /**
+     * setUsername set the username to authenticate the connection with the 
Elasticsearch cluster.
+     *
+     * @param username the auth username
+     * @return {@code Elasticsearch8AsyncSinkBuilder}
+     */
+    public Elasticsearch8AsyncSinkBuilder<InputT> setUsername(String username) 
{
+        checkNotNull(username, "Username must not be null");
+        this.username = username;
+        return this;
+    }
+
+    /**
+     * setPassword set the password to authenticate the connection with the 
Elasticsearch cluster.
+     *
+     * @param password the auth password
+     * @return {@code Elasticsearch8AsyncSinkBuilder}
+     */
+    public Elasticsearch8AsyncSinkBuilder<InputT> setPassword(String password) 
{
+        checkNotNull(password, "Password must not be null");
+        this.password = password;
+        return this;
+    }
+
+    /**
+     * setElementConverter set the element converter that will be called at 
every stream element to
+     * be processed and buffered.
+     *
+     * @param elementConverter elementConverter operation
+     * @return {@code Elasticsearch8AsyncSinkBuilder}
+     */
+    public Elasticsearch8AsyncSinkBuilder<InputT> setElementConverter(
+            ElementConverter<InputT, BulkOperationVariant> elementConverter) {
+        checkNotNull(elementConverter);
+        this.elementConverter = elementConverter;
+        return this;
+    }
+
+    public static <T> Elasticsearch8AsyncSinkBuilder<T> builder() {
+        return new Elasticsearch8AsyncSinkBuilder<>();
+    }
+
+    /**
+     * Creates an ElasticsearchSink instance.
+     *
+     * @return {@link Elasticsearch8AsyncSink}
+     */
+    @Override
+    public Elasticsearch8AsyncSink<InputT> build() {
+        return new Elasticsearch8AsyncSink<>(
+                buildOperationConverter(elementConverter),
+                
Optional.ofNullable(getMaxBatchSize()).orElse(DEFAULT_MAX_BATCH_SIZE),
+                Optional.ofNullable(getMaxInFlightRequests())
+                        .orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS),
+                
Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS),
+                
Optional.ofNullable(getMaxBatchSizeInBytes()).orElse(DEFAULT_MAX_BATCH_SIZE_IN_B),
+                
Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS),
+                
Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B),
+                buildNetworkConfig());
+    }
+
+    private OperationConverter<InputT> buildOperationConverter(
+            ElementConverter<InputT, BulkOperationVariant> converter) {
+        return converter != null ? new OperationConverter<>(converter) : null;
+    }
+
+    private NetworkConfig buildNetworkConfig() {
+        checkArgument(!hosts.isEmpty(), "Hosts cannot be empty.");
+        return new NetworkConfig(
+                hosts, username, password, headers, sslContextSupplier, 
sslHostnameVerifier);
+    }
+
+    /** A wrapper that evolves the Operation, since a BulkOperationVariant is 
not Serializable. */
+    public static class OperationConverter<T> implements ElementConverter<T, 
Operation> {
+        private final ElementConverter<T, BulkOperationVariant> converter;
+
+        public OperationConverter(ElementConverter<T, BulkOperationVariant> 
converter) {
+            this.converter = converter;
+        }
+
+        @Override
+        public Operation apply(T element, SinkWriter.Context context) {
+            return new Operation(converter.apply(element, context));
+        }
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkSerializer.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkSerializer.java
new file mode 100644
index 000000000..df3520274
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkSerializer.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.flink.cdc.connectors.elasticsearch.v2;
+
+import 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+/** Elasticsearch8AsyncSinkSerializer is used to serialize and deserialize an 
Operation. */
+public class Elasticsearch8AsyncSinkSerializer extends 
AsyncSinkWriterStateSerializer<Operation> {
+
+    @Override
+    protected void serializeRequestToStream(Operation request, 
DataOutputStream out) {
+        new OperationSerializer().serialize(request, out);
+    }
+
+    @Override
+    protected Operation deserializeRequestFromStream(long requestSize, 
DataInputStream in) {
+        return new OperationSerializer().deserialize(requestSize, in);
+    }
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java
new file mode 100644
index 000000000..7753d824b
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java
@@ -0,0 +1,217 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.flink.cdc.connectors.elasticsearch.v2;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
+import co.elastic.clients.elasticsearch.core.BulkRequest;
+import co.elastic.clients.elasticsearch.core.BulkResponse;
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
+import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.net.NoRouteToHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Elasticsearch8AsyncWriter Apache Flink's Async Sink Writer that submits 
Operations into an
+ * Elasticsearch cluster.
+ *
+ * @param <InputT> type of Operations
+ */
+public class Elasticsearch8AsyncWriter<InputT> extends AsyncSinkWriter<InputT, 
Operation> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(Elasticsearch8AsyncWriter.class);
+
+    private final ElasticsearchAsyncClient esClient;
+
+    private boolean close = false;
+
+    private final Counter numRecordsOutErrorsCounter;
+    /**
+     * A counter to track number of records that are returned by Elasticsearch 
as failed and then
+     * retried by this writer.
+     */
+    private final Counter numRecordsSendPartialFailureCounter;
+    /** A counter to track the number of bulk requests that are sent to 
Elasticsearch. */
+    private final Counter numRequestSubmittedCounter;
+
+    private static final FatalExceptionClassifier 
ELASTICSEARCH_FATAL_EXCEPTION_CLASSIFIER =
+            FatalExceptionClassifier.createChain(
+                    new FatalExceptionClassifier(
+                            err ->
+                                    err instanceof NoRouteToHostException
+                                            || err instanceof ConnectException,
+                            err ->
+                                    new FlinkRuntimeException(
+                                            "Could not connect to 
Elasticsearch cluster using the provided hosts",
+                                            err)));
+
+    public Elasticsearch8AsyncWriter(
+            ElementConverter<InputT, Operation> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes,
+            NetworkConfig networkConfig,
+            Collection<BufferedRequestState<Operation>> state) {
+        super(
+                elementConverter,
+                context,
+                AsyncSinkWriterConfiguration.builder()
+                        .setMaxBatchSize(maxBatchSize)
+                        .setMaxBatchSizeInBytes(maxBatchSizeInBytes)
+                        .setMaxInFlightRequests(maxInFlightRequests)
+                        .setMaxBufferedRequests(maxBufferedRequests)
+                        .setMaxTimeInBufferMS(maxTimeInBufferMS)
+                        .setMaxRecordSizeInBytes(maxRecordSizeInBytes)
+                        .build(),
+                state);
+
+        this.esClient = networkConfig.createEsClient();
+        final SinkWriterMetricGroup metricGroup = context.metricGroup();
+        checkNotNull(metricGroup);
+
+        this.numRecordsOutErrorsCounter = 
metricGroup.getNumRecordsOutErrorsCounter();
+        this.numRecordsSendPartialFailureCounter =
+                metricGroup.counter("numRecordsSendPartialFailure");
+        this.numRequestSubmittedCounter = 
metricGroup.counter("numRequestSubmitted");
+    }
+
+    @Override
+    protected void submitRequestEntries(
+            List<Operation> requestEntries, Consumer<List<Operation>> 
requestResult) {
+        numRequestSubmittedCounter.inc();
+        LOG.debug("submitRequestEntries with {} items", requestEntries.size());
+
+        BulkRequest.Builder br = new BulkRequest.Builder();
+        for (Operation operation : requestEntries) {
+            //            if (operation.getBulkOperationVariant() == 
null&&requestEntries.size()==1)
+            // {
+            //                return; // 跳过当前循环迭代
+            //            }
+            if (operation.getBulkOperationVariant() == null) {
+                continue;
+            }
+            br.operations(new 
BulkOperation(operation.getBulkOperationVariant()));
+        }
+
+        esClient.bulk(br.build())
+                .whenComplete(
+                        (response, error) -> {
+                            if (error != null) {
+                                handleFailedRequest(requestEntries, 
requestResult, error);
+                            } else if (response.errors()) {
+                                handlePartiallyFailedRequest(
+                                        requestEntries, requestResult, 
response);
+                            } else {
+                                handleSuccessfulRequest(requestResult, 
response);
+                            }
+                        });
+    }
+
+    private void handleFailedRequest(
+            List<Operation> requestEntries,
+            Consumer<List<Operation>> requestResult,
+            Throwable error) {
+        LOG.warn(
+                "The BulkRequest of {} operation(s) has failed due to: {}",
+                requestEntries.size(),
+                error.getMessage());
+        LOG.debug("The BulkRequest has failed", error);
+        numRecordsOutErrorsCounter.inc(requestEntries.size());
+
+        if (isRetryable(error.getCause())) {
+            requestResult.accept(requestEntries);
+        }
+    }
+
+    private void handlePartiallyFailedRequest(
+            List<Operation> requestEntries,
+            Consumer<List<Operation>> requestResult,
+            BulkResponse response) {
+        LOG.debug("The BulkRequest has failed partially. Response: {}", 
response);
+        ArrayList<Operation> failedItems = new ArrayList<>();
+        for (int i = 0; i < response.items().size(); i++) {
+            BulkResponseItem item = response.items().get(i);
+            if (item.error() != null) {
+                failedItems.add(requestEntries.get(i));
+
+                LOG.error("Failed operation {}: {}", i, item.error().reason());
+            }
+        }
+
+        numRecordsOutErrorsCounter.inc(failedItems.size());
+        numRecordsSendPartialFailureCounter.inc(failedItems.size());
+        LOG.info(
+                "The BulkRequest with {} operation(s) has {} failure(s). It 
took {}ms",
+                requestEntries.size(),
+                failedItems.size(),
+                response.took());
+        requestResult.accept(failedItems);
+    }
+
+    private void handleSuccessfulRequest(
+            Consumer<List<Operation>> requestResult, BulkResponse response) {
+        LOG.debug(
+                "The BulkRequest of {} operation(s) completed successfully. It 
took {}ms",
+                response.items().size(),
+                response.took());
+        requestResult.accept(Collections.emptyList());
+    }
+
+    private boolean isRetryable(Throwable error) {
+        return !ELASTICSEARCH_FATAL_EXCEPTION_CLASSIFIER.isFatal(error, 
getFatalExceptionCons());
+    }
+
+    @Override
+    protected long getSizeInBytes(Operation requestEntry) {
+        return new OperationSerializer().size(requestEntry);
+    }
+
+    @Override
+    public void close() {
+        if (!close) {
+            close = true;
+            esClient.shutdown();
+        }
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/NetworkConfig.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/NetworkConfig.java
new file mode 100644
index 000000000..2a6f4d8a6
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/NetworkConfig.java
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.flink.cdc.connectors.elasticsearch.v2;
+
+import org.apache.flink.util.function.SerializableSupplier;
+
+import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
+import co.elastic.clients.json.jackson.JacksonJsonpMapper;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
+import org.apache.http.Header;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+
+import javax.annotation.Nullable;
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** A factory that creates valid ElasticsearchClient instances. */
+public class NetworkConfig implements Serializable {
+    private final List<HttpHost> hosts;
+
+    private final List<Header> headers;
+
+    private final String username;
+
+    private final String password;
+
+    @Nullable private final SerializableSupplier<SSLContext> 
sslContextSupplier;
+
+    @Nullable private final SerializableSupplier<HostnameVerifier> 
sslHostnameVerifier;
+
+    public NetworkConfig(
+            List<HttpHost> hosts,
+            String username,
+            String password,
+            List<Header> headers,
+            SerializableSupplier<SSLContext> sslContextSupplier,
+            SerializableSupplier<HostnameVerifier> sslHostnameVerifier) {
+        checkState(!hosts.isEmpty(), "Hosts must not be empty");
+        this.hosts = hosts;
+        this.username = username;
+        this.password = password;
+        this.headers = headers;
+        this.sslContextSupplier = sslContextSupplier;
+        this.sslHostnameVerifier = sslHostnameVerifier;
+    }
+
+    public ElasticsearchAsyncClient createEsClient() {
+        return new ElasticsearchAsyncClient(
+                new RestClientTransport(this.getRestClient(), new 
JacksonJsonpMapper()));
+    }
+
+    private RestClient getRestClient() {
+        RestClientBuilder restClientBuilder =
+                RestClient.builder(hosts.toArray(new HttpHost[0]))
+                        .setHttpClientConfigCallback(
+                                httpClientBuilder -> {
+                                    if (username != null && password != null) {
+                                        
httpClientBuilder.setDefaultCredentialsProvider(
+                                                getCredentials());
+                                    }
+
+                                    if (sslContextSupplier != null) {
+                                        
httpClientBuilder.setSSLContext(sslContextSupplier.get());
+                                    }
+
+                                    if (sslHostnameVerifier != null) {
+                                        
httpClientBuilder.setSSLHostnameVerifier(
+                                                sslHostnameVerifier.get());
+                                    }
+
+                                    return httpClientBuilder;
+                                });
+
+        if (headers != null) {
+            restClientBuilder.setDefaultHeaders(headers.toArray(new 
Header[0]));
+        }
+
+        return restClientBuilder.build();
+    }
+
+    private CredentialsProvider getCredentials() {
+        final CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
+
+        credentialsProvider.setCredentials(
+                AuthScope.ANY, new UsernamePasswordCredentials(username, 
password));
+
+        return credentialsProvider;
+    }
+
+    public List<HttpHost> getHosts() {
+        return this.hosts;
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Operation.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Operation.java
new file mode 100644
index 000000000..0d2ba0cca
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Operation.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.flink.cdc.connectors.elasticsearch.v2;
+
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** A single stream element which contains a BulkOperationVariant. */
+public class Operation implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final BulkOperationVariant bulkOperationVariant;
+
+    public Operation(BulkOperationVariant bulkOperation) {
+        this.bulkOperationVariant = bulkOperation;
+    }
+
+    public BulkOperationVariant getBulkOperationVariant() {
+        return bulkOperationVariant;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(bulkOperationVariant);
+    }
+
+    @Override
+    public String toString() {
+        return "Operation{" + "bulkOperationVariant=" + bulkOperationVariant + 
'}';
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/OperationSerializer.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/OperationSerializer.java
new file mode 100644
index 000000000..127ce11b1
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/OperationSerializer.java
@@ -0,0 +1,50 @@
+package org.apache.flink.cdc.connectors.elasticsearch.v2;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+/** OperationSerializer is responsible for serialization and deserialization 
of an Operation. */
+public class OperationSerializer {
+    private final Kryo kryo = new Kryo();
+
+    public OperationSerializer() {
+        kryo.setRegistrationRequired(false);
+        kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
+    }
+
+    public void serialize(Operation request, DataOutputStream out) {
+        try (Output output = new Output(out)) {
+            kryo.writeObjectOrNull(output, request, Operation.class);
+            output.flush();
+        }
+    }
+
+    public Operation deserialize(long requestSize, DataInputStream in) {
+        try (Input input = new Input(in, (int) requestSize)) {
+            if (input.available() > 0) {
+                return kryo.readObject(input, Operation.class);
+            } else {
+                return null; // Skip if input stream is empty
+            }
+        } catch (Exception e) {
+            // Handle the exception as needed, e.g., log the error
+            System.err.println("Failed to deserialize Operation: " + 
e.getMessage());
+            return null;
+        }
+    }
+
+    public int size(Operation operation) {
+        ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+        try (Output output = new Output(byteArrayOutputStream)) {
+            kryo.writeObjectOrNull(output, operation, Operation.class);
+            output.flush();
+            return byteArrayOutputStream.size();
+        }
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory
new file mode 100644
index 000000000..327aa202e
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkFactory
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactoryTest.java
new file mode 100644
index 000000000..6195b5f06
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactoryTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.elasticsearch.sink;
+
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.factories.DataSinkFactory;
+import org.apache.flink.cdc.common.factories.FactoryHelper;
+import org.apache.flink.cdc.common.sink.DataSink;
+import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
+import org.apache.flink.table.api.ValidationException;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.*;
+
+/** Tests for {@link ElasticsearchDataSinkFactory}. */
+public class ElasticsearchDataSinkFactoryTest {
+
+    private static final String ELASTICSEARCH_IDENTIFIER = "elasticsearch";
+
+    /** Tests the creation of an Elasticsearch DataSink with valid 
configuration. */
+    @Test
+    void testCreateDataSink() {
+        DataSinkFactory sinkFactory = getElasticsearchDataSinkFactory();
+
+        Configuration conf = createValidConfiguration();
+        DataSink dataSink = createDataSink(sinkFactory, conf);
+
+        
Assertions.assertThat(dataSink).isInstanceOf(ElasticsearchDataSink.class);
+    }
+
+    /** Tests the behavior when a required option is missing. */
+    @Test
+    void testLackRequiredOption() {
+        DataSinkFactory sinkFactory = getElasticsearchDataSinkFactory();
+
+        Map<String, String> options = createValidOptions();
+
+        List<String> requiredKeys = getRequiredKeys(sinkFactory);
+        for (String requiredKey : requiredKeys) {
+            Map<String, String> remainingOptions = new HashMap<>(options);
+            remainingOptions.remove(requiredKey);
+            Configuration conf = Configuration.fromMap(remainingOptions);
+
+            Assertions.assertThatThrownBy(() -> createDataSink(sinkFactory, 
conf))
+                    .isInstanceOf(ValidationException.class)
+                    .hasMessageContaining(
+                            String.format(
+                                    "One or more required options are 
missing.\n\n"
+                                            + "Missing required options 
are:\n\n"
+                                            + "%s",
+                                    requiredKey));
+        }
+    }
+
+    /** Tests the behavior when an unsupported option is provided. */
+    @Test
+    void testUnsupportedOption() {
+        DataSinkFactory sinkFactory = getElasticsearchDataSinkFactory();
+
+        Configuration conf =
+                Configuration.fromMap(
+                        ImmutableMap.<String, String>builder()
+                                .put("hosts", "localhost:9200")
+                                .put("index", "test-index")
+                                .put("unsupported_key", "unsupported_value")
+                                .build());
+
+        Assertions.assertThatThrownBy(() -> createDataSink(sinkFactory, conf))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Unsupported options found for 'elasticsearch'.\n\n"
+                                + "Unsupported options:\n\n"
+                                + "unsupported_key");
+    }
+
+    /**
+     * Tests the creation of an Elasticsearch DataSink with valid 
configuration using prefixed
+     * options.
+     */
+    @Test
+    void testPrefixedRequiredOption() {
+        DataSinkFactory sinkFactory = getElasticsearchDataSinkFactory();
+
+        Configuration conf =
+                Configuration.fromMap(
+                        ImmutableMap.<String, String>builder()
+                                .put("hosts", "localhost:9200")
+                                .put("index", "test-index")
+                                .put("batch.size.max", "500")
+                                .put("inflight.requests.max", "5")
+                                .build());
+
+        DataSink dataSink = createDataSink(sinkFactory, conf);
+        
Assertions.assertThat(dataSink).isInstanceOf(ElasticsearchDataSink.class);
+    }
+    // Helper methods
+
+    private DataSinkFactory getElasticsearchDataSinkFactory() {
+        DataSinkFactory sinkFactory =
+                FactoryDiscoveryUtils.getFactoryByIdentifier(
+                        ELASTICSEARCH_IDENTIFIER, DataSinkFactory.class);
+        
Assertions.assertThat(sinkFactory).isInstanceOf(ElasticsearchDataSinkFactory.class);
+        return sinkFactory;
+    }
+
+    private Configuration createValidConfiguration() {
+        return Configuration.fromMap(
+                ImmutableMap.<String, String>builder()
+                        .put("hosts", "localhost:9200")
+                        .put("index", "test-index")
+                        .build());
+    }
+
+    private Map<String, String> createValidOptions() {
+        Map<String, String> options = new HashMap<>();
+        options.put("hosts", "localhost:9200");
+        options.put("index", "test-index");
+        return options;
+    }
+
+    private List<String> getRequiredKeys(DataSinkFactory sinkFactory) {
+        return sinkFactory.requiredOptions().stream()
+                .map(ConfigOption::key)
+                .collect(Collectors.toList());
+    }
+
+    private DataSink createDataSink(DataSinkFactory sinkFactory, Configuration 
conf) {
+        return sinkFactory.createDataSink(
+                new FactoryHelper.DefaultContext(
+                        conf, conf, 
Thread.currentThread().getContextClassLoader()));
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java
new file mode 100644
index 000000000..5f9011b12
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.elasticsearch.sink;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.*;
+import org.apache.flink.cdc.common.schema.PhysicalColumn;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
+import org.apache.flink.cdc.common.types.DataTypes;
+import 
org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions;
+import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch.core.GetRequest;
+import co.elastic.clients.elasticsearch.core.GetResponse;
+import co.elastic.clients.json.jackson.JacksonJsonpMapper;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.RestClient;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase tests for {@link ElasticsearchDataSink}. */
+@Testcontainers
+public class ElasticsearchDataSinkITCaseTest {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(ElasticsearchDataSinkITCaseTest.class);
+    private static final String ELASTICSEARCH_VERSION = "8.12.0";
+    private static final DockerImageName ELASTICSEARCH_IMAGE =
+            DockerImageName.parse(
+                            "docker.elastic.co/elasticsearch/elasticsearch:"
+                                    + ELASTICSEARCH_VERSION)
+                    
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
+
+    @Container
+    private static final ElasticsearchContainer ELASTICSEARCH_CONTAINER =
+            createElasticsearchContainer();
+
+    private ElasticsearchClient client;
+
+    @BeforeEach
+    public void setUp() {
+        client = createElasticsearchClient();
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        if (client != null) {
+            client.shutdown();
+        }
+    }
+
+    @Test
+    public void testElasticsearchSink() throws Exception {
+        TableId tableId = TableId.tableId("default", "schema", "table");
+        List<Event> events = createTestEvents(tableId);
+
+        runJobWithEvents(events);
+
+        verifyInsertedData(
+                tableId,
+                "1",
+                1,
+                1.0,
+                "value1",
+                true,
+                (byte) 1,
+                (short) 2,
+                100L,
+                1.0f,
+                new BigDecimal("10.00"),
+                1633024800000L);
+        verifyInsertedData(
+                tableId,
+                "2",
+                2,
+                2.0,
+                "value2",
+                false,
+                (byte) 2,
+                (short) 3,
+                200L,
+                2.0f,
+                new BigDecimal("20.00"),
+                1633111200000L);
+    }
+
+    @Test
+    public void testElasticsearchInsertAndDelete() throws Exception {
+        TableId tableId = TableId.tableId("default", "schema", "table");
+        List<Event> events = createTestEventsWithDelete(tableId);
+
+        runJobWithEvents(events);
+
+        verifyDeletedData(tableId, "2");
+    }
+
+    @Test
+    public void testElasticsearchAddColumn() throws Exception {
+        TableId tableId = TableId.tableId("default", "schema", "table");
+        List<Event> events = createTestEventsWithAddColumn(tableId);
+
+        runJobWithEvents(events);
+
+        verifyInsertedDataWithNewColumn(tableId, "3", 3, 3.0, "value3", true);
+    }
+
+    private static ElasticsearchContainer createElasticsearchContainer() {
+        return new ElasticsearchContainer(ELASTICSEARCH_IMAGE)
+                .withEnv("discovery.type", "single-node")
+                .withEnv("xpack.security.enabled", "false")
+                .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
+                .withEnv("logger.org.elasticsearch", "ERROR")
+                .withLogConsumer(new Slf4jLogConsumer(LOG))
+                
.waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(5)));
+    }
+
+    private ElasticsearchClient createElasticsearchClient() {
+        RestClientTransport transport =
+                new RestClientTransport(
+                        RestClient.builder(
+                                        new HttpHost(
+                                                
ELASTICSEARCH_CONTAINER.getHost(),
+                                                
ELASTICSEARCH_CONTAINER.getFirstMappedPort(),
+                                                "http"))
+                                .build(),
+                        new JacksonJsonpMapper());
+        return new ElasticsearchClient(transport);
+    }
+
+    private void runJobWithEvents(List<Event> events) throws Exception {
+        ElasticsearchSinkOptions options = createSinkOptions();
+        StreamExecutionEnvironment env = createStreamExecutionEnvironment();
+        ElasticsearchDataSink<Event> sink =
+                new ElasticsearchDataSink<>(options, ZoneId.systemDefault());
+
+        DataStream<Event> stream = env.fromCollection(events, 
TypeInformation.of(Event.class));
+        Sink<Event> elasticsearchSink = ((FlinkSinkProvider) 
sink.getEventSinkProvider()).getSink();
+        stream.sinkTo(elasticsearchSink);
+
+        env.execute("Elasticsearch Sink Test");
+    }
+
+    private ElasticsearchSinkOptions createSinkOptions() {
+        NetworkConfig networkConfig =
+                new NetworkConfig(
+                        Collections.singletonList(
+                                new HttpHost(
+                                        ELASTICSEARCH_CONTAINER.getHost(),
+                                        
ELASTICSEARCH_CONTAINER.getFirstMappedPort())),
+                        null,
+                        null,
+                        null,
+                        null,
+                        null);
+
+        return new ElasticsearchSinkOptions(
+                5, 1, 10, 50 * 1024 * 1024, 1000, 10 * 1024 * 1024, 
networkConfig);
+    }
+
+    private StreamExecutionEnvironment createStreamExecutionEnvironment() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(3000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        return env;
+    }
+
+    private void verifyInsertedData(
+            TableId tableId,
+            String id,
+            int expectedId,
+            double expectedNumber,
+            String expectedName,
+            boolean expectedBool,
+            byte expectedTinyint,
+            short expectedSmallint,
+            long expectedBigint,
+            float expectedFloat,
+            BigDecimal expectedDecimal,
+            long expectedTimestamp)
+            throws Exception {
+        GetRequest getRequest = new 
GetRequest.Builder().index(tableId.toString()).id(id).build();
+        GetResponse<Map> response = client.get(getRequest, Map.class);
+
+        LOG.debug("Response source: {}", response.source());
+
+        assertThat(response.source()).isNotNull();
+        assertThat(((Number) 
response.source().get("id")).intValue()).isEqualTo(expectedId);
+        assertThat(((Number) response.source().get("number")).doubleValue())
+                .isEqualTo(expectedNumber);
+        assertThat(response.source().get("name")).isEqualTo(expectedName);
+        assertThat(response.source().get("bool")).isEqualTo(expectedBool);
+        assertThat(((Number) response.source().get("tinyint")).byteValue())
+                .isEqualTo(expectedTinyint);
+        assertThat(((Number) response.source().get("smallint")).shortValue())
+                .isEqualTo(expectedSmallint);
+        assertThat(((Number) response.source().get("bigint")).longValue())
+                .isEqualTo(expectedBigint);
+        assertThat(((Number) 
response.source().get("float")).floatValue()).isEqualTo(expectedFloat);
+        assertThat(new BigDecimal(response.source().get("decimal").toString()))
+                .isEqualTo(expectedDecimal);
+
+        String timestampString = response.source().get("timestamp").toString();
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss.SSSSSS");
+        LocalDateTime dateTime = LocalDateTime.parse(timestampString, 
formatter);
+        long timestampMillis = 
dateTime.toInstant(ZoneOffset.UTC).toEpochMilli();
+        assertThat(timestampMillis).isEqualTo(expectedTimestamp);
+    }
+
+    private void verifyDeletedData(TableId tableId, String id) throws 
Exception {
+        GetRequest getRequest = new 
GetRequest.Builder().index(tableId.toString()).id(id).build();
+        GetResponse<Map> response = client.get(getRequest, Map.class);
+
+        assertThat(response.source()).isNull();
+    }
+
+    private void verifyInsertedDataWithNewColumn(
+            TableId tableId,
+            String id,
+            int expectedId,
+            double expectedNumber,
+            String expectedName,
+            boolean expectedExtraBool)
+            throws Exception {
+        GetRequest getRequest = new 
GetRequest.Builder().index(tableId.toString()).id(id).build();
+        GetResponse<Map> response = client.get(getRequest, Map.class);
+
+        assertThat(response.source()).isNotNull();
+        assertThat(response.source().get("id")).isEqualTo(expectedId);
+        assertThat(response.source().get("number")).isEqualTo(expectedNumber);
+        assertThat(response.source().get("name")).isEqualTo(expectedName);
+        
assertThat(response.source().get("extra_bool")).isEqualTo(expectedExtraBool);
+    }
+
+    private List<Event> createTestEvents(TableId tableId) {
+        Schema schema =
+                Schema.newBuilder()
+                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
+                        .column(new PhysicalColumn("number", 
DataTypes.DOUBLE(), null))
+                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(17), null))
+                        .column(new PhysicalColumn("bool", 
DataTypes.BOOLEAN(), null))
+                        .column(new PhysicalColumn("tinyint", 
DataTypes.TINYINT(), null))
+                        .column(new PhysicalColumn("smallint", 
DataTypes.SMALLINT(), null))
+                        .column(new PhysicalColumn("bigint", 
DataTypes.BIGINT(), null))
+                        .column(new PhysicalColumn("float", DataTypes.FLOAT(), 
null))
+                        .column(new PhysicalColumn("decimal", 
DataTypes.DECIMAL(10, 2), null))
+                        .column(new PhysicalColumn("timestamp", 
DataTypes.TIMESTAMP(), null))
+                        .primaryKey("id")
+                        .build();
+
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(
+                        org.apache.flink.cdc.common.types.RowType.of(
+                                
org.apache.flink.cdc.common.types.DataTypes.INT(),
+                                
org.apache.flink.cdc.common.types.DataTypes.DOUBLE(),
+                                
org.apache.flink.cdc.common.types.DataTypes.STRING(),
+                                
org.apache.flink.cdc.common.types.DataTypes.BOOLEAN(),
+                                
org.apache.flink.cdc.common.types.DataTypes.TINYINT(),
+                                
org.apache.flink.cdc.common.types.DataTypes.SMALLINT(),
+                                
org.apache.flink.cdc.common.types.DataTypes.BIGINT(),
+                                
org.apache.flink.cdc.common.types.DataTypes.FLOAT(),
+                                
org.apache.flink.cdc.common.types.DataTypes.DECIMAL(10, 2),
+                                
org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP()));
+
+        return Arrays.asList(
+                new CreateTableEvent(tableId, schema),
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    1,
+                                    1.0,
+                                    BinaryStringData.fromString("value1"),
+                                    true,
+                                    (byte) 1,
+                                    (short) 2,
+                                    100L,
+                                    1.0f,
+                                    DecimalData.fromBigDecimal(new 
BigDecimal("10.00"), 10, 2),
+                                    TimestampData.fromMillis(1633024800000L)
+                                })),
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    2,
+                                    2.0,
+                                    BinaryStringData.fromString("value2"),
+                                    false,
+                                    (byte) 2,
+                                    (short) 3,
+                                    200L,
+                                    2.0f,
+                                    DecimalData.fromBigDecimal(new 
BigDecimal("20.00"), 10, 2),
+                                    TimestampData.fromMillis(1633111200000L)
+                                })));
+    }
+
+    private List<Event> createTestEventsWithDelete(TableId tableId) {
+        Schema schema =
+                Schema.newBuilder()
+                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
+                        .column(new PhysicalColumn("number", 
DataTypes.DOUBLE(), null))
+                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(17), null))
+                        .primaryKey("id")
+                        .build();
+
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(
+                        org.apache.flink.cdc.common.types.RowType.of(
+                                
org.apache.flink.cdc.common.types.DataTypes.INT(),
+                                
org.apache.flink.cdc.common.types.DataTypes.DOUBLE(),
+                                
org.apache.flink.cdc.common.types.DataTypes.STRING()));
+
+        BinaryRecordData insertRecord1 =
+                generator.generate(new Object[] {1, 1.0, 
BinaryStringData.fromString("value1")});
+        BinaryRecordData insertRecord2 =
+                generator.generate(new Object[] {2, 2.0, 
BinaryStringData.fromString("value2")});
+        BinaryRecordData deleteRecord =
+                generator.generate(new Object[] {2, 2.0, 
BinaryStringData.fromString("value2")});
+
+        return Arrays.asList(
+                new CreateTableEvent(tableId, schema),
+                DataChangeEvent.insertEvent(tableId, insertRecord1),
+                DataChangeEvent.insertEvent(tableId, insertRecord2),
+                DataChangeEvent.deleteEvent(tableId, deleteRecord));
+    }
+
+    private List<Event> createTestEventsWithAddColumn(TableId tableId) {
+        Schema schema =
+                Schema.newBuilder()
+                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
+                        .column(new PhysicalColumn("number", 
DataTypes.DOUBLE(), null))
+                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(17), null))
+                        .primaryKey("id")
+                        .build();
+
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(
+                        org.apache.flink.cdc.common.types.RowType.of(
+                                
org.apache.flink.cdc.common.types.DataTypes.INT(),
+                                
org.apache.flink.cdc.common.types.DataTypes.DOUBLE(),
+                                
org.apache.flink.cdc.common.types.DataTypes.STRING(),
+                                
org.apache.flink.cdc.common.types.DataTypes.BOOLEAN()));
+
+        return Arrays.asList(
+                new CreateTableEvent(tableId, schema),
+                new AddColumnEvent(
+                        tableId,
+                        Collections.singletonList(
+                                new AddColumnEvent.ColumnWithPosition(
+                                        new PhysicalColumn(
+                                                "extra_bool", 
DataTypes.BOOLEAN(), null)))),
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    3, 3.0, 
BinaryStringData.fromString("value3"), true
+                                })));
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/resources/log4j2-test.properties
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..1eebcb739
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/resources/log4j2-test.properties
@@ -0,0 +1,25 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#      http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = INFO
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/resources/testcontainers.properties
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/resources/testcontainers.properties
new file mode 100644
index 000000000..d32577249
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/resources/testcontainers.properties
@@ -0,0 +1 @@
+ryuk.container.image=testcontainers/ryuk:0.3.3
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml
index 424372f4c..9d0f9769b 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml
@@ -34,6 +34,7 @@ limitations under the License.
         <module>flink-cdc-pipeline-connector-starrocks</module>
         <module>flink-cdc-pipeline-connector-kafka</module>
         <module>flink-cdc-pipeline-connector-paimon</module>
+        <module>flink-cdc-pipeline-connector-elasticsearch</module>
     </modules>
 
     <dependencies>

Reply via email to