This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit 8137f9d746ba296aeff70b0b7e18cdbcce4a142d Author: wuzexian <[email protected]> AuthorDate: Fri Aug 2 09:30:05 2024 +0800 [FLINK-35894][pipeline-connector][es] Support for ElasticSearch 6, 7 versions This closes #3495. --- .github/labeler.yml | 2 + .../pom.xml | 63 ++++- .../config/ElasticsearchSinkOptions.java | 23 +- .../elasticsearch/serializer/ColumnType.java | 46 ---- .../serializer/Elasticsearch6RequestCreator.java | 74 ++++++ .../serializer/ElasticsearchEventSerializer.java | 162 +++++------- .../serializer/ElasticsearchRowConverter.java | 220 +++++++++-------- .../elasticsearch/sink/ElasticsearchDataSink.java | 111 ++++++++- .../sink/ElasticsearchDataSinkFactory.java | 46 +++- .../sink/ElasticsearchDataSinkOptions.java | 12 +- .../sink/ElasticsearchMetadataApplier.java | 45 ---- .../elasticsearch/v2/Elasticsearch8AsyncSink.java | 41 ++- .../v2/Elasticsearch8AsyncSinkBuilder.java | 2 - .../v2/Elasticsearch8AsyncSinkSerializer.java | 25 +- .../v2/Elasticsearch8AsyncWriter.java | 6 - .../connectors/elasticsearch/v2/NetworkConfig.java | 2 - .../cdc/connectors/elasticsearch/v2/Operation.java | 17 +- .../elasticsearch/v2/OperationSerializer.java | 49 +++- .../sink/Elasticsearch6DataSinkITCaseTest.java | 275 +++++++++++++++++++++ .../sink/Elasticsearch7DataSinkITCaseTest.java | 274 ++++++++++++++++++++ .../sink/ElasticsearchDataSinkFactoryTest.java | 42 ++-- .../sink/ElasticsearchDataSinkITCaseTest.java | 150 +---------- .../sink/utils/ElasticsearchTestUtils.java | 189 ++++++++++++++ .../src/test/resources/testcontainers.properties | 1 - tools/ci/license_check.rb | 15 +- 25 files changed, 1374 insertions(+), 518 deletions(-) diff --git a/.github/labeler.yml b/.github/labeler.yml index 20bacf366..267690ec4 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -84,3 +84,5 @@ doris-pipeline-connector: - flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/**/* starrocks-pipeline-connector: - flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/**/* +elasticsearch-pipeline-connector: + - flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/**/* diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml index 157a260c3..d9b5cd4b6 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml @@ -1,3 +1,20 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> @@ -20,31 +37,21 @@ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <elasticsearch.version>8.12.1</elasticsearch.version> <flink.version>1.18.0</flink.version> - <scala.binary.version>2.12</scala.binary.version> + <scala.binary.version>4.0</scala.binary.version> <jackson.version>2.13.2</jackson.version> <surefire.module.config>--add-opens=java.base/java.util=ALL-UNNAMED</surefire.module.config> <testcontainers.version>1.16.0</testcontainers.version> + <flink.connector.elasticsearch.version>4.0-SNAPSHOT</flink.connector.elasticsearch.version> <httpclient.version>4.5.13</httpclient.version> <junit.jupiter.version>5.7.1</junit.jupiter.version> <assertj.version>3.18.1</assertj.version> <junit.version>4.13.2</junit.version> <slf4j.version>1.7.32</slf4j.version> <junit.platform.version>1.10.2</junit.platform.version> - <paimon.version>0.7.0-incubating</paimon.version> - <hadoop.version>2.8.5</hadoop.version> - <hive.version>2.3.9</hive.version> - <mockito.version>3.4.6</mockito.version> <jakarta.json.version>2.0.2</jakarta.json.version> </properties> <dependencies> - <!-- Flink Dependencies --> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> @@ -108,7 +115,27 @@ <scope>test</scope> </dependency> - <!-- Elasticsearch Clients --> + <!-- Elasticsearch 6 --> + <dependency> + <groupId>org.elasticsearch.client</groupId> + <artifactId>elasticsearch-rest-client</artifactId> + <version>${elasticsearch.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-sql-connector-elasticsearch6</artifactId> + <version>${flink.connector.elasticsearch.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-sql-connector-elasticsearch7</artifactId> + <version>${flink.connector.elasticsearch.version}</version> + </dependency> + + + <!-- Elasticsearch 8 Client --> <dependency> <groupId>co.elastic.clients</groupId> <artifactId>elasticsearch-java</artifactId> @@ -204,6 +231,16 @@ </goals> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> </configuration> </execution> </executions> diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java index 17ec76fd4..97e954520 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java @@ -34,6 +34,9 @@ public class ElasticsearchSinkOptions implements Serializable { private final long maxTimeInBufferMS; private final long maxRecordSizeInBytes; private final NetworkConfig networkConfig; + private final int version; + private final String username; + private final String password; /** Constructor for ElasticsearchSinkOptions. */ public ElasticsearchSinkOptions( @@ -43,7 +46,10 @@ public class ElasticsearchSinkOptions implements Serializable { long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, - NetworkConfig networkConfig) { + NetworkConfig networkConfig, + int version, + String username, + String password) { this.maxBatchSize = maxBatchSize; this.maxInFlightRequests = maxInFlightRequests; this.maxBufferedRequests = maxBufferedRequests; @@ -51,6 +57,9 @@ public class ElasticsearchSinkOptions implements Serializable { this.maxTimeInBufferMS = maxTimeInBufferMS; this.maxRecordSizeInBytes = maxRecordSizeInBytes; this.networkConfig = networkConfig; + this.version = version; + this.username = username; + this.password = password; } /** @return the maximum batch size */ @@ -92,4 +101,16 @@ public class ElasticsearchSinkOptions implements Serializable { public List<HttpHost> getHosts() { return networkConfig.getHosts(); } + + public int getVersion() { + return version; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ColumnType.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ColumnType.java deleted file mode 100644 index 1a3721634..000000000 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ColumnType.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.connectors.elasticsearch.serializer; - -/** - * Enumeration of column types supported in the Elasticsearch connector. These types represent the - * various data types that can be used in database columns and are relevant for serialization and - * deserialization processes. - */ -public enum ColumnType { - BOOLEAN, - TINYINT, - SMALLINT, - INTEGER, - BIGINT, - FLOAT, - DOUBLE, - CHAR, - VARCHAR, - BINARY, - VARBINARY, - DECIMAL, - DATE, - TIME_WITHOUT_TIME_ZONE, - TIMESTAMP_WITHOUT_TIME_ZONE, - TIMESTAMP_WITH_LOCAL_TIME_ZONE, - TIMESTAMP_WITH_TIME_ZONE, - ARRAY, - MAP, - ROW -} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/Elasticsearch6RequestCreator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/Elasticsearch6RequestCreator.java new file mode 100644 index 000000000..d2b962b4a --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/Elasticsearch6RequestCreator.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.elasticsearch.serializer; + +import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.delete.DeleteRequest; +import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.index.IndexRequest; +import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.Requests; + +import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation; +import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.util.Map; + +/** + * A utility class that creates Elasticsearch 6.x specific requests. + * + * <p>This class provides methods to create {@link IndexRequest} and {@link DeleteRequest} objects + * that are compatible with Elasticsearch 6.x based on the operations provided. + */ +public class Elasticsearch6RequestCreator { + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * Creates an Elasticsearch 6 IndexRequest. + * + * @param operation IndexOperation object. + * @return IndexRequest object. + */ + public static IndexRequest createIndexRequest(IndexOperation<?> operation) { + // Convert the document to Map<String, Object> + Map<String, Object> documentMap = + objectMapper.convertValue(operation.document(), Map.class); + + // Create and return IndexRequest, ensuring type field is set + return Requests.indexRequest() + .index(operation.index()) + .type("_doc") // Assuming type is "_doc", adjust as necessary + .id(operation.id()) + .source(documentMap); + } + + /** + * Creates an Elasticsearch 6 DeleteRequest. + * + * @param operation DeleteOperation object. + * @return DeleteRequest object. + */ + public static DeleteRequest createDeleteRequest(DeleteOperation operation) { + String index = operation.index(); + String id = operation.id(); + + // Create and return DeleteRequest, ensuring type field is set + return Requests.deleteRequest(index) + .type("_doc") // Assuming type is "_doc", adjust as necessary + .id(id); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java index f60231103..2968babb0 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java @@ -20,10 +20,17 @@ package org.apache.flink.cdc.connectors.elasticsearch.serializer; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.cdc.common.data.RecordData; -import org.apache.flink.cdc.common.event.*; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.OperationType; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; -import org.apache.flink.cdc.common.types.*; import org.apache.flink.cdc.common.utils.Preconditions; import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.connector.base.sink.writer.ElementConverter; @@ -37,12 +44,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.time.ZoneId; import java.time.format.DateTimeFormatter; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -import static org.apache.flink.cdc.common.types.DataTypeChecks.*; - /** A serializer for Event to BulkOperationVariant. */ public class ElasticsearchEventSerializer implements ElementConverter<Event, BulkOperationVariant> { private final ObjectMapper objectMapper = new ObjectMapper(); @@ -69,7 +78,7 @@ public class ElasticsearchEventSerializer implements ElementConverter<Event, Bul public BulkOperationVariant apply(Event event, SinkWriter.Context context) { try { if (event instanceof DataChangeEvent) { - return applyDataChangeEvent((DataChangeEvent) event); + return createBulkOperationVariant((DataChangeEvent) event); } else if (event instanceof SchemaChangeEvent) { IndexOperation<Map<String, Object>> indexOperation = applySchemaChangeEvent((SchemaChangeEvent) event); @@ -86,11 +95,11 @@ public class ElasticsearchEventSerializer implements ElementConverter<Event, Bul private IndexOperation<Map<String, Object>> applySchemaChangeEvent( SchemaChangeEvent schemaChangeEvent) throws IOException { TableId tableId = schemaChangeEvent.tableId(); - if (schemaChangeEvent instanceof CreateTableEvent) { Schema schema = ((CreateTableEvent) schemaChangeEvent).getSchema(); schemaMaps.put(tableId, schema); - return createSchemaIndexOperation(tableId, schema); + // Cache new converters + getOrCreateConverters(tableId, schema); } else if (schemaChangeEvent instanceof AddColumnEvent || schemaChangeEvent instanceof DropColumnEvent || schemaChangeEvent instanceof RenameColumnEvent) { @@ -100,7 +109,8 @@ public class ElasticsearchEventSerializer implements ElementConverter<Event, Bul Schema updatedSchema = SchemaUtils.applySchemaChangeEvent(schemaMaps.get(tableId), schemaChangeEvent); schemaMaps.put(tableId, updatedSchema); - return createSchemaIndexOperation(tableId, updatedSchema); + // Update cached converters + getOrCreateConverters(tableId, updatedSchema); } else { if (!schemaMaps.containsKey(tableId)) { throw new RuntimeException("Schema of " + tableId + " does not exist."); @@ -108,41 +118,27 @@ public class ElasticsearchEventSerializer implements ElementConverter<Event, Bul Schema updatedSchema = SchemaUtils.applySchemaChangeEvent(schemaMaps.get(tableId), schemaChangeEvent); schemaMaps.put(tableId, updatedSchema); + // Update cached converters + getOrCreateConverters(tableId, updatedSchema); } return null; } - private IndexOperation<Map<String, Object>> createSchemaIndexOperation( - TableId tableId, Schema schema) { - Map<String, Object> schemaMap = new HashMap<>(); - schemaMap.put( - "columns", - schema.getColumns().stream() - .map(Column::asSummaryString) - .collect(Collectors.toList())); - schemaMap.put("primaryKeys", schema.primaryKeys()); - schemaMap.put("options", schema.options()); - - return new IndexOperation.Builder<Map<String, Object>>() - .index(tableId.toString()) - .id(tableId.getTableName()) - .document(schemaMap) - .build(); - } - - private BulkOperationVariant applyDataChangeEvent(DataChangeEvent event) + private BulkOperationVariant createBulkOperationVariant(DataChangeEvent event) throws JsonProcessingException { TableId tableId = event.tableId(); Schema schema = schemaMaps.get(tableId); Preconditions.checkNotNull(schema, event.tableId() + " does not exist."); + // Ensure converters are cached + getOrCreateConverters(tableId, schema); Map<String, Object> valueMap; OperationType op = event.op(); - Object[] uniqueId = generateUniqueId( - op == OperationType.DELETE ? event.before() : event.after(), schema); + op == OperationType.DELETE ? event.before() : event.after(), + schema, + tableId); String id = Arrays.stream(uniqueId).map(Object::toString).collect(Collectors.joining("_")); - switch (op) { case INSERT: case REPLACE: @@ -160,67 +156,37 @@ public class ElasticsearchEventSerializer implements ElementConverter<Event, Bul } } - private Object[] generateUniqueId(RecordData recordData, Schema schema) { + private Object[] generateUniqueId(RecordData recordData, Schema schema, TableId tableId) { List<String> primaryKeys = schema.primaryKeys(); - return primaryKeys.stream() - .map( - primaryKey -> { - Column column = - schema.getColumns().stream() - .filter(col -> col.getName().equals(primaryKey)) - .findFirst() - .orElseThrow( - () -> - new IllegalStateException( - "Primary key column not found: " - + primaryKey)); - int index = schema.getColumns().indexOf(column); - return getFieldValue(recordData, column.getType(), index); - }) - .toArray(); - } + List<ElasticsearchRowConverter.SerializationConverter> converters = + converterCache.get(tableId); + Preconditions.checkNotNull(converters, "No converters found for table: " + tableId); - private Object getFieldValue(RecordData recordData, DataType dataType, int index) { - switch (dataType.getTypeRoot()) { - case BOOLEAN: - return recordData.getBoolean(index); - case TINYINT: - return recordData.getByte(index); - case SMALLINT: - return recordData.getShort(index); - case INTEGER: - case DATE: - case TIME_WITHOUT_TIME_ZONE: - return recordData.getInt(index); - case BIGINT: - return recordData.getLong(index); - case FLOAT: - return recordData.getFloat(index); - case DOUBLE: - return recordData.getDouble(index); - case CHAR: - case VARCHAR: - return recordData.getString(index); - case DECIMAL: - return recordData.getDecimal(index, getPrecision(dataType), getScale(dataType)); - case TIMESTAMP_WITHOUT_TIME_ZONE: - return recordData.getTimestamp(index, getPrecision(dataType)); - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - return recordData.getLocalZonedTimestampData(index, getPrecision(dataType)); - case TIMESTAMP_WITH_TIME_ZONE: - return recordData.getZonedTimestamp(index, getPrecision(dataType)); - case BINARY: - case VARBINARY: - return recordData.getBinary(index); - case ARRAY: - return recordData.getArray(index); - case MAP: - return recordData.getMap(index); - case ROW: - return recordData.getRow(index, getFieldCount(dataType)); - default: - throw new IllegalArgumentException("Unsupported type: " + dataType); + Object[] uniqueId = new Object[primaryKeys.size()]; + List<Column> columns = schema.getColumns(); + + for (int i = 0; i < primaryKeys.size(); i++) { + String primaryKey = primaryKeys.get(i); + Column column = null; + int index = -1; + + for (int j = 0; j < columns.size(); j++) { + if (columns.get(j).getName().equals(primaryKey)) { + column = columns.get(j); + index = j; + break; + } + } + + if (column == null) { + throw new IllegalStateException("Primary key column not found: " + primaryKey); + } + + checkIndex(index, converters.size()); + uniqueId[i] = converters.get(index).serialize(index, recordData); } + + return uniqueId; } public Map<String, Object> serializeRecord( @@ -230,37 +196,39 @@ public class ElasticsearchEventSerializer implements ElementConverter<Event, Bul Preconditions.checkState( columns.size() == recordData.getArity(), "Column size does not match the data size."); - List<ElasticsearchRowConverter.SerializationConverter> converters = getOrCreateConverters(tableId, schema); - for (int i = 0; i < recordData.getArity(); i++) { Column column = columns.get(i); + checkIndex(i, converters.size()); Object field = converters.get(i).serialize(i, recordData); record.put(column.getName(), field); } - return record; } private List<ElasticsearchRowConverter.SerializationConverter> getOrCreateConverters( TableId tableId, Schema schema) { - return converterCache.computeIfAbsent( + return converterCache.compute( tableId, - id -> { + (id, existingConverters) -> { List<ElasticsearchRowConverter.SerializationConverter> converters = new ArrayList<>(); for (Column column : schema.getColumns()) { - ColumnType columnType = - ColumnType.valueOf(column.getType().getTypeRoot().name()); converters.add( ElasticsearchRowConverter.createNullableExternalConverter( - columnType, pipelineZoneId)); + column.getType(), pipelineZoneId)); } return converters; }); } + private void checkIndex(int index, int size) { + if (index < 0 || index >= size) { + throw new IndexOutOfBoundsException("Index: " + index + ", Size: " + size); + } + } + @Override public void open(Sink.InitContext context) { ElementConverter.super.open(context); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchRowConverter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchRowConverter.java index 948c012a5..f3c77d7ba 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchRowConverter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchRowConverter.java @@ -17,15 +17,31 @@ package org.apache.flink.cdc.connectors.elasticsearch.serializer; -import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.ArrayData; +import org.apache.flink.cdc.common.data.GenericArrayData; +import org.apache.flink.cdc.common.data.GenericMapData; +import org.apache.flink.cdc.common.data.MapData; import org.apache.flink.cdc.common.data.RecordData; -import org.apache.flink.cdc.common.data.TimestampData; - -import java.time.*; +import org.apache.flink.cdc.common.types.DataField; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypeChecks; +import org.apache.flink.cdc.common.types.DecimalType; +import org.apache.flink.cdc.common.types.RowType; +import org.apache.flink.cdc.common.types.ZonedTimestampType; +import org.apache.flink.elasticsearch6.shaded.com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.time.LocalDate; +import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** Converter class for serializing row data to Elasticsearch compatible formats. */ public class ElasticsearchRowConverter { + private static final ObjectMapper objectMapper = new ObjectMapper(); // Date and time formatters for various temporal types private static final DateTimeFormatter DATE_FORMATTER = @@ -43,7 +59,7 @@ public class ElasticsearchRowConverter { * @return A SerializationConverter that can handle null values. */ public static SerializationConverter createNullableExternalConverter( - ColumnType columnType, java.time.ZoneId zoneId) { + DataType columnType, java.time.ZoneId zoneId) { return wrapIntoNullableExternalConverter(createExternalConverter(columnType, zoneId)); } @@ -71,123 +87,111 @@ public class ElasticsearchRowConverter { * @param zoneId The time zone to use for temporal conversions. * @return A SerializationConverter for the specified column type. */ - static SerializationConverter createExternalConverter( - ColumnType columnType, java.time.ZoneId zoneId) { - switch (columnType) { - // Basic types - case BOOLEAN: - return (pos, data) -> data.getBoolean(pos); - case INTEGER: - return (pos, data) -> data.getInt(pos); - case DOUBLE: - return (pos, data) -> data.getDouble(pos); - case VARCHAR: + static ElasticsearchRowConverter.SerializationConverter createExternalConverter( + DataType columnType, ZoneId zoneId) { + switch (columnType.getTypeRoot()) { case CHAR: + case VARCHAR: return (pos, data) -> data.getString(pos).toString(); - case FLOAT: - return (pos, data) -> data.getFloat(pos); - case BIGINT: - return (pos, data) -> data.getLong(pos); - case TINYINT: - return (pos, data) -> data.getByte(pos); - case SMALLINT: - return (pos, data) -> data.getShort(pos); + case BOOLEAN: + return (pos, data) -> data.getBoolean(pos); case BINARY: case VARBINARY: return (pos, data) -> data.getBinary(pos); - - // Decimal type case DECIMAL: - return (pos, data) -> { - DecimalData decimalData = data.getDecimal(pos, 17, 2); - return decimalData != null ? decimalData.toBigDecimal().toString() : null; - }; - - // Date and time types + final int decimalPrecision = ((DecimalType) columnType).getPrecision(); + final int decimalScale = ((DecimalType) columnType).getScale(); + return (pos, data) -> + data.getDecimal(pos, decimalPrecision, decimalScale) + .toBigDecimal() + .toString(); + case TINYINT: + return (pos, data) -> data.getByte(pos); + case SMALLINT: + return (pos, data) -> data.getShort(pos); + case INTEGER: + return (pos, data) -> data.getInt(pos); + case BIGINT: + return (pos, data) -> data.getLong(pos); + case FLOAT: + return (pos, data) -> data.getFloat(pos); + case DOUBLE: + return (pos, data) -> data.getDouble(pos); case DATE: - return (pos, data) -> { - int days = data.getInt(pos); - LocalDate date = LocalDate.ofEpochDay(days); - return date.format(DATE_FORMATTER); - }; - case TIME_WITHOUT_TIME_ZONE: - return (pos, data) -> { - int milliseconds = data.getInt(pos); - LocalTime time = LocalTime.ofNanoOfDay(milliseconds * 1_000_000L); - return time.format(TIME_FORMATTER); - }; + return (pos, data) -> LocalDate.ofEpochDay(data.getInt(pos)).format(DATE_FORMATTER); case TIMESTAMP_WITHOUT_TIME_ZONE: - return (pos, data) -> { - long milliseconds = data.getTimestamp(pos, 6).getMillisecond(); - LocalDateTime dateTime = - LocalDateTime.ofEpochSecond( - milliseconds / 1000, - (int) (milliseconds % 1000) * 1_000_000, - java.time.ZoneOffset.UTC); - return dateTime.format(DATE_TIME_FORMATTER); - }; + return (pos, data) -> + data.getTimestamp(pos, DataTypeChecks.getPrecision(columnType)) + .toLocalDateTime() + .format(DATE_TIME_FORMATTER); case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - return (pos, data) -> { - try { - if (data.isNullAt(pos)) { - return null; - } - - // Debug logging - System.out.println( - "Processing TIMESTAMP_WITH_LOCAL_TIME_ZONE at position: " + pos); - System.out.println("Data type: " + data.getClass().getName()); - - // Attempt to retrieve timestamp data - long milliseconds; - int nanos = 0; - - try { - // Try using getTimestamp method - TimestampData timestampData = data.getTimestamp(pos, 6); - milliseconds = timestampData.getMillisecond(); - nanos = timestampData.getNanoOfMillisecond(); - } catch (Exception e) { - // Fallback to getLong if getTimestamp fails - milliseconds = data.getLong(pos); - } - - // Create Instant object - Instant instant = - Instant.ofEpochSecond( - milliseconds / 1000, - (milliseconds % 1000) * 1_000_000L + nanos); - - // Format timestamp using UTC timezone - return DateTimeFormatter.ISO_INSTANT.format(instant); - - } catch (Exception e) { - return "ERROR_PROCESSING_TIMESTAMP"; - } - }; + return (pos, data) -> + data.getTimestamp(pos, DataTypeChecks.getPrecision(columnType)) + .toLocalDateTime() + .atZone(zoneId) + .format(DATE_TIME_FORMATTER); case TIMESTAMP_WITH_TIME_ZONE: - return (pos, data) -> { - long milliseconds = data.getTimestamp(pos, 6).getMillisecond(); - LocalDateTime dateTime = - LocalDateTime.ofEpochSecond( - milliseconds / 1000, - (int) (milliseconds % 1000) * 1_000_000, - zoneId.getRules().getOffset(java.time.Instant.now())); - return dateTime.atZone(zoneId).format(DATE_TIME_FORMATTER); - }; - - // Complex types + final int zonedP = ((ZonedTimestampType) columnType).getPrecision(); + return (pos, data) -> + data.getTimestamp(pos, zonedP) + .toTimestamp() + .toInstant() + .atZone(zoneId) + .format(DATE_TIME_FORMATTER); case ARRAY: - return (pos, data) -> data.getArray(pos); + return (pos, data) -> convertArrayData(data.getArray(pos), columnType); case MAP: - return (pos, data) -> data.getMap(pos); + return (pos, data) -> + writeValueAsString(convertMapData(data.getMap(pos), columnType)); case ROW: - return (pos, data) -> { - RecordData rowData = data.getRow(pos, 5); // Assuming 5 fields, adjust as needed - return rowData != null ? rowData.toString() : null; - }; + return (pos, data) -> + writeValueAsString(convertRowData(data, pos, columnType, zoneId)); default: - throw new IllegalArgumentException("Unsupported column type: " + columnType); + throw new UnsupportedOperationException("Unsupported type: " + columnType); + } + } + + private static List<Object> convertArrayData(ArrayData array, DataType type) { + if (array instanceof GenericArrayData) { + return Arrays.asList(((GenericArrayData) array).toObjectArray()); + } + throw new UnsupportedOperationException("Unsupported array data: " + array.getClass()); + } + + private static Object convertMapData(MapData map, DataType type) { + Map<Object, Object> result = new HashMap<>(); + if (map instanceof GenericMapData) { + GenericMapData gMap = (GenericMapData) map; + for (Object key : ((GenericArrayData) gMap.keyArray()).toObjectArray()) { + result.put(key, gMap.get(key)); + } + return result; + } + throw new UnsupportedOperationException("Unsupported map data: " + map.getClass()); + } + + private static Object convertRowData( + RecordData val, int index, DataType type, ZoneId pipelineZoneId) { + RowType rowType = (RowType) type; + Map<String, Object> value = new HashMap<>(); + RecordData row = val.getRow(index, rowType.getFieldCount()); + + List<DataField> fields = rowType.getFields(); + for (int i = 0; i < fields.size(); i++) { + DataField rowField = fields.get(i); + SerializationConverter converter = + createNullableExternalConverter(rowField.getType(), pipelineZoneId); + Object valTmp = converter.serialize(i, row); + value.put(rowField.getName(), valTmp.toString()); + } + return value; + } + + private static String writeValueAsString(Object object) { + try { + return objectMapper.writeValueAsString(object); + } catch (IOException e) { + throw new RuntimeException(e); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java index 3507774f1..53227ef84 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java @@ -23,23 +23,28 @@ import org.apache.flink.cdc.common.sink.EventSinkProvider; import org.apache.flink.cdc.common.sink.FlinkSinkProvider; import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions; +import org.apache.flink.cdc.connectors.elasticsearch.serializer.Elasticsearch6RequestCreator; import org.apache.flink.cdc.connectors.elasticsearch.serializer.ElasticsearchEventSerializer; import org.apache.flink.cdc.connectors.elasticsearch.v2.Elasticsearch8AsyncSinkBuilder; +import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder; -import org.apache.http.HttpHost; +import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant; +import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation; +import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; import java.io.Serializable; import java.time.ZoneId; /** - * A {@link DataSink} implementation for Elasticsearch connector. + * An implementation of {@link DataSink} for writing events to Elasticsearch. * - * @param <InputT> The input type of the sink. + * <p>This class is responsible for configuring and managing the lifecycle of an Elasticsearch sink, + * including handling different versions of Elasticsearch (6, 7, 8). + * + * @param <InputT> The type of input elements that this sink can process. */ public class ElasticsearchDataSink<InputT> implements DataSink, Serializable { - private static final long serialVersionUID = 1L; - /** The Elasticsearch sink options. */ private final ElasticsearchSinkOptions elasticsearchOptions; @@ -59,11 +64,101 @@ public class ElasticsearchDataSink<InputT> implements DataSink, Serializable { @Override public EventSinkProvider getEventSinkProvider() { + switch (elasticsearchOptions.getVersion()) { + case 6: + return getElasticsearch6SinkProvider(); + case 7: + return getElasticsearch7SinkProvider(); + case 8: + return getElasticsearch8SinkProvider(); + default: + throw new IllegalArgumentException( + "Unsupported Elasticsearch version: " + elasticsearchOptions.getVersion()); + } + } + + private EventSinkProvider getElasticsearch6SinkProvider() { + ElasticsearchEventSerializer serializer = new ElasticsearchEventSerializer(zoneId); + org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost[] hosts = + elasticsearchOptions.getHosts().stream() + .map( + host -> + new org.apache.flink.elasticsearch6.shaded.org.apache.http + .HttpHost( + host.getHostName(), + host.getPort(), + host.getSchemeName())) + .toArray( + org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost[] + ::new); + + return FlinkSinkProvider.of( + new Elasticsearch6SinkBuilder<Event>() + .setHosts(hosts) + .setEmitter( + (element, context, indexer) -> { + BulkOperationVariant operation = + serializer.apply(element, context); + if (operation instanceof IndexOperation) { + indexer.add( + Elasticsearch6RequestCreator.createIndexRequest( + (IndexOperation<?>) operation)); + } else if (operation instanceof DeleteOperation) { + indexer.add( + Elasticsearch6RequestCreator.createDeleteRequest( + (DeleteOperation) operation)); + } + }) + .setBulkFlushMaxActions(elasticsearchOptions.getMaxBatchSize()) + .setBulkFlushInterval(elasticsearchOptions.getMaxTimeInBufferMS()) + .build()); + } + + private EventSinkProvider getElasticsearch7SinkProvider() { + ElasticsearchEventSerializer serializer = new ElasticsearchEventSerializer(zoneId); + org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost[] hosts = + elasticsearchOptions.getHosts().stream() + .map( + host -> + new org.apache.flink.elasticsearch6.shaded.org.apache.http + .HttpHost( + host.getHostName(), + host.getPort(), + host.getSchemeName())) + .toArray( + org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost[] + ::new); + + return FlinkSinkProvider.of( + new Elasticsearch6SinkBuilder<Event>() + .setHosts(hosts) + .setEmitter( + (element, context, indexer) -> { + BulkOperationVariant operation = + serializer.apply(element, context); + if (operation instanceof IndexOperation) { + indexer.add( + Elasticsearch6RequestCreator.createIndexRequest( + (IndexOperation<?>) operation)); + } else if (operation instanceof DeleteOperation) { + indexer.add( + Elasticsearch6RequestCreator.createDeleteRequest( + (DeleteOperation) operation)); + } + }) + .setBulkFlushMaxActions(elasticsearchOptions.getMaxBatchSize()) + .setBulkFlushInterval(elasticsearchOptions.getMaxTimeInBufferMS()) + .build()); + } + + private EventSinkProvider getElasticsearch8SinkProvider() { return FlinkSinkProvider.of( new Elasticsearch8AsyncSinkBuilder<Event>() - .setHosts(elasticsearchOptions.getHosts().toArray(new HttpHost[0])) - .setElementConverter( - new ElasticsearchEventSerializer(ZoneId.systemDefault())) + .setHosts( + elasticsearchOptions + .getHosts() + .toArray(new org.apache.http.HttpHost[0])) + .setElementConverter(new ElasticsearchEventSerializer(zoneId)) .setMaxBatchSize(elasticsearchOptions.getMaxBatchSize()) .setMaxInFlightRequests(elasticsearchOptions.getMaxInFlightRequests()) .setMaxBufferedRequests(elasticsearchOptions.getMaxBufferedRequests()) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java index a84c4691a..b8bcf59b9 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java @@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.common.sink.DataSink; import org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions; import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig; +import org.apache.flink.table.api.ValidationException; import org.apache.http.HttpHost; @@ -36,7 +37,16 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; -import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.*; +import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.HOSTS; +import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.MAX_BATCH_SIZE; +import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.MAX_BATCH_SIZE_IN_BYTES; +import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.MAX_BUFFERED_REQUESTS; +import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.MAX_IN_FLIGHT_REQUESTS; +import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.MAX_RECORD_SIZE_IN_BYTES; +import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.MAX_TIME_IN_BUFFER_MS; +import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.PASSWORD; +import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.USERNAME; +import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.VERSION; /** Factory for creating {@link ElasticsearchDataSink}. */ public class ElasticsearchDataSinkFactory implements DataSinkFactory { @@ -45,12 +55,17 @@ public class ElasticsearchDataSinkFactory implements DataSinkFactory { @Override public DataSink createDataSink(Context context) { + // Validate the configuration FactoryHelper.createFactoryHelper(this, context).validate(); + // Get the configuration directly from the context Configuration configuration = Configuration.fromMap(context.getFactoryConfiguration().toMap()); - ZoneId zoneId = determineZoneId(context); + // Validate required options + validateRequiredOptions(configuration); + + ZoneId zoneId = determineZoneId(context); ElasticsearchSinkOptions sinkOptions = buildSinkConnectorOptions(configuration); return new ElasticsearchDataSink(sinkOptions, zoneId); } @@ -69,6 +84,7 @@ public class ElasticsearchDataSinkFactory implements DataSinkFactory { List<HttpHost> hosts = parseHosts(cdcConfig.get(HOSTS)); String username = cdcConfig.get(USERNAME); String password = cdcConfig.get(PASSWORD); + int version = cdcConfig.get(VERSION); NetworkConfig networkConfig = new NetworkConfig(hosts, username, password, null, null, null); return new ElasticsearchSinkOptions( @@ -78,7 +94,10 @@ public class ElasticsearchDataSinkFactory implements DataSinkFactory { cdcConfig.get(MAX_BATCH_SIZE_IN_BYTES), cdcConfig.get(MAX_TIME_IN_BUFFER_MS), cdcConfig.get(MAX_RECORD_SIZE_IN_BYTES), - networkConfig); + networkConfig, + version, + username, + password); } private List<HttpHost> parseHosts(String hostsStr) { @@ -96,7 +115,7 @@ public class ElasticsearchDataSinkFactory implements DataSinkFactory { public Set<ConfigOption<?>> requiredOptions() { Set<ConfigOption<?>> requiredOptions = new HashSet<>(); requiredOptions.add(HOSTS); - requiredOptions.add(INDEX); + requiredOptions.add(VERSION); return requiredOptions; } @@ -113,4 +132,23 @@ public class ElasticsearchDataSinkFactory implements DataSinkFactory { optionalOptions.add(PASSWORD); return optionalOptions; } + + private void validateRequiredOptions(Configuration configuration) { + Set<ConfigOption<?>> missingOptions = new HashSet<>(); + for (ConfigOption<?> option : requiredOptions()) { + if (!configuration.contains(option)) { + missingOptions.add(option); + } + } + if (!missingOptions.isEmpty()) { + throw new ValidationException( + String.format( + "One or more required options are missing.\n\n" + + "Missing required options are:\n\n" + + "%s", + missingOptions.stream() + .map(ConfigOption::key) + .collect(Collectors.joining("\n")))); + } + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java index c5e414527..b26290400 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java @@ -77,12 +77,12 @@ public class ElasticsearchDataSinkOptions { .defaultValue(10L * 1024L * 1024L) .withDescription("The maximum size of a single record in bytes."); - /** The Elasticsearch index name to write to. */ - public static final ConfigOption<String> INDEX = - ConfigOptions.key("index") - .stringType() - .noDefaultValue() - .withDescription("The Elasticsearch index name to write to."); + /** The version of Elasticsearch to connect to. */ + public static final ConfigOption<Integer> VERSION = + ConfigOptions.key("version") + .intType() + .defaultValue(7) + .withDescription("The version of Elasticsearch to connect to (6, 7, or 8)."); /** The username for Elasticsearch authentication. */ public static final ConfigOption<String> USERNAME = diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchMetadataApplier.java deleted file mode 100644 index 9206ed7bc..000000000 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchMetadataApplier.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.connectors.elasticsearch.sink; - -import org.apache.flink.cdc.common.event.SchemaChangeEvent; -import org.apache.flink.cdc.common.sink.MetadataApplier; - -/** - * A metadata applier for Elasticsearch that handles schema change events. This class is responsible - * for applying metadata changes to the Elasticsearch index based on schema change events from the - * CDC source. - */ -public class ElasticsearchMetadataApplier implements MetadataApplier { - - /** - * Applies the given schema change event to the Elasticsearch index. - * - * @param event The schema change event to apply. - */ - @Override - public void applySchemaChange(SchemaChangeEvent event) { - // TODO: Implement the logic to apply schema changes to Elasticsearch - // This might include: - // - Creating new indices - // - Updating existing index mappings - // - Handling column additions or deletions - throw new UnsupportedOperationException( - "Schema change application is not yet implemented for Elasticsearch."); - } -} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSink.java index 960c64ba2..4a237e3a3 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSink.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,7 +15,6 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * */ package org.apache.flink.cdc.connectors.elasticsearch.v2; @@ -34,17 +32,30 @@ import java.util.Collection; import java.util.Collections; /** - * Elasticsearch8AsyncSink Apache Flink's Async Sink that submits Operations into an Elasticsearch - * cluster. + * Elasticsearch8AsyncSink is Apache Flink's Async Sink that submits Operations into an + * Elasticsearch cluster. * - * @param <InputT> type of records that will be converted into {@link Operation} see {@link - * Elasticsearch8AsyncSinkBuilder} on how to construct valid instances + * @param <InputT> type of records that will be converted into {@link Operation}. See {@link + * Elasticsearch8AsyncSinkBuilder} on how to construct valid instances. */ public class Elasticsearch8AsyncSink<InputT> extends AsyncSinkBase<InputT, Operation> { private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch8AsyncSink.class); @VisibleForTesting protected final NetworkConfig networkConfig; + /** + * Constructs an Elasticsearch8AsyncSink. + * + * @param converter the converter that transforms input records to Elasticsearch operations. + * @param maxBatchSize the maximum number of records to be included in a single batch. + * @param maxInFlightRequests the maximum number of in-flight requests. + * @param maxBufferedRequests the maximum number of buffered requests. + * @param maxBatchSizeInBytes the maximum size of a batch in bytes. + * @param maxTimeInBufferMS the maximum time a request can stay in the buffer before being + * flushed. + * @param maxRecordSizeInByte the maximum size of a single record in bytes. + * @param networkConfig the network configuration for Elasticsearch. + */ protected Elasticsearch8AsyncSink( ElementConverter<InputT, Operation> converter, int maxBatchSize, @@ -66,6 +77,12 @@ public class Elasticsearch8AsyncSink<InputT> extends AsyncSinkBase<InputT, Opera this.networkConfig = networkConfig; } + /** + * Creates a new {@link StatefulSinkWriter} for writing elements to Elasticsearch. + * + * @param context the initialization context. + * @return a new instance of {@link Elasticsearch8AsyncWriter}. + */ @Override public StatefulSinkWriter<InputT, BufferedRequestState<Operation>> createWriter( InitContext context) { @@ -82,6 +99,13 @@ public class Elasticsearch8AsyncSink<InputT> extends AsyncSinkBase<InputT, Opera Collections.emptyList()); } + /** + * Restores a {@link StatefulSinkWriter} from a previously saved state. + * + * @param context the initialization context. + * @param recoveredState the recovered state. + * @return a restored instance of {@link Elasticsearch8AsyncWriter}. + */ @Override public StatefulSinkWriter<InputT, BufferedRequestState<Operation>> restoreWriter( InitContext context, Collection<BufferedRequestState<Operation>> recoveredState) { @@ -98,6 +122,11 @@ public class Elasticsearch8AsyncSink<InputT> extends AsyncSinkBase<InputT, Opera recoveredState); } + /** + * Returns the serializer for the writer's state. + * + * @return an instance of {@link Elasticsearch8AsyncSinkSerializer}. + */ @Override public SimpleVersionedSerializer<BufferedRequestState<Operation>> getWriterStateSerializer() { return new Elasticsearch8AsyncSinkSerializer(); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkBuilder.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkBuilder.java index 2e6598e09..17d2d36e5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkBuilder.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkBuilder.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,7 +15,6 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * */ package org.apache.flink.cdc.connectors.elasticsearch.v2; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkSerializer.java index df3520274..efdb74ee9 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkSerializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkSerializer.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,7 +15,6 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * */ package org.apache.flink.cdc.connectors.elasticsearch.v2; @@ -26,19 +24,40 @@ import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerialize import java.io.DataInputStream; import java.io.DataOutputStream; -/** Elasticsearch8AsyncSinkSerializer is used to serialize and deserialize an Operation. */ +/** + * {@code Elasticsearch8AsyncSinkSerializer} is used to serialize and deserialize {@link Operation} + * objects for Elasticsearch 8 async sink. + */ public class Elasticsearch8AsyncSinkSerializer extends AsyncSinkWriterStateSerializer<Operation> { + /** + * Serializes the given {@link Operation} to the provided {@link DataOutputStream}. + * + * @param request the Operation to serialize. + * @param out the DataOutputStream to which the Operation will be serialized. + */ @Override protected void serializeRequestToStream(Operation request, DataOutputStream out) { new OperationSerializer().serialize(request, out); } + /** + * Deserializes an {@link Operation} from the provided {@link DataInputStream}. + * + * @param requestSize the size of the request in bytes. + * @param in the DataInputStream from which the Operation will be deserialized. + * @return the deserialized Operation. + */ @Override protected Operation deserializeRequestFromStream(long requestSize, DataInputStream in) { return new OperationSerializer().deserialize(requestSize, in); } + /** + * Returns the version of this serializer. + * + * @return the version number. + */ @Override public int getVersion() { return 1; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java index 7753d824b..50a606c87 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,7 +15,6 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * */ package org.apache.flink.cdc.connectors.elasticsearch.v2; @@ -124,10 +122,6 @@ public class Elasticsearch8AsyncWriter<InputT> extends AsyncSinkWriter<InputT, O BulkRequest.Builder br = new BulkRequest.Builder(); for (Operation operation : requestEntries) { - // if (operation.getBulkOperationVariant() == null&&requestEntries.size()==1) - // { - // return; // 跳过当前循环迭代 - // } if (operation.getBulkOperationVariant() == null) { continue; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/NetworkConfig.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/NetworkConfig.java index 2a6f4d8a6..a38f210b6 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/NetworkConfig.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/NetworkConfig.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,7 +15,6 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * */ package org.apache.flink.cdc.connectors.elasticsearch.v2; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Operation.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Operation.java index 0d2ba0cca..855f2e3c8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Operation.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Operation.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,7 +15,6 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * */ package org.apache.flink.cdc.connectors.elasticsearch.v2; @@ -26,16 +24,29 @@ import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant; import java.io.Serializable; import java.util.Objects; -/** A single stream element which contains a BulkOperationVariant. */ +/** + * A single stream element which contains a {@link BulkOperationVariant}. This class represents an + * operation that will be performed in an Elasticsearch bulk request. + */ public class Operation implements Serializable { private static final long serialVersionUID = 1L; private final BulkOperationVariant bulkOperationVariant; + /** + * Constructs an Operation with the specified {@link BulkOperationVariant}. + * + * @param bulkOperation the BulkOperationVariant to be wrapped by this Operation. + */ public Operation(BulkOperationVariant bulkOperation) { this.bulkOperationVariant = bulkOperation; } + /** + * Returns the {@link BulkOperationVariant} contained in this Operation. + * + * @return the BulkOperationVariant instance. + */ public BulkOperationVariant getBulkOperationVariant() { return bulkOperationVariant; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/OperationSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/OperationSerializer.java index 127ce11b1..108425136 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/OperationSerializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/OperationSerializer.java @@ -1,16 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.cdc.connectors.elasticsearch.v2; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import org.objenesis.strategy.StdInstantiatorStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; -/** OperationSerializer is responsible for serialization and deserialization of an Operation. */ +/** + * OperationSerializer is responsible for the serialization and deserialization of an {@link + * Operation}. + */ public class OperationSerializer { + + private static final Logger LOG = LoggerFactory.getLogger(OperationSerializer.class); + private final Kryo kryo = new Kryo(); public OperationSerializer() { @@ -18,6 +43,12 @@ public class OperationSerializer { kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); } + /** + * Serializes the given {@link Operation} to the provided {@link DataOutputStream}. + * + * @param request the Operation to serialize + * @param out the DataOutputStream to which the Operation will be serialized + */ public void serialize(Operation request, DataOutputStream out) { try (Output output = new Output(out)) { kryo.writeObjectOrNull(output, request, Operation.class); @@ -25,6 +56,13 @@ public class OperationSerializer { } } + /** + * Deserializes an {@link Operation} from the provided {@link DataInputStream}. + * + * @param requestSize the size of the request in bytes + * @param in the DataInputStream from which the Operation will be deserialized + * @return the deserialized Operation, or null if deserialization fails + */ public Operation deserialize(long requestSize, DataInputStream in) { try (Input input = new Input(in, (int) requestSize)) { if (input.available() > 0) { @@ -33,12 +71,17 @@ public class OperationSerializer { return null; // Skip if input stream is empty } } catch (Exception e) { - // Handle the exception as needed, e.g., log the error - System.err.println("Failed to deserialize Operation: " + e.getMessage()); + LOG.error("Failed to deserialize Operation: {}", e.getMessage(), e); return null; } } + /** + * Calculates the serialized size of the given {@link Operation}. + * + * @param operation the Operation whose size is to be calculated + * @return the size of the serialized Operation in bytes + */ public int size(Operation operation) { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); try (Output output = new Output(byteArrayOutputStream)) { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/Elasticsearch6DataSinkITCaseTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/Elasticsearch6DataSinkITCaseTest.java new file mode 100644 index 000000000..54eb4c54a --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/Elasticsearch6DataSinkITCaseTest.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.cdc.connectors.elasticsearch.sink; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.sink.FlinkSinkProvider; +import org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions; +import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchTestUtils; +import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig; +import org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost; +import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.get.GetRequest; +import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.get.GetResponse; +import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RequestOptions; +import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient; +import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.math.BigDecimal; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** ITCase tests for {@link ElasticsearchDataSink}. */ +@Testcontainers +public class Elasticsearch6DataSinkITCaseTest { + + private static final Logger LOG = + LoggerFactory.getLogger(ElasticsearchDataSinkITCaseTest.class); + private static final String ELASTICSEARCH_VERSION = "6.8.20"; + private static final DockerImageName ELASTICSEARCH_IMAGE = + DockerImageName.parse( + "docker.elastic.co/elasticsearch/elasticsearch:" + + ELASTICSEARCH_VERSION) + .asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch"); + + @Container + private static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = + createElasticsearchContainer(); + + private RestHighLevelClient client; + + @BeforeEach + public void setUp() { + client = createElasticsearchClient(); + } + + @AfterEach + public void tearDown() throws Exception { + if (client != null) { + client.close(); + } + } + + @Test + public void testElasticsearchSink() throws Exception { + TableId tableId = TableId.tableId("default", "schema", "table"); + List<Event> events = ElasticsearchTestUtils.createTestEvents(tableId); // 使用工具类 + + runJobWithEvents(events); + + verifyInsertedData( + tableId, + "1", + 1, + 1.0, + "value1", + true, + (byte) 1, + (short) 2, + 100L, + 1.0f, + new BigDecimal("10.00"), + 1633024800000L); + verifyInsertedData( + tableId, + "2", + 2, + 2.0, + "value2", + false, + (byte) 2, + (short) 3, + 200L, + 2.0f, + new BigDecimal("20.00"), + 1633111200000L); + } + + @Test + public void testElasticsearchInsertAndDelete() throws Exception { + TableId tableId = TableId.tableId("default", "schema", "table"); + List<Event> events = ElasticsearchTestUtils.createTestEventsWithDelete(tableId); // 使用工具类 + + runJobWithEvents(events); + + verifyDeletedData(tableId, "2"); + } + + @Test + public void testElasticsearchAddColumn() throws Exception { + TableId tableId = TableId.tableId("default", "schema", "table"); + List<Event> events = ElasticsearchTestUtils.createTestEventsWithAddColumn(tableId); // 使用工具类 + + runJobWithEvents(events); + + verifyInsertedDataWithNewColumn(tableId, "3", 3, 3.0, "value3", true); + } + + private static ElasticsearchContainer createElasticsearchContainer() { + return new ElasticsearchContainer(ELASTICSEARCH_IMAGE) + .withEnv("discovery.type", "single-node") + .withEnv("xpack.security.enabled", "false") + .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g") + .withEnv("logger.org.elasticsearch", "ERROR") + .withLogConsumer(new Slf4jLogConsumer(LOG)) + .waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(5))); + } + + private RestHighLevelClient createElasticsearchClient() { + return new RestHighLevelClient( + RestClient.builder( + new HttpHost( + ELASTICSEARCH_CONTAINER.getHost(), + ELASTICSEARCH_CONTAINER.getFirstMappedPort(), + "http"))); + } + + private void runJobWithEvents(List<Event> events) throws Exception { + ElasticsearchSinkOptions options = createSinkOptions(); + StreamExecutionEnvironment env = createStreamExecutionEnvironment(); + ElasticsearchDataSink<Event> sink = + new ElasticsearchDataSink<>(options, ZoneId.systemDefault()); + + DataStream<Event> stream = env.fromCollection(events, TypeInformation.of(Event.class)); + Sink<Event> elasticsearchSink = ((FlinkSinkProvider) sink.getEventSinkProvider()).getSink(); + stream.sinkTo(elasticsearchSink); + + env.execute("Elasticsearch Sink Test"); + } + + private ElasticsearchSinkOptions createSinkOptions() { + + NetworkConfig networkConfig = + new NetworkConfig( + Collections.singletonList( + new org.apache.http.HttpHost( + ELASTICSEARCH_CONTAINER.getHost(), + ELASTICSEARCH_CONTAINER.getFirstMappedPort())), + null, + null, + null, + null, + null); + + return new ElasticsearchSinkOptions( + 5, 1, 10, 50 * 1024 * 1024, 1000, 10 * 1024 * 1024, networkConfig, 6, null, null); + } + + private StreamExecutionEnvironment createStreamExecutionEnvironment() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.enableCheckpointing(3000); + env.setRestartStrategy(RestartStrategies.noRestart()); + return env; + } + + private void verifyInsertedData( + TableId tableId, + String id, + int expectedId, + double expectedNumber, + String expectedName, + boolean expectedBool, + byte expectedTinyint, + short expectedSmallint, + long expectedBigint, + float expectedFloat, + BigDecimal expectedDecimal, + long expectedTimestamp) + throws Exception { + GetRequest getRequest = new GetRequest(tableId.toString()).id(id); + + GetResponse response = client.get(getRequest, RequestOptions.DEFAULT); + + LOG.debug("Response source: {}", response.getSource()); + + assertThat(response.getSource()).isNotNull(); + assertThat(((Number) response.getSource().get("id")).intValue()).isEqualTo(expectedId); + assertThat(((Number) response.getSource().get("number")).doubleValue()) + .isEqualTo(expectedNumber); + assertThat(response.getSource().get("name")).isEqualTo(expectedName); + assertThat(response.getSource().get("bool")).isEqualTo(expectedBool); + assertThat(((Number) response.getSource().get("tinyint")).byteValue()) + .isEqualTo(expectedTinyint); + assertThat(((Number) response.getSource().get("smallint")).shortValue()) + .isEqualTo(expectedSmallint); + assertThat(((Number) response.getSource().get("bigint")).longValue()) + .isEqualTo(expectedBigint); + assertThat(((Number) response.getSource().get("float")).floatValue()) + .isEqualTo(expectedFloat); + assertThat(new BigDecimal(response.getSource().get("decimal").toString())) + .isEqualTo(expectedDecimal); + + String timestampString = response.getSource().get("timestamp").toString(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); + LocalDateTime dateTime = LocalDateTime.parse(timestampString, formatter); + long timestampMillis = dateTime.toInstant(ZoneOffset.UTC).toEpochMilli(); + assertThat(timestampMillis).isEqualTo(expectedTimestamp); + } + + private void verifyDeletedData(TableId tableId, String id) throws Exception { + GetRequest getRequest = new GetRequest(tableId.toString()).id(id); + GetResponse response = client.get(getRequest, RequestOptions.DEFAULT); + + assertThat(response.isExists()).isFalse(); + } + + private void verifyInsertedDataWithNewColumn( + TableId tableId, + String id, + int expectedId, + double expectedNumber, + String expectedName, + boolean expectedExtraBool) + throws Exception { + GetRequest getRequest = new GetRequest(tableId.toString()).id(id); + GetResponse response = client.get(getRequest, RequestOptions.DEFAULT); + + assertThat(response.getSource()).isNotNull(); + assertThat(response.getSource().get("id")).isEqualTo(expectedId); + assertThat(response.getSource().get("number")).isEqualTo(expectedNumber); + assertThat(response.getSource().get("name")).isEqualTo(expectedName); + assertThat(response.getSource().get("extra_bool")).isEqualTo(expectedExtraBool); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/Elasticsearch7DataSinkITCaseTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/Elasticsearch7DataSinkITCaseTest.java new file mode 100644 index 000000000..fdc79aaf8 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/Elasticsearch7DataSinkITCaseTest.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.cdc.connectors.elasticsearch.sink; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.sink.FlinkSinkProvider; +import org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions; +import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchTestUtils; +import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig; +import org.apache.flink.elasticsearch7.shaded.org.apache.http.HttpHost; +import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.get.GetRequest; +import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.get.GetResponse; +import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RequestOptions; +import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient; +import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.math.BigDecimal; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** ITCase tests for {@link ElasticsearchDataSink}. */ +@Testcontainers +public class Elasticsearch7DataSinkITCaseTest { + + private static final Logger LOG = + LoggerFactory.getLogger(ElasticsearchDataSinkITCaseTest.class); + private static final String ELASTICSEARCH_VERSION = "7.10.2"; + private static final DockerImageName ELASTICSEARCH_IMAGE = + DockerImageName.parse( + "docker.elastic.co/elasticsearch/elasticsearch:" + + ELASTICSEARCH_VERSION) + .asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch"); + + @Container + private static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = + createElasticsearchContainer(); + + private RestHighLevelClient client; + + @BeforeEach + public void setUp() { + client = createElasticsearchClient(); + } + + @AfterEach + public void tearDown() throws Exception { + if (client != null) { + client.close(); + } + } + + @Test + public void testElasticsearchSink() throws Exception { + TableId tableId = TableId.tableId("default", "schema", "table"); + List<Event> events = ElasticsearchTestUtils.createTestEvents(tableId); // 使用工具类 + + runJobWithEvents(events); + + verifyInsertedData( + tableId, + "1", + 1, + 1.0, + "value1", + true, + (byte) 1, + (short) 2, + 100L, + 1.0f, + new BigDecimal("10.00"), + 1633024800000L); + verifyInsertedData( + tableId, + "2", + 2, + 2.0, + "value2", + false, + (byte) 2, + (short) 3, + 200L, + 2.0f, + new BigDecimal("20.00"), + 1633111200000L); + } + + @Test + public void testElasticsearchInsertAndDelete() throws Exception { + TableId tableId = TableId.tableId("default", "schema", "table"); + List<Event> events = ElasticsearchTestUtils.createTestEventsWithDelete(tableId); // 使用工具类 + + runJobWithEvents(events); + + verifyDeletedData(tableId, "2"); + } + + @Test + public void testElasticsearchAddColumn() throws Exception { + TableId tableId = TableId.tableId("default", "schema", "table"); + List<Event> events = ElasticsearchTestUtils.createTestEventsWithAddColumn(tableId); // 使用工具类 + + runJobWithEvents(events); + + verifyInsertedDataWithNewColumn(tableId, "3", 3, 3.0, "value3", true); + } + + private static ElasticsearchContainer createElasticsearchContainer() { + return new ElasticsearchContainer(ELASTICSEARCH_IMAGE) + .withEnv("discovery.type", "single-node") + .withEnv("xpack.security.enabled", "false") + .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g") + .withEnv("logger.org.elasticsearch", "ERROR") + .withLogConsumer(new Slf4jLogConsumer(LOG)) + .waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(5))); + } + + private RestHighLevelClient createElasticsearchClient() { + return new RestHighLevelClient( + RestClient.builder( + new HttpHost( + ELASTICSEARCH_CONTAINER.getHost(), + ELASTICSEARCH_CONTAINER.getFirstMappedPort(), + "http"))); + } + + private void runJobWithEvents(List<Event> events) throws Exception { + ElasticsearchSinkOptions options = createSinkOptions(); + StreamExecutionEnvironment env = createStreamExecutionEnvironment(); + ElasticsearchDataSink<Event> sink = + new ElasticsearchDataSink<>(options, ZoneId.systemDefault()); + + DataStream<Event> stream = env.fromCollection(events, TypeInformation.of(Event.class)); + Sink<Event> elasticsearchSink = ((FlinkSinkProvider) sink.getEventSinkProvider()).getSink(); + stream.sinkTo(elasticsearchSink); + + env.execute("Elasticsearch Sink Test"); + } + + private ElasticsearchSinkOptions createSinkOptions() { + NetworkConfig networkConfig = + new NetworkConfig( + Collections.singletonList( + new org.apache.http.HttpHost( + ELASTICSEARCH_CONTAINER.getHost(), + ELASTICSEARCH_CONTAINER.getFirstMappedPort())), + null, + null, + null, + null, + null); + + return new ElasticsearchSinkOptions( + 5, 1, 10, 50 * 1024 * 1024, 1000, 10 * 1024 * 1024, networkConfig, 7, null, null); + } + + private StreamExecutionEnvironment createStreamExecutionEnvironment() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.enableCheckpointing(3000); + env.setRestartStrategy(RestartStrategies.noRestart()); + return env; + } + + private void verifyInsertedData( + TableId tableId, + String id, + int expectedId, + double expectedNumber, + String expectedName, + boolean expectedBool, + byte expectedTinyint, + short expectedSmallint, + long expectedBigint, + float expectedFloat, + BigDecimal expectedDecimal, + long expectedTimestamp) + throws Exception { + GetRequest getRequest = new GetRequest(tableId.toString()).id(id); + + GetResponse response = client.get(getRequest, RequestOptions.DEFAULT); + + LOG.debug("Response source: {}", response.getSource()); + + assertThat(response.getSource()).isNotNull(); + assertThat(((Number) response.getSource().get("id")).intValue()).isEqualTo(expectedId); + assertThat(((Number) response.getSource().get("number")).doubleValue()) + .isEqualTo(expectedNumber); + assertThat(response.getSource().get("name")).isEqualTo(expectedName); + assertThat(response.getSource().get("bool")).isEqualTo(expectedBool); + assertThat(((Number) response.getSource().get("tinyint")).byteValue()) + .isEqualTo(expectedTinyint); + assertThat(((Number) response.getSource().get("smallint")).shortValue()) + .isEqualTo(expectedSmallint); + assertThat(((Number) response.getSource().get("bigint")).longValue()) + .isEqualTo(expectedBigint); + assertThat(((Number) response.getSource().get("float")).floatValue()) + .isEqualTo(expectedFloat); + assertThat(new BigDecimal(response.getSource().get("decimal").toString())) + .isEqualTo(expectedDecimal); + + String timestampString = response.getSource().get("timestamp").toString(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); + LocalDateTime dateTime = LocalDateTime.parse(timestampString, formatter); + long timestampMillis = dateTime.toInstant(ZoneOffset.UTC).toEpochMilli(); + assertThat(timestampMillis).isEqualTo(expectedTimestamp); + } + + private void verifyDeletedData(TableId tableId, String id) throws Exception { + GetRequest getRequest = new GetRequest(tableId.toString()).id(id); + GetResponse response = client.get(getRequest, RequestOptions.DEFAULT); + + assertThat(response.isExists()).isFalse(); + } + + private void verifyInsertedDataWithNewColumn( + TableId tableId, + String id, + int expectedId, + double expectedNumber, + String expectedName, + boolean expectedExtraBool) + throws Exception { + GetRequest getRequest = new GetRequest(tableId.toString()).id(id); + GetResponse response = client.get(getRequest, RequestOptions.DEFAULT); + + assertThat(response.getSource()).isNotNull(); + assertThat(response.getSource().get("id")).isEqualTo(expectedId); + assertThat(response.getSource().get("number")).isEqualTo(expectedNumber); + assertThat(response.getSource().get("name")).isEqualTo(expectedName); + assertThat(response.getSource().get("extra_bool")).isEqualTo(expectedExtraBool); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactoryTest.java index 6195b5f06..32835349d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactoryTest.java @@ -1,7 +1,7 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more + * Licensed to the Apache Software Foundation (ASF) under one或多个 * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. + * this work for additional信息 regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -10,7 +10,7 @@ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express或 implied. * See the License for the specific language governing permissions and * limitations under the License. */ @@ -35,8 +35,6 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.*; - /** Tests for {@link ElasticsearchDataSinkFactory}. */ public class ElasticsearchDataSinkFactoryTest { @@ -57,16 +55,20 @@ public class ElasticsearchDataSinkFactoryTest { @Test void testLackRequiredOption() { DataSinkFactory sinkFactory = getElasticsearchDataSinkFactory(); - - Map<String, String> options = createValidOptions(); - List<String> requiredKeys = getRequiredKeys(sinkFactory); for (String requiredKey : requiredKeys) { - Map<String, String> remainingOptions = new HashMap<>(options); - remainingOptions.remove(requiredKey); - Configuration conf = Configuration.fromMap(remainingOptions); - + // 创建一个新的配置 Map,包含所有必需选项 + Map<String, String> options = new HashMap<>(createValidOptions()); + // 移除当前正在测试的必需选项 + options.remove(requiredKey); + Configuration conf = Configuration.fromMap(options); + // 打印日志以确保我们在测试缺少必需选项的情况 + System.out.println("Testing missing required option: " + requiredKey); + + // 添加创建 DataSink 对象的代码 Assertions.assertThatThrownBy(() -> createDataSink(sinkFactory, conf)) + + // Assertions to check for missing required option .isInstanceOf(ValidationException.class) .hasMessageContaining( String.format( @@ -86,10 +88,14 @@ public class ElasticsearchDataSinkFactoryTest { Configuration.fromMap( ImmutableMap.<String, String>builder() .put("hosts", "localhost:9200") - .put("index", "test-index") + .put("version", "7") .put("unsupported_key", "unsupported_value") .build()); + // 打印日志以确保我们在测试不受支持的选项 + System.out.println("Testing unsupported option"); + + // Assertions to check for unsupported options Assertions.assertThatThrownBy(() -> createDataSink(sinkFactory, conf)) .isInstanceOf(ValidationException.class) .hasMessageContaining( @@ -110,14 +116,18 @@ public class ElasticsearchDataSinkFactoryTest { Configuration.fromMap( ImmutableMap.<String, String>builder() .put("hosts", "localhost:9200") - .put("index", "test-index") .put("batch.size.max", "500") .put("inflight.requests.max", "5") + .put("version", "7") // Added version to the test configuration .build()); + // 打印日志以确保我们在测试带前缀的必需选项 + System.out.println("Testing prefixed required option"); + DataSink dataSink = createDataSink(sinkFactory, conf); Assertions.assertThat(dataSink).isInstanceOf(ElasticsearchDataSink.class); } + // Helper methods private DataSinkFactory getElasticsearchDataSinkFactory() { @@ -132,14 +142,14 @@ public class ElasticsearchDataSinkFactoryTest { return Configuration.fromMap( ImmutableMap.<String, String>builder() .put("hosts", "localhost:9200") - .put("index", "test-index") + .put("version", "7") // Added version to the valid configuration .build()); } private Map<String, String> createValidOptions() { Map<String, String> options = new HashMap<>(); options.put("hosts", "localhost:9200"); - options.put("index", "test-index"); + options.put("version", "7"); // Added version to the valid options return options; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java index 5f9011b12..8772d3fbc 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java @@ -20,18 +20,12 @@ package org.apache.flink.cdc.connectors.elasticsearch.sink; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.sink2.Sink; -import org.apache.flink.cdc.common.data.DecimalData; -import org.apache.flink.cdc.common.data.TimestampData; -import org.apache.flink.cdc.common.data.binary.BinaryRecordData; -import org.apache.flink.cdc.common.data.binary.BinaryStringData; -import org.apache.flink.cdc.common.event.*; -import org.apache.flink.cdc.common.schema.PhysicalColumn; -import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.sink.FlinkSinkProvider; -import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions; +import org.apache.flink.cdc.connectors.elasticsearch.sink.utils.ElasticsearchTestUtils; import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig; -import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -60,7 +54,6 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -73,7 +66,7 @@ public class ElasticsearchDataSinkITCaseTest { private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchDataSinkITCaseTest.class); - private static final String ELASTICSEARCH_VERSION = "8.12.0"; + private static final String ELASTICSEARCH_VERSION = "8.12.1"; private static final DockerImageName ELASTICSEARCH_IMAGE = DockerImageName.parse( "docker.elastic.co/elasticsearch/elasticsearch:" @@ -101,7 +94,7 @@ public class ElasticsearchDataSinkITCaseTest { @Test public void testElasticsearchSink() throws Exception { TableId tableId = TableId.tableId("default", "schema", "table"); - List<Event> events = createTestEvents(tableId); + List<Event> events = ElasticsearchTestUtils.createTestEvents(tableId); // 使用工具类 runJobWithEvents(events); @@ -136,7 +129,7 @@ public class ElasticsearchDataSinkITCaseTest { @Test public void testElasticsearchInsertAndDelete() throws Exception { TableId tableId = TableId.tableId("default", "schema", "table"); - List<Event> events = createTestEventsWithDelete(tableId); + List<Event> events = ElasticsearchTestUtils.createTestEventsWithDelete(tableId); // 使用工具类 runJobWithEvents(events); @@ -146,7 +139,7 @@ public class ElasticsearchDataSinkITCaseTest { @Test public void testElasticsearchAddColumn() throws Exception { TableId tableId = TableId.tableId("default", "schema", "table"); - List<Event> events = createTestEventsWithAddColumn(tableId); + List<Event> events = ElasticsearchTestUtils.createTestEventsWithAddColumn(tableId); // 使用工具类 runJobWithEvents(events); @@ -203,7 +196,7 @@ public class ElasticsearchDataSinkITCaseTest { null); return new ElasticsearchSinkOptions( - 5, 1, 10, 50 * 1024 * 1024, 1000, 10 * 1024 * 1024, networkConfig); + 5, 1, 10, 50 * 1024 * 1024, 1000, 10 * 1024 * 1024, networkConfig, 8, null, null); } private StreamExecutionEnvironment createStreamExecutionEnvironment() { @@ -280,131 +273,4 @@ public class ElasticsearchDataSinkITCaseTest { assertThat(response.source().get("name")).isEqualTo(expectedName); assertThat(response.source().get("extra_bool")).isEqualTo(expectedExtraBool); } - - private List<Event> createTestEvents(TableId tableId) { - Schema schema = - Schema.newBuilder() - .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) - .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) - .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) - .column(new PhysicalColumn("bool", DataTypes.BOOLEAN(), null)) - .column(new PhysicalColumn("tinyint", DataTypes.TINYINT(), null)) - .column(new PhysicalColumn("smallint", DataTypes.SMALLINT(), null)) - .column(new PhysicalColumn("bigint", DataTypes.BIGINT(), null)) - .column(new PhysicalColumn("float", DataTypes.FLOAT(), null)) - .column(new PhysicalColumn("decimal", DataTypes.DECIMAL(10, 2), null)) - .column(new PhysicalColumn("timestamp", DataTypes.TIMESTAMP(), null)) - .primaryKey("id") - .build(); - - BinaryRecordDataGenerator generator = - new BinaryRecordDataGenerator( - org.apache.flink.cdc.common.types.RowType.of( - org.apache.flink.cdc.common.types.DataTypes.INT(), - org.apache.flink.cdc.common.types.DataTypes.DOUBLE(), - org.apache.flink.cdc.common.types.DataTypes.STRING(), - org.apache.flink.cdc.common.types.DataTypes.BOOLEAN(), - org.apache.flink.cdc.common.types.DataTypes.TINYINT(), - org.apache.flink.cdc.common.types.DataTypes.SMALLINT(), - org.apache.flink.cdc.common.types.DataTypes.BIGINT(), - org.apache.flink.cdc.common.types.DataTypes.FLOAT(), - org.apache.flink.cdc.common.types.DataTypes.DECIMAL(10, 2), - org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP())); - - return Arrays.asList( - new CreateTableEvent(tableId, schema), - DataChangeEvent.insertEvent( - tableId, - generator.generate( - new Object[] { - 1, - 1.0, - BinaryStringData.fromString("value1"), - true, - (byte) 1, - (short) 2, - 100L, - 1.0f, - DecimalData.fromBigDecimal(new BigDecimal("10.00"), 10, 2), - TimestampData.fromMillis(1633024800000L) - })), - DataChangeEvent.insertEvent( - tableId, - generator.generate( - new Object[] { - 2, - 2.0, - BinaryStringData.fromString("value2"), - false, - (byte) 2, - (short) 3, - 200L, - 2.0f, - DecimalData.fromBigDecimal(new BigDecimal("20.00"), 10, 2), - TimestampData.fromMillis(1633111200000L) - }))); - } - - private List<Event> createTestEventsWithDelete(TableId tableId) { - Schema schema = - Schema.newBuilder() - .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) - .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) - .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) - .primaryKey("id") - .build(); - - BinaryRecordDataGenerator generator = - new BinaryRecordDataGenerator( - org.apache.flink.cdc.common.types.RowType.of( - org.apache.flink.cdc.common.types.DataTypes.INT(), - org.apache.flink.cdc.common.types.DataTypes.DOUBLE(), - org.apache.flink.cdc.common.types.DataTypes.STRING())); - - BinaryRecordData insertRecord1 = - generator.generate(new Object[] {1, 1.0, BinaryStringData.fromString("value1")}); - BinaryRecordData insertRecord2 = - generator.generate(new Object[] {2, 2.0, BinaryStringData.fromString("value2")}); - BinaryRecordData deleteRecord = - generator.generate(new Object[] {2, 2.0, BinaryStringData.fromString("value2")}); - - return Arrays.asList( - new CreateTableEvent(tableId, schema), - DataChangeEvent.insertEvent(tableId, insertRecord1), - DataChangeEvent.insertEvent(tableId, insertRecord2), - DataChangeEvent.deleteEvent(tableId, deleteRecord)); - } - - private List<Event> createTestEventsWithAddColumn(TableId tableId) { - Schema schema = - Schema.newBuilder() - .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) - .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) - .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) - .primaryKey("id") - .build(); - - BinaryRecordDataGenerator generator = - new BinaryRecordDataGenerator( - org.apache.flink.cdc.common.types.RowType.of( - org.apache.flink.cdc.common.types.DataTypes.INT(), - org.apache.flink.cdc.common.types.DataTypes.DOUBLE(), - org.apache.flink.cdc.common.types.DataTypes.STRING(), - org.apache.flink.cdc.common.types.DataTypes.BOOLEAN())); - - return Arrays.asList( - new CreateTableEvent(tableId, schema), - new AddColumnEvent( - tableId, - Collections.singletonList( - new AddColumnEvent.ColumnWithPosition( - new PhysicalColumn( - "extra_bool", DataTypes.BOOLEAN(), null)))), - DataChangeEvent.insertEvent( - tableId, - generator.generate( - new Object[] { - 3, 3.0, BinaryStringData.fromString("value3"), true - }))); - } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/utils/ElasticsearchTestUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/utils/ElasticsearchTestUtils.java new file mode 100644 index 000000000..31cd4df38 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/utils/ElasticsearchTestUtils.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.cdc.connectors.elasticsearch.sink.utils; + +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** Utility class for creating test events for Elasticsearch integration tests. */ +public class ElasticsearchTestUtils { + + /** + * Creates a list of test events that simulate inserting rows into a table. + * + * @param tableId the identifier of the table. + * @return a list of events simulating inserts into the table. + */ + public static List<Event> createTestEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .column(new PhysicalColumn("bool", DataTypes.BOOLEAN(), null)) + .column(new PhysicalColumn("tinyint", DataTypes.TINYINT(), null)) + .column(new PhysicalColumn("smallint", DataTypes.SMALLINT(), null)) + .column(new PhysicalColumn("bigint", DataTypes.BIGINT(), null)) + .column(new PhysicalColumn("float", DataTypes.FLOAT(), null)) + .column(new PhysicalColumn("decimal", DataTypes.DECIMAL(10, 2), null)) + .column(new PhysicalColumn("timestamp", DataTypes.TIMESTAMP(), null)) + .primaryKey("id") + .build(); + + BinaryRecordDataGenerator generator = + new BinaryRecordDataGenerator( + org.apache.flink.cdc.common.types.RowType.of( + org.apache.flink.cdc.common.types.DataTypes.INT(), + org.apache.flink.cdc.common.types.DataTypes.DOUBLE(), + org.apache.flink.cdc.common.types.DataTypes.STRING(), + org.apache.flink.cdc.common.types.DataTypes.BOOLEAN(), + org.apache.flink.cdc.common.types.DataTypes.TINYINT(), + org.apache.flink.cdc.common.types.DataTypes.SMALLINT(), + org.apache.flink.cdc.common.types.DataTypes.BIGINT(), + org.apache.flink.cdc.common.types.DataTypes.FLOAT(), + org.apache.flink.cdc.common.types.DataTypes.DECIMAL(10, 2), + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP())); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + 1, + 1.0, + BinaryStringData.fromString("value1"), + true, + (byte) 1, + (short) 2, + 100L, + 1.0f, + DecimalData.fromBigDecimal(new BigDecimal("10.00"), 10, 2), + TimestampData.fromMillis(1633024800000L) + })), + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + 2, + 2.0, + BinaryStringData.fromString("value2"), + false, + (byte) 2, + (short) 3, + 200L, + 2.0f, + DecimalData.fromBigDecimal(new BigDecimal("20.00"), 10, 2), + TimestampData.fromMillis(1633111200000L) + }))); + } + + /** + * Creates a list of test events that simulate inserting and then deleting a row from a table. + * + * @param tableId the identifier of the table. + * @return a list of events simulating inserts and deletes from the table. + */ + public static List<Event> createTestEventsWithDelete(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + BinaryRecordDataGenerator generator = + new BinaryRecordDataGenerator( + org.apache.flink.cdc.common.types.RowType.of( + org.apache.flink.cdc.common.types.DataTypes.INT(), + org.apache.flink.cdc.common.types.DataTypes.DOUBLE(), + org.apache.flink.cdc.common.types.DataTypes.STRING())); + + BinaryRecordData insertRecord1 = + generator.generate(new Object[] {1, 1.0, BinaryStringData.fromString("value1")}); + BinaryRecordData insertRecord2 = + generator.generate(new Object[] {2, 2.0, BinaryStringData.fromString("value2")}); + BinaryRecordData deleteRecord = + generator.generate(new Object[] {2, 2.0, BinaryStringData.fromString("value2")}); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + DataChangeEvent.insertEvent(tableId, insertRecord1), + DataChangeEvent.insertEvent(tableId, insertRecord2), + DataChangeEvent.deleteEvent(tableId, deleteRecord)); + } + + /** + * Creates a list of test events that simulate adding a column and then inserting rows into a + * table. + * + * @param tableId the identifier of the table. + * @return a list of events simulating a schema change and inserts into the table. + */ + public static List<Event> createTestEventsWithAddColumn(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + BinaryRecordDataGenerator generator = + new BinaryRecordDataGenerator( + org.apache.flink.cdc.common.types.RowType.of( + org.apache.flink.cdc.common.types.DataTypes.INT(), + org.apache.flink.cdc.common.types.DataTypes.DOUBLE(), + org.apache.flink.cdc.common.types.DataTypes.STRING(), + org.apache.flink.cdc.common.types.DataTypes.BOOLEAN())); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn( + "extra_bool", DataTypes.BOOLEAN(), null)))), + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + 3, 3.0, BinaryStringData.fromString("value3"), true + }))); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/resources/testcontainers.properties b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/resources/testcontainers.properties deleted file mode 100644 index d32577249..000000000 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/resources/testcontainers.properties +++ /dev/null @@ -1 +0,0 @@ -ryuk.container.image=testcontainers/ryuk:0.3.3 diff --git a/tools/ci/license_check.rb b/tools/ci/license_check.rb index ac74c52ca..3ba68da89 100755 --- a/tools/ci/license_check.rb +++ b/tools/ci/license_check.rb @@ -64,7 +64,7 @@ QUESTIONABLE_STATEMENTS = [ ].freeze # These file extensions are binary-formatted. No check needed. -BINARY_FILE_EXTENSIONS = %w[.class .dylib .so .dll .gif .ico].freeze +BINARY_FILE_EXTENSIONS = %w[.class .dylib .so .dll .gif .ico .SF .DSA .RSA].freeze # These packages are licensed under "Weak Copyleft" licenses. # According to Apache official guidelines, such software could be @@ -72,10 +72,13 @@ BINARY_FILE_EXTENSIONS = %w[.class .dylib .so .dll .gif .ico].freeze # See https://www.apache.org/legal/resolved.html for more details. EXCEPTION_PACKAGES = [ 'org/glassfish/jersey/', # dual-licensed under GPL 2 and EPL 2.0 - 'org.glassfish.jersey', # dual-licensed under GPL 2 and EPL 2.0 - 'org.glassfish.hk2', # dual-licensed under GPL 2 and EPL 2.0 - 'javax.ws.rs-api', # dual-licensed under GPL 2 and EPL 2.0 - 'jakarta.ws.rs' # dual-licensed under GPL 2 and EPL 2.0 + 'org.glassfish.jersey', # dual-licensed under GPL 2 and EPL 2.0 + 'org.glassfish.hk2', # dual-licensed under GPL 2 and EPL 2.0 + 'javax.ws.rs-api', # dual-licensed under GPL 2 and EPL 2.0 + 'jakarta.ws.rs', # dual-licensed under GPL 2 and EPL 2.0 + 'jakarta.json-api', # dual-licensed under GPL 2 and EPL 2.0 + 'org.eclipse.parsson', # EPL 2.0 + 'org/eclipse/parsson/' # EPL 2.0 ].freeze puts 'Start license check...' @@ -110,7 +113,7 @@ def check_jar_license(jar_file) Zip::File.open(jar_file) do |jar| jar.filter { |e| e.ftype == :file } .filter { |e| !File.basename(e.name).downcase.end_with?(*BINARY_FILE_EXTENSIONS) } - .filter { |e| !File.basename(e.name).downcase.start_with? 'license', 'dependencies' } + .filter { |e| !File.basename(e.name).downcase.start_with? 'license', 'dependencies', 'notice' } .filter { |e| EXCEPTION_PACKAGES.none? { |ex| e.name.include? ex } } .map do |e| content = e.get_input_stream.read.force_encoding('UTF-8')
