This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new d993c6ae8 [FLINK-38455][elasticsearch][fix] Fix Elasticsearch Missing
required property 'BulkRequest.operations' error (#4270)
d993c6ae8 is described below
commit d993c6ae8afd93e06caa5e2df5e0f7ed475cf024
Author: Jia Fan <[email protected]>
AuthorDate: Wed Feb 25 10:18:32 2026 +0800
[FLINK-38455][elasticsearch][fix] Fix Elasticsearch Missing required
property 'BulkRequest.operations' error (#4270)
---
.../elasticsearch/v2/Elasticsearch8AsyncWriter.java | 10 ++++++++++
.../sink/ElasticsearchDataSinkITCaseTest.java | 8 ++++++++
.../sink/ElasticsearchEventSerializerTest.java | 15 +++++++++++++++
.../sink/utils/ElasticsearchTestUtils.java | 20 ++++++++++++++++++++
4 files changed, 53 insertions(+)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java
index c5fe2f656..86d081697 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java
@@ -123,11 +123,21 @@ public class Elasticsearch8AsyncWriter<InputT> extends
AsyncSinkWriter<InputT, O
LOG.debug("submitRequestEntries with {} items", requestEntries.size());
BulkRequest.Builder br = new BulkRequest.Builder();
+ boolean hasOperations = false;
for (Operation operation : requestEntries) {
if (operation.getBulkOperationVariant() == null) {
continue;
}
br.operations(new
BulkOperation(operation.getBulkOperationVariant()));
+ hasOperations = true;
+ }
+
+ if (!hasOperations) {
+ LOG.debug(
+ "Skipping empty BulkRequest, all {} operation(s) have null
BulkOperationVariant",
+ requestEntries.size());
+ requestResult.accept(Collections.emptyList());
+ return;
}
esClient.bulk(br.build())
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java
index 7b67d18b4..23f16055a 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java
@@ -146,6 +146,14 @@ class ElasticsearchDataSinkITCaseTest {
verifyInsertedDataWithNewColumn(tableId, "3", 3, 3.0, "value3", true);
}
+ @Test
+ void testElasticsearchSinkWithOnlySchemaChangeEvents() throws Exception {
+ TableId tableId = TableId.tableId("default", "schema",
"schema_only_table");
+ List<Event> events =
ElasticsearchTestUtils.createTestEventsWithOnlySchemaChange(tableId);
+
+ runJobWithEvents(events);
+ }
+
private static ElasticsearchContainer createElasticsearchContainer() {
ElasticsearchContainer esContainer = new
ElasticsearchContainer(ELASTICSEARCH_VERSION);
esContainer.withLogConsumer(new Slf4jLogConsumer(LOG));
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java
index 103359724..e5d6b0e2b 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchEventSerializerTest.java
@@ -90,6 +90,21 @@ public class ElasticsearchEventSerializerTest {
assertThat(index).isEqualTo("test$2025-01-01");
}
+ @Test
+ void testSchemaChangeEventReturnsNull() {
+ Schema tableSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn("name",
DataTypes.VARCHAR(255).notNull())
+ .primaryKey("id")
+ .build();
+
+ ElasticsearchEventSerializer serializer =
+ new ElasticsearchEventSerializer(ZoneId.of("UTC"));
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
tableSchema);
+ assertThat(serializer.apply(createTableEvent, new
MockContext())).isNull();
+ }
+
private String getShardingString(Map<TableId, String> shardingKey, String
shardingSeparator) {
RowType rowType =
RowType.of(
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/utils/ElasticsearchTestUtils.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/utils/ElasticsearchTestUtils.java
index 31cd4df38..c76b32bfb 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/utils/ElasticsearchTestUtils.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/utils/ElasticsearchTestUtils.java
@@ -186,4 +186,24 @@ public class ElasticsearchTestUtils {
3, 3.0,
BinaryStringData.fromString("value3"), true
})));
}
+
+ /**
+ * Creates a list of test events that only contain SchemaChangeEvents
(CreateTableEvent) without
+ * any DataChangeEvents. This simulates the scenario where a batch only
has schema change
+ * events, which previously caused "Missing required property
'BulkRequest.operations'" error.
+ *
+ * @param tableId the identifier of the table.
+ * @return a list of events containing only CreateTableEvent.
+ */
+ public static List<Event> createTestEventsWithOnlySchemaChange(TableId
tableId) {
+ Schema schema =
+ Schema.newBuilder()
+ .column(new PhysicalColumn("id",
DataTypes.INT().notNull(), null))
+ .column(new PhysicalColumn("number",
DataTypes.DOUBLE(), null))
+ .column(new PhysicalColumn("name",
DataTypes.VARCHAR(17), null))
+ .primaryKey("id")
+ .build();
+
+ return Collections.singletonList(new CreateTableEvent(tableId,
schema));
+ }
}