This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit 8137f9d746ba296aeff70b0b7e18cdbcce4a142d
Author: wuzexian <[email protected]>
AuthorDate: Fri Aug 2 09:30:05 2024 +0800

    [FLINK-35894][pipeline-connector][es] Support for ElasticSearch 6, 7 
versions
    
    This closes #3495.
---
 .github/labeler.yml                                |   2 +
 .../pom.xml                                        |  63 ++++-
 .../config/ElasticsearchSinkOptions.java           |  23 +-
 .../elasticsearch/serializer/ColumnType.java       |  46 ----
 .../serializer/Elasticsearch6RequestCreator.java   |  74 ++++++
 .../serializer/ElasticsearchEventSerializer.java   | 162 +++++-------
 .../serializer/ElasticsearchRowConverter.java      | 220 +++++++++--------
 .../elasticsearch/sink/ElasticsearchDataSink.java  | 111 ++++++++-
 .../sink/ElasticsearchDataSinkFactory.java         |  46 +++-
 .../sink/ElasticsearchDataSinkOptions.java         |  12 +-
 .../sink/ElasticsearchMetadataApplier.java         |  45 ----
 .../elasticsearch/v2/Elasticsearch8AsyncSink.java  |  41 ++-
 .../v2/Elasticsearch8AsyncSinkBuilder.java         |   2 -
 .../v2/Elasticsearch8AsyncSinkSerializer.java      |  25 +-
 .../v2/Elasticsearch8AsyncWriter.java              |   6 -
 .../connectors/elasticsearch/v2/NetworkConfig.java |   2 -
 .../cdc/connectors/elasticsearch/v2/Operation.java |  17 +-
 .../elasticsearch/v2/OperationSerializer.java      |  49 +++-
 .../sink/Elasticsearch6DataSinkITCaseTest.java     | 275 +++++++++++++++++++++
 .../sink/Elasticsearch7DataSinkITCaseTest.java     | 274 ++++++++++++++++++++
 .../sink/ElasticsearchDataSinkFactoryTest.java     |  42 ++--
 .../sink/ElasticsearchDataSinkITCaseTest.java      | 150 +----------
 .../sink/utils/ElasticsearchTestUtils.java         | 189 ++++++++++++++
 .../src/test/resources/testcontainers.properties   |   1 -
 tools/ci/license_check.rb                          |  15 +-
 25 files changed, 1374 insertions(+), 518 deletions(-)

diff --git a/.github/labeler.yml b/.github/labeler.yml
index 20bacf366..267690ec4 100644
--- a/.github/labeler.yml
+++ b/.github/labeler.yml
@@ -84,3 +84,5 @@ doris-pipeline-connector:
   - 
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/**/*
 starrocks-pipeline-connector:
   - 
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/**/*
+elasticsearch-pipeline-connector:
+  - 
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/**/*
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml
index 157a260c3..d9b5cd4b6 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml
@@ -1,3 +1,20 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
 <project xmlns="http://maven.apache.org/POM/4.0.0";
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
@@ -20,31 +37,21 @@
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <elasticsearch.version>8.12.1</elasticsearch.version>
         <flink.version>1.18.0</flink.version>
-        <scala.binary.version>2.12</scala.binary.version>
+        <scala.binary.version>4.0</scala.binary.version>
         <jackson.version>2.13.2</jackson.version>
         
<surefire.module.config>--add-opens=java.base/java.util=ALL-UNNAMED</surefire.module.config>
         <testcontainers.version>1.16.0</testcontainers.version>
+        
<flink.connector.elasticsearch.version>4.0-SNAPSHOT</flink.connector.elasticsearch.version>
         <httpclient.version>4.5.13</httpclient.version>
         <junit.jupiter.version>5.7.1</junit.jupiter.version>
         <assertj.version>3.18.1</assertj.version>
         <junit.version>4.13.2</junit.version>
         <slf4j.version>1.7.32</slf4j.version>
         <junit.platform.version>1.10.2</junit.platform.version>
-        <paimon.version>0.7.0-incubating</paimon.version>
-        <hadoop.version>2.8.5</hadoop.version>
-        <hive.version>2.3.9</hive.version>
-        <mockito.version>3.4.6</mockito.version>
         <jakarta.json.version>2.0.2</jakarta.json.version>
     </properties>
 
     <dependencies>
-        <!-- Flink Dependencies -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-core</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-streaming-java</artifactId>
@@ -108,7 +115,27 @@
             <scope>test</scope>
         </dependency>
 
-        <!-- Elasticsearch Clients -->
+        <!-- Elasticsearch 6 -->
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>elasticsearch-rest-client</artifactId>
+            <version>${elasticsearch.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-sql-connector-elasticsearch6</artifactId>
+            <version>${flink.connector.elasticsearch.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-sql-connector-elasticsearch7</artifactId>
+            <version>${flink.connector.elasticsearch.version}</version>
+        </dependency>
+
+
+        <!-- Elasticsearch 8 Client -->
         <dependency>
             <groupId>co.elastic.clients</groupId>
             <artifactId>elasticsearch-java</artifactId>
@@ -204,6 +231,16 @@
                         </goals>
                         <configuration>
                             
<createDependencyReducedPom>false</createDependencyReducedPom>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
                         </configuration>
                     </execution>
                 </executions>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java
index 17ec76fd4..97e954520 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java
@@ -34,6 +34,9 @@ public class ElasticsearchSinkOptions implements Serializable 
{
     private final long maxTimeInBufferMS;
     private final long maxRecordSizeInBytes;
     private final NetworkConfig networkConfig;
+    private final int version;
+    private final String username;
+    private final String password;
 
     /** Constructor for ElasticsearchSinkOptions. */
     public ElasticsearchSinkOptions(
@@ -43,7 +46,10 @@ public class ElasticsearchSinkOptions implements 
Serializable {
             long maxBatchSizeInBytes,
             long maxTimeInBufferMS,
             long maxRecordSizeInBytes,
-            NetworkConfig networkConfig) {
+            NetworkConfig networkConfig,
+            int version,
+            String username,
+            String password) {
         this.maxBatchSize = maxBatchSize;
         this.maxInFlightRequests = maxInFlightRequests;
         this.maxBufferedRequests = maxBufferedRequests;
@@ -51,6 +57,9 @@ public class ElasticsearchSinkOptions implements Serializable 
{
         this.maxTimeInBufferMS = maxTimeInBufferMS;
         this.maxRecordSizeInBytes = maxRecordSizeInBytes;
         this.networkConfig = networkConfig;
+        this.version = version;
+        this.username = username;
+        this.password = password;
     }
 
     /** @return the maximum batch size */
@@ -92,4 +101,16 @@ public class ElasticsearchSinkOptions implements 
Serializable {
     public List<HttpHost> getHosts() {
         return networkConfig.getHosts();
     }
+
+    public int getVersion() {
+        return version;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ColumnType.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ColumnType.java
deleted file mode 100644
index 1a3721634..000000000
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ColumnType.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.connectors.elasticsearch.serializer;
-
-/**
- * Enumeration of column types supported in the Elasticsearch connector. These 
types represent the
- * various data types that can be used in database columns and are relevant 
for serialization and
- * deserialization processes.
- */
-public enum ColumnType {
-    BOOLEAN,
-    TINYINT,
-    SMALLINT,
-    INTEGER,
-    BIGINT,
-    FLOAT,
-    DOUBLE,
-    CHAR,
-    VARCHAR,
-    BINARY,
-    VARBINARY,
-    DECIMAL,
-    DATE,
-    TIME_WITHOUT_TIME_ZONE,
-    TIMESTAMP_WITHOUT_TIME_ZONE,
-    TIMESTAMP_WITH_LOCAL_TIME_ZONE,
-    TIMESTAMP_WITH_TIME_ZONE,
-    ARRAY,
-    MAP,
-    ROW
-}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/Elasticsearch6RequestCreator.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/Elasticsearch6RequestCreator.java
new file mode 100644
index 000000000..d2b962b4a
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/Elasticsearch6RequestCreator.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.elasticsearch.serializer;
+
+import 
org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.delete.DeleteRequest;
+import 
org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.index.IndexRequest;
+import 
org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.Requests;
+
+import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation;
+import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.Map;
+
+/**
+ * A utility class that creates Elasticsearch 6.x specific requests.
+ *
+ * <p>This class provides methods to create {@link IndexRequest} and {@link 
DeleteRequest} objects
+ * that are compatible with Elasticsearch 6.x based on the operations provided.
+ */
+public class Elasticsearch6RequestCreator {
+
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    /**
+     * Creates an Elasticsearch 6 IndexRequest.
+     *
+     * @param operation IndexOperation object.
+     * @return IndexRequest object.
+     */
+    public static IndexRequest createIndexRequest(IndexOperation<?> operation) 
{
+        // Convert the document to Map<String, Object>
+        Map<String, Object> documentMap =
+                objectMapper.convertValue(operation.document(), Map.class);
+
+        // Create and return IndexRequest, ensuring type field is set
+        return Requests.indexRequest()
+                .index(operation.index())
+                .type("_doc") // Assuming type is "_doc", adjust as necessary
+                .id(operation.id())
+                .source(documentMap);
+    }
+
+    /**
+     * Creates an Elasticsearch 6 DeleteRequest.
+     *
+     * @param operation DeleteOperation object.
+     * @return DeleteRequest object.
+     */
+    public static DeleteRequest createDeleteRequest(DeleteOperation operation) 
{
+        String index = operation.index();
+        String id = operation.id();
+
+        // Create and return DeleteRequest, ensuring type field is set
+        return Requests.deleteRequest(index)
+                .type("_doc") // Assuming type is "_doc", adjust as necessary
+                .id(id);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java
index f60231103..2968babb0 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java
@@ -20,10 +20,17 @@ package 
org.apache.flink.cdc.connectors.elasticsearch.serializer;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.cdc.common.data.RecordData;
-import org.apache.flink.cdc.common.event.*;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.DropColumnEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.OperationType;
+import org.apache.flink.cdc.common.event.RenameColumnEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
-import org.apache.flink.cdc.common.types.*;
 import org.apache.flink.cdc.common.utils.Preconditions;
 import org.apache.flink.cdc.common.utils.SchemaUtils;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
@@ -37,12 +44,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.cdc.common.types.DataTypeChecks.*;
-
 /** A serializer for Event to BulkOperationVariant. */
 public class ElasticsearchEventSerializer implements ElementConverter<Event, 
BulkOperationVariant> {
     private final ObjectMapper objectMapper = new ObjectMapper();
@@ -69,7 +78,7 @@ public class ElasticsearchEventSerializer implements 
ElementConverter<Event, Bul
     public BulkOperationVariant apply(Event event, SinkWriter.Context context) 
{
         try {
             if (event instanceof DataChangeEvent) {
-                return applyDataChangeEvent((DataChangeEvent) event);
+                return createBulkOperationVariant((DataChangeEvent) event);
             } else if (event instanceof SchemaChangeEvent) {
                 IndexOperation<Map<String, Object>> indexOperation =
                         applySchemaChangeEvent((SchemaChangeEvent) event);
@@ -86,11 +95,11 @@ public class ElasticsearchEventSerializer implements 
ElementConverter<Event, Bul
     private IndexOperation<Map<String, Object>> applySchemaChangeEvent(
             SchemaChangeEvent schemaChangeEvent) throws IOException {
         TableId tableId = schemaChangeEvent.tableId();
-
         if (schemaChangeEvent instanceof CreateTableEvent) {
             Schema schema = ((CreateTableEvent) schemaChangeEvent).getSchema();
             schemaMaps.put(tableId, schema);
-            return createSchemaIndexOperation(tableId, schema);
+            // Cache new converters
+            getOrCreateConverters(tableId, schema);
         } else if (schemaChangeEvent instanceof AddColumnEvent
                 || schemaChangeEvent instanceof DropColumnEvent
                 || schemaChangeEvent instanceof RenameColumnEvent) {
@@ -100,7 +109,8 @@ public class ElasticsearchEventSerializer implements 
ElementConverter<Event, Bul
             Schema updatedSchema =
                     
SchemaUtils.applySchemaChangeEvent(schemaMaps.get(tableId), schemaChangeEvent);
             schemaMaps.put(tableId, updatedSchema);
-            return createSchemaIndexOperation(tableId, updatedSchema);
+            // Update cached converters
+            getOrCreateConverters(tableId, updatedSchema);
         } else {
             if (!schemaMaps.containsKey(tableId)) {
                 throw new RuntimeException("Schema of " + tableId + " does not 
exist.");
@@ -108,41 +118,27 @@ public class ElasticsearchEventSerializer implements 
ElementConverter<Event, Bul
             Schema updatedSchema =
                     
SchemaUtils.applySchemaChangeEvent(schemaMaps.get(tableId), schemaChangeEvent);
             schemaMaps.put(tableId, updatedSchema);
+            // Update cached converters
+            getOrCreateConverters(tableId, updatedSchema);
         }
         return null;
     }
 
-    private IndexOperation<Map<String, Object>> createSchemaIndexOperation(
-            TableId tableId, Schema schema) {
-        Map<String, Object> schemaMap = new HashMap<>();
-        schemaMap.put(
-                "columns",
-                schema.getColumns().stream()
-                        .map(Column::asSummaryString)
-                        .collect(Collectors.toList()));
-        schemaMap.put("primaryKeys", schema.primaryKeys());
-        schemaMap.put("options", schema.options());
-
-        return new IndexOperation.Builder<Map<String, Object>>()
-                .index(tableId.toString())
-                .id(tableId.getTableName())
-                .document(schemaMap)
-                .build();
-    }
-
-    private BulkOperationVariant applyDataChangeEvent(DataChangeEvent event)
+    private BulkOperationVariant createBulkOperationVariant(DataChangeEvent 
event)
             throws JsonProcessingException {
         TableId tableId = event.tableId();
         Schema schema = schemaMaps.get(tableId);
         Preconditions.checkNotNull(schema, event.tableId() + " does not 
exist.");
+        // Ensure converters are cached
+        getOrCreateConverters(tableId, schema);
         Map<String, Object> valueMap;
         OperationType op = event.op();
-
         Object[] uniqueId =
                 generateUniqueId(
-                        op == OperationType.DELETE ? event.before() : 
event.after(), schema);
+                        op == OperationType.DELETE ? event.before() : 
event.after(),
+                        schema,
+                        tableId);
         String id = 
Arrays.stream(uniqueId).map(Object::toString).collect(Collectors.joining("_"));
-
         switch (op) {
             case INSERT:
             case REPLACE:
@@ -160,67 +156,37 @@ public class ElasticsearchEventSerializer implements 
ElementConverter<Event, Bul
         }
     }
 
-    private Object[] generateUniqueId(RecordData recordData, Schema schema) {
+    private Object[] generateUniqueId(RecordData recordData, Schema schema, 
TableId tableId) {
         List<String> primaryKeys = schema.primaryKeys();
-        return primaryKeys.stream()
-                .map(
-                        primaryKey -> {
-                            Column column =
-                                    schema.getColumns().stream()
-                                            .filter(col -> 
col.getName().equals(primaryKey))
-                                            .findFirst()
-                                            .orElseThrow(
-                                                    () ->
-                                                            new 
IllegalStateException(
-                                                                    "Primary 
key column not found: "
-                                                                            + 
primaryKey));
-                            int index = schema.getColumns().indexOf(column);
-                            return getFieldValue(recordData, column.getType(), 
index);
-                        })
-                .toArray();
-    }
+        List<ElasticsearchRowConverter.SerializationConverter> converters =
+                converterCache.get(tableId);
+        Preconditions.checkNotNull(converters, "No converters found for table: 
" + tableId);
 
-    private Object getFieldValue(RecordData recordData, DataType dataType, int 
index) {
-        switch (dataType.getTypeRoot()) {
-            case BOOLEAN:
-                return recordData.getBoolean(index);
-            case TINYINT:
-                return recordData.getByte(index);
-            case SMALLINT:
-                return recordData.getShort(index);
-            case INTEGER:
-            case DATE:
-            case TIME_WITHOUT_TIME_ZONE:
-                return recordData.getInt(index);
-            case BIGINT:
-                return recordData.getLong(index);
-            case FLOAT:
-                return recordData.getFloat(index);
-            case DOUBLE:
-                return recordData.getDouble(index);
-            case CHAR:
-            case VARCHAR:
-                return recordData.getString(index);
-            case DECIMAL:
-                return recordData.getDecimal(index, getPrecision(dataType), 
getScale(dataType));
-            case TIMESTAMP_WITHOUT_TIME_ZONE:
-                return recordData.getTimestamp(index, getPrecision(dataType));
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                return recordData.getLocalZonedTimestampData(index, 
getPrecision(dataType));
-            case TIMESTAMP_WITH_TIME_ZONE:
-                return recordData.getZonedTimestamp(index, 
getPrecision(dataType));
-            case BINARY:
-            case VARBINARY:
-                return recordData.getBinary(index);
-            case ARRAY:
-                return recordData.getArray(index);
-            case MAP:
-                return recordData.getMap(index);
-            case ROW:
-                return recordData.getRow(index, getFieldCount(dataType));
-            default:
-                throw new IllegalArgumentException("Unsupported type: " + 
dataType);
+        Object[] uniqueId = new Object[primaryKeys.size()];
+        List<Column> columns = schema.getColumns();
+
+        for (int i = 0; i < primaryKeys.size(); i++) {
+            String primaryKey = primaryKeys.get(i);
+            Column column = null;
+            int index = -1;
+
+            for (int j = 0; j < columns.size(); j++) {
+                if (columns.get(j).getName().equals(primaryKey)) {
+                    column = columns.get(j);
+                    index = j;
+                    break;
+                }
+            }
+
+            if (column == null) {
+                throw new IllegalStateException("Primary key column not found: 
" + primaryKey);
+            }
+
+            checkIndex(index, converters.size());
+            uniqueId[i] = converters.get(index).serialize(index, recordData);
         }
+
+        return uniqueId;
     }
 
     public Map<String, Object> serializeRecord(
@@ -230,37 +196,39 @@ public class ElasticsearchEventSerializer implements 
ElementConverter<Event, Bul
         Preconditions.checkState(
                 columns.size() == recordData.getArity(),
                 "Column size does not match the data size.");
-
         List<ElasticsearchRowConverter.SerializationConverter> converters =
                 getOrCreateConverters(tableId, schema);
-
         for (int i = 0; i < recordData.getArity(); i++) {
             Column column = columns.get(i);
+            checkIndex(i, converters.size());
             Object field = converters.get(i).serialize(i, recordData);
             record.put(column.getName(), field);
         }
-
         return record;
     }
 
     private List<ElasticsearchRowConverter.SerializationConverter> 
getOrCreateConverters(
             TableId tableId, Schema schema) {
-        return converterCache.computeIfAbsent(
+        return converterCache.compute(
                 tableId,
-                id -> {
+                (id, existingConverters) -> {
                     List<ElasticsearchRowConverter.SerializationConverter> 
converters =
                             new ArrayList<>();
                     for (Column column : schema.getColumns()) {
-                        ColumnType columnType =
-                                
ColumnType.valueOf(column.getType().getTypeRoot().name());
                         converters.add(
                                 
ElasticsearchRowConverter.createNullableExternalConverter(
-                                        columnType, pipelineZoneId));
+                                        column.getType(), pipelineZoneId));
                     }
                     return converters;
                 });
     }
 
+    private void checkIndex(int index, int size) {
+        if (index < 0 || index >= size) {
+            throw new IndexOutOfBoundsException("Index: " + index + ", Size: " 
+ size);
+        }
+    }
+
     @Override
     public void open(Sink.InitContext context) {
         ElementConverter.super.open(context);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchRowConverter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchRowConverter.java
index 948c012a5..f3c77d7ba 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchRowConverter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchRowConverter.java
@@ -17,15 +17,31 @@
 
 package org.apache.flink.cdc.connectors.elasticsearch.serializer;
 
-import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
+import org.apache.flink.cdc.common.data.MapData;
 import org.apache.flink.cdc.common.data.RecordData;
-import org.apache.flink.cdc.common.data.TimestampData;
-
-import java.time.*;
+import org.apache.flink.cdc.common.types.DataField;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypeChecks;
+import org.apache.flink.cdc.common.types.DecimalType;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.types.ZonedTimestampType;
+import 
org.apache.flink.elasticsearch6.shaded.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 /** Converter class for serializing row data to Elasticsearch compatible 
formats. */
 public class ElasticsearchRowConverter {
+    private static final ObjectMapper objectMapper = new ObjectMapper();
 
     // Date and time formatters for various temporal types
     private static final DateTimeFormatter DATE_FORMATTER =
@@ -43,7 +59,7 @@ public class ElasticsearchRowConverter {
      * @return A SerializationConverter that can handle null values.
      */
     public static SerializationConverter createNullableExternalConverter(
-            ColumnType columnType, java.time.ZoneId zoneId) {
+            DataType columnType, java.time.ZoneId zoneId) {
         return 
wrapIntoNullableExternalConverter(createExternalConverter(columnType, zoneId));
     }
 
@@ -71,123 +87,111 @@ public class ElasticsearchRowConverter {
      * @param zoneId The time zone to use for temporal conversions.
      * @return A SerializationConverter for the specified column type.
      */
-    static SerializationConverter createExternalConverter(
-            ColumnType columnType, java.time.ZoneId zoneId) {
-        switch (columnType) {
-                // Basic types
-            case BOOLEAN:
-                return (pos, data) -> data.getBoolean(pos);
-            case INTEGER:
-                return (pos, data) -> data.getInt(pos);
-            case DOUBLE:
-                return (pos, data) -> data.getDouble(pos);
-            case VARCHAR:
+    static ElasticsearchRowConverter.SerializationConverter 
createExternalConverter(
+            DataType columnType, ZoneId zoneId) {
+        switch (columnType.getTypeRoot()) {
             case CHAR:
+            case VARCHAR:
                 return (pos, data) -> data.getString(pos).toString();
-            case FLOAT:
-                return (pos, data) -> data.getFloat(pos);
-            case BIGINT:
-                return (pos, data) -> data.getLong(pos);
-            case TINYINT:
-                return (pos, data) -> data.getByte(pos);
-            case SMALLINT:
-                return (pos, data) -> data.getShort(pos);
+            case BOOLEAN:
+                return (pos, data) -> data.getBoolean(pos);
             case BINARY:
             case VARBINARY:
                 return (pos, data) -> data.getBinary(pos);
-
-                // Decimal type
             case DECIMAL:
-                return (pos, data) -> {
-                    DecimalData decimalData = data.getDecimal(pos, 17, 2);
-                    return decimalData != null ? 
decimalData.toBigDecimal().toString() : null;
-                };
-
-                // Date and time types
+                final int decimalPrecision = ((DecimalType) 
columnType).getPrecision();
+                final int decimalScale = ((DecimalType) columnType).getScale();
+                return (pos, data) ->
+                        data.getDecimal(pos, decimalPrecision, decimalScale)
+                                .toBigDecimal()
+                                .toString();
+            case TINYINT:
+                return (pos, data) -> data.getByte(pos);
+            case SMALLINT:
+                return (pos, data) -> data.getShort(pos);
+            case INTEGER:
+                return (pos, data) -> data.getInt(pos);
+            case BIGINT:
+                return (pos, data) -> data.getLong(pos);
+            case FLOAT:
+                return (pos, data) -> data.getFloat(pos);
+            case DOUBLE:
+                return (pos, data) -> data.getDouble(pos);
             case DATE:
-                return (pos, data) -> {
-                    int days = data.getInt(pos);
-                    LocalDate date = LocalDate.ofEpochDay(days);
-                    return date.format(DATE_FORMATTER);
-                };
-            case TIME_WITHOUT_TIME_ZONE:
-                return (pos, data) -> {
-                    int milliseconds = data.getInt(pos);
-                    LocalTime time = LocalTime.ofNanoOfDay(milliseconds * 
1_000_000L);
-                    return time.format(TIME_FORMATTER);
-                };
+                return (pos, data) -> 
LocalDate.ofEpochDay(data.getInt(pos)).format(DATE_FORMATTER);
             case TIMESTAMP_WITHOUT_TIME_ZONE:
-                return (pos, data) -> {
-                    long milliseconds = data.getTimestamp(pos, 
6).getMillisecond();
-                    LocalDateTime dateTime =
-                            LocalDateTime.ofEpochSecond(
-                                    milliseconds / 1000,
-                                    (int) (milliseconds % 1000) * 1_000_000,
-                                    java.time.ZoneOffset.UTC);
-                    return dateTime.format(DATE_TIME_FORMATTER);
-                };
+                return (pos, data) ->
+                        data.getTimestamp(pos, 
DataTypeChecks.getPrecision(columnType))
+                                .toLocalDateTime()
+                                .format(DATE_TIME_FORMATTER);
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                return (pos, data) -> {
-                    try {
-                        if (data.isNullAt(pos)) {
-                            return null;
-                        }
-
-                        // Debug logging
-                        System.out.println(
-                                "Processing TIMESTAMP_WITH_LOCAL_TIME_ZONE at 
position: " + pos);
-                        System.out.println("Data type: " + 
data.getClass().getName());
-
-                        // Attempt to retrieve timestamp data
-                        long milliseconds;
-                        int nanos = 0;
-
-                        try {
-                            // Try using getTimestamp method
-                            TimestampData timestampData = 
data.getTimestamp(pos, 6);
-                            milliseconds = timestampData.getMillisecond();
-                            nanos = timestampData.getNanoOfMillisecond();
-                        } catch (Exception e) {
-                            // Fallback to getLong if getTimestamp fails
-                            milliseconds = data.getLong(pos);
-                        }
-
-                        // Create Instant object
-                        Instant instant =
-                                Instant.ofEpochSecond(
-                                        milliseconds / 1000,
-                                        (milliseconds % 1000) * 1_000_000L + 
nanos);
-
-                        // Format timestamp using UTC timezone
-                        return DateTimeFormatter.ISO_INSTANT.format(instant);
-
-                    } catch (Exception e) {
-                        return "ERROR_PROCESSING_TIMESTAMP";
-                    }
-                };
+                return (pos, data) ->
+                        data.getTimestamp(pos, 
DataTypeChecks.getPrecision(columnType))
+                                .toLocalDateTime()
+                                .atZone(zoneId)
+                                .format(DATE_TIME_FORMATTER);
             case TIMESTAMP_WITH_TIME_ZONE:
-                return (pos, data) -> {
-                    long milliseconds = data.getTimestamp(pos, 
6).getMillisecond();
-                    LocalDateTime dateTime =
-                            LocalDateTime.ofEpochSecond(
-                                    milliseconds / 1000,
-                                    (int) (milliseconds % 1000) * 1_000_000,
-                                    
zoneId.getRules().getOffset(java.time.Instant.now()));
-                    return dateTime.atZone(zoneId).format(DATE_TIME_FORMATTER);
-                };
-
-                // Complex types
+                final int zonedP = ((ZonedTimestampType) 
columnType).getPrecision();
+                return (pos, data) ->
+                        data.getTimestamp(pos, zonedP)
+                                .toTimestamp()
+                                .toInstant()
+                                .atZone(zoneId)
+                                .format(DATE_TIME_FORMATTER);
             case ARRAY:
-                return (pos, data) -> data.getArray(pos);
+                return (pos, data) -> convertArrayData(data.getArray(pos), 
columnType);
             case MAP:
-                return (pos, data) -> data.getMap(pos);
+                return (pos, data) ->
+                        writeValueAsString(convertMapData(data.getMap(pos), 
columnType));
             case ROW:
-                return (pos, data) -> {
-                    RecordData rowData = data.getRow(pos, 5); // Assuming 5 
fields, adjust as needed
-                    return rowData != null ? rowData.toString() : null;
-                };
+                return (pos, data) ->
+                        writeValueAsString(convertRowData(data, pos, 
columnType, zoneId));
             default:
-                throw new IllegalArgumentException("Unsupported column type: " 
+ columnType);
+                throw new UnsupportedOperationException("Unsupported type: " + 
columnType);
+        }
+    }
+
+    private static List<Object> convertArrayData(ArrayData array, DataType 
type) {
+        if (array instanceof GenericArrayData) {
+            return Arrays.asList(((GenericArrayData) array).toObjectArray());
+        }
+        throw new UnsupportedOperationException("Unsupported array data: " + 
array.getClass());
+    }
+
+    private static Object convertMapData(MapData map, DataType type) {
+        Map<Object, Object> result = new HashMap<>();
+        if (map instanceof GenericMapData) {
+            GenericMapData gMap = (GenericMapData) map;
+            for (Object key : ((GenericArrayData) 
gMap.keyArray()).toObjectArray()) {
+                result.put(key, gMap.get(key));
+            }
+            return result;
+        }
+        throw new UnsupportedOperationException("Unsupported map data: " + 
map.getClass());
+    }
+
+    private static Object convertRowData(
+            RecordData val, int index, DataType type, ZoneId pipelineZoneId) {
+        RowType rowType = (RowType) type;
+        Map<String, Object> value = new HashMap<>();
+        RecordData row = val.getRow(index, rowType.getFieldCount());
+
+        List<DataField> fields = rowType.getFields();
+        for (int i = 0; i < fields.size(); i++) {
+            DataField rowField = fields.get(i);
+            SerializationConverter converter =
+                    createNullableExternalConverter(rowField.getType(), 
pipelineZoneId);
+            Object valTmp = converter.serialize(i, row);
+            value.put(rowField.getName(), valTmp.toString());
+        }
+        return value;
+    }
+
+    private static String writeValueAsString(Object object) {
+        try {
+            return objectMapper.writeValueAsString(object);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
         }
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java
index 3507774f1..53227ef84 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java
@@ -23,23 +23,28 @@ import org.apache.flink.cdc.common.sink.EventSinkProvider;
 import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
 import 
org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions;
+import 
org.apache.flink.cdc.connectors.elasticsearch.serializer.Elasticsearch6RequestCreator;
 import 
org.apache.flink.cdc.connectors.elasticsearch.serializer.ElasticsearchEventSerializer;
 import 
org.apache.flink.cdc.connectors.elasticsearch.v2.Elasticsearch8AsyncSinkBuilder;
+import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder;
 
-import org.apache.http.HttpHost;
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant;
+import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation;
+import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
 
 import java.io.Serializable;
 import java.time.ZoneId;
 
 /**
- * A {@link DataSink} implementation for Elasticsearch connector.
+ * An implementation of {@link DataSink} for writing events to Elasticsearch.
  *
- * @param <InputT> The input type of the sink.
+ * <p>This class is responsible for configuring and managing the lifecycle of 
an Elasticsearch sink,
+ * including handling different versions of Elasticsearch (6, 7, 8).
+ *
+ * @param <InputT> The type of input elements that this sink can process.
  */
 public class ElasticsearchDataSink<InputT> implements DataSink, Serializable {
 
-    private static final long serialVersionUID = 1L;
-
     /** The Elasticsearch sink options. */
     private final ElasticsearchSinkOptions elasticsearchOptions;
 
@@ -59,11 +64,101 @@ public class ElasticsearchDataSink<InputT> implements 
DataSink, Serializable {
 
     @Override
     public EventSinkProvider getEventSinkProvider() {
+        switch (elasticsearchOptions.getVersion()) {
+            case 6:
+                return getElasticsearch6SinkProvider();
+            case 7:
+                return getElasticsearch7SinkProvider();
+            case 8:
+                return getElasticsearch8SinkProvider();
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported Elasticsearch version: " + 
elasticsearchOptions.getVersion());
+        }
+    }
+
+    private EventSinkProvider getElasticsearch6SinkProvider() {
+        ElasticsearchEventSerializer serializer = new 
ElasticsearchEventSerializer(zoneId);
+        org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost[] 
hosts =
+                elasticsearchOptions.getHosts().stream()
+                        .map(
+                                host ->
+                                        new 
org.apache.flink.elasticsearch6.shaded.org.apache.http
+                                                .HttpHost(
+                                                host.getHostName(),
+                                                host.getPort(),
+                                                host.getSchemeName()))
+                        .toArray(
+                                
org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost[]
+                                        ::new);
+
+        return FlinkSinkProvider.of(
+                new Elasticsearch6SinkBuilder<Event>()
+                        .setHosts(hosts)
+                        .setEmitter(
+                                (element, context, indexer) -> {
+                                    BulkOperationVariant operation =
+                                            serializer.apply(element, context);
+                                    if (operation instanceof IndexOperation) {
+                                        indexer.add(
+                                                
Elasticsearch6RequestCreator.createIndexRequest(
+                                                        (IndexOperation<?>) 
operation));
+                                    } else if (operation instanceof 
DeleteOperation) {
+                                        indexer.add(
+                                                
Elasticsearch6RequestCreator.createDeleteRequest(
+                                                        (DeleteOperation) 
operation));
+                                    }
+                                })
+                        
.setBulkFlushMaxActions(elasticsearchOptions.getMaxBatchSize())
+                        
.setBulkFlushInterval(elasticsearchOptions.getMaxTimeInBufferMS())
+                        .build());
+    }
+
+    private EventSinkProvider getElasticsearch7SinkProvider() {
+        ElasticsearchEventSerializer serializer = new 
ElasticsearchEventSerializer(zoneId);
+        org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost[] 
hosts =
+                elasticsearchOptions.getHosts().stream()
+                        .map(
+                                host ->
+                                        new 
org.apache.flink.elasticsearch6.shaded.org.apache.http
+                                                .HttpHost(
+                                                host.getHostName(),
+                                                host.getPort(),
+                                                host.getSchemeName()))
+                        .toArray(
+                                
org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost[]
+                                        ::new);
+
+        return FlinkSinkProvider.of(
+                new Elasticsearch6SinkBuilder<Event>()
+                        .setHosts(hosts)
+                        .setEmitter(
+                                (element, context, indexer) -> {
+                                    BulkOperationVariant operation =
+                                            serializer.apply(element, context);
+                                    if (operation instanceof IndexOperation) {
+                                        indexer.add(
+                                                
Elasticsearch6RequestCreator.createIndexRequest(
+                                                        (IndexOperation<?>) 
operation));
+                                    } else if (operation instanceof 
DeleteOperation) {
+                                        indexer.add(
+                                                
Elasticsearch6RequestCreator.createDeleteRequest(
+                                                        (DeleteOperation) 
operation));
+                                    }
+                                })
+                        
.setBulkFlushMaxActions(elasticsearchOptions.getMaxBatchSize())
+                        
.setBulkFlushInterval(elasticsearchOptions.getMaxTimeInBufferMS())
+                        .build());
+    }
+
+    private EventSinkProvider getElasticsearch8SinkProvider() {
         return FlinkSinkProvider.of(
                 new Elasticsearch8AsyncSinkBuilder<Event>()
-                        .setHosts(elasticsearchOptions.getHosts().toArray(new 
HttpHost[0]))
-                        .setElementConverter(
-                                new 
ElasticsearchEventSerializer(ZoneId.systemDefault()))
+                        .setHosts(
+                                elasticsearchOptions
+                                        .getHosts()
+                                        .toArray(new 
org.apache.http.HttpHost[0]))
+                        .setElementConverter(new 
ElasticsearchEventSerializer(zoneId))
                         
.setMaxBatchSize(elasticsearchOptions.getMaxBatchSize())
                         
.setMaxInFlightRequests(elasticsearchOptions.getMaxInFlightRequests())
                         
.setMaxBufferedRequests(elasticsearchOptions.getMaxBufferedRequests())
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java
index a84c4691a..b8bcf59b9 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.pipeline.PipelineOptions;
 import org.apache.flink.cdc.common.sink.DataSink;
 import 
org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions;
 import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig;
+import org.apache.flink.table.api.ValidationException;
 
 import org.apache.http.HttpHost;
 
@@ -36,7 +37,16 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.*;
+import static 
org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.HOSTS;
+import static 
org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.MAX_BATCH_SIZE;
+import static 
org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.MAX_BATCH_SIZE_IN_BYTES;
+import static 
org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.MAX_BUFFERED_REQUESTS;
+import static 
org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.MAX_IN_FLIGHT_REQUESTS;
+import static 
org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.MAX_RECORD_SIZE_IN_BYTES;
+import static 
org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.MAX_TIME_IN_BUFFER_MS;
+import static 
org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.PASSWORD;
+import static 
org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.USERNAME;
+import static 
org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.VERSION;
 
 /** Factory for creating {@link ElasticsearchDataSink}. */
 public class ElasticsearchDataSinkFactory implements DataSinkFactory {
@@ -45,12 +55,17 @@ public class ElasticsearchDataSinkFactory implements 
DataSinkFactory {
 
     @Override
     public DataSink createDataSink(Context context) {
+        // Validate the configuration
         FactoryHelper.createFactoryHelper(this, context).validate();
 
+        // Get the configuration directly from the context
         Configuration configuration =
                 
Configuration.fromMap(context.getFactoryConfiguration().toMap());
-        ZoneId zoneId = determineZoneId(context);
 
+        // Validate required options
+        validateRequiredOptions(configuration);
+
+        ZoneId zoneId = determineZoneId(context);
         ElasticsearchSinkOptions sinkOptions = 
buildSinkConnectorOptions(configuration);
         return new ElasticsearchDataSink(sinkOptions, zoneId);
     }
@@ -69,6 +84,7 @@ public class ElasticsearchDataSinkFactory implements 
DataSinkFactory {
         List<HttpHost> hosts = parseHosts(cdcConfig.get(HOSTS));
         String username = cdcConfig.get(USERNAME);
         String password = cdcConfig.get(PASSWORD);
+        int version = cdcConfig.get(VERSION);
         NetworkConfig networkConfig =
                 new NetworkConfig(hosts, username, password, null, null, null);
         return new ElasticsearchSinkOptions(
@@ -78,7 +94,10 @@ public class ElasticsearchDataSinkFactory implements 
DataSinkFactory {
                 cdcConfig.get(MAX_BATCH_SIZE_IN_BYTES),
                 cdcConfig.get(MAX_TIME_IN_BUFFER_MS),
                 cdcConfig.get(MAX_RECORD_SIZE_IN_BYTES),
-                networkConfig);
+                networkConfig,
+                version,
+                username,
+                password);
     }
 
     private List<HttpHost> parseHosts(String hostsStr) {
@@ -96,7 +115,7 @@ public class ElasticsearchDataSinkFactory implements 
DataSinkFactory {
     public Set<ConfigOption<?>> requiredOptions() {
         Set<ConfigOption<?>> requiredOptions = new HashSet<>();
         requiredOptions.add(HOSTS);
-        requiredOptions.add(INDEX);
+        requiredOptions.add(VERSION);
         return requiredOptions;
     }
 
@@ -113,4 +132,23 @@ public class ElasticsearchDataSinkFactory implements 
DataSinkFactory {
         optionalOptions.add(PASSWORD);
         return optionalOptions;
     }
+
+    private void validateRequiredOptions(Configuration configuration) {
+        Set<ConfigOption<?>> missingOptions = new HashSet<>();
+        for (ConfigOption<?> option : requiredOptions()) {
+            if (!configuration.contains(option)) {
+                missingOptions.add(option);
+            }
+        }
+        if (!missingOptions.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "One or more required options are missing.\n\n"
+                                    + "Missing required options are:\n\n"
+                                    + "%s",
+                            missingOptions.stream()
+                                    .map(ConfigOption::key)
+                                    .collect(Collectors.joining("\n"))));
+        }
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java
index c5e414527..b26290400 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java
@@ -77,12 +77,12 @@ public class ElasticsearchDataSinkOptions {
                     .defaultValue(10L * 1024L * 1024L)
                     .withDescription("The maximum size of a single record in 
bytes.");
 
-    /** The Elasticsearch index name to write to. */
-    public static final ConfigOption<String> INDEX =
-            ConfigOptions.key("index")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The Elasticsearch index name to write 
to.");
+    /** The version of Elasticsearch to connect to. */
+    public static final ConfigOption<Integer> VERSION =
+            ConfigOptions.key("version")
+                    .intType()
+                    .defaultValue(7)
+                    .withDescription("The version of Elasticsearch to connect 
to (6, 7, or 8).");
 
     /** The username for Elasticsearch authentication. */
     public static final ConfigOption<String> USERNAME =
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchMetadataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchMetadataApplier.java
deleted file mode 100644
index 9206ed7bc..000000000
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchMetadataApplier.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.connectors.elasticsearch.sink;
-
-import org.apache.flink.cdc.common.event.SchemaChangeEvent;
-import org.apache.flink.cdc.common.sink.MetadataApplier;
-
-/**
- * A metadata applier for Elasticsearch that handles schema change events. 
This class is responsible
- * for applying metadata changes to the Elasticsearch index based on schema 
change events from the
- * CDC source.
- */
-public class ElasticsearchMetadataApplier implements MetadataApplier {
-
-    /**
-     * Applies the given schema change event to the Elasticsearch index.
-     *
-     * @param event The schema change event to apply.
-     */
-    @Override
-    public void applySchemaChange(SchemaChangeEvent event) {
-        // TODO: Implement the logic to apply schema changes to Elasticsearch
-        // This might include:
-        // - Creating new indices
-        // - Updating existing index mappings
-        // - Handling column additions or deletions
-        throw new UnsupportedOperationException(
-                "Schema change application is not yet implemented for 
Elasticsearch.");
-    }
-}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSink.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSink.java
index 960c64ba2..4a237e3a3 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSink.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSink.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,7 +15,6 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
 
 package org.apache.flink.cdc.connectors.elasticsearch.v2;
@@ -34,17 +32,30 @@ import java.util.Collection;
 import java.util.Collections;
 
 /**
- * Elasticsearch8AsyncSink Apache Flink's Async Sink that submits Operations 
into an Elasticsearch
- * cluster.
+ * Elasticsearch8AsyncSink is Apache Flink's Async Sink that submits 
Operations into an
+ * Elasticsearch cluster.
  *
- * @param <InputT> type of records that will be converted into {@link 
Operation} see {@link
- *     Elasticsearch8AsyncSinkBuilder} on how to construct valid instances
+ * @param <InputT> type of records that will be converted into {@link 
Operation}. See {@link
+ *     Elasticsearch8AsyncSinkBuilder} on how to construct valid instances.
  */
 public class Elasticsearch8AsyncSink<InputT> extends AsyncSinkBase<InputT, 
Operation> {
     private static final Logger LOG = 
LoggerFactory.getLogger(Elasticsearch8AsyncSink.class);
 
     @VisibleForTesting protected final NetworkConfig networkConfig;
 
+    /**
+     * Constructs an Elasticsearch8AsyncSink.
+     *
+     * @param converter the converter that transforms input records to 
Elasticsearch operations.
+     * @param maxBatchSize the maximum number of records to be included in a 
single batch.
+     * @param maxInFlightRequests the maximum number of in-flight requests.
+     * @param maxBufferedRequests the maximum number of buffered requests.
+     * @param maxBatchSizeInBytes the maximum size of a batch in bytes.
+     * @param maxTimeInBufferMS the maximum time a request can stay in the 
buffer before being
+     *     flushed.
+     * @param maxRecordSizeInByte the maximum size of a single record in bytes.
+     * @param networkConfig the network configuration for Elasticsearch.
+     */
     protected Elasticsearch8AsyncSink(
             ElementConverter<InputT, Operation> converter,
             int maxBatchSize,
@@ -66,6 +77,12 @@ public class Elasticsearch8AsyncSink<InputT> extends 
AsyncSinkBase<InputT, Opera
         this.networkConfig = networkConfig;
     }
 
+    /**
+     * Creates a new {@link StatefulSinkWriter} for writing elements to 
Elasticsearch.
+     *
+     * @param context the initialization context.
+     * @return a new instance of {@link Elasticsearch8AsyncWriter}.
+     */
     @Override
     public StatefulSinkWriter<InputT, BufferedRequestState<Operation>> 
createWriter(
             InitContext context) {
@@ -82,6 +99,13 @@ public class Elasticsearch8AsyncSink<InputT> extends 
AsyncSinkBase<InputT, Opera
                 Collections.emptyList());
     }
 
+    /**
+     * Restores a {@link StatefulSinkWriter} from a previously saved state.
+     *
+     * @param context the initialization context.
+     * @param recoveredState the recovered state.
+     * @return a restored instance of {@link Elasticsearch8AsyncWriter}.
+     */
     @Override
     public StatefulSinkWriter<InputT, BufferedRequestState<Operation>> 
restoreWriter(
             InitContext context, Collection<BufferedRequestState<Operation>> 
recoveredState) {
@@ -98,6 +122,11 @@ public class Elasticsearch8AsyncSink<InputT> extends 
AsyncSinkBase<InputT, Opera
                 recoveredState);
     }
 
+    /**
+     * Returns the serializer for the writer's state.
+     *
+     * @return an instance of {@link Elasticsearch8AsyncSinkSerializer}.
+     */
     @Override
     public SimpleVersionedSerializer<BufferedRequestState<Operation>> 
getWriterStateSerializer() {
         return new Elasticsearch8AsyncSinkSerializer();
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkBuilder.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkBuilder.java
index 2e6598e09..17d2d36e5 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkBuilder.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkBuilder.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,7 +15,6 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
 
 package org.apache.flink.cdc.connectors.elasticsearch.v2;
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkSerializer.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkSerializer.java
index df3520274..efdb74ee9 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkSerializer.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkSerializer.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,7 +15,6 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
 
 package org.apache.flink.cdc.connectors.elasticsearch.v2;
@@ -26,19 +24,40 @@ import 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerialize
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 
-/** Elasticsearch8AsyncSinkSerializer is used to serialize and deserialize an 
Operation. */
+/**
+ * {@code Elasticsearch8AsyncSinkSerializer} is used to serialize and 
deserialize {@link Operation}
+ * objects for Elasticsearch 8 async sink.
+ */
 public class Elasticsearch8AsyncSinkSerializer extends 
AsyncSinkWriterStateSerializer<Operation> {
 
+    /**
+     * Serializes the given {@link Operation} to the provided {@link 
DataOutputStream}.
+     *
+     * @param request the Operation to serialize.
+     * @param out the DataOutputStream to which the Operation will be 
serialized.
+     */
     @Override
     protected void serializeRequestToStream(Operation request, 
DataOutputStream out) {
         new OperationSerializer().serialize(request, out);
     }
 
+    /**
+     * Deserializes an {@link Operation} from the provided {@link 
DataInputStream}.
+     *
+     * @param requestSize the size of the request in bytes.
+     * @param in the DataInputStream from which the Operation will be 
deserialized.
+     * @return the deserialized Operation.
+     */
     @Override
     protected Operation deserializeRequestFromStream(long requestSize, 
DataInputStream in) {
         return new OperationSerializer().deserialize(requestSize, in);
     }
 
+    /**
+     * Returns the version of this serializer.
+     *
+     * @return the version number.
+     */
     @Override
     public int getVersion() {
         return 1;
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java
index 7753d824b..50a606c87 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,7 +15,6 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
 
 package org.apache.flink.cdc.connectors.elasticsearch.v2;
@@ -124,10 +122,6 @@ public class Elasticsearch8AsyncWriter<InputT> extends 
AsyncSinkWriter<InputT, O
 
         BulkRequest.Builder br = new BulkRequest.Builder();
         for (Operation operation : requestEntries) {
-            //            if (operation.getBulkOperationVariant() == 
null&&requestEntries.size()==1)
-            // {
-            //                return; // 跳过当前循环迭代
-            //            }
             if (operation.getBulkOperationVariant() == null) {
                 continue;
             }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/NetworkConfig.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/NetworkConfig.java
index 2a6f4d8a6..a38f210b6 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/NetworkConfig.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/NetworkConfig.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,7 +15,6 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
 
 package org.apache.flink.cdc.connectors.elasticsearch.v2;
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Operation.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Operation.java
index 0d2ba0cca..855f2e3c8 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Operation.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Operation.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,7 +15,6 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
 
 package org.apache.flink.cdc.connectors.elasticsearch.v2;
@@ -26,16 +24,29 @@ import 
co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant;
 import java.io.Serializable;
 import java.util.Objects;
 
-/** A single stream element which contains a BulkOperationVariant. */
+/**
+ * A single stream element which contains a {@link BulkOperationVariant}. This 
class represents an
+ * operation that will be performed in an Elasticsearch bulk request.
+ */
 public class Operation implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final BulkOperationVariant bulkOperationVariant;
 
+    /**
+     * Constructs an Operation with the specified {@link BulkOperationVariant}.
+     *
+     * @param bulkOperation the BulkOperationVariant to be wrapped by this 
Operation.
+     */
     public Operation(BulkOperationVariant bulkOperation) {
         this.bulkOperationVariant = bulkOperation;
     }
 
+    /**
+     * Returns the {@link BulkOperationVariant} contained in this Operation.
+     *
+     * @return the BulkOperationVariant instance.
+     */
     public BulkOperationVariant getBulkOperationVariant() {
         return bulkOperationVariant;
     }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/OperationSerializer.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/OperationSerializer.java
index 127ce11b1..108425136 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/OperationSerializer.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/OperationSerializer.java
@@ -1,16 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.cdc.connectors.elasticsearch.v2;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import org.objenesis.strategy.StdInstantiatorStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 
-/** OperationSerializer is responsible for serialization and deserialization 
of an Operation. */
+/**
+ * OperationSerializer is responsible for the serialization and 
deserialization of an {@link
+ * Operation}.
+ */
 public class OperationSerializer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(OperationSerializer.class);
+
     private final Kryo kryo = new Kryo();
 
     public OperationSerializer() {
@@ -18,6 +43,12 @@ public class OperationSerializer {
         kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
     }
 
+    /**
+     * Serializes the given {@link Operation} to the provided {@link 
DataOutputStream}.
+     *
+     * @param request the Operation to serialize
+     * @param out the DataOutputStream to which the Operation will be 
serialized
+     */
     public void serialize(Operation request, DataOutputStream out) {
         try (Output output = new Output(out)) {
             kryo.writeObjectOrNull(output, request, Operation.class);
@@ -25,6 +56,13 @@ public class OperationSerializer {
         }
     }
 
+    /**
+     * Deserializes an {@link Operation} from the provided {@link 
DataInputStream}.
+     *
+     * @param requestSize the size of the request in bytes
+     * @param in the DataInputStream from which the Operation will be 
deserialized
+     * @return the deserialized Operation, or null if deserialization fails
+     */
     public Operation deserialize(long requestSize, DataInputStream in) {
         try (Input input = new Input(in, (int) requestSize)) {
             if (input.available() > 0) {
@@ -33,12 +71,17 @@ public class OperationSerializer {
                 return null; // Skip if input stream is empty
             }
         } catch (Exception e) {
-            // Handle the exception as needed, e.g., log the error
-            System.err.println("Failed to deserialize Operation: " + 
e.getMessage());
+            LOG.error("Failed to deserialize Operation: {}", e.getMessage(), 
e);
             return null;
         }
     }
 
+    /**
+     * Calculates the serialized size of the given {@link Operation}.
+     *
+     * @param operation the Operation whose size is to be calculated
+     * @return the size of the serialized Operation in bytes
+     */
     public int size(Operation operation) {
         ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
         try (Output output = new Output(byteArrayOutputStream)) {
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/Elasticsearch6DataSinkITCaseTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/Elasticsearch6DataSinkITCaseTest.java
new file mode 100644
index 000000000..54eb4c54a
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/Elasticsearch6DataSinkITCaseTest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.cdc.connectors.elasticsearch.sink;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
+import 
org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions;
+import 
org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchTestUtils;
+import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig;
+import org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost;
+import 
org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.get.GetRequest;
+import 
org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.get.GetResponse;
+import 
org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RequestOptions;
+import 
org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient;
+import 
org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase tests for {@link ElasticsearchDataSink}. */
+@Testcontainers
+public class Elasticsearch6DataSinkITCaseTest {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(ElasticsearchDataSinkITCaseTest.class);
+    private static final String ELASTICSEARCH_VERSION = "6.8.20";
+    private static final DockerImageName ELASTICSEARCH_IMAGE =
+            DockerImageName.parse(
+                            "docker.elastic.co/elasticsearch/elasticsearch:"
+                                    + ELASTICSEARCH_VERSION)
+                    
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
+
+    @Container
+    private static final ElasticsearchContainer ELASTICSEARCH_CONTAINER =
+            createElasticsearchContainer();
+
+    private RestHighLevelClient client;
+
+    @BeforeEach
+    public void setUp() {
+        client = createElasticsearchClient();
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        if (client != null) {
+            client.close();
+        }
+    }
+
+    @Test
+    public void testElasticsearchSink() throws Exception {
+        TableId tableId = TableId.tableId("default", "schema", "table");
+        List<Event> events = ElasticsearchTestUtils.createTestEvents(tableId); 
// 使用工具类
+
+        runJobWithEvents(events);
+
+        verifyInsertedData(
+                tableId,
+                "1",
+                1,
+                1.0,
+                "value1",
+                true,
+                (byte) 1,
+                (short) 2,
+                100L,
+                1.0f,
+                new BigDecimal("10.00"),
+                1633024800000L);
+        verifyInsertedData(
+                tableId,
+                "2",
+                2,
+                2.0,
+                "value2",
+                false,
+                (byte) 2,
+                (short) 3,
+                200L,
+                2.0f,
+                new BigDecimal("20.00"),
+                1633111200000L);
+    }
+
+    @Test
+    public void testElasticsearchInsertAndDelete() throws Exception {
+        TableId tableId = TableId.tableId("default", "schema", "table");
+        List<Event> events = 
ElasticsearchTestUtils.createTestEventsWithDelete(tableId); // 使用工具类
+
+        runJobWithEvents(events);
+
+        verifyDeletedData(tableId, "2");
+    }
+
+    @Test
+    public void testElasticsearchAddColumn() throws Exception {
+        TableId tableId = TableId.tableId("default", "schema", "table");
+        List<Event> events = 
ElasticsearchTestUtils.createTestEventsWithAddColumn(tableId); // 使用工具类
+
+        runJobWithEvents(events);
+
+        verifyInsertedDataWithNewColumn(tableId, "3", 3, 3.0, "value3", true);
+    }
+
+    private static ElasticsearchContainer createElasticsearchContainer() {
+        return new ElasticsearchContainer(ELASTICSEARCH_IMAGE)
+                .withEnv("discovery.type", "single-node")
+                .withEnv("xpack.security.enabled", "false")
+                .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
+                .withEnv("logger.org.elasticsearch", "ERROR")
+                .withLogConsumer(new Slf4jLogConsumer(LOG))
+                
.waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(5)));
+    }
+
+    private RestHighLevelClient createElasticsearchClient() {
+        return new RestHighLevelClient(
+                RestClient.builder(
+                        new HttpHost(
+                                ELASTICSEARCH_CONTAINER.getHost(),
+                                ELASTICSEARCH_CONTAINER.getFirstMappedPort(),
+                                "http")));
+    }
+
+    private void runJobWithEvents(List<Event> events) throws Exception {
+        ElasticsearchSinkOptions options = createSinkOptions();
+        StreamExecutionEnvironment env = createStreamExecutionEnvironment();
+        ElasticsearchDataSink<Event> sink =
+                new ElasticsearchDataSink<>(options, ZoneId.systemDefault());
+
+        DataStream<Event> stream = env.fromCollection(events, 
TypeInformation.of(Event.class));
+        Sink<Event> elasticsearchSink = ((FlinkSinkProvider) 
sink.getEventSinkProvider()).getSink();
+        stream.sinkTo(elasticsearchSink);
+
+        env.execute("Elasticsearch Sink Test");
+    }
+
+    private ElasticsearchSinkOptions createSinkOptions() {
+
+        NetworkConfig networkConfig =
+                new NetworkConfig(
+                        Collections.singletonList(
+                                new org.apache.http.HttpHost(
+                                        ELASTICSEARCH_CONTAINER.getHost(),
+                                        
ELASTICSEARCH_CONTAINER.getFirstMappedPort())),
+                        null,
+                        null,
+                        null,
+                        null,
+                        null);
+
+        return new ElasticsearchSinkOptions(
+                5, 1, 10, 50 * 1024 * 1024, 1000, 10 * 1024 * 1024, 
networkConfig, 6, null, null);
+    }
+
+    private StreamExecutionEnvironment createStreamExecutionEnvironment() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(3000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        return env;
+    }
+
+    private void verifyInsertedData(
+            TableId tableId,
+            String id,
+            int expectedId,
+            double expectedNumber,
+            String expectedName,
+            boolean expectedBool,
+            byte expectedTinyint,
+            short expectedSmallint,
+            long expectedBigint,
+            float expectedFloat,
+            BigDecimal expectedDecimal,
+            long expectedTimestamp)
+            throws Exception {
+        GetRequest getRequest = new GetRequest(tableId.toString()).id(id);
+
+        GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
+
+        LOG.debug("Response source: {}", response.getSource());
+
+        assertThat(response.getSource()).isNotNull();
+        assertThat(((Number) 
response.getSource().get("id")).intValue()).isEqualTo(expectedId);
+        assertThat(((Number) response.getSource().get("number")).doubleValue())
+                .isEqualTo(expectedNumber);
+        assertThat(response.getSource().get("name")).isEqualTo(expectedName);
+        assertThat(response.getSource().get("bool")).isEqualTo(expectedBool);
+        assertThat(((Number) response.getSource().get("tinyint")).byteValue())
+                .isEqualTo(expectedTinyint);
+        assertThat(((Number) 
response.getSource().get("smallint")).shortValue())
+                .isEqualTo(expectedSmallint);
+        assertThat(((Number) response.getSource().get("bigint")).longValue())
+                .isEqualTo(expectedBigint);
+        assertThat(((Number) response.getSource().get("float")).floatValue())
+                .isEqualTo(expectedFloat);
+        assertThat(new 
BigDecimal(response.getSource().get("decimal").toString()))
+                .isEqualTo(expectedDecimal);
+
+        String timestampString = 
response.getSource().get("timestamp").toString();
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss.SSSSSS");
+        LocalDateTime dateTime = LocalDateTime.parse(timestampString, 
formatter);
+        long timestampMillis = 
dateTime.toInstant(ZoneOffset.UTC).toEpochMilli();
+        assertThat(timestampMillis).isEqualTo(expectedTimestamp);
+    }
+
+    private void verifyDeletedData(TableId tableId, String id) throws 
Exception {
+        GetRequest getRequest = new GetRequest(tableId.toString()).id(id);
+        GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
+
+        assertThat(response.isExists()).isFalse();
+    }
+
+    private void verifyInsertedDataWithNewColumn(
+            TableId tableId,
+            String id,
+            int expectedId,
+            double expectedNumber,
+            String expectedName,
+            boolean expectedExtraBool)
+            throws Exception {
+        GetRequest getRequest = new GetRequest(tableId.toString()).id(id);
+        GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
+
+        assertThat(response.getSource()).isNotNull();
+        assertThat(response.getSource().get("id")).isEqualTo(expectedId);
+        
assertThat(response.getSource().get("number")).isEqualTo(expectedNumber);
+        assertThat(response.getSource().get("name")).isEqualTo(expectedName);
+        
assertThat(response.getSource().get("extra_bool")).isEqualTo(expectedExtraBool);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/Elasticsearch7DataSinkITCaseTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/Elasticsearch7DataSinkITCaseTest.java
new file mode 100644
index 000000000..fdc79aaf8
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/Elasticsearch7DataSinkITCaseTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.cdc.connectors.elasticsearch.sink;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
+import 
org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions;
+import 
org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchTestUtils;
+import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig;
+import org.apache.flink.elasticsearch7.shaded.org.apache.http.HttpHost;
+import 
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.get.GetRequest;
+import 
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.get.GetResponse;
+import 
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RequestOptions;
+import 
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient;
+import 
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase tests for {@link ElasticsearchDataSink}. */
+@Testcontainers
+public class Elasticsearch7DataSinkITCaseTest {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(ElasticsearchDataSinkITCaseTest.class);
+    private static final String ELASTICSEARCH_VERSION = "7.10.2";
+    private static final DockerImageName ELASTICSEARCH_IMAGE =
+            DockerImageName.parse(
+                            "docker.elastic.co/elasticsearch/elasticsearch:"
+                                    + ELASTICSEARCH_VERSION)
+                    
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
+
+    @Container
+    private static final ElasticsearchContainer ELASTICSEARCH_CONTAINER =
+            createElasticsearchContainer();
+
+    private RestHighLevelClient client;
+
+    @BeforeEach
+    public void setUp() {
+        client = createElasticsearchClient();
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        if (client != null) {
+            client.close();
+        }
+    }
+
+    @Test
+    public void testElasticsearchSink() throws Exception {
+        TableId tableId = TableId.tableId("default", "schema", "table");
+        List<Event> events = ElasticsearchTestUtils.createTestEvents(tableId); 
// 使用工具类
+
+        runJobWithEvents(events);
+
+        verifyInsertedData(
+                tableId,
+                "1",
+                1,
+                1.0,
+                "value1",
+                true,
+                (byte) 1,
+                (short) 2,
+                100L,
+                1.0f,
+                new BigDecimal("10.00"),
+                1633024800000L);
+        verifyInsertedData(
+                tableId,
+                "2",
+                2,
+                2.0,
+                "value2",
+                false,
+                (byte) 2,
+                (short) 3,
+                200L,
+                2.0f,
+                new BigDecimal("20.00"),
+                1633111200000L);
+    }
+
+    @Test
+    public void testElasticsearchInsertAndDelete() throws Exception {
+        TableId tableId = TableId.tableId("default", "schema", "table");
+        List<Event> events = 
ElasticsearchTestUtils.createTestEventsWithDelete(tableId); // 使用工具类
+
+        runJobWithEvents(events);
+
+        verifyDeletedData(tableId, "2");
+    }
+
+    @Test
+    public void testElasticsearchAddColumn() throws Exception {
+        TableId tableId = TableId.tableId("default", "schema", "table");
+        List<Event> events = 
ElasticsearchTestUtils.createTestEventsWithAddColumn(tableId); // 使用工具类
+
+        runJobWithEvents(events);
+
+        verifyInsertedDataWithNewColumn(tableId, "3", 3, 3.0, "value3", true);
+    }
+
+    private static ElasticsearchContainer createElasticsearchContainer() {
+        return new ElasticsearchContainer(ELASTICSEARCH_IMAGE)
+                .withEnv("discovery.type", "single-node")
+                .withEnv("xpack.security.enabled", "false")
+                .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
+                .withEnv("logger.org.elasticsearch", "ERROR")
+                .withLogConsumer(new Slf4jLogConsumer(LOG))
+                
.waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(5)));
+    }
+
+    private RestHighLevelClient createElasticsearchClient() {
+        return new RestHighLevelClient(
+                RestClient.builder(
+                        new HttpHost(
+                                ELASTICSEARCH_CONTAINER.getHost(),
+                                ELASTICSEARCH_CONTAINER.getFirstMappedPort(),
+                                "http")));
+    }
+
+    private void runJobWithEvents(List<Event> events) throws Exception {
+        ElasticsearchSinkOptions options = createSinkOptions();
+        StreamExecutionEnvironment env = createStreamExecutionEnvironment();
+        ElasticsearchDataSink<Event> sink =
+                new ElasticsearchDataSink<>(options, ZoneId.systemDefault());
+
+        DataStream<Event> stream = env.fromCollection(events, 
TypeInformation.of(Event.class));
+        Sink<Event> elasticsearchSink = ((FlinkSinkProvider) 
sink.getEventSinkProvider()).getSink();
+        stream.sinkTo(elasticsearchSink);
+
+        env.execute("Elasticsearch Sink Test");
+    }
+
+    private ElasticsearchSinkOptions createSinkOptions() {
+        NetworkConfig networkConfig =
+                new NetworkConfig(
+                        Collections.singletonList(
+                                new org.apache.http.HttpHost(
+                                        ELASTICSEARCH_CONTAINER.getHost(),
+                                        
ELASTICSEARCH_CONTAINER.getFirstMappedPort())),
+                        null,
+                        null,
+                        null,
+                        null,
+                        null);
+
+        return new ElasticsearchSinkOptions(
+                5, 1, 10, 50 * 1024 * 1024, 1000, 10 * 1024 * 1024, 
networkConfig, 7, null, null);
+    }
+
+    private StreamExecutionEnvironment createStreamExecutionEnvironment() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(3000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        return env;
+    }
+
+    private void verifyInsertedData(
+            TableId tableId,
+            String id,
+            int expectedId,
+            double expectedNumber,
+            String expectedName,
+            boolean expectedBool,
+            byte expectedTinyint,
+            short expectedSmallint,
+            long expectedBigint,
+            float expectedFloat,
+            BigDecimal expectedDecimal,
+            long expectedTimestamp)
+            throws Exception {
+        GetRequest getRequest = new GetRequest(tableId.toString()).id(id);
+
+        GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
+
+        LOG.debug("Response source: {}", response.getSource());
+
+        assertThat(response.getSource()).isNotNull();
+        assertThat(((Number) 
response.getSource().get("id")).intValue()).isEqualTo(expectedId);
+        assertThat(((Number) response.getSource().get("number")).doubleValue())
+                .isEqualTo(expectedNumber);
+        assertThat(response.getSource().get("name")).isEqualTo(expectedName);
+        assertThat(response.getSource().get("bool")).isEqualTo(expectedBool);
+        assertThat(((Number) response.getSource().get("tinyint")).byteValue())
+                .isEqualTo(expectedTinyint);
+        assertThat(((Number) 
response.getSource().get("smallint")).shortValue())
+                .isEqualTo(expectedSmallint);
+        assertThat(((Number) response.getSource().get("bigint")).longValue())
+                .isEqualTo(expectedBigint);
+        assertThat(((Number) response.getSource().get("float")).floatValue())
+                .isEqualTo(expectedFloat);
+        assertThat(new 
BigDecimal(response.getSource().get("decimal").toString()))
+                .isEqualTo(expectedDecimal);
+
+        String timestampString = 
response.getSource().get("timestamp").toString();
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss.SSSSSS");
+        LocalDateTime dateTime = LocalDateTime.parse(timestampString, 
formatter);
+        long timestampMillis = 
dateTime.toInstant(ZoneOffset.UTC).toEpochMilli();
+        assertThat(timestampMillis).isEqualTo(expectedTimestamp);
+    }
+
+    private void verifyDeletedData(TableId tableId, String id) throws 
Exception {
+        GetRequest getRequest = new GetRequest(tableId.toString()).id(id);
+        GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
+
+        assertThat(response.isExists()).isFalse();
+    }
+
+    private void verifyInsertedDataWithNewColumn(
+            TableId tableId,
+            String id,
+            int expectedId,
+            double expectedNumber,
+            String expectedName,
+            boolean expectedExtraBool)
+            throws Exception {
+        GetRequest getRequest = new GetRequest(tableId.toString()).id(id);
+        GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
+
+        assertThat(response.getSource()).isNotNull();
+        assertThat(response.getSource().get("id")).isEqualTo(expectedId);
+        
assertThat(response.getSource().get("number")).isEqualTo(expectedNumber);
+        assertThat(response.getSource().get("name")).isEqualTo(expectedName);
+        
assertThat(response.getSource().get("extra_bool")).isEqualTo(expectedExtraBool);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactoryTest.java
index 6195b5f06..32835349d 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactoryTest.java
@@ -1,7 +1,7 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
+ * Licensed to the Apache Software Foundation (ASF) under one或多个
  * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
+ * this work for additional信息 regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
@@ -10,7 +10,7 @@
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express或 implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
@@ -35,8 +35,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.*;
-
 /** Tests for {@link ElasticsearchDataSinkFactory}. */
 public class ElasticsearchDataSinkFactoryTest {
 
@@ -57,16 +55,20 @@ public class ElasticsearchDataSinkFactoryTest {
     @Test
     void testLackRequiredOption() {
         DataSinkFactory sinkFactory = getElasticsearchDataSinkFactory();
-
-        Map<String, String> options = createValidOptions();
-
         List<String> requiredKeys = getRequiredKeys(sinkFactory);
         for (String requiredKey : requiredKeys) {
-            Map<String, String> remainingOptions = new HashMap<>(options);
-            remainingOptions.remove(requiredKey);
-            Configuration conf = Configuration.fromMap(remainingOptions);
-
+            // 创建一个新的配置 Map,包含所有必需选项
+            Map<String, String> options = new HashMap<>(createValidOptions());
+            // 移除当前正在测试的必需选项
+            options.remove(requiredKey);
+            Configuration conf = Configuration.fromMap(options);
+            // 打印日志以确保我们在测试缺少必需选项的情况
+            System.out.println("Testing missing required option: " + 
requiredKey);
+
+            // 添加创建 DataSink 对象的代码
             Assertions.assertThatThrownBy(() -> createDataSink(sinkFactory, 
conf))
+
+                    // Assertions to check for missing required option
                     .isInstanceOf(ValidationException.class)
                     .hasMessageContaining(
                             String.format(
@@ -86,10 +88,14 @@ public class ElasticsearchDataSinkFactoryTest {
                 Configuration.fromMap(
                         ImmutableMap.<String, String>builder()
                                 .put("hosts", "localhost:9200")
-                                .put("index", "test-index")
+                                .put("version", "7")
                                 .put("unsupported_key", "unsupported_value")
                                 .build());
 
+        // 打印日志以确保我们在测试不受支持的选项
+        System.out.println("Testing unsupported option");
+
+        // Assertions to check for unsupported options
         Assertions.assertThatThrownBy(() -> createDataSink(sinkFactory, conf))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
@@ -110,14 +116,18 @@ public class ElasticsearchDataSinkFactoryTest {
                 Configuration.fromMap(
                         ImmutableMap.<String, String>builder()
                                 .put("hosts", "localhost:9200")
-                                .put("index", "test-index")
                                 .put("batch.size.max", "500")
                                 .put("inflight.requests.max", "5")
+                                .put("version", "7") // Added version to the 
test configuration
                                 .build());
 
+        // 打印日志以确保我们在测试带前缀的必需选项
+        System.out.println("Testing prefixed required option");
+
         DataSink dataSink = createDataSink(sinkFactory, conf);
         
Assertions.assertThat(dataSink).isInstanceOf(ElasticsearchDataSink.class);
     }
+
     // Helper methods
 
     private DataSinkFactory getElasticsearchDataSinkFactory() {
@@ -132,14 +142,14 @@ public class ElasticsearchDataSinkFactoryTest {
         return Configuration.fromMap(
                 ImmutableMap.<String, String>builder()
                         .put("hosts", "localhost:9200")
-                        .put("index", "test-index")
+                        .put("version", "7") // Added version to the valid 
configuration
                         .build());
     }
 
     private Map<String, String> createValidOptions() {
         Map<String, String> options = new HashMap<>();
         options.put("hosts", "localhost:9200");
-        options.put("index", "test-index");
+        options.put("version", "7"); // Added version to the valid options
         return options;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java
index 5f9011b12..8772d3fbc 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java
@@ -20,18 +20,12 @@ package org.apache.flink.cdc.connectors.elasticsearch.sink;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.cdc.common.data.DecimalData;
-import org.apache.flink.cdc.common.data.TimestampData;
-import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
-import org.apache.flink.cdc.common.data.binary.BinaryStringData;
-import org.apache.flink.cdc.common.event.*;
-import org.apache.flink.cdc.common.schema.PhysicalColumn;
-import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
-import org.apache.flink.cdc.common.types.DataTypes;
 import 
org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions;
+import 
org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchTestUtils;
 import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig;
-import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
@@ -60,7 +54,6 @@ import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -73,7 +66,7 @@ public class ElasticsearchDataSinkITCaseTest {
 
     private static final Logger LOG =
             LoggerFactory.getLogger(ElasticsearchDataSinkITCaseTest.class);
-    private static final String ELASTICSEARCH_VERSION = "8.12.0";
+    private static final String ELASTICSEARCH_VERSION = "8.12.1";
     private static final DockerImageName ELASTICSEARCH_IMAGE =
             DockerImageName.parse(
                             "docker.elastic.co/elasticsearch/elasticsearch:"
@@ -101,7 +94,7 @@ public class ElasticsearchDataSinkITCaseTest {
     @Test
     public void testElasticsearchSink() throws Exception {
         TableId tableId = TableId.tableId("default", "schema", "table");
-        List<Event> events = createTestEvents(tableId);
+        List<Event> events = ElasticsearchTestUtils.createTestEvents(tableId); 
// 使用工具类
 
         runJobWithEvents(events);
 
@@ -136,7 +129,7 @@ public class ElasticsearchDataSinkITCaseTest {
     @Test
     public void testElasticsearchInsertAndDelete() throws Exception {
         TableId tableId = TableId.tableId("default", "schema", "table");
-        List<Event> events = createTestEventsWithDelete(tableId);
+        List<Event> events = 
ElasticsearchTestUtils.createTestEventsWithDelete(tableId); // 使用工具类
 
         runJobWithEvents(events);
 
@@ -146,7 +139,7 @@ public class ElasticsearchDataSinkITCaseTest {
     @Test
     public void testElasticsearchAddColumn() throws Exception {
         TableId tableId = TableId.tableId("default", "schema", "table");
-        List<Event> events = createTestEventsWithAddColumn(tableId);
+        List<Event> events = 
ElasticsearchTestUtils.createTestEventsWithAddColumn(tableId); // 使用工具类
 
         runJobWithEvents(events);
 
@@ -203,7 +196,7 @@ public class ElasticsearchDataSinkITCaseTest {
                         null);
 
         return new ElasticsearchSinkOptions(
-                5, 1, 10, 50 * 1024 * 1024, 1000, 10 * 1024 * 1024, 
networkConfig);
+                5, 1, 10, 50 * 1024 * 1024, 1000, 10 * 1024 * 1024, 
networkConfig, 8, null, null);
     }
 
     private StreamExecutionEnvironment createStreamExecutionEnvironment() {
@@ -280,131 +273,4 @@ public class ElasticsearchDataSinkITCaseTest {
         assertThat(response.source().get("name")).isEqualTo(expectedName);
         
assertThat(response.source().get("extra_bool")).isEqualTo(expectedExtraBool);
     }
-
-    private List<Event> createTestEvents(TableId tableId) {
-        Schema schema =
-                Schema.newBuilder()
-                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
-                        .column(new PhysicalColumn("number", 
DataTypes.DOUBLE(), null))
-                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(17), null))
-                        .column(new PhysicalColumn("bool", 
DataTypes.BOOLEAN(), null))
-                        .column(new PhysicalColumn("tinyint", 
DataTypes.TINYINT(), null))
-                        .column(new PhysicalColumn("smallint", 
DataTypes.SMALLINT(), null))
-                        .column(new PhysicalColumn("bigint", 
DataTypes.BIGINT(), null))
-                        .column(new PhysicalColumn("float", DataTypes.FLOAT(), 
null))
-                        .column(new PhysicalColumn("decimal", 
DataTypes.DECIMAL(10, 2), null))
-                        .column(new PhysicalColumn("timestamp", 
DataTypes.TIMESTAMP(), null))
-                        .primaryKey("id")
-                        .build();
-
-        BinaryRecordDataGenerator generator =
-                new BinaryRecordDataGenerator(
-                        org.apache.flink.cdc.common.types.RowType.of(
-                                
org.apache.flink.cdc.common.types.DataTypes.INT(),
-                                
org.apache.flink.cdc.common.types.DataTypes.DOUBLE(),
-                                
org.apache.flink.cdc.common.types.DataTypes.STRING(),
-                                
org.apache.flink.cdc.common.types.DataTypes.BOOLEAN(),
-                                
org.apache.flink.cdc.common.types.DataTypes.TINYINT(),
-                                
org.apache.flink.cdc.common.types.DataTypes.SMALLINT(),
-                                
org.apache.flink.cdc.common.types.DataTypes.BIGINT(),
-                                
org.apache.flink.cdc.common.types.DataTypes.FLOAT(),
-                                
org.apache.flink.cdc.common.types.DataTypes.DECIMAL(10, 2),
-                                
org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP()));
-
-        return Arrays.asList(
-                new CreateTableEvent(tableId, schema),
-                DataChangeEvent.insertEvent(
-                        tableId,
-                        generator.generate(
-                                new Object[] {
-                                    1,
-                                    1.0,
-                                    BinaryStringData.fromString("value1"),
-                                    true,
-                                    (byte) 1,
-                                    (short) 2,
-                                    100L,
-                                    1.0f,
-                                    DecimalData.fromBigDecimal(new 
BigDecimal("10.00"), 10, 2),
-                                    TimestampData.fromMillis(1633024800000L)
-                                })),
-                DataChangeEvent.insertEvent(
-                        tableId,
-                        generator.generate(
-                                new Object[] {
-                                    2,
-                                    2.0,
-                                    BinaryStringData.fromString("value2"),
-                                    false,
-                                    (byte) 2,
-                                    (short) 3,
-                                    200L,
-                                    2.0f,
-                                    DecimalData.fromBigDecimal(new 
BigDecimal("20.00"), 10, 2),
-                                    TimestampData.fromMillis(1633111200000L)
-                                })));
-    }
-
-    private List<Event> createTestEventsWithDelete(TableId tableId) {
-        Schema schema =
-                Schema.newBuilder()
-                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
-                        .column(new PhysicalColumn("number", 
DataTypes.DOUBLE(), null))
-                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(17), null))
-                        .primaryKey("id")
-                        .build();
-
-        BinaryRecordDataGenerator generator =
-                new BinaryRecordDataGenerator(
-                        org.apache.flink.cdc.common.types.RowType.of(
-                                
org.apache.flink.cdc.common.types.DataTypes.INT(),
-                                
org.apache.flink.cdc.common.types.DataTypes.DOUBLE(),
-                                
org.apache.flink.cdc.common.types.DataTypes.STRING()));
-
-        BinaryRecordData insertRecord1 =
-                generator.generate(new Object[] {1, 1.0, 
BinaryStringData.fromString("value1")});
-        BinaryRecordData insertRecord2 =
-                generator.generate(new Object[] {2, 2.0, 
BinaryStringData.fromString("value2")});
-        BinaryRecordData deleteRecord =
-                generator.generate(new Object[] {2, 2.0, 
BinaryStringData.fromString("value2")});
-
-        return Arrays.asList(
-                new CreateTableEvent(tableId, schema),
-                DataChangeEvent.insertEvent(tableId, insertRecord1),
-                DataChangeEvent.insertEvent(tableId, insertRecord2),
-                DataChangeEvent.deleteEvent(tableId, deleteRecord));
-    }
-
-    private List<Event> createTestEventsWithAddColumn(TableId tableId) {
-        Schema schema =
-                Schema.newBuilder()
-                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
-                        .column(new PhysicalColumn("number", 
DataTypes.DOUBLE(), null))
-                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(17), null))
-                        .primaryKey("id")
-                        .build();
-
-        BinaryRecordDataGenerator generator =
-                new BinaryRecordDataGenerator(
-                        org.apache.flink.cdc.common.types.RowType.of(
-                                
org.apache.flink.cdc.common.types.DataTypes.INT(),
-                                
org.apache.flink.cdc.common.types.DataTypes.DOUBLE(),
-                                
org.apache.flink.cdc.common.types.DataTypes.STRING(),
-                                
org.apache.flink.cdc.common.types.DataTypes.BOOLEAN()));
-
-        return Arrays.asList(
-                new CreateTableEvent(tableId, schema),
-                new AddColumnEvent(
-                        tableId,
-                        Collections.singletonList(
-                                new AddColumnEvent.ColumnWithPosition(
-                                        new PhysicalColumn(
-                                                "extra_bool", 
DataTypes.BOOLEAN(), null)))),
-                DataChangeEvent.insertEvent(
-                        tableId,
-                        generator.generate(
-                                new Object[] {
-                                    3, 3.0, 
BinaryStringData.fromString("value3"), true
-                                })));
-    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/utils/ElasticsearchTestUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/utils/ElasticsearchTestUtils.java
new file mode 100644
index 000000000..31cd4df38
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/utils/ElasticsearchTestUtils.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.cdc.connectors.elasticsearch.sink.utils;
+
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.PhysicalColumn;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/** Utility class for creating test events for Elasticsearch integration 
tests. */
+public class ElasticsearchTestUtils {
+
+    /**
+     * Creates a list of test events that simulate inserting rows into a table.
+     *
+     * @param tableId the identifier of the table.
+     * @return a list of events simulating inserts into the table.
+     */
+    public static List<Event> createTestEvents(TableId tableId) {
+        Schema schema =
+                Schema.newBuilder()
+                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
+                        .column(new PhysicalColumn("number", 
DataTypes.DOUBLE(), null))
+                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(17), null))
+                        .column(new PhysicalColumn("bool", 
DataTypes.BOOLEAN(), null))
+                        .column(new PhysicalColumn("tinyint", 
DataTypes.TINYINT(), null))
+                        .column(new PhysicalColumn("smallint", 
DataTypes.SMALLINT(), null))
+                        .column(new PhysicalColumn("bigint", 
DataTypes.BIGINT(), null))
+                        .column(new PhysicalColumn("float", DataTypes.FLOAT(), 
null))
+                        .column(new PhysicalColumn("decimal", 
DataTypes.DECIMAL(10, 2), null))
+                        .column(new PhysicalColumn("timestamp", 
DataTypes.TIMESTAMP(), null))
+                        .primaryKey("id")
+                        .build();
+
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(
+                        org.apache.flink.cdc.common.types.RowType.of(
+                                
org.apache.flink.cdc.common.types.DataTypes.INT(),
+                                
org.apache.flink.cdc.common.types.DataTypes.DOUBLE(),
+                                
org.apache.flink.cdc.common.types.DataTypes.STRING(),
+                                
org.apache.flink.cdc.common.types.DataTypes.BOOLEAN(),
+                                
org.apache.flink.cdc.common.types.DataTypes.TINYINT(),
+                                
org.apache.flink.cdc.common.types.DataTypes.SMALLINT(),
+                                
org.apache.flink.cdc.common.types.DataTypes.BIGINT(),
+                                
org.apache.flink.cdc.common.types.DataTypes.FLOAT(),
+                                
org.apache.flink.cdc.common.types.DataTypes.DECIMAL(10, 2),
+                                
org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP()));
+
+        return Arrays.asList(
+                new CreateTableEvent(tableId, schema),
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    1,
+                                    1.0,
+                                    BinaryStringData.fromString("value1"),
+                                    true,
+                                    (byte) 1,
+                                    (short) 2,
+                                    100L,
+                                    1.0f,
+                                    DecimalData.fromBigDecimal(new 
BigDecimal("10.00"), 10, 2),
+                                    TimestampData.fromMillis(1633024800000L)
+                                })),
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    2,
+                                    2.0,
+                                    BinaryStringData.fromString("value2"),
+                                    false,
+                                    (byte) 2,
+                                    (short) 3,
+                                    200L,
+                                    2.0f,
+                                    DecimalData.fromBigDecimal(new 
BigDecimal("20.00"), 10, 2),
+                                    TimestampData.fromMillis(1633111200000L)
+                                })));
+    }
+
+    /**
+     * Creates a list of test events that simulate inserting and then deleting 
a row from a table.
+     *
+     * @param tableId the identifier of the table.
+     * @return a list of events simulating inserts and deletes from the table.
+     */
+    public static List<Event> createTestEventsWithDelete(TableId tableId) {
+        Schema schema =
+                Schema.newBuilder()
+                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
+                        .column(new PhysicalColumn("number", 
DataTypes.DOUBLE(), null))
+                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(17), null))
+                        .primaryKey("id")
+                        .build();
+
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(
+                        org.apache.flink.cdc.common.types.RowType.of(
+                                
org.apache.flink.cdc.common.types.DataTypes.INT(),
+                                
org.apache.flink.cdc.common.types.DataTypes.DOUBLE(),
+                                
org.apache.flink.cdc.common.types.DataTypes.STRING()));
+
+        BinaryRecordData insertRecord1 =
+                generator.generate(new Object[] {1, 1.0, 
BinaryStringData.fromString("value1")});
+        BinaryRecordData insertRecord2 =
+                generator.generate(new Object[] {2, 2.0, 
BinaryStringData.fromString("value2")});
+        BinaryRecordData deleteRecord =
+                generator.generate(new Object[] {2, 2.0, 
BinaryStringData.fromString("value2")});
+
+        return Arrays.asList(
+                new CreateTableEvent(tableId, schema),
+                DataChangeEvent.insertEvent(tableId, insertRecord1),
+                DataChangeEvent.insertEvent(tableId, insertRecord2),
+                DataChangeEvent.deleteEvent(tableId, deleteRecord));
+    }
+
+    /**
+     * Creates a list of test events that simulate adding a column and then 
inserting rows into a
+     * table.
+     *
+     * @param tableId the identifier of the table.
+     * @return a list of events simulating a schema change and inserts into 
the table.
+     */
+    public static List<Event> createTestEventsWithAddColumn(TableId tableId) {
+        Schema schema =
+                Schema.newBuilder()
+                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
+                        .column(new PhysicalColumn("number", 
DataTypes.DOUBLE(), null))
+                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(17), null))
+                        .primaryKey("id")
+                        .build();
+
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(
+                        org.apache.flink.cdc.common.types.RowType.of(
+                                
org.apache.flink.cdc.common.types.DataTypes.INT(),
+                                
org.apache.flink.cdc.common.types.DataTypes.DOUBLE(),
+                                
org.apache.flink.cdc.common.types.DataTypes.STRING(),
+                                
org.apache.flink.cdc.common.types.DataTypes.BOOLEAN()));
+
+        return Arrays.asList(
+                new CreateTableEvent(tableId, schema),
+                new AddColumnEvent(
+                        tableId,
+                        Collections.singletonList(
+                                new AddColumnEvent.ColumnWithPosition(
+                                        new PhysicalColumn(
+                                                "extra_bool", 
DataTypes.BOOLEAN(), null)))),
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    3, 3.0, 
BinaryStringData.fromString("value3"), true
+                                })));
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/resources/testcontainers.properties
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/resources/testcontainers.properties
deleted file mode 100644
index d32577249..000000000
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/resources/testcontainers.properties
+++ /dev/null
@@ -1 +0,0 @@
-ryuk.container.image=testcontainers/ryuk:0.3.3
diff --git a/tools/ci/license_check.rb b/tools/ci/license_check.rb
index ac74c52ca..3ba68da89 100755
--- a/tools/ci/license_check.rb
+++ b/tools/ci/license_check.rb
@@ -64,7 +64,7 @@ QUESTIONABLE_STATEMENTS = [
 ].freeze
 
 # These file extensions are binary-formatted. No check needed.
-BINARY_FILE_EXTENSIONS = %w[.class .dylib .so .dll .gif .ico].freeze
+BINARY_FILE_EXTENSIONS = %w[.class .dylib .so .dll .gif .ico .SF .DSA 
.RSA].freeze
 
 # These packages are licensed under "Weak Copyleft" licenses.
 # According to Apache official guidelines, such software could be
@@ -72,10 +72,13 @@ BINARY_FILE_EXTENSIONS = %w[.class .dylib .so .dll .gif 
.ico].freeze
 # See https://www.apache.org/legal/resolved.html for more details.
 EXCEPTION_PACKAGES = [
   'org/glassfish/jersey/', # dual-licensed under GPL 2 and EPL 2.0
-  'org.glassfish.jersey', # dual-licensed under GPL 2 and EPL 2.0
-  'org.glassfish.hk2', # dual-licensed under GPL 2 and EPL 2.0
-  'javax.ws.rs-api', # dual-licensed under GPL 2 and EPL 2.0
-  'jakarta.ws.rs' # dual-licensed under GPL 2 and EPL 2.0
+  'org.glassfish.jersey',  # dual-licensed under GPL 2 and EPL 2.0
+  'org.glassfish.hk2',     # dual-licensed under GPL 2 and EPL 2.0
+  'javax.ws.rs-api',       # dual-licensed under GPL 2 and EPL 2.0
+  'jakarta.ws.rs',         # dual-licensed under GPL 2 and EPL 2.0
+  'jakarta.json-api',      # dual-licensed under GPL 2 and EPL 2.0
+  'org.eclipse.parsson',   # EPL 2.0
+  'org/eclipse/parsson/'   # EPL 2.0
 ].freeze
 
 puts 'Start license check...'
@@ -110,7 +113,7 @@ def check_jar_license(jar_file)
   Zip::File.open(jar_file) do |jar|
     jar.filter { |e| e.ftype == :file }
        .filter { |e| 
!File.basename(e.name).downcase.end_with?(*BINARY_FILE_EXTENSIONS) }
-       .filter { |e| !File.basename(e.name).downcase.start_with? 'license', 
'dependencies' }
+       .filter { |e| !File.basename(e.name).downcase.start_with? 'license', 
'dependencies', 'notice' }
        .filter { |e| EXCEPTION_PACKAGES.none? { |ex| e.name.include? ex } }
        .map do |e|
          content = e.get_input_stream.read.force_encoding('UTF-8')

Reply via email to