This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3455316981 [Bugfix][Elasticsearch] Fix add column event (#9069)
3455316981 is described below
commit 34553169810d52c29f3176caf9eed525a22fdcd6
Author: hailin0 <[email protected]>
AuthorDate: Mon Apr 7 15:39:04 2025 +0800
[Bugfix][Elasticsearch] Fix add column event (#9069)
---
.../sink/ElasticsearchSinkWriter.java | 16 ++++++++++++-
.../elasticsearch/ElasticsearchSchemaChangeIT.java | 27 +++++++++++++++++++---
2 files changed, 39 insertions(+), 4 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
index 46c49cc4b0..b5da93d37b 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -23,11 +23,14 @@ import
org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
+import
org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventDispatcher;
+import
org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventHandler;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
@@ -66,12 +69,14 @@ public class ElasticsearchSinkWriter
private final int maxBatchSize;
- private final SeaTunnelRowSerializer seaTunnelRowSerializer;
+ private SeaTunnelRowSerializer seaTunnelRowSerializer;
private final List<String> requestEsList;
private EsRestClient esRestClient;
private RetryMaterial retryMaterial;
private static final long DEFAULT_SLEEP_TIME_MS = 200L;
private final IndexInfo indexInfo;
+ private TableSchema tableSchema;
+ private final TableSchemaChangeEventHandler tableSchemaChangeEventHandler;
public ElasticsearchSinkWriter(
Context context,
@@ -94,6 +99,8 @@ public class ElasticsearchSinkWriter
this.requestEsList = new ArrayList<>(maxBatchSize);
this.retryMaterial =
new RetryMaterial(maxRetryCount, true, exception -> true,
DEFAULT_SLEEP_TIME_MS);
+ this.tableSchema = catalogTable.getTableSchema();
+ this.tableSchemaChangeEventHandler = new
TableSchemaChangeEventDispatcher();
}
@Override
@@ -120,6 +127,13 @@ public class ElasticsearchSinkWriter
} else {
throw new UnsupportedOperationException("Unsupported alter table
event: " + event);
}
+
+ this.tableSchema =
tableSchemaChangeEventHandler.reset(tableSchema).apply(event);
+ this.seaTunnelRowSerializer =
+ new ElasticsearchRowSerializer(
+ esRestClient.getClusterInfo(),
+ indexInfo,
+ tableSchema.toPhysicalRowDataType());
}
private void applySingleSchemaChangeEvent(SchemaChangeEvent event) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchSchemaChangeIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchSchemaChangeIT.java
index b05cbc098d..d09954fcc1 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchSchemaChangeIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchSchemaChangeIT.java
@@ -192,9 +192,30 @@ public class ElasticsearchSchemaChangeIT extends
TestSuiteBase implements TestRe
this.container.execInContainer(
"bash",
"-c",
- "curl -k -u elastic:elasticsearch
https://localhost:9200/schema_change_index/_count");
- Assertions.assertTrue(
-
indexCountResult.getStdout().contains("\"count\":18"));
+ "curl -k -u elastic:elasticsearch
-H \"Content-Type:application/json\" -d '{ \"from\": 0, \"size\": 10000,
\"query\": { \"match_all\": {}}}'
https://localhost:9200/schema_change_index/_search");
+ log.info("indexCountResult: {}",
indexCountResult.getStdout());
+ ObjectNode jsonNode =
+
JsonUtils.parseObject(indexCountResult.getStdout());
+ JsonNode hits = jsonNode.get("hits");
+ long totalCount =
hits.get("total").get("value").asLong();
+ Assertions.assertEquals(18L, totalCount);
+
+ hits.get("hits")
+ .forEach(
+ hit -> {
+ JsonNode source =
hit.get("_source");
+ int id =
source.get("id").asInt();
+ if (id >= 119 && id <= 127) {
+ Assertions.assertTrue(
+
source.has("add_column1"));
+ Assertions.assertFalse(
+
source.get("add_column1").isNull());
+ Assertions.assertTrue(
+
source.has("add_column2"));
+ Assertions.assertFalse(
+
source.get("add_column2").isNull());
+ }
+ });
});
}