fapaul commented on code in PR #26662: URL: https://github.com/apache/flink/pull/26662#discussion_r2158361081
########## flink-end-to-end-tests/flink-confluent-schema-registry/src/main/resources/avro/input-record.avsc: ########## @@ -15,13 +15,14 @@ * limitations under the License. */ - {"namespace": "example.avro", - "type": "record", - "name": "User", - "fields": [ - {"name": "name", "type": "string", "default": ""}, - {"name": "favoriteNumber", "type": "string", "default": ""}, - {"name": "favoriteColor", "type": "string", "default": ""}, - {"name": "eventType","type": {"name": "EventType","type": "enum", "symbols": ["meeting"] }} - ] -} +{ + "namespace": "org.apache.flink.avro.generated", + "type": "record", + "name": "record", + "fields": [ + {"name": "name", "type": ["null", "string"], "default": null}, Review Comment: Nit: Why did you change the used type of the example? Is this related to registration? ########## flink-end-to-end-tests/flink-confluent-schema-registry/src/test/java/org/apache/flink/schema/registry/test/AvroConfluentITCase.java: ########## @@ -0,0 +1,444 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.schema.registry.test; + +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.FlinkContainersSettings; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.test.resources.ResourceTestUtils; +import org.apache.flink.test.util.SQLJobSubmission; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.jackson.JacksonMapperFactory; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** E2E Test for Avro-Confluent integration. */ +@Testcontainers +public class AvroConfluentITCase { + + private static final Logger LOG = LoggerFactory.getLogger(AvroConfluentITCase.class); + + private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka"; + private static final String INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS = "schema-registry"; + + private static final String TOPIC = "test-avro-input"; + private static final String RESULT_TOPIC = "test-avro-output"; + private static final String MANUAL_TOPIC = "test-avro-input-manual"; + private static final String MANUAL_RESULT_TOPIC = "test-avro-output-manual"; + + private static final String INPUT_SCHEMA = SchemaLoader.loadSchema("avro/input-record.avsc"); + private static final String OUTPUT_SCHEMA = SchemaLoader.loadSchema("avro/output-record.avsc"); + + private static final Path sqlToolBoxJar = ResourceTestUtils.getResource(".*/SqlToolbox\\.jar"); + + private final Path sqlConnectorKafkaJar = ResourceTestUtils.getResource(".*kafka.*\\.jar"); + private final Path sqlConnectorUpsertTestJar = + ResourceTestUtils.getResource(".*flink-test-utils.*\\.jar"); + private final Path sqlAvroConfluentJar = + ResourceTestUtils.getResource(".*avro-confluent.*\\.jar"); + + private static final Network NETWORK = Network.newNetwork(); + + @Container + public static final KafkaContainer KAFKA = + new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA)) + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS); + + @Container + private static final GenericContainer<?> SCHEMA_REGISTRY = + new GenericContainer<>(DockerImageName.parse(DockerImageVersions.SCHEMA_REGISTRY)) + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS) + .withExposedPorts(8081) + .withEnv("SCHEMA_REGISTRY_HOST_NAME", INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS) + .withEnv( + "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", + INTER_CONTAINER_KAFKA_ALIAS + ":9092") + .dependsOn(KAFKA); + + private static final FlinkContainers FLINK = + FlinkContainers.builder() + .withFlinkContainersSettings( + FlinkContainersSettings.builder().numTaskManagers(1).build()) + .withTestcontainersSettings( + TestcontainersSettings.builder() + .network(NETWORK) + .logger(LOG) + .dependsOn(KAFKA) + .build()) + .build(); + + private final HttpClient client = HttpClient.newHttpClient(); + private final ObjectMapper objectMapper = JacksonMapperFactory.createObjectMapper(); + + @BeforeAll + public static void setup() throws Exception { + KAFKA.start(); + SCHEMA_REGISTRY.start(); + FLINK.start(); + } + + @AfterAll + public static void tearDown() { + FLINK.stop(); + SCHEMA_REGISTRY.stop(); + KAFKA.stop(); + } + + @BeforeEach + public void before() throws Exception { + // Create topics using external bootstrap servers since we're outside the Docker network + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()); + try (Admin admin = Admin.create(props)) { + admin.createTopics( + Arrays.asList( + new NewTopic(TOPIC, 1, (short) 1), + new NewTopic(RESULT_TOPIC, 1, (short) 1), + new NewTopic(MANUAL_TOPIC, 1, (short) 1), + new NewTopic(MANUAL_RESULT_TOPIC, 1, (short) 1))); + + // Poll for topic creation with timeout + try { + CommonTestUtils.waitUntilIgnoringExceptions( + () -> { + try { + Set<String> topics = admin.listTopics().names().get(); + return topics.contains(TOPIC) + && topics.contains(RESULT_TOPIC) + && topics.contains(MANUAL_TOPIC) + && topics.contains(MANUAL_RESULT_TOPIC); + } catch (Exception e) { + LOG.warn("Exception while checking topic creation", e); + return false; + } + }, + Duration.ofSeconds(30), + Duration.ofMillis(100), + "Topics were not created in time"); + } catch (TimeoutException | InterruptedException e) { + throw new RuntimeException("Failed to wait for topic creation", e); + } + + LOG.info( + "Topics {}, {}, {}, and {} created successfully", + TOPIC, + RESULT_TOPIC, + MANUAL_TOPIC, + MANUAL_RESULT_TOPIC); + } + } + + @Test + public void testAvroConfluentIntegrationWithAutoRegister() throws Exception { + // Combine all SQL statements into a single submission + List<String> allSqlStatements = + Arrays.asList( + "SET 'table.dml-sync' = 'true';", + "", + "CREATE TABLE avro_input (", + " name STRING,", + " favoriteNumber STRING,", + " favoriteColor STRING,", + " eventType STRING", + ") WITH (", + " 'connector' = 'kafka',", + " 'topic' = '" + TOPIC + "',", + " 'properties.bootstrap.servers' = '" + + INTER_CONTAINER_KAFKA_ALIAS + + ":9092',", + " 'properties.group.id' = 'test-group',", + " 'scan.startup.mode' = 'earliest-offset',", + " 'scan.bounded.mode' = 'latest-offset',", + " 'format' = 'avro-confluent',", + " 'avro-confluent.url' = 'http://" + + INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS + + ":8081',", + " 'avro-confluent.auto.register.schemas' = 'true'", + ");", + "", + "CREATE TABLE avro_output (", + " name STRING,", + " favoriteNumber STRING,", + " favoriteColor STRING,", + " eventType STRING", + ") WITH (", + " 'connector' = 'kafka',", + " 'topic' = '" + RESULT_TOPIC + "',", + " 'properties.bootstrap.servers' = '" + + INTER_CONTAINER_KAFKA_ALIAS + + ":9092',", + " 'properties.group.id' = 'test-group',", + " 'scan.startup.mode' = 'earliest-offset',", + " 'scan.bounded.mode' = 'latest-offset',", + " 'format' = 'avro-confluent',", + " 'avro-confluent.url' = 'http://" + + INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS + + ":8081',", + " 'avro-confluent.auto.register.schemas' = 'true'", + ");", + "", + "INSERT INTO avro_input VALUES", + " ('Alice', '42', 'blue', 'INSERT'),", + " ('Bob', '7', 'red', 'INSERT'),", + " ('Charlie', '73', 'green', 'INSERT');", + "", + "INSERT INTO avro_output", + "SELECT * FROM avro_input;"); + + LOG.info("Submitting SQL statements: {}", String.join("\n", allSqlStatements)); + + // Execute all SQL statements in a single submission + executeSql(allSqlStatements); + + // Verify output + verifyNumberOfResultRecords(RESULT_TOPIC, 3); + } + + @Test + public void testAvroConfluentIntegrationWithManualRegister() throws Exception { Review Comment: I would have expected to see a test that runs a query and fails, if the schema isn't registered. The current test IMO doesn't fully cover the behavior since it can also pass if the SQL query does the registration. ########## flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java: ########## @@ -81,13 +100,29 @@ public Schema readSchema(InputStream in) throws IOException { @Override public void writeSchema(Schema schema, OutputStream out) throws IOException { - try { - int registeredId = schemaRegistryClient.register(subject, schema); - out.write(CONFLUENT_MAGIC_BYTE); - byte[] schemaIdBytes = ByteBuffer.allocate(4).putInt(registeredId).array(); - out.write(schemaIdBytes); - } catch (RestClientException e) { - throw new IOException("Could not register schema in registry", e); + int registeredId; + if (registerSchema()) { + try { + registeredId = schemaRegistryClient.register(subject, schema); + } catch (RestClientException e) { + throw new IOException("Could not register schema in registry", e); + } + } else { + try { + registeredId = schemaRegistryClient.getId(subject, schema); + } catch (RestClientException e) { + throw new IOException("Could not retrieve schema in registry", e); + } } + out.write(CONFLUENT_MAGIC_BYTE); + byte[] schemaIdBytes = ByteBuffer.allocate(4).putInt(registeredId).array(); + out.write(schemaIdBytes); + } + + private boolean registerSchema() { Review Comment: Why is `registerSchema` a method, instead of parsing the value once in the ctor? ########## flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml: ########## @@ -43,101 +43,160 @@ under the License. <dependencies> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java</artifactId> + <artifactId>flink-table-common</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency> - <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --> + <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-kafka</artifactId> - <version>3.0.0-1.17</version> + <artifactId>flink-table-api-java-bridge</artifactId> + <version>${project.version}</version> + <scope>provided</scope> </dependency> - <!-- Make sure that Shaded Guava matches the one used in the flink-connector-kafka, - or remove when FLINK-32462 is resolved --> + <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-shaded-guava</artifactId> - <version>30.1.1-jre-16.1</version> + <artifactId>flink-end-to-end-tests-common</artifactId> + <version>${project.version}</version> </dependency> + + <!-- This enables the WebUI during tests. --> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-avro</artifactId> + <artifactId>flink-runtime-web</artifactId> <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <!-- The following dependencies are for connector/format sql-jars that + we copy using the maven-dependency-plugin. When extending the test + to cover more connectors/formats, add a dependency here and an entry + to the dependency-plugin configuration below. + This ensures that all modules we actually need (as defined by the + dependency-plugin configuration) are built before this module. --> + <dependency> + <!-- Used by maven-dependency-plugin --> + <groupId>org.apache.flink</groupId> + <artifactId>flink-sql-connector-kafka</artifactId> + <version>4.0.0-2.0</version> + <scope>provided</scope> </dependency> + <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-avro-confluent-registry</artifactId> + <artifactId>flink-sql-avro-confluent-registry</artifactId> <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>3.9.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>kafka</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>io.confluent</groupId> + <artifactId>kafka-avro-serializer</artifactId> + <version>7.2.2</version> Review Comment: Afaik the serializer is versioned similarly to kafka. 7.2.2 should be Kafka 3.2. Can we upgrade the serializer to `7.9.0` to be inline with the used kafka version 3.9.0? ########## flink-end-to-end-tests/flink-confluent-schema-registry/src/test/java/org/apache/flink/schema/registry/test/AvroConfluentITCase.java: ########## @@ -0,0 +1,444 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.schema.registry.test; + +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.FlinkContainersSettings; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.test.resources.ResourceTestUtils; +import org.apache.flink.test.util.SQLJobSubmission; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.jackson.JacksonMapperFactory; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** E2E Test for Avro-Confluent integration. */ +@Testcontainers +public class AvroConfluentITCase { + + private static final Logger LOG = LoggerFactory.getLogger(AvroConfluentITCase.class); + + private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka"; + private static final String INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS = "schema-registry"; + + private static final String TOPIC = "test-avro-input"; + private static final String RESULT_TOPIC = "test-avro-output"; + private static final String MANUAL_TOPIC = "test-avro-input-manual"; + private static final String MANUAL_RESULT_TOPIC = "test-avro-output-manual"; + + private static final String INPUT_SCHEMA = SchemaLoader.loadSchema("avro/input-record.avsc"); + private static final String OUTPUT_SCHEMA = SchemaLoader.loadSchema("avro/output-record.avsc"); + + private static final Path sqlToolBoxJar = ResourceTestUtils.getResource(".*/SqlToolbox\\.jar"); + + private final Path sqlConnectorKafkaJar = ResourceTestUtils.getResource(".*kafka.*\\.jar"); + private final Path sqlConnectorUpsertTestJar = + ResourceTestUtils.getResource(".*flink-test-utils.*\\.jar"); + private final Path sqlAvroConfluentJar = + ResourceTestUtils.getResource(".*avro-confluent.*\\.jar"); + + private static final Network NETWORK = Network.newNetwork(); + + @Container + public static final KafkaContainer KAFKA = + new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA)) + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS); + + @Container + private static final GenericContainer<?> SCHEMA_REGISTRY = + new GenericContainer<>(DockerImageName.parse(DockerImageVersions.SCHEMA_REGISTRY)) + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS) + .withExposedPorts(8081) + .withEnv("SCHEMA_REGISTRY_HOST_NAME", INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS) + .withEnv( + "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", + INTER_CONTAINER_KAFKA_ALIAS + ":9092") + .dependsOn(KAFKA); + + private static final FlinkContainers FLINK = + FlinkContainers.builder() + .withFlinkContainersSettings( + FlinkContainersSettings.builder().numTaskManagers(1).build()) + .withTestcontainersSettings( + TestcontainersSettings.builder() + .network(NETWORK) + .logger(LOG) + .dependsOn(KAFKA) + .build()) + .build(); + + private final HttpClient client = HttpClient.newHttpClient(); + private final ObjectMapper objectMapper = JacksonMapperFactory.createObjectMapper(); + + @BeforeAll + public static void setup() throws Exception { + KAFKA.start(); + SCHEMA_REGISTRY.start(); + FLINK.start(); + } + + @AfterAll + public static void tearDown() { + FLINK.stop(); + SCHEMA_REGISTRY.stop(); + KAFKA.stop(); + } + + @BeforeEach + public void before() throws Exception { + // Create topics using external bootstrap servers since we're outside the Docker network + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()); + try (Admin admin = Admin.create(props)) { + admin.createTopics( + Arrays.asList( + new NewTopic(TOPIC, 1, (short) 1), + new NewTopic(RESULT_TOPIC, 1, (short) 1), + new NewTopic(MANUAL_TOPIC, 1, (short) 1), + new NewTopic(MANUAL_RESULT_TOPIC, 1, (short) 1))); + + // Poll for topic creation with timeout + try { + CommonTestUtils.waitUntilIgnoringExceptions( + () -> { + try { + Set<String> topics = admin.listTopics().names().get(); + return topics.contains(TOPIC) + && topics.contains(RESULT_TOPIC) + && topics.contains(MANUAL_TOPIC) + && topics.contains(MANUAL_RESULT_TOPIC); + } catch (Exception e) { + LOG.warn("Exception while checking topic creation", e); + return false; + } + }, + Duration.ofSeconds(30), + Duration.ofMillis(100), + "Topics were not created in time"); + } catch (TimeoutException | InterruptedException e) { + throw new RuntimeException("Failed to wait for topic creation", e); + } + + LOG.info( + "Topics {}, {}, {}, and {} created successfully", + TOPIC, + RESULT_TOPIC, + MANUAL_TOPIC, + MANUAL_RESULT_TOPIC); + } + } + + @Test + public void testAvroConfluentIntegrationWithAutoRegister() throws Exception { + // Combine all SQL statements into a single submission + List<String> allSqlStatements = + Arrays.asList( + "SET 'table.dml-sync' = 'true';", + "", + "CREATE TABLE avro_input (", + " name STRING,", + " favoriteNumber STRING,", + " favoriteColor STRING,", + " eventType STRING", + ") WITH (", + " 'connector' = 'kafka',", + " 'topic' = '" + TOPIC + "',", + " 'properties.bootstrap.servers' = '" + + INTER_CONTAINER_KAFKA_ALIAS + + ":9092',", + " 'properties.group.id' = 'test-group',", + " 'scan.startup.mode' = 'earliest-offset',", + " 'scan.bounded.mode' = 'latest-offset',", + " 'format' = 'avro-confluent',", + " 'avro-confluent.url' = 'http://" + + INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS + + ":8081',", + " 'avro-confluent.auto.register.schemas' = 'true'", + ");", + "", + "CREATE TABLE avro_output (", + " name STRING,", + " favoriteNumber STRING,", + " favoriteColor STRING,", + " eventType STRING", + ") WITH (", + " 'connector' = 'kafka',", + " 'topic' = '" + RESULT_TOPIC + "',", + " 'properties.bootstrap.servers' = '" + + INTER_CONTAINER_KAFKA_ALIAS + + ":9092',", + " 'properties.group.id' = 'test-group',", + " 'scan.startup.mode' = 'earliest-offset',", + " 'scan.bounded.mode' = 'latest-offset',", + " 'format' = 'avro-confluent',", + " 'avro-confluent.url' = 'http://" + + INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS + + ":8081',", + " 'avro-confluent.auto.register.schemas' = 'true'", + ");", + "", + "INSERT INTO avro_input VALUES", + " ('Alice', '42', 'blue', 'INSERT'),", + " ('Bob', '7', 'red', 'INSERT'),", + " ('Charlie', '73', 'green', 'INSERT');", + "", + "INSERT INTO avro_output", + "SELECT * FROM avro_input;"); Review Comment: Nit: Can you extract the String to a template? Currently, it's hard to review and spot the difference between the two test cases. ########## flink-end-to-end-tests/flink-confluent-schema-registry/src/test/java/org/apache/flink/schema/registry/test/AvroConfluentITCase.java: ########## @@ -0,0 +1,444 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.schema.registry.test; + +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.FlinkContainersSettings; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.test.resources.ResourceTestUtils; +import org.apache.flink.test.util.SQLJobSubmission; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.jackson.JacksonMapperFactory; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** E2E Test for Avro-Confluent integration. */ +@Testcontainers +public class AvroConfluentITCase { + + private static final Logger LOG = LoggerFactory.getLogger(AvroConfluentITCase.class); + + private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka"; + private static final String INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS = "schema-registry"; + + private static final String TOPIC = "test-avro-input"; + private static final String RESULT_TOPIC = "test-avro-output"; + private static final String MANUAL_TOPIC = "test-avro-input-manual"; + private static final String MANUAL_RESULT_TOPIC = "test-avro-output-manual"; + + private static final String INPUT_SCHEMA = SchemaLoader.loadSchema("avro/input-record.avsc"); + private static final String OUTPUT_SCHEMA = SchemaLoader.loadSchema("avro/output-record.avsc"); + + private static final Path sqlToolBoxJar = ResourceTestUtils.getResource(".*/SqlToolbox\\.jar"); + + private final Path sqlConnectorKafkaJar = ResourceTestUtils.getResource(".*kafka.*\\.jar"); + private final Path sqlConnectorUpsertTestJar = + ResourceTestUtils.getResource(".*flink-test-utils.*\\.jar"); + private final Path sqlAvroConfluentJar = + ResourceTestUtils.getResource(".*avro-confluent.*\\.jar"); + + private static final Network NETWORK = Network.newNetwork(); + + @Container + public static final KafkaContainer KAFKA = + new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA)) + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS); + + @Container + private static final GenericContainer<?> SCHEMA_REGISTRY = + new GenericContainer<>(DockerImageName.parse(DockerImageVersions.SCHEMA_REGISTRY)) + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS) + .withExposedPorts(8081) + .withEnv("SCHEMA_REGISTRY_HOST_NAME", INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS) + .withEnv( + "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", + INTER_CONTAINER_KAFKA_ALIAS + ":9092") + .dependsOn(KAFKA); + + private static final FlinkContainers FLINK = + FlinkContainers.builder() + .withFlinkContainersSettings( + FlinkContainersSettings.builder().numTaskManagers(1).build()) + .withTestcontainersSettings( + TestcontainersSettings.builder() + .network(NETWORK) + .logger(LOG) + .dependsOn(KAFKA) + .build()) + .build(); + + private final HttpClient client = HttpClient.newHttpClient(); + private final ObjectMapper objectMapper = JacksonMapperFactory.createObjectMapper(); + + @BeforeAll + public static void setup() throws Exception { + KAFKA.start(); + SCHEMA_REGISTRY.start(); + FLINK.start(); + } + + @AfterAll + public static void tearDown() { + FLINK.stop(); + SCHEMA_REGISTRY.stop(); + KAFKA.stop(); + } Review Comment: These methods should be obsolete since for the Kafka/SR container the lifecycle is already handled by using the `@Container` annotation and for the Flink container see the commend above. ########## flink-end-to-end-tests/flink-confluent-schema-registry/src/test/java/org/apache/flink/schema/registry/test/AvroConfluentITCase.java: ########## @@ -0,0 +1,444 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.schema.registry.test; + +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.FlinkContainersSettings; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.test.resources.ResourceTestUtils; +import org.apache.flink.test.util.SQLJobSubmission; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.jackson.JacksonMapperFactory; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** E2E Test for Avro-Confluent integration. */ +@Testcontainers +public class AvroConfluentITCase { + + private static final Logger LOG = LoggerFactory.getLogger(AvroConfluentITCase.class); + + private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka"; + private static final String INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS = "schema-registry"; + + private static final String TOPIC = "test-avro-input"; + private static final String RESULT_TOPIC = "test-avro-output"; + private static final String MANUAL_TOPIC = "test-avro-input-manual"; + private static final String MANUAL_RESULT_TOPIC = "test-avro-output-manual"; + + private static final String INPUT_SCHEMA = SchemaLoader.loadSchema("avro/input-record.avsc"); + private static final String OUTPUT_SCHEMA = SchemaLoader.loadSchema("avro/output-record.avsc"); + + private static final Path sqlToolBoxJar = ResourceTestUtils.getResource(".*/SqlToolbox\\.jar"); + + private final Path sqlConnectorKafkaJar = ResourceTestUtils.getResource(".*kafka.*\\.jar"); + private final Path sqlConnectorUpsertTestJar = + ResourceTestUtils.getResource(".*flink-test-utils.*\\.jar"); + private final Path sqlAvroConfluentJar = + ResourceTestUtils.getResource(".*avro-confluent.*\\.jar"); + + private static final Network NETWORK = Network.newNetwork(); + + @Container + public static final KafkaContainer KAFKA = + new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA)) + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS); + + @Container + private static final GenericContainer<?> SCHEMA_REGISTRY = + new GenericContainer<>(DockerImageName.parse(DockerImageVersions.SCHEMA_REGISTRY)) + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS) + .withExposedPorts(8081) + .withEnv("SCHEMA_REGISTRY_HOST_NAME", INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS) + .withEnv( + "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", + INTER_CONTAINER_KAFKA_ALIAS + ":9092") + .dependsOn(KAFKA); + Review Comment: You can use ```suggestion @RegisterExtension ``` to avoid manually starting the Flink Cluster and let Junit handle the lifecycle. ########## flink-end-to-end-tests/flink-sql-client-test/pom.xml: ########## @@ -61,14 +61,21 @@ under the License. <!-- Used by maven-dependency-plugin --> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka</artifactId> - <version>3.0.0-1.17</version> + <version>4.0.0-2.0</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-sql-avro-confluent-registry</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> - <version>3.2.3</version> + <version>3.9.0</version> Review Comment: Nit: SInce you also upgraded the FLink connector to be compatible with Kafka 4.0, can you also use kafka 4.0 here? ########## flink-end-to-end-tests/flink-confluent-schema-registry/src/test/resources/log4j2-test.properties: ########## @@ -0,0 +1,34 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = INFO Review Comment: Please set this to OFF before merging ########## flink-end-to-end-tests/flink-confluent-schema-registry/src/test/java/org/apache/flink/schema/registry/test/AvroConfluentITCase.java: ########## @@ -0,0 +1,444 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.schema.registry.test; + +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.FlinkContainersSettings; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.test.resources.ResourceTestUtils; +import org.apache.flink.test.util.SQLJobSubmission; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.jackson.JacksonMapperFactory; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** E2E Test for Avro-Confluent integration. */ +@Testcontainers +public class AvroConfluentITCase { + + private static final Logger LOG = LoggerFactory.getLogger(AvroConfluentITCase.class); + + private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka"; + private static final String INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS = "schema-registry"; + + private static final String TOPIC = "test-avro-input"; + private static final String RESULT_TOPIC = "test-avro-output"; + private static final String MANUAL_TOPIC = "test-avro-input-manual"; + private static final String MANUAL_RESULT_TOPIC = "test-avro-output-manual"; + + private static final String INPUT_SCHEMA = SchemaLoader.loadSchema("avro/input-record.avsc"); + private static final String OUTPUT_SCHEMA = SchemaLoader.loadSchema("avro/output-record.avsc"); + + private static final Path sqlToolBoxJar = ResourceTestUtils.getResource(".*/SqlToolbox\\.jar"); + + private final Path sqlConnectorKafkaJar = ResourceTestUtils.getResource(".*kafka.*\\.jar"); + private final Path sqlConnectorUpsertTestJar = + ResourceTestUtils.getResource(".*flink-test-utils.*\\.jar"); + private final Path sqlAvroConfluentJar = + ResourceTestUtils.getResource(".*avro-confluent.*\\.jar"); + + private static final Network NETWORK = Network.newNetwork(); + + @Container + public static final KafkaContainer KAFKA = + new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA)) + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS); + + @Container + private static final GenericContainer<?> SCHEMA_REGISTRY = + new GenericContainer<>(DockerImageName.parse(DockerImageVersions.SCHEMA_REGISTRY)) + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS) + .withExposedPorts(8081) + .withEnv("SCHEMA_REGISTRY_HOST_NAME", INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS) + .withEnv( + "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", + INTER_CONTAINER_KAFKA_ALIAS + ":9092") + .dependsOn(KAFKA); + + private static final FlinkContainers FLINK = + FlinkContainers.builder() + .withFlinkContainersSettings( + FlinkContainersSettings.builder().numTaskManagers(1).build()) + .withTestcontainersSettings( + TestcontainersSettings.builder() + .network(NETWORK) + .logger(LOG) + .dependsOn(KAFKA) + .build()) + .build(); + + private final HttpClient client = HttpClient.newHttpClient(); + private final ObjectMapper objectMapper = JacksonMapperFactory.createObjectMapper(); Review Comment: Nit: I usually make the ObjectMapper static to avoid any reinitialization ########## docs/content.zh/docs/connectors/table/formats/avro-confluent.md: ########## @@ -280,6 +280,13 @@ Format 参数 <td>String</td> <td>The URL of the Confluent Schema Registry to fetch/register schemas.</td> </tr> + <tr> + <td><h5>auto.register.schemas</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>Whether to automatically register schemas with the Confluent Schema Registry if they don't exist. When set to <code>false</code>, schemas must be manually registered in the Schema Registry before being used. When set to <code>true</code>, schemas will be automatically registered during serialization if they don't already exist. The default value is <code>true</code>.</td> Review Comment: Did we document anywhere in which scenario we auto register schemas for example when reading or writing from/to a table? ########## flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java: ########## @@ -81,13 +100,29 @@ public Schema readSchema(InputStream in) throws IOException { @Override public void writeSchema(Schema schema, OutputStream out) throws IOException { - try { - int registeredId = schemaRegistryClient.register(subject, schema); - out.write(CONFLUENT_MAGIC_BYTE); - byte[] schemaIdBytes = ByteBuffer.allocate(4).putInt(registeredId).array(); - out.write(schemaIdBytes); - } catch (RestClientException e) { - throw new IOException("Could not register schema in registry", e); + int registeredId; + if (registerSchema()) { + try { + registeredId = schemaRegistryClient.register(subject, schema); + } catch (RestClientException e) { + throw new IOException("Could not register schema in registry", e); + } + } else { + try { + registeredId = schemaRegistryClient.getId(subject, schema); + } catch (RestClientException e) { + throw new IOException("Could not retrieve schema in registry", e); Review Comment: Does it throw an IOException if the schema is not present? Nit: Can we maybe throw a better exception e.g. FlinkException to make sure this is "expected" on not found schema? ########## flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java: ########## @@ -229,6 +229,74 @@ public void testSerializationSchemaWithInvalidOptionalSchema() { null, SCHEMA.toPhysicalRowDataType())); } + @Test Review Comment: Nit: Can we turn the three (or at least the first/third) tests into a `parameterTest` to avoid the code duplication? It's a bit unclear to me why only test two verifies the behavior for sinks and sources while the other tests only look at sinks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org