This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch release-3.2 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit 74f58cd31757970c6d784aa7982275cd18583b55 Author: Kunni <[email protected]> AuthorDate: Mon Aug 19 21:20:01 2024 +0800 [FLINK-36082][pipeline-connector][kafka] Fix lamda NotSerializableException in KafkaDataSink This closes #3549. --- .../cdc/connectors/kafka/sink/KafkaDataSink.java | 35 +------------ .../kafka/sink/KafkaMetaDataApplier.java | 57 ++++++++++++++++++++++ 2 files changed, 58 insertions(+), 34 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java index 2dfc021b3..51e8b180d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java @@ -19,9 +19,6 @@ package org.apache.flink.cdc.connectors.kafka.sink; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.cdc.common.event.Event; -import org.apache.flink.cdc.common.event.SchemaChangeEvent; -import org.apache.flink.cdc.common.event.SchemaChangeEventType; -import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; import org.apache.flink.cdc.common.sink.DataSink; import org.apache.flink.cdc.common.sink.EventSinkProvider; import org.apache.flink.cdc.common.sink.FlinkSinkProvider; @@ -33,10 +30,7 @@ import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder; import org.apache.kafka.clients.producer.ProducerConfig; import java.time.ZoneId; -import java.util.Arrays; import java.util.Properties; -import java.util.Set; -import java.util.stream.Collectors; /** A {@link DataSink} for "Kafka" connector. */ public class KafkaDataSink implements DataSink { @@ -104,33 +98,6 @@ public class KafkaDataSink implements DataSink { @Override public MetadataApplier getMetadataApplier() { - return new MetadataApplier() { - - private Set<SchemaChangeEventType> enabledEventTypes = - Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet()); - - @Override - public MetadataApplier setAcceptedSchemaEvolutionTypes( - Set<SchemaChangeEventType> schemaEvolutionTypes) { - enabledEventTypes = schemaEvolutionTypes; - return this; - } - - @Override - public boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType) { - return enabledEventTypes.contains(schemaChangeEventType); - } - - @Override - public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() { - // All schema change events are supported. - return Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet()); - } - - @Override - public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) { - // simply do nothing here because Kafka do not maintain the schemas. - } - }; + return new KafkaMetaDataApplier(); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaMetaDataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaMetaDataApplier.java new file mode 100644 index 000000000..839dd9ea9 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaMetaDataApplier.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.kafka.sink; + +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEventType; +import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; +import org.apache.flink.cdc.common.sink.MetadataApplier; + +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; + +/** Supports {@link KafkaDataSink} to schema evolution. */ +public class KafkaMetaDataApplier implements MetadataApplier { + + private Set<SchemaChangeEventType> enabledEventTypes = + Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet()); + + @Override + public MetadataApplier setAcceptedSchemaEvolutionTypes( + Set<SchemaChangeEventType> schemaEvolutionTypes) { + enabledEventTypes = schemaEvolutionTypes; + return this; + } + + @Override + public boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType) { + return enabledEventTypes.contains(schemaChangeEventType); + } + + @Override + public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() { + // All schema change events are supported. + return Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet()); + } + + @Override + public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) { + // simply do nothing here because Kafka do not maintain the schemas. + } +}
