lvyanquan commented on code in PR #3495: URL: https://github.com/apache/flink-cdc/pull/3495#discussion_r1699586793
########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java: ########## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.elasticsearch.serializer; + +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.event.*; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.*; +import org.apache.flink.cdc.common.utils.Preconditions; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant; +import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation; +import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.flink.cdc.common.types.DataTypeChecks.*; + +/** A serializer for Event to BulkOperationVariant. */ +public class ElasticsearchEventSerializer implements ElementConverter<Event, BulkOperationVariant> { + private final ObjectMapper objectMapper = new ObjectMapper(); + private final Map<TableId, Schema> schemaMaps = new HashMap<>(); + private final ConcurrentHashMap<TableId, List<ElasticsearchRowConverter.SerializationConverter>> + converterCache = new ConcurrentHashMap<>(); + + /** Format DATE type data. */ + public static final DateTimeFormatter DATE_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd"); + + /** Format timestamp-related type data. */ + public static final DateTimeFormatter DATE_TIME_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); + + /** ZoneId from pipeline config to support timestamp with local time zone. */ + private final ZoneId pipelineZoneId; + + public ElasticsearchEventSerializer(ZoneId zoneId) { + this.pipelineZoneId = zoneId; + } + + @Override + public BulkOperationVariant apply(Event event, SinkWriter.Context context) { + try { + if (event instanceof DataChangeEvent) { + return applyDataChangeEvent((DataChangeEvent) event); + } else if (event instanceof SchemaChangeEvent) { + IndexOperation<Map<String, Object>> indexOperation = + applySchemaChangeEvent((SchemaChangeEvent) event); + if (indexOperation != null) { + return indexOperation; + } + } + } catch (IOException e) { + throw new RuntimeException("Failed to serialize event", e); + } + return null; + } + + private IndexOperation<Map<String, Object>> applySchemaChangeEvent( + SchemaChangeEvent schemaChangeEvent) throws IOException { + TableId tableId = schemaChangeEvent.tableId(); + + if (schemaChangeEvent instanceof CreateTableEvent) { + Schema schema = ((CreateTableEvent) schemaChangeEvent).getSchema(); + schemaMaps.put(tableId, schema); + return createSchemaIndexOperation(tableId, schema); Review Comment: We should move this logic to `ElasticsearchMetadataApplier` as there will have more than one subtasks to receive the SchemaChangeEvent. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.elasticsearch.sink; + +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.configuration.ConfigOptions; + +/** Options for the Elasticsearch data sink. */ +public class ElasticsearchDataSinkOptions { + + /** The comma-separated list of Elasticsearch hosts to connect to. */ + public static final ConfigOption<String> HOSTS = + ConfigOptions.key("hosts") + .stringType() + .noDefaultValue() + .withDescription( + "The comma-separated list of Elasticsearch hosts to connect to."); + + /** The maximum number of actions to buffer for each bulk request. */ + public static final ConfigOption<Integer> MAX_BATCH_SIZE = + ConfigOptions.key("batch.size.max") + .intType() + .defaultValue(500) + .withDescription( + "The maximum number of actions to buffer for each bulk request."); + + /** The maximum number of concurrent requests that the sink will try to execute. */ + public static final ConfigOption<Integer> MAX_IN_FLIGHT_REQUESTS = + ConfigOptions.key("inflight.requests.max") + .intType() + .defaultValue(5) + .withDescription( + "The maximum number of concurrent requests that the sink will try to execute."); + + /** The maximum number of requests to keep in the in-memory buffer. */ + public static final ConfigOption<Integer> MAX_BUFFERED_REQUESTS = + ConfigOptions.key("buffered.requests.max") + .intType() + .defaultValue(1000) + .withDescription( + "The maximum number of requests to keep in the in-memory buffer."); + + /** The maximum size of batch requests in bytes. */ + public static final ConfigOption<Long> MAX_BATCH_SIZE_IN_BYTES = + ConfigOptions.key("batch.size.max.bytes") + .longType() + .defaultValue(5L * 1024L * 1024L) + .withDescription("The maximum size of batch requests in bytes."); + + /** The maximum time to wait for incomplete batches before flushing. */ + public static final ConfigOption<Long> MAX_TIME_IN_BUFFER_MS = + ConfigOptions.key("buffer.time.max.ms") + .longType() + .defaultValue(5000L) + .withDescription( + "The maximum time to wait for incomplete batches before flushing."); + + /** The maximum size of a single record in bytes. */ + public static final ConfigOption<Long> MAX_RECORD_SIZE_IN_BYTES = + ConfigOptions.key("record.size.max.bytes") + .longType() + .defaultValue(10L * 1024L * 1024L) + .withDescription("The maximum size of a single record in bytes."); + + /** The Elasticsearch index name to write to. */ + public static final ConfigOption<String> INDEX = + ConfigOptions.key("index") + .stringType() + .noDefaultValue() + .withDescription("The Elasticsearch index name to write to."); Review Comment: Actually, I don't see any place that uses this option. What about setting all index name to this value if this config was set, otherwise use tableId as index name, like [topic in Kafka connector](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/pipeline-connectors/kafka/#pipeline-connector-options). ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchRowConverter.java: ########## @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.elasticsearch.serializer; + +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.data.TimestampData; + +import java.time.*; +import java.time.format.DateTimeFormatter; + +/** Converter class for serializing row data to Elasticsearch compatible formats. */ +public class ElasticsearchRowConverter { + + // Date and time formatters for various temporal types + private static final DateTimeFormatter DATE_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd"); + private static final DateTimeFormatter TIME_FORMATTER = + DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS"); + private static final DateTimeFormatter DATE_TIME_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); + + /** + * Creates a nullable external converter for the given column type and time zone. + * + * @param columnType The type of the column to convert. + * @param zoneId The time zone to use for temporal conversions. + * @return A SerializationConverter that can handle null values. + */ + public static SerializationConverter createNullableExternalConverter( + ColumnType columnType, java.time.ZoneId zoneId) { + return wrapIntoNullableExternalConverter(createExternalConverter(columnType, zoneId)); + } + + /** + * Wraps a SerializationConverter to handle null values. + * + * @param serializationConverter The original SerializationConverter. + * @return A SerializationConverter that returns null for null input. + */ + static SerializationConverter wrapIntoNullableExternalConverter( + SerializationConverter serializationConverter) { + return (pos, data) -> { + if (data == null || data.isNullAt(pos)) { + return null; + } else { + return serializationConverter.serialize(pos, data); + } + }; + } + + /** + * Creates an external converter for the given column type and time zone. + * + * @param columnType The type of the column to convert. + * @param zoneId The time zone to use for temporal conversions. + * @return A SerializationConverter for the specified column type. + */ + static SerializationConverter createExternalConverter( + ColumnType columnType, java.time.ZoneId zoneId) { + switch (columnType) { + // Basic types + case BOOLEAN: + return (pos, data) -> data.getBoolean(pos); + case INTEGER: + return (pos, data) -> data.getInt(pos); + case DOUBLE: + return (pos, data) -> data.getDouble(pos); + case VARCHAR: + case CHAR: + return (pos, data) -> data.getString(pos).toString(); + case FLOAT: + return (pos, data) -> data.getFloat(pos); + case BIGINT: + return (pos, data) -> data.getLong(pos); + case TINYINT: + return (pos, data) -> data.getByte(pos); + case SMALLINT: + return (pos, data) -> data.getShort(pos); + case BINARY: + case VARBINARY: + return (pos, data) -> data.getBinary(pos); + + // Decimal type + case DECIMAL: + return (pos, data) -> { + DecimalData decimalData = data.getDecimal(pos, 17, 2); Review Comment: We should get the precise from original data type. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java: ########## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.elasticsearch.serializer; + +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.event.*; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.*; +import org.apache.flink.cdc.common.utils.Preconditions; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant; +import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation; +import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.flink.cdc.common.types.DataTypeChecks.*; + +/** A serializer for Event to BulkOperationVariant. */ +public class ElasticsearchEventSerializer implements ElementConverter<Event, BulkOperationVariant> { + private final ObjectMapper objectMapper = new ObjectMapper(); + private final Map<TableId, Schema> schemaMaps = new HashMap<>(); + private final ConcurrentHashMap<TableId, List<ElasticsearchRowConverter.SerializationConverter>> + converterCache = new ConcurrentHashMap<>(); + + /** Format DATE type data. */ + public static final DateTimeFormatter DATE_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd"); + + /** Format timestamp-related type data. */ + public static final DateTimeFormatter DATE_TIME_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); + + /** ZoneId from pipeline config to support timestamp with local time zone. */ + private final ZoneId pipelineZoneId; + + public ElasticsearchEventSerializer(ZoneId zoneId) { + this.pipelineZoneId = zoneId; + } + + @Override + public BulkOperationVariant apply(Event event, SinkWriter.Context context) { + try { + if (event instanceof DataChangeEvent) { + return applyDataChangeEvent((DataChangeEvent) event); + } else if (event instanceof SchemaChangeEvent) { + IndexOperation<Map<String, Object>> indexOperation = + applySchemaChangeEvent((SchemaChangeEvent) event); + if (indexOperation != null) { + return indexOperation; + } + } + } catch (IOException e) { + throw new RuntimeException("Failed to serialize event", e); + } + return null; + } + + private IndexOperation<Map<String, Object>> applySchemaChangeEvent( + SchemaChangeEvent schemaChangeEvent) throws IOException { + TableId tableId = schemaChangeEvent.tableId(); + + if (schemaChangeEvent instanceof CreateTableEvent) { + Schema schema = ((CreateTableEvent) schemaChangeEvent).getSchema(); + schemaMaps.put(tableId, schema); + return createSchemaIndexOperation(tableId, schema); + } else if (schemaChangeEvent instanceof AddColumnEvent + || schemaChangeEvent instanceof DropColumnEvent + || schemaChangeEvent instanceof RenameColumnEvent) { + if (!schemaMaps.containsKey(tableId)) { + throw new RuntimeException("Schema of " + tableId + " does not exist."); + } + Schema updatedSchema = + SchemaUtils.applySchemaChangeEvent(schemaMaps.get(tableId), schemaChangeEvent); + schemaMaps.put(tableId, updatedSchema); + return createSchemaIndexOperation(tableId, updatedSchema); + } else { + if (!schemaMaps.containsKey(tableId)) { + throw new RuntimeException("Schema of " + tableId + " does not exist."); + } + Schema updatedSchema = + SchemaUtils.applySchemaChangeEvent(schemaMaps.get(tableId), schemaChangeEvent); + schemaMaps.put(tableId, updatedSchema); + } + return null; + } + + private IndexOperation<Map<String, Object>> createSchemaIndexOperation( + TableId tableId, Schema schema) { + Map<String, Object> schemaMap = new HashMap<>(); + schemaMap.put( + "columns", + schema.getColumns().stream() + .map(Column::asSummaryString) + .collect(Collectors.toList())); + schemaMap.put("primaryKeys", schema.primaryKeys()); + schemaMap.put("options", schema.options()); + + return new IndexOperation.Builder<Map<String, Object>>() + .index(tableId.toString()) + .id(tableId.getTableName()) + .document(schemaMap) + .build(); + } + + private BulkOperationVariant applyDataChangeEvent(DataChangeEvent event) Review Comment: It's better to named this method createBulkOperationVariant. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java: ########## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.elasticsearch.serializer; + +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.event.*; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.*; +import org.apache.flink.cdc.common.utils.Preconditions; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant; +import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation; +import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.flink.cdc.common.types.DataTypeChecks.*; + +/** A serializer for Event to BulkOperationVariant. */ +public class ElasticsearchEventSerializer implements ElementConverter<Event, BulkOperationVariant> { + private final ObjectMapper objectMapper = new ObjectMapper(); + private final Map<TableId, Schema> schemaMaps = new HashMap<>(); + private final ConcurrentHashMap<TableId, List<ElasticsearchRowConverter.SerializationConverter>> + converterCache = new ConcurrentHashMap<>(); + + /** Format DATE type data. */ + public static final DateTimeFormatter DATE_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd"); + + /** Format timestamp-related type data. */ + public static final DateTimeFormatter DATE_TIME_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); + + /** ZoneId from pipeline config to support timestamp with local time zone. */ + private final ZoneId pipelineZoneId; + + public ElasticsearchEventSerializer(ZoneId zoneId) { + this.pipelineZoneId = zoneId; + } + + @Override + public BulkOperationVariant apply(Event event, SinkWriter.Context context) { + try { + if (event instanceof DataChangeEvent) { + return applyDataChangeEvent((DataChangeEvent) event); + } else if (event instanceof SchemaChangeEvent) { + IndexOperation<Map<String, Object>> indexOperation = + applySchemaChangeEvent((SchemaChangeEvent) event); + if (indexOperation != null) { + return indexOperation; + } + } + } catch (IOException e) { + throw new RuntimeException("Failed to serialize event", e); + } + return null; + } + + private IndexOperation<Map<String, Object>> applySchemaChangeEvent( + SchemaChangeEvent schemaChangeEvent) throws IOException { + TableId tableId = schemaChangeEvent.tableId(); + + if (schemaChangeEvent instanceof CreateTableEvent) { + Schema schema = ((CreateTableEvent) schemaChangeEvent).getSchema(); + schemaMaps.put(tableId, schema); + return createSchemaIndexOperation(tableId, schema); + } else if (schemaChangeEvent instanceof AddColumnEvent + || schemaChangeEvent instanceof DropColumnEvent + || schemaChangeEvent instanceof RenameColumnEvent) { + if (!schemaMaps.containsKey(tableId)) { + throw new RuntimeException("Schema of " + tableId + " does not exist."); + } + Schema updatedSchema = + SchemaUtils.applySchemaChangeEvent(schemaMaps.get(tableId), schemaChangeEvent); + schemaMaps.put(tableId, updatedSchema); + return createSchemaIndexOperation(tableId, updatedSchema); + } else { + if (!schemaMaps.containsKey(tableId)) { + throw new RuntimeException("Schema of " + tableId + " does not exist."); + } + Schema updatedSchema = + SchemaUtils.applySchemaChangeEvent(schemaMaps.get(tableId), schemaChangeEvent); + schemaMaps.put(tableId, updatedSchema); + } + return null; + } + + private IndexOperation<Map<String, Object>> createSchemaIndexOperation( + TableId tableId, Schema schema) { + Map<String, Object> schemaMap = new HashMap<>(); + schemaMap.put( + "columns", + schema.getColumns().stream() + .map(Column::asSummaryString) + .collect(Collectors.toList())); + schemaMap.put("primaryKeys", schema.primaryKeys()); + schemaMap.put("options", schema.options()); + + return new IndexOperation.Builder<Map<String, Object>>() + .index(tableId.toString()) + .id(tableId.getTableName()) + .document(schemaMap) + .build(); + } + + private BulkOperationVariant applyDataChangeEvent(DataChangeEvent event) + throws JsonProcessingException { + TableId tableId = event.tableId(); + Schema schema = schemaMaps.get(tableId); + Preconditions.checkNotNull(schema, event.tableId() + " does not exist."); + Map<String, Object> valueMap; + OperationType op = event.op(); + + Object[] uniqueId = + generateUniqueId( + op == OperationType.DELETE ? event.before() : event.after(), schema); + String id = Arrays.stream(uniqueId).map(Object::toString).collect(Collectors.joining("_")); + + switch (op) { + case INSERT: + case REPLACE: + case UPDATE: + valueMap = serializeRecord(tableId, event.after(), schema, pipelineZoneId); + return new IndexOperation.Builder<>() + .index(tableId.toString()) + .id(id) + .document(valueMap) + .build(); + case DELETE: + return new DeleteOperation.Builder().index(tableId.toString()).id(id).build(); + default: + throw new UnsupportedOperationException("Unsupported Operation " + op); + } + } + + private Object[] generateUniqueId(RecordData recordData, Schema schema) { + List<String> primaryKeys = schema.primaryKeys(); + return primaryKeys.stream() + .map( + primaryKey -> { + Column column = + schema.getColumns().stream() + .filter(col -> col.getName().equals(primaryKey)) + .findFirst() + .orElseThrow( + () -> + new IllegalStateException( + "Primary key column not found: " + + primaryKey)); + int index = schema.getColumns().indexOf(column); + return getFieldValue(recordData, column.getType(), index); + }) + .toArray(); + } + + private Object getFieldValue(RecordData recordData, DataType dataType, int index) { + switch (dataType.getTypeRoot()) { + case BOOLEAN: + return recordData.getBoolean(index); + case TINYINT: + return recordData.getByte(index); + case SMALLINT: + return recordData.getShort(index); + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + return recordData.getInt(index); + case BIGINT: + return recordData.getLong(index); + case FLOAT: + return recordData.getFloat(index); + case DOUBLE: + return recordData.getDouble(index); + case CHAR: + case VARCHAR: + return recordData.getString(index); + case DECIMAL: + return recordData.getDecimal(index, getPrecision(dataType), getScale(dataType)); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return recordData.getTimestamp(index, getPrecision(dataType)); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return recordData.getLocalZonedTimestampData(index, getPrecision(dataType)); + case TIMESTAMP_WITH_TIME_ZONE: + return recordData.getZonedTimestamp(index, getPrecision(dataType)); + case BINARY: + case VARBINARY: + return recordData.getBinary(index); + case ARRAY: + return recordData.getArray(index); + case MAP: + return recordData.getMap(index); + case ROW: + return recordData.getRow(index, getFieldCount(dataType)); + default: + throw new IllegalArgumentException("Unsupported type: " + dataType); + } + } + + public Map<String, Object> serializeRecord( + TableId tableId, RecordData recordData, Schema schema, ZoneId pipelineZoneId) { + List<Column> columns = schema.getColumns(); + Map<String, Object> record = new HashMap<>(); + Preconditions.checkState( + columns.size() == recordData.getArity(), + "Column size does not match the data size."); + + List<ElasticsearchRowConverter.SerializationConverter> converters = + getOrCreateConverters(tableId, schema); + + for (int i = 0; i < recordData.getArity(); i++) { + Column column = columns.get(i); + Object field = converters.get(i).serialize(i, recordData); + record.put(column.getName(), field); + } + + return record; + } + + private List<ElasticsearchRowConverter.SerializationConverter> getOrCreateConverters( + TableId tableId, Schema schema) { + return converterCache.computeIfAbsent( + tableId, + id -> { + List<ElasticsearchRowConverter.SerializationConverter> converters = + new ArrayList<>(); + for (Column column : schema.getColumns()) { + ColumnType columnType = + ColumnType.valueOf(column.getType().getTypeRoot().name()); Review Comment: This conversion will lead to precise lost, and it's unnecessary. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java: ########## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.elasticsearch.serializer; + +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.event.*; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.*; +import org.apache.flink.cdc.common.utils.Preconditions; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant; +import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation; +import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.flink.cdc.common.types.DataTypeChecks.*; + +/** A serializer for Event to BulkOperationVariant. */ +public class ElasticsearchEventSerializer implements ElementConverter<Event, BulkOperationVariant> { + private final ObjectMapper objectMapper = new ObjectMapper(); + private final Map<TableId, Schema> schemaMaps = new HashMap<>(); + private final ConcurrentHashMap<TableId, List<ElasticsearchRowConverter.SerializationConverter>> + converterCache = new ConcurrentHashMap<>(); + + /** Format DATE type data. */ + public static final DateTimeFormatter DATE_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd"); + + /** Format timestamp-related type data. */ + public static final DateTimeFormatter DATE_TIME_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); + + /** ZoneId from pipeline config to support timestamp with local time zone. */ + private final ZoneId pipelineZoneId; + + public ElasticsearchEventSerializer(ZoneId zoneId) { + this.pipelineZoneId = zoneId; + } + + @Override + public BulkOperationVariant apply(Event event, SinkWriter.Context context) { + try { + if (event instanceof DataChangeEvent) { + return applyDataChangeEvent((DataChangeEvent) event); + } else if (event instanceof SchemaChangeEvent) { + IndexOperation<Map<String, Object>> indexOperation = + applySchemaChangeEvent((SchemaChangeEvent) event); + if (indexOperation != null) { + return indexOperation; + } + } + } catch (IOException e) { + throw new RuntimeException("Failed to serialize event", e); + } + return null; + } + + private IndexOperation<Map<String, Object>> applySchemaChangeEvent( + SchemaChangeEvent schemaChangeEvent) throws IOException { + TableId tableId = schemaChangeEvent.tableId(); + + if (schemaChangeEvent instanceof CreateTableEvent) { + Schema schema = ((CreateTableEvent) schemaChangeEvent).getSchema(); + schemaMaps.put(tableId, schema); + return createSchemaIndexOperation(tableId, schema); + } else if (schemaChangeEvent instanceof AddColumnEvent + || schemaChangeEvent instanceof DropColumnEvent + || schemaChangeEvent instanceof RenameColumnEvent) { + if (!schemaMaps.containsKey(tableId)) { + throw new RuntimeException("Schema of " + tableId + " does not exist."); + } + Schema updatedSchema = + SchemaUtils.applySchemaChangeEvent(schemaMaps.get(tableId), schemaChangeEvent); + schemaMaps.put(tableId, updatedSchema); + return createSchemaIndexOperation(tableId, updatedSchema); + } else { + if (!schemaMaps.containsKey(tableId)) { + throw new RuntimeException("Schema of " + tableId + " does not exist."); + } + Schema updatedSchema = + SchemaUtils.applySchemaChangeEvent(schemaMaps.get(tableId), schemaChangeEvent); + schemaMaps.put(tableId, updatedSchema); + } + return null; + } + + private IndexOperation<Map<String, Object>> createSchemaIndexOperation( + TableId tableId, Schema schema) { + Map<String, Object> schemaMap = new HashMap<>(); + schemaMap.put( + "columns", + schema.getColumns().stream() + .map(Column::asSummaryString) + .collect(Collectors.toList())); + schemaMap.put("primaryKeys", schema.primaryKeys()); + schemaMap.put("options", schema.options()); + + return new IndexOperation.Builder<Map<String, Object>>() + .index(tableId.toString()) + .id(tableId.getTableName()) + .document(schemaMap) + .build(); + } + + private BulkOperationVariant applyDataChangeEvent(DataChangeEvent event) + throws JsonProcessingException { + TableId tableId = event.tableId(); + Schema schema = schemaMaps.get(tableId); + Preconditions.checkNotNull(schema, event.tableId() + " does not exist."); + Map<String, Object> valueMap; + OperationType op = event.op(); + + Object[] uniqueId = + generateUniqueId( + op == OperationType.DELETE ? event.before() : event.after(), schema); + String id = Arrays.stream(uniqueId).map(Object::toString).collect(Collectors.joining("_")); + + switch (op) { + case INSERT: + case REPLACE: + case UPDATE: + valueMap = serializeRecord(tableId, event.after(), schema, pipelineZoneId); + return new IndexOperation.Builder<>() + .index(tableId.toString()) + .id(id) + .document(valueMap) + .build(); + case DELETE: + return new DeleteOperation.Builder().index(tableId.toString()).id(id).build(); + default: + throw new UnsupportedOperationException("Unsupported Operation " + op); + } + } + + private Object[] generateUniqueId(RecordData recordData, Schema schema) { + List<String> primaryKeys = schema.primaryKeys(); + return primaryKeys.stream() + .map( + primaryKey -> { + Column column = + schema.getColumns().stream() + .filter(col -> col.getName().equals(primaryKey)) + .findFirst() + .orElseThrow( + () -> + new IllegalStateException( + "Primary key column not found: " + + primaryKey)); + int index = schema.getColumns().indexOf(column); + return getFieldValue(recordData, column.getType(), index); Review Comment: Can be replaced with ``` converterCache.get(TableId.tableId("your_tableId")).get(index).serialize(index, recordData); ``` here you could pass TableId of this RecordData. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
