This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit ac14dba9fba67452cd28405b3f59d79d7440fe98 Author: wuzexian <[email protected]> AuthorDate: Fri Jul 12 16:59:18 2024 +0800 [FLINK-35894][pipeline-connector][es] Introduce Elasticsearch Sink Connector for Flink CDC Pipeline --- .../pom.xml | 222 +++++++++++ .../config/ElasticsearchSinkOptions.java | 95 +++++ .../elasticsearch/serializer/ColumnType.java | 46 +++ .../serializer/ElasticsearchEventSerializer.java | 268 ++++++++++++++ .../serializer/ElasticsearchRowConverter.java | 205 +++++++++++ .../elasticsearch/sink/ElasticsearchDataSink.java | 81 ++++ .../sink/ElasticsearchDataSinkFactory.java | 116 ++++++ .../sink/ElasticsearchDataSinkOptions.java | 104 ++++++ .../sink/ElasticsearchMetadataApplier.java | 45 +++ .../elasticsearch/v2/Elasticsearch8AsyncSink.java | 105 ++++++ .../v2/Elasticsearch8AsyncSinkBuilder.java | 258 +++++++++++++ .../v2/Elasticsearch8AsyncSinkSerializer.java | 46 +++ .../v2/Elasticsearch8AsyncWriter.java | 217 +++++++++++ .../connectors/elasticsearch/v2/NetworkConfig.java | 123 +++++++ .../cdc/connectors/elasticsearch/v2/Operation.java | 52 +++ .../elasticsearch/v2/OperationSerializer.java | 50 +++ .../org.apache.flink.cdc.common.factories.Factory | 16 + .../sink/ElasticsearchDataSinkFactoryTest.java | 157 ++++++++ .../sink/ElasticsearchDataSinkITCaseTest.java | 410 +++++++++++++++++++++ .../src/test/resources/log4j2-test.properties | 25 ++ .../src/test/resources/testcontainers.properties | 1 + .../flink-cdc-pipeline-connectors/pom.xml | 1 + 22 files changed, 2643 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml new file mode 100644 index 000000000..157a260c3 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml @@ -0,0 +1,222 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-cdc-pipeline-connectors</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>flink-cdc-pipeline-connector-elasticsearch</artifactId> + <packaging>jar</packaging> + + <name>flink-cdc-pipeline-connector-elasticsearch</name> + <url>http://maven.apache.org</url> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <elasticsearch.version>8.12.1</elasticsearch.version> + <flink.version>1.18.0</flink.version> + <scala.binary.version>2.12</scala.binary.version> + <jackson.version>2.13.2</jackson.version> + <surefire.module.config>--add-opens=java.base/java.util=ALL-UNNAMED</surefire.module.config> + <testcontainers.version>1.16.0</testcontainers.version> + <httpclient.version>4.5.13</httpclient.version> + <junit.jupiter.version>5.7.1</junit.jupiter.version> + <assertj.version>3.18.1</assertj.version> + <junit.version>4.13.2</junit.version> + <slf4j.version>1.7.32</slf4j.version> + <junit.platform.version>1.10.2</junit.platform.version> + <paimon.version>0.7.0-incubating</paimon.version> + <hadoop.version>2.8.5</hadoop.version> + <hive.version>2.3.9</hive.version> + <mockito.version>3.4.6</mockito.version> + <jakarta.json.version>2.0.2</jakarta.json.version> + </properties> + + <dependencies> + <!-- Flink Dependencies --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-base</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-json</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>${httpclient.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-test-utils</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + + <!-- Jackson Databind --> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> + </dependency> + + <!-- Testcontainers --> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>junit-jupiter</artifactId> + <version>${testcontainers.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>elasticsearch</artifactId> + <version>${testcontainers.version}</version> + <scope>test</scope> + </dependency> + + <!-- Elasticsearch Clients --> + <dependency> + <groupId>co.elastic.clients</groupId> + <artifactId>elasticsearch-java</artifactId> + <version>${elasticsearch.version}</version> + </dependency> + + <!-- SLF4J API --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + </dependency> + + <!-- Testing --> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <version>${junit.jupiter.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-engine</artifactId> + <version>${junit.jupiter.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <version>${assertj.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>test</scope> + </dependency> + + <!-- Architecture Tests --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-architecture-tests-test</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-architecture-tests-production</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.platform</groupId> + <artifactId>junit-platform-launcher</artifactId> + <version>${junit.platform.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-cdc-composer</artifactId> + <version>${revision}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>jakarta.json</groupId> + <artifactId>jakarta.json-api</artifactId> + <version>${jakarta.json.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>${jackson.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.2.4</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <createDependencyReducedPom>false</createDependencyReducedPom> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.8.1</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java new file mode 100644 index 000000000..17ec76fd4 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.elasticsearch.config; + +import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig; + +import org.apache.http.HttpHost; + +import java.io.Serializable; +import java.util.List; + +/** Elasticsearch DataSink Options reference {@link ElasticsearchSinkOptions}. */ +public class ElasticsearchSinkOptions implements Serializable { + + private final int maxBatchSize; + private final int maxInFlightRequests; + private final int maxBufferedRequests; + private final long maxBatchSizeInBytes; + private final long maxTimeInBufferMS; + private final long maxRecordSizeInBytes; + private final NetworkConfig networkConfig; + + /** Constructor for ElasticsearchSinkOptions. */ + public ElasticsearchSinkOptions( + int maxBatchSize, + int maxInFlightRequests, + int maxBufferedRequests, + long maxBatchSizeInBytes, + long maxTimeInBufferMS, + long maxRecordSizeInBytes, + NetworkConfig networkConfig) { + this.maxBatchSize = maxBatchSize; + this.maxInFlightRequests = maxInFlightRequests; + this.maxBufferedRequests = maxBufferedRequests; + this.maxBatchSizeInBytes = maxBatchSizeInBytes; + this.maxTimeInBufferMS = maxTimeInBufferMS; + this.maxRecordSizeInBytes = maxRecordSizeInBytes; + this.networkConfig = networkConfig; + } + + /** @return the maximum batch size */ + public int getMaxBatchSize() { + return maxBatchSize; + } + + /** @return the maximum number of in-flight requests */ + public int getMaxInFlightRequests() { + return maxInFlightRequests; + } + + /** @return the maximum number of buffered requests */ + public int getMaxBufferedRequests() { + return maxBufferedRequests; + } + + /** @return the maximum batch size in bytes */ + public long getMaxBatchSizeInBytes() { + return maxBatchSizeInBytes; + } + + /** @return the maximum time in buffer in milliseconds */ + public long getMaxTimeInBufferMS() { + return maxTimeInBufferMS; + } + + /** @return the maximum record size in bytes */ + public long getMaxRecordSizeInBytes() { + return maxRecordSizeInBytes; + } + + /** @return the network configuration */ + public NetworkConfig getNetworkConfig() { + return networkConfig; + } + + /** @return the list of Elasticsearch hosts */ + public List<HttpHost> getHosts() { + return networkConfig.getHosts(); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ColumnType.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ColumnType.java new file mode 100644 index 000000000..1a3721634 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ColumnType.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.elasticsearch.serializer; + +/** + * Enumeration of column types supported in the Elasticsearch connector. These types represent the + * various data types that can be used in database columns and are relevant for serialization and + * deserialization processes. + */ +public enum ColumnType { + BOOLEAN, + TINYINT, + SMALLINT, + INTEGER, + BIGINT, + FLOAT, + DOUBLE, + CHAR, + VARCHAR, + BINARY, + VARBINARY, + DECIMAL, + DATE, + TIME_WITHOUT_TIME_ZONE, + TIMESTAMP_WITHOUT_TIME_ZONE, + TIMESTAMP_WITH_LOCAL_TIME_ZONE, + TIMESTAMP_WITH_TIME_ZONE, + ARRAY, + MAP, + ROW +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java new file mode 100644 index 000000000..f60231103 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.elasticsearch.serializer; + +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.event.*; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.*; +import org.apache.flink.cdc.common.utils.Preconditions; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant; +import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation; +import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.flink.cdc.common.types.DataTypeChecks.*; + +/** A serializer for Event to BulkOperationVariant. */ +public class ElasticsearchEventSerializer implements ElementConverter<Event, BulkOperationVariant> { + private final ObjectMapper objectMapper = new ObjectMapper(); + private final Map<TableId, Schema> schemaMaps = new HashMap<>(); + private final ConcurrentHashMap<TableId, List<ElasticsearchRowConverter.SerializationConverter>> + converterCache = new ConcurrentHashMap<>(); + + /** Format DATE type data. */ + public static final DateTimeFormatter DATE_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd"); + + /** Format timestamp-related type data. */ + public static final DateTimeFormatter DATE_TIME_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); + + /** ZoneId from pipeline config to support timestamp with local time zone. */ + private final ZoneId pipelineZoneId; + + public ElasticsearchEventSerializer(ZoneId zoneId) { + this.pipelineZoneId = zoneId; + } + + @Override + public BulkOperationVariant apply(Event event, SinkWriter.Context context) { + try { + if (event instanceof DataChangeEvent) { + return applyDataChangeEvent((DataChangeEvent) event); + } else if (event instanceof SchemaChangeEvent) { + IndexOperation<Map<String, Object>> indexOperation = + applySchemaChangeEvent((SchemaChangeEvent) event); + if (indexOperation != null) { + return indexOperation; + } + } + } catch (IOException e) { + throw new RuntimeException("Failed to serialize event", e); + } + return null; + } + + private IndexOperation<Map<String, Object>> applySchemaChangeEvent( + SchemaChangeEvent schemaChangeEvent) throws IOException { + TableId tableId = schemaChangeEvent.tableId(); + + if (schemaChangeEvent instanceof CreateTableEvent) { + Schema schema = ((CreateTableEvent) schemaChangeEvent).getSchema(); + schemaMaps.put(tableId, schema); + return createSchemaIndexOperation(tableId, schema); + } else if (schemaChangeEvent instanceof AddColumnEvent + || schemaChangeEvent instanceof DropColumnEvent + || schemaChangeEvent instanceof RenameColumnEvent) { + if (!schemaMaps.containsKey(tableId)) { + throw new RuntimeException("Schema of " + tableId + " does not exist."); + } + Schema updatedSchema = + SchemaUtils.applySchemaChangeEvent(schemaMaps.get(tableId), schemaChangeEvent); + schemaMaps.put(tableId, updatedSchema); + return createSchemaIndexOperation(tableId, updatedSchema); + } else { + if (!schemaMaps.containsKey(tableId)) { + throw new RuntimeException("Schema of " + tableId + " does not exist."); + } + Schema updatedSchema = + SchemaUtils.applySchemaChangeEvent(schemaMaps.get(tableId), schemaChangeEvent); + schemaMaps.put(tableId, updatedSchema); + } + return null; + } + + private IndexOperation<Map<String, Object>> createSchemaIndexOperation( + TableId tableId, Schema schema) { + Map<String, Object> schemaMap = new HashMap<>(); + schemaMap.put( + "columns", + schema.getColumns().stream() + .map(Column::asSummaryString) + .collect(Collectors.toList())); + schemaMap.put("primaryKeys", schema.primaryKeys()); + schemaMap.put("options", schema.options()); + + return new IndexOperation.Builder<Map<String, Object>>() + .index(tableId.toString()) + .id(tableId.getTableName()) + .document(schemaMap) + .build(); + } + + private BulkOperationVariant applyDataChangeEvent(DataChangeEvent event) + throws JsonProcessingException { + TableId tableId = event.tableId(); + Schema schema = schemaMaps.get(tableId); + Preconditions.checkNotNull(schema, event.tableId() + " does not exist."); + Map<String, Object> valueMap; + OperationType op = event.op(); + + Object[] uniqueId = + generateUniqueId( + op == OperationType.DELETE ? event.before() : event.after(), schema); + String id = Arrays.stream(uniqueId).map(Object::toString).collect(Collectors.joining("_")); + + switch (op) { + case INSERT: + case REPLACE: + case UPDATE: + valueMap = serializeRecord(tableId, event.after(), schema, pipelineZoneId); + return new IndexOperation.Builder<>() + .index(tableId.toString()) + .id(id) + .document(valueMap) + .build(); + case DELETE: + return new DeleteOperation.Builder().index(tableId.toString()).id(id).build(); + default: + throw new UnsupportedOperationException("Unsupported Operation " + op); + } + } + + private Object[] generateUniqueId(RecordData recordData, Schema schema) { + List<String> primaryKeys = schema.primaryKeys(); + return primaryKeys.stream() + .map( + primaryKey -> { + Column column = + schema.getColumns().stream() + .filter(col -> col.getName().equals(primaryKey)) + .findFirst() + .orElseThrow( + () -> + new IllegalStateException( + "Primary key column not found: " + + primaryKey)); + int index = schema.getColumns().indexOf(column); + return getFieldValue(recordData, column.getType(), index); + }) + .toArray(); + } + + private Object getFieldValue(RecordData recordData, DataType dataType, int index) { + switch (dataType.getTypeRoot()) { + case BOOLEAN: + return recordData.getBoolean(index); + case TINYINT: + return recordData.getByte(index); + case SMALLINT: + return recordData.getShort(index); + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + return recordData.getInt(index); + case BIGINT: + return recordData.getLong(index); + case FLOAT: + return recordData.getFloat(index); + case DOUBLE: + return recordData.getDouble(index); + case CHAR: + case VARCHAR: + return recordData.getString(index); + case DECIMAL: + return recordData.getDecimal(index, getPrecision(dataType), getScale(dataType)); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return recordData.getTimestamp(index, getPrecision(dataType)); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return recordData.getLocalZonedTimestampData(index, getPrecision(dataType)); + case TIMESTAMP_WITH_TIME_ZONE: + return recordData.getZonedTimestamp(index, getPrecision(dataType)); + case BINARY: + case VARBINARY: + return recordData.getBinary(index); + case ARRAY: + return recordData.getArray(index); + case MAP: + return recordData.getMap(index); + case ROW: + return recordData.getRow(index, getFieldCount(dataType)); + default: + throw new IllegalArgumentException("Unsupported type: " + dataType); + } + } + + public Map<String, Object> serializeRecord( + TableId tableId, RecordData recordData, Schema schema, ZoneId pipelineZoneId) { + List<Column> columns = schema.getColumns(); + Map<String, Object> record = new HashMap<>(); + Preconditions.checkState( + columns.size() == recordData.getArity(), + "Column size does not match the data size."); + + List<ElasticsearchRowConverter.SerializationConverter> converters = + getOrCreateConverters(tableId, schema); + + for (int i = 0; i < recordData.getArity(); i++) { + Column column = columns.get(i); + Object field = converters.get(i).serialize(i, recordData); + record.put(column.getName(), field); + } + + return record; + } + + private List<ElasticsearchRowConverter.SerializationConverter> getOrCreateConverters( + TableId tableId, Schema schema) { + return converterCache.computeIfAbsent( + tableId, + id -> { + List<ElasticsearchRowConverter.SerializationConverter> converters = + new ArrayList<>(); + for (Column column : schema.getColumns()) { + ColumnType columnType = + ColumnType.valueOf(column.getType().getTypeRoot().name()); + converters.add( + ElasticsearchRowConverter.createNullableExternalConverter( + columnType, pipelineZoneId)); + } + return converters; + }); + } + + @Override + public void open(Sink.InitContext context) { + ElementConverter.super.open(context); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchRowConverter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchRowConverter.java new file mode 100644 index 000000000..948c012a5 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchRowConverter.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.elasticsearch.serializer; + +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.data.TimestampData; + +import java.time.*; +import java.time.format.DateTimeFormatter; + +/** Converter class for serializing row data to Elasticsearch compatible formats. */ +public class ElasticsearchRowConverter { + + // Date and time formatters for various temporal types + private static final DateTimeFormatter DATE_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd"); + private static final DateTimeFormatter TIME_FORMATTER = + DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS"); + private static final DateTimeFormatter DATE_TIME_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); + + /** + * Creates a nullable external converter for the given column type and time zone. + * + * @param columnType The type of the column to convert. + * @param zoneId The time zone to use for temporal conversions. + * @return A SerializationConverter that can handle null values. + */ + public static SerializationConverter createNullableExternalConverter( + ColumnType columnType, java.time.ZoneId zoneId) { + return wrapIntoNullableExternalConverter(createExternalConverter(columnType, zoneId)); + } + + /** + * Wraps a SerializationConverter to handle null values. + * + * @param serializationConverter The original SerializationConverter. + * @return A SerializationConverter that returns null for null input. + */ + static SerializationConverter wrapIntoNullableExternalConverter( + SerializationConverter serializationConverter) { + return (pos, data) -> { + if (data == null || data.isNullAt(pos)) { + return null; + } else { + return serializationConverter.serialize(pos, data); + } + }; + } + + /** + * Creates an external converter for the given column type and time zone. + * + * @param columnType The type of the column to convert. + * @param zoneId The time zone to use for temporal conversions. + * @return A SerializationConverter for the specified column type. + */ + static SerializationConverter createExternalConverter( + ColumnType columnType, java.time.ZoneId zoneId) { + switch (columnType) { + // Basic types + case BOOLEAN: + return (pos, data) -> data.getBoolean(pos); + case INTEGER: + return (pos, data) -> data.getInt(pos); + case DOUBLE: + return (pos, data) -> data.getDouble(pos); + case VARCHAR: + case CHAR: + return (pos, data) -> data.getString(pos).toString(); + case FLOAT: + return (pos, data) -> data.getFloat(pos); + case BIGINT: + return (pos, data) -> data.getLong(pos); + case TINYINT: + return (pos, data) -> data.getByte(pos); + case SMALLINT: + return (pos, data) -> data.getShort(pos); + case BINARY: + case VARBINARY: + return (pos, data) -> data.getBinary(pos); + + // Decimal type + case DECIMAL: + return (pos, data) -> { + DecimalData decimalData = data.getDecimal(pos, 17, 2); + return decimalData != null ? decimalData.toBigDecimal().toString() : null; + }; + + // Date and time types + case DATE: + return (pos, data) -> { + int days = data.getInt(pos); + LocalDate date = LocalDate.ofEpochDay(days); + return date.format(DATE_FORMATTER); + }; + case TIME_WITHOUT_TIME_ZONE: + return (pos, data) -> { + int milliseconds = data.getInt(pos); + LocalTime time = LocalTime.ofNanoOfDay(milliseconds * 1_000_000L); + return time.format(TIME_FORMATTER); + }; + case TIMESTAMP_WITHOUT_TIME_ZONE: + return (pos, data) -> { + long milliseconds = data.getTimestamp(pos, 6).getMillisecond(); + LocalDateTime dateTime = + LocalDateTime.ofEpochSecond( + milliseconds / 1000, + (int) (milliseconds % 1000) * 1_000_000, + java.time.ZoneOffset.UTC); + return dateTime.format(DATE_TIME_FORMATTER); + }; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return (pos, data) -> { + try { + if (data.isNullAt(pos)) { + return null; + } + + // Debug logging + System.out.println( + "Processing TIMESTAMP_WITH_LOCAL_TIME_ZONE at position: " + pos); + System.out.println("Data type: " + data.getClass().getName()); + + // Attempt to retrieve timestamp data + long milliseconds; + int nanos = 0; + + try { + // Try using getTimestamp method + TimestampData timestampData = data.getTimestamp(pos, 6); + milliseconds = timestampData.getMillisecond(); + nanos = timestampData.getNanoOfMillisecond(); + } catch (Exception e) { + // Fallback to getLong if getTimestamp fails + milliseconds = data.getLong(pos); + } + + // Create Instant object + Instant instant = + Instant.ofEpochSecond( + milliseconds / 1000, + (milliseconds % 1000) * 1_000_000L + nanos); + + // Format timestamp using UTC timezone + return DateTimeFormatter.ISO_INSTANT.format(instant); + + } catch (Exception e) { + return "ERROR_PROCESSING_TIMESTAMP"; + } + }; + case TIMESTAMP_WITH_TIME_ZONE: + return (pos, data) -> { + long milliseconds = data.getTimestamp(pos, 6).getMillisecond(); + LocalDateTime dateTime = + LocalDateTime.ofEpochSecond( + milliseconds / 1000, + (int) (milliseconds % 1000) * 1_000_000, + zoneId.getRules().getOffset(java.time.Instant.now())); + return dateTime.atZone(zoneId).format(DATE_TIME_FORMATTER); + }; + + // Complex types + case ARRAY: + return (pos, data) -> data.getArray(pos); + case MAP: + return (pos, data) -> data.getMap(pos); + case ROW: + return (pos, data) -> { + RecordData rowData = data.getRow(pos, 5); // Assuming 5 fields, adjust as needed + return rowData != null ? rowData.toString() : null; + }; + default: + throw new IllegalArgumentException("Unsupported column type: " + columnType); + } + } + + /** Interface for serialization converters. */ + public interface SerializationConverter { + /** + * Serializes a value from the given position in the RecordData. + * + * @param pos The position of the value in the RecordData. + * @param data The RecordData containing the value to serialize. + * @return The serialized object. + */ + Object serialize(int pos, RecordData data); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java new file mode 100644 index 000000000..3507774f1 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.elasticsearch.sink; + +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.common.sink.EventSinkProvider; +import org.apache.flink.cdc.common.sink.FlinkSinkProvider; +import org.apache.flink.cdc.common.sink.MetadataApplier; +import org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions; +import org.apache.flink.cdc.connectors.elasticsearch.serializer.ElasticsearchEventSerializer; +import org.apache.flink.cdc.connectors.elasticsearch.v2.Elasticsearch8AsyncSinkBuilder; + +import org.apache.http.HttpHost; + +import java.io.Serializable; +import java.time.ZoneId; + +/** + * A {@link DataSink} implementation for Elasticsearch connector. + * + * @param <InputT> The input type of the sink. + */ +public class ElasticsearchDataSink<InputT> implements DataSink, Serializable { + + private static final long serialVersionUID = 1L; + + /** The Elasticsearch sink options. */ + private final ElasticsearchSinkOptions elasticsearchOptions; + + /** The time zone ID for handling time-related operations. */ + private final ZoneId zoneId; + + /** + * Constructs an ElasticsearchDataSink with the given options and time zone. + * + * @param elasticsearchOptions The Elasticsearch sink options. + * @param zoneId The time zone ID for handling time-related operations. + */ + public ElasticsearchDataSink(ElasticsearchSinkOptions elasticsearchOptions, ZoneId zoneId) { + this.elasticsearchOptions = elasticsearchOptions; + this.zoneId = zoneId; + } + + @Override + public EventSinkProvider getEventSinkProvider() { + return FlinkSinkProvider.of( + new Elasticsearch8AsyncSinkBuilder<Event>() + .setHosts(elasticsearchOptions.getHosts().toArray(new HttpHost[0])) + .setElementConverter( + new ElasticsearchEventSerializer(ZoneId.systemDefault())) + .setMaxBatchSize(elasticsearchOptions.getMaxBatchSize()) + .setMaxInFlightRequests(elasticsearchOptions.getMaxInFlightRequests()) + .setMaxBufferedRequests(elasticsearchOptions.getMaxBufferedRequests()) + .setMaxBatchSizeInBytes(elasticsearchOptions.getMaxBatchSizeInBytes()) + .setMaxTimeInBufferMS(elasticsearchOptions.getMaxTimeInBufferMS()) + .setMaxRecordSizeInBytes(elasticsearchOptions.getMaxRecordSizeInBytes()) + .build()); + } + + @Override + public MetadataApplier getMetadataApplier() { + // Currently, no metadata application is needed for Elasticsearch + return schemaChangeEvent -> {}; + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java new file mode 100644 index 000000000..a84c4691a --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactory.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.elasticsearch.sink; + +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.factories.DataSinkFactory; +import org.apache.flink.cdc.common.factories.FactoryHelper; +import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions; +import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig; + +import org.apache.http.HttpHost; + +import java.time.ZoneId; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.*; + +/** Factory for creating {@link ElasticsearchDataSink}. */ +public class ElasticsearchDataSinkFactory implements DataSinkFactory { + + public static final String IDENTIFIER = "elasticsearch"; + + @Override + public DataSink createDataSink(Context context) { + FactoryHelper.createFactoryHelper(this, context).validate(); + + Configuration configuration = + Configuration.fromMap(context.getFactoryConfiguration().toMap()); + ZoneId zoneId = determineZoneId(context); + + ElasticsearchSinkOptions sinkOptions = buildSinkConnectorOptions(configuration); + return new ElasticsearchDataSink(sinkOptions, zoneId); + } + + private ZoneId determineZoneId(Context context) { + String configuredZone = + context.getPipelineConfiguration().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE); + String defaultZone = PipelineOptions.PIPELINE_LOCAL_TIME_ZONE.defaultValue(); + + return Objects.equals(configuredZone, defaultZone) + ? ZoneId.systemDefault() + : ZoneId.of(configuredZone); + } + + private ElasticsearchSinkOptions buildSinkConnectorOptions(Configuration cdcConfig) { + List<HttpHost> hosts = parseHosts(cdcConfig.get(HOSTS)); + String username = cdcConfig.get(USERNAME); + String password = cdcConfig.get(PASSWORD); + NetworkConfig networkConfig = + new NetworkConfig(hosts, username, password, null, null, null); + return new ElasticsearchSinkOptions( + cdcConfig.get(MAX_BATCH_SIZE), + cdcConfig.get(MAX_IN_FLIGHT_REQUESTS), + cdcConfig.get(MAX_BUFFERED_REQUESTS), + cdcConfig.get(MAX_BATCH_SIZE_IN_BYTES), + cdcConfig.get(MAX_TIME_IN_BUFFER_MS), + cdcConfig.get(MAX_RECORD_SIZE_IN_BYTES), + networkConfig); + } + + private List<HttpHost> parseHosts(String hostsStr) { + return Arrays.stream(hostsStr.split(",")) + .map(HttpHost::create) + .collect(Collectors.toList()); + } + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + Set<ConfigOption<?>> requiredOptions = new HashSet<>(); + requiredOptions.add(HOSTS); + requiredOptions.add(INDEX); + return requiredOptions; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + Set<ConfigOption<?>> optionalOptions = new HashSet<>(); + optionalOptions.add(MAX_BATCH_SIZE); + optionalOptions.add(MAX_IN_FLIGHT_REQUESTS); + optionalOptions.add(MAX_BUFFERED_REQUESTS); + optionalOptions.add(MAX_BATCH_SIZE_IN_BYTES); + optionalOptions.add(MAX_TIME_IN_BUFFER_MS); + optionalOptions.add(MAX_RECORD_SIZE_IN_BYTES); + optionalOptions.add(USERNAME); + optionalOptions.add(PASSWORD); + return optionalOptions; + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java new file mode 100644 index 000000000..c5e414527 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.elasticsearch.sink; + +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.configuration.ConfigOptions; + +/** Options for the Elasticsearch data sink. */ +public class ElasticsearchDataSinkOptions { + + /** The comma-separated list of Elasticsearch hosts to connect to. */ + public static final ConfigOption<String> HOSTS = + ConfigOptions.key("hosts") + .stringType() + .noDefaultValue() + .withDescription( + "The comma-separated list of Elasticsearch hosts to connect to."); + + /** The maximum number of actions to buffer for each bulk request. */ + public static final ConfigOption<Integer> MAX_BATCH_SIZE = + ConfigOptions.key("batch.size.max") + .intType() + .defaultValue(500) + .withDescription( + "The maximum number of actions to buffer for each bulk request."); + + /** The maximum number of concurrent requests that the sink will try to execute. */ + public static final ConfigOption<Integer> MAX_IN_FLIGHT_REQUESTS = + ConfigOptions.key("inflight.requests.max") + .intType() + .defaultValue(5) + .withDescription( + "The maximum number of concurrent requests that the sink will try to execute."); + + /** The maximum number of requests to keep in the in-memory buffer. */ + public static final ConfigOption<Integer> MAX_BUFFERED_REQUESTS = + ConfigOptions.key("buffered.requests.max") + .intType() + .defaultValue(1000) + .withDescription( + "The maximum number of requests to keep in the in-memory buffer."); + + /** The maximum size of batch requests in bytes. */ + public static final ConfigOption<Long> MAX_BATCH_SIZE_IN_BYTES = + ConfigOptions.key("batch.size.max.bytes") + .longType() + .defaultValue(5L * 1024L * 1024L) + .withDescription("The maximum size of batch requests in bytes."); + + /** The maximum time to wait for incomplete batches before flushing. */ + public static final ConfigOption<Long> MAX_TIME_IN_BUFFER_MS = + ConfigOptions.key("buffer.time.max.ms") + .longType() + .defaultValue(5000L) + .withDescription( + "The maximum time to wait for incomplete batches before flushing."); + + /** The maximum size of a single record in bytes. */ + public static final ConfigOption<Long> MAX_RECORD_SIZE_IN_BYTES = + ConfigOptions.key("record.size.max.bytes") + .longType() + .defaultValue(10L * 1024L * 1024L) + .withDescription("The maximum size of a single record in bytes."); + + /** The Elasticsearch index name to write to. */ + public static final ConfigOption<String> INDEX = + ConfigOptions.key("index") + .stringType() + .noDefaultValue() + .withDescription("The Elasticsearch index name to write to."); + + /** The username for Elasticsearch authentication. */ + public static final ConfigOption<String> USERNAME = + ConfigOptions.key("username") + .stringType() + .noDefaultValue() + .withDescription("The username for Elasticsearch authentication."); + + /** The password for Elasticsearch authentication. */ + public static final ConfigOption<String> PASSWORD = + ConfigOptions.key("password") + .stringType() + .noDefaultValue() + .withDescription("The password for Elasticsearch authentication."); + + private ElasticsearchDataSinkOptions() { + // This class should not be instantiated + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchMetadataApplier.java new file mode 100644 index 000000000..9206ed7bc --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchMetadataApplier.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.elasticsearch.sink; + +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.sink.MetadataApplier; + +/** + * A metadata applier for Elasticsearch that handles schema change events. This class is responsible + * for applying metadata changes to the Elasticsearch index based on schema change events from the + * CDC source. + */ +public class ElasticsearchMetadataApplier implements MetadataApplier { + + /** + * Applies the given schema change event to the Elasticsearch index. + * + * @param event The schema change event to apply. + */ + @Override + public void applySchemaChange(SchemaChangeEvent event) { + // TODO: Implement the logic to apply schema changes to Elasticsearch + // This might include: + // - Creating new indices + // - Updating existing index mappings + // - Handling column additions or deletions + throw new UnsupportedOperationException( + "Schema change application is not yet implemented for Elasticsearch."); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSink.java new file mode 100644 index 000000000..960c64ba2 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSink.java @@ -0,0 +1,105 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.flink.cdc.connectors.elasticsearch.v2; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; + +/** + * Elasticsearch8AsyncSink Apache Flink's Async Sink that submits Operations into an Elasticsearch + * cluster. + * + * @param <InputT> type of records that will be converted into {@link Operation} see {@link + * Elasticsearch8AsyncSinkBuilder} on how to construct valid instances + */ +public class Elasticsearch8AsyncSink<InputT> extends AsyncSinkBase<InputT, Operation> { + private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch8AsyncSink.class); + + @VisibleForTesting protected final NetworkConfig networkConfig; + + protected Elasticsearch8AsyncSink( + ElementConverter<InputT, Operation> converter, + int maxBatchSize, + int maxInFlightRequests, + int maxBufferedRequests, + long maxBatchSizeInBytes, + long maxTimeInBufferMS, + long maxRecordSizeInByte, + NetworkConfig networkConfig) { + super( + converter, + maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInBytes, + maxTimeInBufferMS, + maxRecordSizeInByte); + + this.networkConfig = networkConfig; + } + + @Override + public StatefulSinkWriter<InputT, BufferedRequestState<Operation>> createWriter( + InitContext context) { + return new Elasticsearch8AsyncWriter<>( + getElementConverter(), + context, + getMaxBatchSize(), + getMaxInFlightRequests(), + getMaxBufferedRequests(), + getMaxBatchSizeInBytes(), + getMaxTimeInBufferMS(), + getMaxRecordSizeInBytes(), + networkConfig, + Collections.emptyList()); + } + + @Override + public StatefulSinkWriter<InputT, BufferedRequestState<Operation>> restoreWriter( + InitContext context, Collection<BufferedRequestState<Operation>> recoveredState) { + return new Elasticsearch8AsyncWriter<>( + getElementConverter(), + context, + getMaxBatchSize(), + getMaxInFlightRequests(), + getMaxBufferedRequests(), + getMaxBatchSizeInBytes(), + getMaxTimeInBufferMS(), + getMaxRecordSizeInBytes(), + networkConfig, + recoveredState); + } + + @Override + public SimpleVersionedSerializer<BufferedRequestState<Operation>> getWriterStateSerializer() { + return new Elasticsearch8AsyncSinkSerializer(); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkBuilder.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkBuilder.java new file mode 100644 index 000000000..2e6598e09 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkBuilder.java @@ -0,0 +1,258 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.flink.cdc.connectors.elasticsearch.v2; + +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.util.function.SerializableSupplier; + +import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant; +import co.elastic.clients.transport.TransportUtils; +import org.apache.http.Header; +import org.apache.http.HttpHost; +import org.apache.http.conn.ssl.TrustAllStrategy; +import org.apache.http.ssl.SSLContexts; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; + +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Elasticsearch8AsyncSinkBuilder The builder to construct the Elasticsearch8Sink {@link + * Elasticsearch8AsyncSink}. + * + * @param <InputT> the type of records to be sunk into an Elasticsearch cluster + */ +public class Elasticsearch8AsyncSinkBuilder<InputT> + extends AsyncSinkBaseBuilder<InputT, Operation, Elasticsearch8AsyncSinkBuilder<InputT>> { + + private static final int DEFAULT_MAX_BATCH_SIZE = 500; + private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50; + private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 10_000; + private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 5 * 1024 * 1024; + private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000; + private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 1024 * 1024; + + /** The hosts where the Elasticsearch cluster is reachable. */ + private List<HttpHost> hosts; + + /** The headers to be sent with the requests made to Elasticsearch cluster. */ + private List<Header> headers; + + /** The username to authenticate the connection with the Elasticsearch cluster. */ + private String username; + + /** The password to authenticate the connection with the Elasticsearch cluster. */ + private String password; + + /** + * The element converter that will be called on every stream element to be processed and + * buffered. + */ + private ElementConverter<InputT, BulkOperationVariant> elementConverter; + + private SerializableSupplier<SSLContext> sslContextSupplier; + + private SerializableSupplier<HostnameVerifier> sslHostnameVerifier; + + /** + * setHosts set the hosts where the Elasticsearch cluster is reachable. + * + * @param hosts the hosts address + * @return {@code Elasticsearch8AsyncSinkBuilder} + */ + public Elasticsearch8AsyncSinkBuilder<InputT> setHosts(HttpHost... hosts) { + checkNotNull(hosts); + checkArgument(hosts.length > 0, "Hosts cannot be empty"); + this.hosts = Arrays.asList(hosts); + return this; + } + + /** + * setHeaders set the headers to be sent with the requests made to Elasticsearch cluster.. + * + * @param headers the headers + * @return {@code Elasticsearch8AsyncSinkBuilder} + */ + public Elasticsearch8AsyncSinkBuilder<InputT> setHeaders(Header... headers) { + checkNotNull(hosts); + checkArgument(headers.length > 0, "Headers cannot be empty"); + this.headers = Arrays.asList(headers); + return this; + } + + /** + * Allows to bypass the certificates chain validation and connect to insecure network endpoints + * (for example, servers which use self-signed certificates). + * + * @return this builder + */ + public Elasticsearch8AsyncSinkBuilder<InputT> allowInsecure() { + this.sslContextSupplier = + () -> { + try { + return SSLContexts.custom() + .loadTrustMaterial(TrustAllStrategy.INSTANCE) + .build(); + } catch (final NoSuchAlgorithmException + | KeyStoreException + | KeyManagementException ex) { + throw new IllegalStateException("Unable to create custom SSL context", ex); + } + }; + return this; + } + + /** + * Set the certificate fingerprint to be used to verify the HTTPS connection. + * + * @param certificateFingerprint the certificate fingerprint + * @return {@code Elasticsearch8AsyncSinkBuilder} + */ + public Elasticsearch8AsyncSinkBuilder<InputT> setCertificateFingerprint( + String certificateFingerprint) { + checkNotNull(certificateFingerprint, "certificateFingerprint must not be null"); + this.sslContextSupplier = + () -> TransportUtils.sslContextFromCaFingerprint(certificateFingerprint); + return this; + } + + /** + * Sets the supplier for getting an {@link SSLContext} instance. + * + * @param sslContextSupplier the serializable SSLContext supplier function + * @return this builder + */ + public Elasticsearch8AsyncSinkBuilder<InputT> setSslContextSupplier( + SerializableSupplier<SSLContext> sslContextSupplier) { + this.sslContextSupplier = checkNotNull(sslContextSupplier); + return this; + } + + /** + * Sets the supplier for getting an SSL {@link HostnameVerifier} instance. + * + * @param sslHostnameVerifierSupplier the serializable hostname verifier supplier function + * @return this builder + */ + public Elasticsearch8AsyncSinkBuilder<InputT> setSslHostnameVerifier( + SerializableSupplier<HostnameVerifier> sslHostnameVerifierSupplier) { + this.sslHostnameVerifier = sslHostnameVerifierSupplier; + return this; + } + + /** + * setUsername set the username to authenticate the connection with the Elasticsearch cluster. + * + * @param username the auth username + * @return {@code Elasticsearch8AsyncSinkBuilder} + */ + public Elasticsearch8AsyncSinkBuilder<InputT> setUsername(String username) { + checkNotNull(username, "Username must not be null"); + this.username = username; + return this; + } + + /** + * setPassword set the password to authenticate the connection with the Elasticsearch cluster. + * + * @param password the auth password + * @return {@code Elasticsearch8AsyncSinkBuilder} + */ + public Elasticsearch8AsyncSinkBuilder<InputT> setPassword(String password) { + checkNotNull(password, "Password must not be null"); + this.password = password; + return this; + } + + /** + * setElementConverter set the element converter that will be called at every stream element to + * be processed and buffered. + * + * @param elementConverter elementConverter operation + * @return {@code Elasticsearch8AsyncSinkBuilder} + */ + public Elasticsearch8AsyncSinkBuilder<InputT> setElementConverter( + ElementConverter<InputT, BulkOperationVariant> elementConverter) { + checkNotNull(elementConverter); + this.elementConverter = elementConverter; + return this; + } + + public static <T> Elasticsearch8AsyncSinkBuilder<T> builder() { + return new Elasticsearch8AsyncSinkBuilder<>(); + } + + /** + * Creates an ElasticsearchSink instance. + * + * @return {@link Elasticsearch8AsyncSink} + */ + @Override + public Elasticsearch8AsyncSink<InputT> build() { + return new Elasticsearch8AsyncSink<>( + buildOperationConverter(elementConverter), + Optional.ofNullable(getMaxBatchSize()).orElse(DEFAULT_MAX_BATCH_SIZE), + Optional.ofNullable(getMaxInFlightRequests()) + .orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS), + Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS), + Optional.ofNullable(getMaxBatchSizeInBytes()).orElse(DEFAULT_MAX_BATCH_SIZE_IN_B), + Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS), + Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B), + buildNetworkConfig()); + } + + private OperationConverter<InputT> buildOperationConverter( + ElementConverter<InputT, BulkOperationVariant> converter) { + return converter != null ? new OperationConverter<>(converter) : null; + } + + private NetworkConfig buildNetworkConfig() { + checkArgument(!hosts.isEmpty(), "Hosts cannot be empty."); + return new NetworkConfig( + hosts, username, password, headers, sslContextSupplier, sslHostnameVerifier); + } + + /** A wrapper that evolves the Operation, since a BulkOperationVariant is not Serializable. */ + public static class OperationConverter<T> implements ElementConverter<T, Operation> { + private final ElementConverter<T, BulkOperationVariant> converter; + + public OperationConverter(ElementConverter<T, BulkOperationVariant> converter) { + this.converter = converter; + } + + @Override + public Operation apply(T element, SinkWriter.Context context) { + return new Operation(converter.apply(element, context)); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkSerializer.java new file mode 100644 index 000000000..df3520274 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSinkSerializer.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.flink.cdc.connectors.elasticsearch.v2; + +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer; + +import java.io.DataInputStream; +import java.io.DataOutputStream; + +/** Elasticsearch8AsyncSinkSerializer is used to serialize and deserialize an Operation. */ +public class Elasticsearch8AsyncSinkSerializer extends AsyncSinkWriterStateSerializer<Operation> { + + @Override + protected void serializeRequestToStream(Operation request, DataOutputStream out) { + new OperationSerializer().serialize(request, out); + } + + @Override + protected Operation deserializeRequestFromStream(long requestSize, DataInputStream in) { + return new OperationSerializer().deserialize(requestSize, in); + } + + @Override + public int getVersion() { + return 1; + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java new file mode 100644 index 000000000..7753d824b --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java @@ -0,0 +1,217 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.flink.cdc.connectors.elasticsearch.v2; + +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; +import org.apache.flink.util.FlinkRuntimeException; + +import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient; +import co.elastic.clients.elasticsearch.core.BulkRequest; +import co.elastic.clients.elasticsearch.core.BulkResponse; +import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; +import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.ConnectException; +import java.net.NoRouteToHostException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Elasticsearch8AsyncWriter Apache Flink's Async Sink Writer that submits Operations into an + * Elasticsearch cluster. + * + * @param <InputT> type of Operations + */ +public class Elasticsearch8AsyncWriter<InputT> extends AsyncSinkWriter<InputT, Operation> { + private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch8AsyncWriter.class); + + private final ElasticsearchAsyncClient esClient; + + private boolean close = false; + + private final Counter numRecordsOutErrorsCounter; + /** + * A counter to track number of records that are returned by Elasticsearch as failed and then + * retried by this writer. + */ + private final Counter numRecordsSendPartialFailureCounter; + /** A counter to track the number of bulk requests that are sent to Elasticsearch. */ + private final Counter numRequestSubmittedCounter; + + private static final FatalExceptionClassifier ELASTICSEARCH_FATAL_EXCEPTION_CLASSIFIER = + FatalExceptionClassifier.createChain( + new FatalExceptionClassifier( + err -> + err instanceof NoRouteToHostException + || err instanceof ConnectException, + err -> + new FlinkRuntimeException( + "Could not connect to Elasticsearch cluster using the provided hosts", + err))); + + public Elasticsearch8AsyncWriter( + ElementConverter<InputT, Operation> elementConverter, + Sink.InitContext context, + int maxBatchSize, + int maxInFlightRequests, + int maxBufferedRequests, + long maxBatchSizeInBytes, + long maxTimeInBufferMS, + long maxRecordSizeInBytes, + NetworkConfig networkConfig, + Collection<BufferedRequestState<Operation>> state) { + super( + elementConverter, + context, + AsyncSinkWriterConfiguration.builder() + .setMaxBatchSize(maxBatchSize) + .setMaxBatchSizeInBytes(maxBatchSizeInBytes) + .setMaxInFlightRequests(maxInFlightRequests) + .setMaxBufferedRequests(maxBufferedRequests) + .setMaxTimeInBufferMS(maxTimeInBufferMS) + .setMaxRecordSizeInBytes(maxRecordSizeInBytes) + .build(), + state); + + this.esClient = networkConfig.createEsClient(); + final SinkWriterMetricGroup metricGroup = context.metricGroup(); + checkNotNull(metricGroup); + + this.numRecordsOutErrorsCounter = metricGroup.getNumRecordsOutErrorsCounter(); + this.numRecordsSendPartialFailureCounter = + metricGroup.counter("numRecordsSendPartialFailure"); + this.numRequestSubmittedCounter = metricGroup.counter("numRequestSubmitted"); + } + + @Override + protected void submitRequestEntries( + List<Operation> requestEntries, Consumer<List<Operation>> requestResult) { + numRequestSubmittedCounter.inc(); + LOG.debug("submitRequestEntries with {} items", requestEntries.size()); + + BulkRequest.Builder br = new BulkRequest.Builder(); + for (Operation operation : requestEntries) { + // if (operation.getBulkOperationVariant() == null&&requestEntries.size()==1) + // { + // return; // 跳过当前循环迭代 + // } + if (operation.getBulkOperationVariant() == null) { + continue; + } + br.operations(new BulkOperation(operation.getBulkOperationVariant())); + } + + esClient.bulk(br.build()) + .whenComplete( + (response, error) -> { + if (error != null) { + handleFailedRequest(requestEntries, requestResult, error); + } else if (response.errors()) { + handlePartiallyFailedRequest( + requestEntries, requestResult, response); + } else { + handleSuccessfulRequest(requestResult, response); + } + }); + } + + private void handleFailedRequest( + List<Operation> requestEntries, + Consumer<List<Operation>> requestResult, + Throwable error) { + LOG.warn( + "The BulkRequest of {} operation(s) has failed due to: {}", + requestEntries.size(), + error.getMessage()); + LOG.debug("The BulkRequest has failed", error); + numRecordsOutErrorsCounter.inc(requestEntries.size()); + + if (isRetryable(error.getCause())) { + requestResult.accept(requestEntries); + } + } + + private void handlePartiallyFailedRequest( + List<Operation> requestEntries, + Consumer<List<Operation>> requestResult, + BulkResponse response) { + LOG.debug("The BulkRequest has failed partially. Response: {}", response); + ArrayList<Operation> failedItems = new ArrayList<>(); + for (int i = 0; i < response.items().size(); i++) { + BulkResponseItem item = response.items().get(i); + if (item.error() != null) { + failedItems.add(requestEntries.get(i)); + + LOG.error("Failed operation {}: {}", i, item.error().reason()); + } + } + + numRecordsOutErrorsCounter.inc(failedItems.size()); + numRecordsSendPartialFailureCounter.inc(failedItems.size()); + LOG.info( + "The BulkRequest with {} operation(s) has {} failure(s). It took {}ms", + requestEntries.size(), + failedItems.size(), + response.took()); + requestResult.accept(failedItems); + } + + private void handleSuccessfulRequest( + Consumer<List<Operation>> requestResult, BulkResponse response) { + LOG.debug( + "The BulkRequest of {} operation(s) completed successfully. It took {}ms", + response.items().size(), + response.took()); + requestResult.accept(Collections.emptyList()); + } + + private boolean isRetryable(Throwable error) { + return !ELASTICSEARCH_FATAL_EXCEPTION_CLASSIFIER.isFatal(error, getFatalExceptionCons()); + } + + @Override + protected long getSizeInBytes(Operation requestEntry) { + return new OperationSerializer().size(requestEntry); + } + + @Override + public void close() { + if (!close) { + close = true; + esClient.shutdown(); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/NetworkConfig.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/NetworkConfig.java new file mode 100644 index 000000000..2a6f4d8a6 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/NetworkConfig.java @@ -0,0 +1,123 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.flink.cdc.connectors.elasticsearch.v2; + +import org.apache.flink.util.function.SerializableSupplier; + +import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient; +import co.elastic.clients.json.jackson.JacksonJsonpMapper; +import co.elastic.clients.transport.rest_client.RestClientTransport; +import org.apache.http.Header; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; + +import javax.annotation.Nullable; +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; + +import java.io.Serializable; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkState; + +/** A factory that creates valid ElasticsearchClient instances. */ +public class NetworkConfig implements Serializable { + private final List<HttpHost> hosts; + + private final List<Header> headers; + + private final String username; + + private final String password; + + @Nullable private final SerializableSupplier<SSLContext> sslContextSupplier; + + @Nullable private final SerializableSupplier<HostnameVerifier> sslHostnameVerifier; + + public NetworkConfig( + List<HttpHost> hosts, + String username, + String password, + List<Header> headers, + SerializableSupplier<SSLContext> sslContextSupplier, + SerializableSupplier<HostnameVerifier> sslHostnameVerifier) { + checkState(!hosts.isEmpty(), "Hosts must not be empty"); + this.hosts = hosts; + this.username = username; + this.password = password; + this.headers = headers; + this.sslContextSupplier = sslContextSupplier; + this.sslHostnameVerifier = sslHostnameVerifier; + } + + public ElasticsearchAsyncClient createEsClient() { + return new ElasticsearchAsyncClient( + new RestClientTransport(this.getRestClient(), new JacksonJsonpMapper())); + } + + private RestClient getRestClient() { + RestClientBuilder restClientBuilder = + RestClient.builder(hosts.toArray(new HttpHost[0])) + .setHttpClientConfigCallback( + httpClientBuilder -> { + if (username != null && password != null) { + httpClientBuilder.setDefaultCredentialsProvider( + getCredentials()); + } + + if (sslContextSupplier != null) { + httpClientBuilder.setSSLContext(sslContextSupplier.get()); + } + + if (sslHostnameVerifier != null) { + httpClientBuilder.setSSLHostnameVerifier( + sslHostnameVerifier.get()); + } + + return httpClientBuilder; + }); + + if (headers != null) { + restClientBuilder.setDefaultHeaders(headers.toArray(new Header[0])); + } + + return restClientBuilder.build(); + } + + private CredentialsProvider getCredentials() { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + + credentialsProvider.setCredentials( + AuthScope.ANY, new UsernamePasswordCredentials(username, password)); + + return credentialsProvider; + } + + public List<HttpHost> getHosts() { + return this.hosts; + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Operation.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Operation.java new file mode 100644 index 000000000..0d2ba0cca --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Operation.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.flink.cdc.connectors.elasticsearch.v2; + +import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant; + +import java.io.Serializable; +import java.util.Objects; + +/** A single stream element which contains a BulkOperationVariant. */ +public class Operation implements Serializable { + private static final long serialVersionUID = 1L; + + private final BulkOperationVariant bulkOperationVariant; + + public Operation(BulkOperationVariant bulkOperation) { + this.bulkOperationVariant = bulkOperation; + } + + public BulkOperationVariant getBulkOperationVariant() { + return bulkOperationVariant; + } + + @Override + public int hashCode() { + return Objects.hash(bulkOperationVariant); + } + + @Override + public String toString() { + return "Operation{" + "bulkOperationVariant=" + bulkOperationVariant + '}'; + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/OperationSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/OperationSerializer.java new file mode 100644 index 000000000..127ce11b1 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/OperationSerializer.java @@ -0,0 +1,50 @@ +package org.apache.flink.cdc.connectors.elasticsearch.v2; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.objenesis.strategy.StdInstantiatorStrategy; + +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; + +/** OperationSerializer is responsible for serialization and deserialization of an Operation. */ +public class OperationSerializer { + private final Kryo kryo = new Kryo(); + + public OperationSerializer() { + kryo.setRegistrationRequired(false); + kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); + } + + public void serialize(Operation request, DataOutputStream out) { + try (Output output = new Output(out)) { + kryo.writeObjectOrNull(output, request, Operation.class); + output.flush(); + } + } + + public Operation deserialize(long requestSize, DataInputStream in) { + try (Input input = new Input(in, (int) requestSize)) { + if (input.available() > 0) { + return kryo.readObject(input, Operation.class); + } else { + return null; // Skip if input stream is empty + } + } catch (Exception e) { + // Handle the exception as needed, e.g., log the error + System.err.println("Failed to deserialize Operation: " + e.getMessage()); + return null; + } + } + + public int size(Operation operation) { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (Output output = new Output(byteArrayOutputStream)) { + kryo.writeObjectOrNull(output, operation, Operation.class); + output.flush(); + return byteArrayOutputStream.size(); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory new file mode 100644 index 000000000..327aa202e --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkFactory diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactoryTest.java new file mode 100644 index 000000000..6195b5f06 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkFactoryTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.elasticsearch.sink; + +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.factories.DataSinkFactory; +import org.apache.flink.cdc.common.factories.FactoryHelper; +import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils; +import org.apache.flink.table.api.ValidationException; + +import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.cdc.connectors.elasticsearch.sink.ElasticsearchDataSinkOptions.*; + +/** Tests for {@link ElasticsearchDataSinkFactory}. */ +public class ElasticsearchDataSinkFactoryTest { + + private static final String ELASTICSEARCH_IDENTIFIER = "elasticsearch"; + + /** Tests the creation of an Elasticsearch DataSink with valid configuration. */ + @Test + void testCreateDataSink() { + DataSinkFactory sinkFactory = getElasticsearchDataSinkFactory(); + + Configuration conf = createValidConfiguration(); + DataSink dataSink = createDataSink(sinkFactory, conf); + + Assertions.assertThat(dataSink).isInstanceOf(ElasticsearchDataSink.class); + } + + /** Tests the behavior when a required option is missing. */ + @Test + void testLackRequiredOption() { + DataSinkFactory sinkFactory = getElasticsearchDataSinkFactory(); + + Map<String, String> options = createValidOptions(); + + List<String> requiredKeys = getRequiredKeys(sinkFactory); + for (String requiredKey : requiredKeys) { + Map<String, String> remainingOptions = new HashMap<>(options); + remainingOptions.remove(requiredKey); + Configuration conf = Configuration.fromMap(remainingOptions); + + Assertions.assertThatThrownBy(() -> createDataSink(sinkFactory, conf)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + String.format( + "One or more required options are missing.\n\n" + + "Missing required options are:\n\n" + + "%s", + requiredKey)); + } + } + + /** Tests the behavior when an unsupported option is provided. */ + @Test + void testUnsupportedOption() { + DataSinkFactory sinkFactory = getElasticsearchDataSinkFactory(); + + Configuration conf = + Configuration.fromMap( + ImmutableMap.<String, String>builder() + .put("hosts", "localhost:9200") + .put("index", "test-index") + .put("unsupported_key", "unsupported_value") + .build()); + + Assertions.assertThatThrownBy(() -> createDataSink(sinkFactory, conf)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Unsupported options found for 'elasticsearch'.\n\n" + + "Unsupported options:\n\n" + + "unsupported_key"); + } + + /** + * Tests the creation of an Elasticsearch DataSink with valid configuration using prefixed + * options. + */ + @Test + void testPrefixedRequiredOption() { + DataSinkFactory sinkFactory = getElasticsearchDataSinkFactory(); + + Configuration conf = + Configuration.fromMap( + ImmutableMap.<String, String>builder() + .put("hosts", "localhost:9200") + .put("index", "test-index") + .put("batch.size.max", "500") + .put("inflight.requests.max", "5") + .build()); + + DataSink dataSink = createDataSink(sinkFactory, conf); + Assertions.assertThat(dataSink).isInstanceOf(ElasticsearchDataSink.class); + } + // Helper methods + + private DataSinkFactory getElasticsearchDataSinkFactory() { + DataSinkFactory sinkFactory = + FactoryDiscoveryUtils.getFactoryByIdentifier( + ELASTICSEARCH_IDENTIFIER, DataSinkFactory.class); + Assertions.assertThat(sinkFactory).isInstanceOf(ElasticsearchDataSinkFactory.class); + return sinkFactory; + } + + private Configuration createValidConfiguration() { + return Configuration.fromMap( + ImmutableMap.<String, String>builder() + .put("hosts", "localhost:9200") + .put("index", "test-index") + .build()); + } + + private Map<String, String> createValidOptions() { + Map<String, String> options = new HashMap<>(); + options.put("hosts", "localhost:9200"); + options.put("index", "test-index"); + return options; + } + + private List<String> getRequiredKeys(DataSinkFactory sinkFactory) { + return sinkFactory.requiredOptions().stream() + .map(ConfigOption::key) + .collect(Collectors.toList()); + } + + private DataSink createDataSink(DataSinkFactory sinkFactory, Configuration conf) { + return sinkFactory.createDataSink( + new FactoryHelper.DefaultContext( + conf, conf, Thread.currentThread().getContextClassLoader())); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java new file mode 100644 index 000000000..5f9011b12 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkITCaseTest.java @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.elasticsearch.sink; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.*; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.sink.FlinkSinkProvider; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.connectors.elasticsearch.config.ElasticsearchSinkOptions; +import org.apache.flink.cdc.connectors.elasticsearch.v2.NetworkConfig; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch.core.GetRequest; +import co.elastic.clients.elasticsearch.core.GetResponse; +import co.elastic.clients.json.jackson.JacksonJsonpMapper; +import co.elastic.clients.transport.rest_client.RestClientTransport; +import org.apache.http.HttpHost; +import org.elasticsearch.client.RestClient; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.math.BigDecimal; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** ITCase tests for {@link ElasticsearchDataSink}. */ +@Testcontainers +public class ElasticsearchDataSinkITCaseTest { + + private static final Logger LOG = + LoggerFactory.getLogger(ElasticsearchDataSinkITCaseTest.class); + private static final String ELASTICSEARCH_VERSION = "8.12.0"; + private static final DockerImageName ELASTICSEARCH_IMAGE = + DockerImageName.parse( + "docker.elastic.co/elasticsearch/elasticsearch:" + + ELASTICSEARCH_VERSION) + .asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch"); + + @Container + private static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = + createElasticsearchContainer(); + + private ElasticsearchClient client; + + @BeforeEach + public void setUp() { + client = createElasticsearchClient(); + } + + @AfterEach + public void tearDown() throws Exception { + if (client != null) { + client.shutdown(); + } + } + + @Test + public void testElasticsearchSink() throws Exception { + TableId tableId = TableId.tableId("default", "schema", "table"); + List<Event> events = createTestEvents(tableId); + + runJobWithEvents(events); + + verifyInsertedData( + tableId, + "1", + 1, + 1.0, + "value1", + true, + (byte) 1, + (short) 2, + 100L, + 1.0f, + new BigDecimal("10.00"), + 1633024800000L); + verifyInsertedData( + tableId, + "2", + 2, + 2.0, + "value2", + false, + (byte) 2, + (short) 3, + 200L, + 2.0f, + new BigDecimal("20.00"), + 1633111200000L); + } + + @Test + public void testElasticsearchInsertAndDelete() throws Exception { + TableId tableId = TableId.tableId("default", "schema", "table"); + List<Event> events = createTestEventsWithDelete(tableId); + + runJobWithEvents(events); + + verifyDeletedData(tableId, "2"); + } + + @Test + public void testElasticsearchAddColumn() throws Exception { + TableId tableId = TableId.tableId("default", "schema", "table"); + List<Event> events = createTestEventsWithAddColumn(tableId); + + runJobWithEvents(events); + + verifyInsertedDataWithNewColumn(tableId, "3", 3, 3.0, "value3", true); + } + + private static ElasticsearchContainer createElasticsearchContainer() { + return new ElasticsearchContainer(ELASTICSEARCH_IMAGE) + .withEnv("discovery.type", "single-node") + .withEnv("xpack.security.enabled", "false") + .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g") + .withEnv("logger.org.elasticsearch", "ERROR") + .withLogConsumer(new Slf4jLogConsumer(LOG)) + .waitingFor(Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(5))); + } + + private ElasticsearchClient createElasticsearchClient() { + RestClientTransport transport = + new RestClientTransport( + RestClient.builder( + new HttpHost( + ELASTICSEARCH_CONTAINER.getHost(), + ELASTICSEARCH_CONTAINER.getFirstMappedPort(), + "http")) + .build(), + new JacksonJsonpMapper()); + return new ElasticsearchClient(transport); + } + + private void runJobWithEvents(List<Event> events) throws Exception { + ElasticsearchSinkOptions options = createSinkOptions(); + StreamExecutionEnvironment env = createStreamExecutionEnvironment(); + ElasticsearchDataSink<Event> sink = + new ElasticsearchDataSink<>(options, ZoneId.systemDefault()); + + DataStream<Event> stream = env.fromCollection(events, TypeInformation.of(Event.class)); + Sink<Event> elasticsearchSink = ((FlinkSinkProvider) sink.getEventSinkProvider()).getSink(); + stream.sinkTo(elasticsearchSink); + + env.execute("Elasticsearch Sink Test"); + } + + private ElasticsearchSinkOptions createSinkOptions() { + NetworkConfig networkConfig = + new NetworkConfig( + Collections.singletonList( + new HttpHost( + ELASTICSEARCH_CONTAINER.getHost(), + ELASTICSEARCH_CONTAINER.getFirstMappedPort())), + null, + null, + null, + null, + null); + + return new ElasticsearchSinkOptions( + 5, 1, 10, 50 * 1024 * 1024, 1000, 10 * 1024 * 1024, networkConfig); + } + + private StreamExecutionEnvironment createStreamExecutionEnvironment() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.enableCheckpointing(3000); + env.setRestartStrategy(RestartStrategies.noRestart()); + return env; + } + + private void verifyInsertedData( + TableId tableId, + String id, + int expectedId, + double expectedNumber, + String expectedName, + boolean expectedBool, + byte expectedTinyint, + short expectedSmallint, + long expectedBigint, + float expectedFloat, + BigDecimal expectedDecimal, + long expectedTimestamp) + throws Exception { + GetRequest getRequest = new GetRequest.Builder().index(tableId.toString()).id(id).build(); + GetResponse<Map> response = client.get(getRequest, Map.class); + + LOG.debug("Response source: {}", response.source()); + + assertThat(response.source()).isNotNull(); + assertThat(((Number) response.source().get("id")).intValue()).isEqualTo(expectedId); + assertThat(((Number) response.source().get("number")).doubleValue()) + .isEqualTo(expectedNumber); + assertThat(response.source().get("name")).isEqualTo(expectedName); + assertThat(response.source().get("bool")).isEqualTo(expectedBool); + assertThat(((Number) response.source().get("tinyint")).byteValue()) + .isEqualTo(expectedTinyint); + assertThat(((Number) response.source().get("smallint")).shortValue()) + .isEqualTo(expectedSmallint); + assertThat(((Number) response.source().get("bigint")).longValue()) + .isEqualTo(expectedBigint); + assertThat(((Number) response.source().get("float")).floatValue()).isEqualTo(expectedFloat); + assertThat(new BigDecimal(response.source().get("decimal").toString())) + .isEqualTo(expectedDecimal); + + String timestampString = response.source().get("timestamp").toString(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); + LocalDateTime dateTime = LocalDateTime.parse(timestampString, formatter); + long timestampMillis = dateTime.toInstant(ZoneOffset.UTC).toEpochMilli(); + assertThat(timestampMillis).isEqualTo(expectedTimestamp); + } + + private void verifyDeletedData(TableId tableId, String id) throws Exception { + GetRequest getRequest = new GetRequest.Builder().index(tableId.toString()).id(id).build(); + GetResponse<Map> response = client.get(getRequest, Map.class); + + assertThat(response.source()).isNull(); + } + + private void verifyInsertedDataWithNewColumn( + TableId tableId, + String id, + int expectedId, + double expectedNumber, + String expectedName, + boolean expectedExtraBool) + throws Exception { + GetRequest getRequest = new GetRequest.Builder().index(tableId.toString()).id(id).build(); + GetResponse<Map> response = client.get(getRequest, Map.class); + + assertThat(response.source()).isNotNull(); + assertThat(response.source().get("id")).isEqualTo(expectedId); + assertThat(response.source().get("number")).isEqualTo(expectedNumber); + assertThat(response.source().get("name")).isEqualTo(expectedName); + assertThat(response.source().get("extra_bool")).isEqualTo(expectedExtraBool); + } + + private List<Event> createTestEvents(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .column(new PhysicalColumn("bool", DataTypes.BOOLEAN(), null)) + .column(new PhysicalColumn("tinyint", DataTypes.TINYINT(), null)) + .column(new PhysicalColumn("smallint", DataTypes.SMALLINT(), null)) + .column(new PhysicalColumn("bigint", DataTypes.BIGINT(), null)) + .column(new PhysicalColumn("float", DataTypes.FLOAT(), null)) + .column(new PhysicalColumn("decimal", DataTypes.DECIMAL(10, 2), null)) + .column(new PhysicalColumn("timestamp", DataTypes.TIMESTAMP(), null)) + .primaryKey("id") + .build(); + + BinaryRecordDataGenerator generator = + new BinaryRecordDataGenerator( + org.apache.flink.cdc.common.types.RowType.of( + org.apache.flink.cdc.common.types.DataTypes.INT(), + org.apache.flink.cdc.common.types.DataTypes.DOUBLE(), + org.apache.flink.cdc.common.types.DataTypes.STRING(), + org.apache.flink.cdc.common.types.DataTypes.BOOLEAN(), + org.apache.flink.cdc.common.types.DataTypes.TINYINT(), + org.apache.flink.cdc.common.types.DataTypes.SMALLINT(), + org.apache.flink.cdc.common.types.DataTypes.BIGINT(), + org.apache.flink.cdc.common.types.DataTypes.FLOAT(), + org.apache.flink.cdc.common.types.DataTypes.DECIMAL(10, 2), + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP())); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + 1, + 1.0, + BinaryStringData.fromString("value1"), + true, + (byte) 1, + (short) 2, + 100L, + 1.0f, + DecimalData.fromBigDecimal(new BigDecimal("10.00"), 10, 2), + TimestampData.fromMillis(1633024800000L) + })), + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + 2, + 2.0, + BinaryStringData.fromString("value2"), + false, + (byte) 2, + (short) 3, + 200L, + 2.0f, + DecimalData.fromBigDecimal(new BigDecimal("20.00"), 10, 2), + TimestampData.fromMillis(1633111200000L) + }))); + } + + private List<Event> createTestEventsWithDelete(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + BinaryRecordDataGenerator generator = + new BinaryRecordDataGenerator( + org.apache.flink.cdc.common.types.RowType.of( + org.apache.flink.cdc.common.types.DataTypes.INT(), + org.apache.flink.cdc.common.types.DataTypes.DOUBLE(), + org.apache.flink.cdc.common.types.DataTypes.STRING())); + + BinaryRecordData insertRecord1 = + generator.generate(new Object[] {1, 1.0, BinaryStringData.fromString("value1")}); + BinaryRecordData insertRecord2 = + generator.generate(new Object[] {2, 2.0, BinaryStringData.fromString("value2")}); + BinaryRecordData deleteRecord = + generator.generate(new Object[] {2, 2.0, BinaryStringData.fromString("value2")}); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + DataChangeEvent.insertEvent(tableId, insertRecord1), + DataChangeEvent.insertEvent(tableId, insertRecord2), + DataChangeEvent.deleteEvent(tableId, deleteRecord)); + } + + private List<Event> createTestEventsWithAddColumn(TableId tableId) { + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + BinaryRecordDataGenerator generator = + new BinaryRecordDataGenerator( + org.apache.flink.cdc.common.types.RowType.of( + org.apache.flink.cdc.common.types.DataTypes.INT(), + org.apache.flink.cdc.common.types.DataTypes.DOUBLE(), + org.apache.flink.cdc.common.types.DataTypes.STRING(), + org.apache.flink.cdc.common.types.DataTypes.BOOLEAN())); + + return Arrays.asList( + new CreateTableEvent(tableId, schema), + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn( + "extra_bool", DataTypes.BOOLEAN(), null)))), + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + 3, 3.0, BinaryStringData.fromString("value3"), true + }))); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/resources/log4j2-test.properties b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/resources/log4j2-test.properties new file mode 100644 index 000000000..1eebcb739 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/resources/log4j2-test.properties @@ -0,0 +1,25 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = INFO +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/resources/testcontainers.properties b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/resources/testcontainers.properties new file mode 100644 index 000000000..d32577249 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/resources/testcontainers.properties @@ -0,0 +1 @@ +ryuk.container.image=testcontainers/ryuk:0.3.3 diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml index 424372f4c..9d0f9769b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml @@ -34,6 +34,7 @@ limitations under the License. <module>flink-cdc-pipeline-connector-starrocks</module> <module>flink-cdc-pipeline-connector-kafka</module> <module>flink-cdc-pipeline-connector-paimon</module> + <module>flink-cdc-pipeline-connector-elasticsearch</module> </modules> <dependencies>
