This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5dd99eddef34e0f90ed9a1bc8648735bd464c4b4 Author: Fabian Paul <[email protected]> AuthorDate: Tue Sep 14 16:04:25 2021 +0200 [FLINK-24281][connectors/kafka] Migrate all format tests from FlinkKafkaProducer to KafkaSink --- .../kafka/table/KafkaChangelogTableITCase.java | 82 ++++++++-------------- .../connectors/kafka/table/KafkaTableTestBase.java | 3 + .../registry/test/TestAvroConsumerConfluent.java | 48 ++++++++----- 3 files changed, 66 insertions(+), 67 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java index 70d5060..501d470 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java @@ -21,12 +21,15 @@ package org.apache.flink.streaming.connectors.kafka.table; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.table.api.TableResult; +import org.apache.kafka.clients.producer.ProducerConfig; import org.junit.Before; import org.junit.Test; @@ -36,8 +39,6 @@ import java.util.Arrays; import java.util.List; import java.util.Properties; -import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE; -import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic.EXACTLY_ONCE; import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.readLines; import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.waitingExpectedResults; @@ -65,23 +66,8 @@ public class KafkaChangelogTableITCase extends KafkaTableTestBase { // ---------- Write the Debezium json into Kafka ------------------- List<String> lines = readLines("debezium-data-schema-exclude.txt"); - DataStreamSource<String> stream = env.fromCollection(lines); - SerializationSchema<String> serSchema = new SimpleStringSchema(); - FlinkKafkaPartitioner<String> partitioner = new FlinkFixedPartitioner<>(); - - // the producer must not produce duplicates - Properties producerProperties = getStandardProps(); - producerProperties.setProperty("retries", "0"); try { - stream.addSink( - new FlinkKafkaProducer<>( - topic, - serSchema, - producerProperties, - partitioner, - EXACTLY_ONCE, - DEFAULT_KAFKA_PRODUCERS_POOL_SIZE)); - env.execute("Write sequence"); + writeRecordsToKafka(topic, lines); } catch (Exception e) { throw new Exception("Failed to write debezium data to Kafka.", e); } @@ -208,23 +194,8 @@ public class KafkaChangelogTableITCase extends KafkaTableTestBase { // ---------- Write the Canal json into Kafka ------------------- List<String> lines = readLines("canal-data.txt"); - DataStreamSource<String> stream = env.fromCollection(lines); - SerializationSchema<String> serSchema = new SimpleStringSchema(); - FlinkKafkaPartitioner<String> partitioner = new FlinkFixedPartitioner<>(); - - // the producer must not produce duplicates - Properties producerProperties = getStandardProps(); - producerProperties.setProperty("retries", "0"); try { - stream.addSink( - new FlinkKafkaProducer<>( - topic, - serSchema, - producerProperties, - partitioner, - EXACTLY_ONCE, - DEFAULT_KAFKA_PRODUCERS_POOL_SIZE)); - env.execute("Write sequence"); + writeRecordsToKafka(topic, lines); } catch (Exception e) { throw new Exception("Failed to write canal data to Kafka.", e); } @@ -361,23 +332,8 @@ public class KafkaChangelogTableITCase extends KafkaTableTestBase { // ---------- Write the Maxwell json into Kafka ------------------- List<String> lines = readLines("maxwell-data.txt"); - DataStreamSource<String> stream = env.fromCollection(lines); - SerializationSchema<String> serSchema = new SimpleStringSchema(); - FlinkKafkaPartitioner<String> partitioner = new FlinkFixedPartitioner<>(); - - // the producer must not produce duplicates - Properties producerProperties = getStandardProps(); - producerProperties.setProperty("retries", "0"); try { - stream.addSink( - new FlinkKafkaProducer<>( - topic, - serSchema, - producerProperties, - partitioner, - EXACTLY_ONCE, - DEFAULT_KAFKA_PRODUCERS_POOL_SIZE)); - env.execute("Write sequence"); + writeRecordsToKafka(topic, lines); } catch (Exception e) { throw new Exception("Failed to write maxwell data to Kafka.", e); } @@ -492,4 +448,28 @@ public class KafkaChangelogTableITCase extends KafkaTableTestBase { tableResult.getJobClient().get().cancel().get(); // stop the job deleteTestTopic(topic); } + + private void writeRecordsToKafka(String topic, List<String> lines) throws Exception { + DataStreamSource<String> stream = env.fromCollection(lines); + SerializationSchema<String> serSchema = new SimpleStringSchema(); + FlinkKafkaPartitioner<String> partitioner = new FlinkFixedPartitioner<>(); + + // the producer must not produce duplicates + Properties producerProperties = getStandardProps(); + producerProperties.setProperty("retries", "0"); + stream.sinkTo( + KafkaSink.<String>builder() + .setBootstrapServers( + producerProperties.getProperty( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) + .setRecordSerializer( + KafkaRecordSerializationSchema.builder() + .setTopic(topic) + .setValueSerializationSchema(serSchema) + .setPartitioner(partitioner) + .build()) + .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) + .build()); + env.execute("Write sequence"); + } } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java index a562ec7..28c15a8 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java @@ -76,6 +76,9 @@ public abstract class KafkaTableTestBase extends AbstractTestBase { }.withEmbeddedZookeeper() .withNetwork(NETWORK) .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS) + .withEnv( + "KAFKA_TRANSACTION_MAX_TIMEOUT_MS", + String.valueOf(Duration.ofHours(2).toMillis())) // Disable log deletion to prevent records from being deleted during test run .withEnv("KAFKA_LOG_RETENTION_MS", "-1"); diff --git a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java index 840aa9e..e3f4149 100644 --- a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java +++ b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java @@ -20,16 +20,18 @@ package org.apache.flink.schema.registry.test; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import example.avro.User; import org.apache.avro.specific.SpecificRecordBase; +import org.apache.kafka.clients.producer.ProducerConfig; import java.util.Properties; @@ -74,22 +76,36 @@ public class TestAvroConsumerConfluent { SingleOutputStreamOperator<String> mapToString = input.map((MapFunction<User, String>) SpecificRecordBase::toString); - FlinkKafkaProducer<String> stringFlinkKafkaProducer = - new FlinkKafkaProducer<>( - parameterTool.getRequired("output-string-topic"), - new SimpleStringSchema(), - config); - mapToString.addSink(stringFlinkKafkaProducer); + KafkaSink<String> stringSink = + KafkaSink.<String>builder() + .setBootstrapServers( + config.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) + .setRecordSerializer( + KafkaRecordSerializationSchema.builder() + .setValueSerializationSchema(new SimpleStringSchema()) + .setTopic(parameterTool.getRequired("output-string-topic")) + .build()) + .setKafkaProducerConfig(config) + .build(); + mapToString.sinkTo(stringSink); - FlinkKafkaProducer<User> avroFlinkKafkaProducer = - new FlinkKafkaProducer<>( - parameterTool.getRequired("output-avro-topic"), - ConfluentRegistryAvroSerializationSchema.forSpecific( - User.class, - parameterTool.getRequired("output-subject"), - schemaRegistryUrl), - config); - input.addSink(avroFlinkKafkaProducer); + KafkaSink<User> avroSink = + KafkaSink.<User>builder() + .setBootstrapServers( + config.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) + .setRecordSerializer( + KafkaRecordSerializationSchema.builder() + .setValueSerializationSchema( + ConfluentRegistryAvroSerializationSchema + .forSpecific( + User.class, + parameterTool.getRequired( + "output-subject"), + schemaRegistryUrl)) + .setTopic(parameterTool.getRequired("output-avro-topic")) + .build()) + .build(); + input.sinkTo(avroSink); env.execute("Kafka Confluent Schema Registry AVRO Example"); }
