This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new d993c6ae8 [FLINK-38455][elasticsearch][fix] Fix Elasticsearch Missing 
required property 'BulkRequest.operations' error (#4270)
d993c6ae8 is described below

commit d993c6ae8afd93e06caa5e2df5e0f7ed475cf024
Author: Jia Fan <[email protected]>
AuthorDate: Wed Feb 25 10:18:32 2026 +0800

    [FLINK-38455][elasticsearch][fix] Fix Elasticsearch Missing required 
property 'BulkRequest.operations' error (#4270)
---
 .../elasticsearch/v2/Elasticsearch8AsyncWriter.java  | 10 ++++++++++
 .../sink/ElasticsearchDataSinkITCaseTest.java        |  8 ++++++++
 .../sink/ElasticsearchEventSerializerTest.java       | 15 +++++++++++++++
 .../sink/utils/ElasticsearchTestUtils.java           | 20 ++++++++++++++++++++
 4 files changed, 53 insertions(+)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java
index c5fe2f656..86d081697 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java
@@ -123,11 +123,21 @@ public class Elasticsearch8AsyncWriter<InputT> extends 
AsyncSinkWriter<InputT, O
         LOG.debug("submitRequestEntries with {} items", requestEntries.size());
 
         BulkRequest.Builder br = new BulkRequest.Builder();
+        boolean hasOperations = false;
         for (Operation operation : requestEntries) {
             if (operation.getBulkOperationVariant() == null) {
                 continue;
             }
             br.operations(new 
BulkOperation(operation.getBulkOperationVariant()));
+            hasOperations = true;
+        }
+
+        if (!hasOperations) {
+            LOG.debug(
+                    "Skipping empty BulkRequest, all {} operation(s) have null 
BulkOperationVariant",
+                    requestEntries.size());
+            requestResult.accept(Collections.emptyList());
+            return;
         }
 
         esClient.bulk(br.build())
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java
index 7b67d18b4..23f16055a 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java
@@ -146,6 +146,14 @@ class ElasticsearchDataSinkITCaseTest {
         verifyInsertedDataWithNewColumn(tableId, "3", 3, 3.0, "value3", true);
     }
 
+    @Test
+    void testElasticsearchSinkWithOnlySchemaChangeEvents() throws Exception {
+        TableId tableId = TableId.tableId("default", "schema", 
"schema_only_table");
+        List<Event> events = 
ElasticsearchTestUtils.createTestEventsWithOnlySchemaChange(tableId);
+
+        runJobWithEvents(events);
+    }
+
     private static ElasticsearchContainer createElasticsearchContainer() {
         ElasticsearchContainer esContainer = new 
ElasticsearchContainer(ELASTICSEARCH_VERSION);
         esContainer.withLogConsumer(new Slf4jLogConsumer(LOG));
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java
index 103359724..e5d6b0e2b 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java
@@ -90,6 +90,21 @@ public class ElasticsearchEventSerializerTest {
         assertThat(index).isEqualTo("test$2025-01-01");
     }
 
+    @Test
+    void testSchemaChangeEventReturnsNull() {
+        Schema tableSchema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn("name", 
DataTypes.VARCHAR(255).notNull())
+                        .primaryKey("id")
+                        .build();
+
+        ElasticsearchEventSerializer serializer =
+                new ElasticsearchEventSerializer(ZoneId.of("UTC"));
+        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
tableSchema);
+        assertThat(serializer.apply(createTableEvent, new 
MockContext())).isNull();
+    }
+
     private String getShardingString(Map<TableId, String> shardingKey, String 
shardingSeparator) {
         RowType rowType =
                 RowType.of(
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/utils/ElasticsearchTestUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/utils/ElasticsearchTestUtils.java
index 31cd4df38..c76b32bfb 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/utils/ElasticsearchTestUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/utils/ElasticsearchTestUtils.java
@@ -186,4 +186,24 @@ public class ElasticsearchTestUtils {
                                     3, 3.0, 
BinaryStringData.fromString("value3"), true
                                 })));
     }
+
+    /**
+     * Creates a list of test events that only contain SchemaChangeEvents 
(CreateTableEvent) without
+     * any DataChangeEvents. This simulates the scenario where a batch only 
has schema change
+     * events, which previously caused "Missing required property 
'BulkRequest.operations'" error.
+     *
+     * @param tableId the identifier of the table.
+     * @return a list of events containing only CreateTableEvent.
+     */
+    public static List<Event> createTestEventsWithOnlySchemaChange(TableId 
tableId) {
+        Schema schema =
+                Schema.newBuilder()
+                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
+                        .column(new PhysicalColumn("number", 
DataTypes.DOUBLE(), null))
+                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(17), null))
+                        .primaryKey("id")
+                        .build();
+
+        return Collections.singletonList(new CreateTableEvent(tableId, 
schema));
+    }
 }

Reply via email to