fapaul commented on code in PR #26662:
URL: https://github.com/apache/flink/pull/26662#discussion_r2158361081


##########
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/resources/avro/input-record.avsc:
##########
@@ -15,13 +15,14 @@
  * limitations under the License.
  */
 
- {"namespace": "example.avro",
- "type": "record",
- "name": "User",
- "fields": [
-     {"name": "name", "type": "string", "default": ""},
-     {"name": "favoriteNumber",  "type": "string", "default": ""},
-     {"name": "favoriteColor", "type": "string", "default": ""},
-     {"name": "eventType","type": {"name": "EventType","type": "enum", 
"symbols": ["meeting"] }}
- ]
-}
+{
+  "namespace": "org.apache.flink.avro.generated",
+  "type": "record",
+  "name": "record",
+  "fields": [
+    {"name": "name", "type": ["null", "string"], "default": null},

Review Comment:
   Nit: Why did you change the used type of the example? Is this related to 
registration?



##########
flink-end-to-end-tests/flink-confluent-schema-registry/src/test/java/org/apache/flink/schema/registry/test/AvroConfluentITCase.java:
##########
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.schema.registry.test;
+
+import org.apache.flink.connector.testframe.container.FlinkContainers;
+import org.apache.flink.connector.testframe.container.FlinkContainersSettings;
+import org.apache.flink.connector.testframe.container.TestcontainersSettings;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.test.resources.ResourceTestUtils;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** E2E Test for Avro-Confluent integration. */
+@Testcontainers
+public class AvroConfluentITCase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AvroConfluentITCase.class);
+
+    private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
+    private static final String INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS = 
"schema-registry";
+
+    private static final String TOPIC = "test-avro-input";
+    private static final String RESULT_TOPIC = "test-avro-output";
+    private static final String MANUAL_TOPIC = "test-avro-input-manual";
+    private static final String MANUAL_RESULT_TOPIC = 
"test-avro-output-manual";
+
+    private static final String INPUT_SCHEMA = 
SchemaLoader.loadSchema("avro/input-record.avsc");
+    private static final String OUTPUT_SCHEMA = 
SchemaLoader.loadSchema("avro/output-record.avsc");
+
+    private static final Path sqlToolBoxJar = 
ResourceTestUtils.getResource(".*/SqlToolbox\\.jar");
+
+    private final Path sqlConnectorKafkaJar = 
ResourceTestUtils.getResource(".*kafka.*\\.jar");
+    private final Path sqlConnectorUpsertTestJar =
+            ResourceTestUtils.getResource(".*flink-test-utils.*\\.jar");
+    private final Path sqlAvroConfluentJar =
+            ResourceTestUtils.getResource(".*avro-confluent.*\\.jar");
+
+    private static final Network NETWORK = Network.newNetwork();
+
+    @Container
+    public static final KafkaContainer KAFKA =
+            new 
KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
+
+    @Container
+    private static final GenericContainer<?> SCHEMA_REGISTRY =
+            new 
GenericContainer<>(DockerImageName.parse(DockerImageVersions.SCHEMA_REGISTRY))
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases(INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS)
+                    .withExposedPorts(8081)
+                    .withEnv("SCHEMA_REGISTRY_HOST_NAME", 
INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS)
+                    .withEnv(
+                            "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS",
+                            INTER_CONTAINER_KAFKA_ALIAS + ":9092")
+                    .dependsOn(KAFKA);
+
+    private static final FlinkContainers FLINK =
+            FlinkContainers.builder()
+                    .withFlinkContainersSettings(
+                            
FlinkContainersSettings.builder().numTaskManagers(1).build())
+                    .withTestcontainersSettings(
+                            TestcontainersSettings.builder()
+                                    .network(NETWORK)
+                                    .logger(LOG)
+                                    .dependsOn(KAFKA)
+                                    .build())
+                    .build();
+
+    private final HttpClient client = HttpClient.newHttpClient();
+    private final ObjectMapper objectMapper = 
JacksonMapperFactory.createObjectMapper();
+
+    @BeforeAll
+    public static void setup() throws Exception {
+        KAFKA.start();
+        SCHEMA_REGISTRY.start();
+        FLINK.start();
+    }
+
+    @AfterAll
+    public static void tearDown() {
+        FLINK.stop();
+        SCHEMA_REGISTRY.stop();
+        KAFKA.stop();
+    }
+
+    @BeforeEach
+    public void before() throws Exception {
+        // Create topics using external bootstrap servers since we're outside 
the Docker network
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KAFKA.getBootstrapServers());
+        try (Admin admin = Admin.create(props)) {
+            admin.createTopics(
+                    Arrays.asList(
+                            new NewTopic(TOPIC, 1, (short) 1),
+                            new NewTopic(RESULT_TOPIC, 1, (short) 1),
+                            new NewTopic(MANUAL_TOPIC, 1, (short) 1),
+                            new NewTopic(MANUAL_RESULT_TOPIC, 1, (short) 1)));
+
+            // Poll for topic creation with timeout
+            try {
+                CommonTestUtils.waitUntilIgnoringExceptions(
+                        () -> {
+                            try {
+                                Set<String> topics = 
admin.listTopics().names().get();
+                                return topics.contains(TOPIC)
+                                        && topics.contains(RESULT_TOPIC)
+                                        && topics.contains(MANUAL_TOPIC)
+                                        && 
topics.contains(MANUAL_RESULT_TOPIC);
+                            } catch (Exception e) {
+                                LOG.warn("Exception while checking topic 
creation", e);
+                                return false;
+                            }
+                        },
+                        Duration.ofSeconds(30),
+                        Duration.ofMillis(100),
+                        "Topics were not created in time");
+            } catch (TimeoutException | InterruptedException e) {
+                throw new RuntimeException("Failed to wait for topic 
creation", e);
+            }
+
+            LOG.info(
+                    "Topics {}, {}, {}, and {} created successfully",
+                    TOPIC,
+                    RESULT_TOPIC,
+                    MANUAL_TOPIC,
+                    MANUAL_RESULT_TOPIC);
+        }
+    }
+
+    @Test
+    public void testAvroConfluentIntegrationWithAutoRegister() throws 
Exception {
+        // Combine all SQL statements into a single submission
+        List<String> allSqlStatements =
+                Arrays.asList(
+                        "SET 'table.dml-sync' = 'true';",
+                        "",
+                        "CREATE TABLE avro_input (",
+                        "  name STRING,",
+                        "  favoriteNumber STRING,",
+                        "  favoriteColor STRING,",
+                        "  eventType STRING",
+                        ") WITH (",
+                        "  'connector' = 'kafka',",
+                        "  'topic' = '" + TOPIC + "',",
+                        "  'properties.bootstrap.servers' = '"
+                                + INTER_CONTAINER_KAFKA_ALIAS
+                                + ":9092',",
+                        "  'properties.group.id' = 'test-group',",
+                        "  'scan.startup.mode' = 'earliest-offset',",
+                        "  'scan.bounded.mode' = 'latest-offset',",
+                        "  'format' = 'avro-confluent',",
+                        "  'avro-confluent.url' = 'http://";
+                                + INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS
+                                + ":8081',",
+                        "  'avro-confluent.auto.register.schemas' = 'true'",
+                        ");",
+                        "",
+                        "CREATE TABLE avro_output (",
+                        "  name STRING,",
+                        "  favoriteNumber STRING,",
+                        "  favoriteColor STRING,",
+                        "  eventType STRING",
+                        ") WITH (",
+                        "  'connector' = 'kafka',",
+                        "  'topic' = '" + RESULT_TOPIC + "',",
+                        "  'properties.bootstrap.servers' = '"
+                                + INTER_CONTAINER_KAFKA_ALIAS
+                                + ":9092',",
+                        "  'properties.group.id' = 'test-group',",
+                        "  'scan.startup.mode' = 'earliest-offset',",
+                        "  'scan.bounded.mode' = 'latest-offset',",
+                        "  'format' = 'avro-confluent',",
+                        "  'avro-confluent.url' = 'http://";
+                                + INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS
+                                + ":8081',",
+                        "  'avro-confluent.auto.register.schemas' = 'true'",
+                        ");",
+                        "",
+                        "INSERT INTO avro_input VALUES",
+                        "  ('Alice', '42', 'blue', 'INSERT'),",
+                        "  ('Bob', '7', 'red', 'INSERT'),",
+                        "  ('Charlie', '73', 'green', 'INSERT');",
+                        "",
+                        "INSERT INTO avro_output",
+                        "SELECT * FROM avro_input;");
+
+        LOG.info("Submitting SQL statements: {}", String.join("\n", 
allSqlStatements));
+
+        // Execute all SQL statements in a single submission
+        executeSql(allSqlStatements);
+
+        // Verify output
+        verifyNumberOfResultRecords(RESULT_TOPIC, 3);
+    }
+
+    @Test
+    public void testAvroConfluentIntegrationWithManualRegister() throws 
Exception {

Review Comment:
   I would have expected to see a test that runs a query and fails, if the 
schema isn't registered.
   
   The current test IMO doesn't fully cover the behavior since it can also pass 
if the SQL query does the registration.



##########
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java:
##########
@@ -81,13 +100,29 @@ public Schema readSchema(InputStream in) throws 
IOException {
 
     @Override
     public void writeSchema(Schema schema, OutputStream out) throws 
IOException {
-        try {
-            int registeredId = schemaRegistryClient.register(subject, schema);
-            out.write(CONFLUENT_MAGIC_BYTE);
-            byte[] schemaIdBytes = 
ByteBuffer.allocate(4).putInt(registeredId).array();
-            out.write(schemaIdBytes);
-        } catch (RestClientException e) {
-            throw new IOException("Could not register schema in registry", e);
+        int registeredId;
+        if (registerSchema()) {
+            try {
+                registeredId = schemaRegistryClient.register(subject, schema);
+            } catch (RestClientException e) {
+                throw new IOException("Could not register schema in registry", 
e);
+            }
+        } else {
+            try {
+                registeredId = schemaRegistryClient.getId(subject, schema);
+            } catch (RestClientException e) {
+                throw new IOException("Could not retrieve schema in registry", 
e);
+            }
         }
+        out.write(CONFLUENT_MAGIC_BYTE);
+        byte[] schemaIdBytes = 
ByteBuffer.allocate(4).putInt(registeredId).array();
+        out.write(schemaIdBytes);
+    }
+
+    private boolean registerSchema() {

Review Comment:
   Why is `registerSchema` a method, instead of parsing the value once in the 
ctor?



##########
flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml:
##########
@@ -43,101 +43,160 @@ under the License.
        <dependencies>
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-java</artifactId>
+                       <artifactId>flink-table-common</artifactId>
                        <version>${project.version}</version>
                        <scope>provided</scope>
                </dependency>
-               <!-- 
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-connector-kafka</artifactId>
-                       <version>3.0.0-1.17</version>
+                       <artifactId>flink-table-api-java-bridge</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
                </dependency>
-               <!-- Make sure that Shaded Guava matches the one used in the 
flink-connector-kafka,
-                or remove when FLINK-32462 is resolved -->
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-shaded-guava</artifactId>
-                       <version>30.1.1-jre-16.1</version>
+                       <artifactId>flink-end-to-end-tests-common</artifactId>
+                       <version>${project.version}</version>
                </dependency>
+
+               <!-- This enables the WebUI during tests. -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-avro</artifactId>
+                       <artifactId>flink-runtime-web</artifactId>
                        <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <!-- The following dependencies are for connector/format 
sql-jars that
+                       we copy using the maven-dependency-plugin. When 
extending the test
+                       to cover more connectors/formats, add a dependency here 
and an entry
+                       to the dependency-plugin configuration below.
+                       This ensures that all modules we actually need (as 
defined by the
+                       dependency-plugin configuration) are built before this 
module. -->
+               <dependency>
+                       <!-- Used by maven-dependency-plugin -->
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-sql-connector-kafka</artifactId>
+                       <version>4.0.0-2.0</version>
+                       <scope>provided</scope>
                </dependency>
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-avro-confluent-registry</artifactId>
+                       
<artifactId>flink-sql-avro-confluent-registry</artifactId>
                        <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.kafka</groupId>
+                       <artifactId>kafka-clients</artifactId>
+                       <version>3.9.0</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.testcontainers</groupId>
+                       <artifactId>kafka</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>io.confluent</groupId>
+                       <artifactId>kafka-avro-serializer</artifactId>
+                       <version>7.2.2</version>

Review Comment:
   Afaik the serializer is versioned similarly to kafka. 7.2.2 should be Kafka 
3.2. Can we upgrade the serializer to `7.9.0` to be inline with the used kafka 
version 3.9.0?



##########
flink-end-to-end-tests/flink-confluent-schema-registry/src/test/java/org/apache/flink/schema/registry/test/AvroConfluentITCase.java:
##########
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.schema.registry.test;
+
+import org.apache.flink.connector.testframe.container.FlinkContainers;
+import org.apache.flink.connector.testframe.container.FlinkContainersSettings;
+import org.apache.flink.connector.testframe.container.TestcontainersSettings;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.test.resources.ResourceTestUtils;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** E2E Test for Avro-Confluent integration. */
+@Testcontainers
+public class AvroConfluentITCase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AvroConfluentITCase.class);
+
+    private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
+    private static final String INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS = 
"schema-registry";
+
+    private static final String TOPIC = "test-avro-input";
+    private static final String RESULT_TOPIC = "test-avro-output";
+    private static final String MANUAL_TOPIC = "test-avro-input-manual";
+    private static final String MANUAL_RESULT_TOPIC = 
"test-avro-output-manual";
+
+    private static final String INPUT_SCHEMA = 
SchemaLoader.loadSchema("avro/input-record.avsc");
+    private static final String OUTPUT_SCHEMA = 
SchemaLoader.loadSchema("avro/output-record.avsc");
+
+    private static final Path sqlToolBoxJar = 
ResourceTestUtils.getResource(".*/SqlToolbox\\.jar");
+
+    private final Path sqlConnectorKafkaJar = 
ResourceTestUtils.getResource(".*kafka.*\\.jar");
+    private final Path sqlConnectorUpsertTestJar =
+            ResourceTestUtils.getResource(".*flink-test-utils.*\\.jar");
+    private final Path sqlAvroConfluentJar =
+            ResourceTestUtils.getResource(".*avro-confluent.*\\.jar");
+
+    private static final Network NETWORK = Network.newNetwork();
+
+    @Container
+    public static final KafkaContainer KAFKA =
+            new 
KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
+
+    @Container
+    private static final GenericContainer<?> SCHEMA_REGISTRY =
+            new 
GenericContainer<>(DockerImageName.parse(DockerImageVersions.SCHEMA_REGISTRY))
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases(INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS)
+                    .withExposedPorts(8081)
+                    .withEnv("SCHEMA_REGISTRY_HOST_NAME", 
INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS)
+                    .withEnv(
+                            "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS",
+                            INTER_CONTAINER_KAFKA_ALIAS + ":9092")
+                    .dependsOn(KAFKA);
+
+    private static final FlinkContainers FLINK =
+            FlinkContainers.builder()
+                    .withFlinkContainersSettings(
+                            
FlinkContainersSettings.builder().numTaskManagers(1).build())
+                    .withTestcontainersSettings(
+                            TestcontainersSettings.builder()
+                                    .network(NETWORK)
+                                    .logger(LOG)
+                                    .dependsOn(KAFKA)
+                                    .build())
+                    .build();
+
+    private final HttpClient client = HttpClient.newHttpClient();
+    private final ObjectMapper objectMapper = 
JacksonMapperFactory.createObjectMapper();
+
+    @BeforeAll
+    public static void setup() throws Exception {
+        KAFKA.start();
+        SCHEMA_REGISTRY.start();
+        FLINK.start();
+    }
+
+    @AfterAll
+    public static void tearDown() {
+        FLINK.stop();
+        SCHEMA_REGISTRY.stop();
+        KAFKA.stop();
+    }
+
+    @BeforeEach
+    public void before() throws Exception {
+        // Create topics using external bootstrap servers since we're outside 
the Docker network
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KAFKA.getBootstrapServers());
+        try (Admin admin = Admin.create(props)) {
+            admin.createTopics(
+                    Arrays.asList(
+                            new NewTopic(TOPIC, 1, (short) 1),
+                            new NewTopic(RESULT_TOPIC, 1, (short) 1),
+                            new NewTopic(MANUAL_TOPIC, 1, (short) 1),
+                            new NewTopic(MANUAL_RESULT_TOPIC, 1, (short) 1)));
+
+            // Poll for topic creation with timeout
+            try {
+                CommonTestUtils.waitUntilIgnoringExceptions(
+                        () -> {
+                            try {
+                                Set<String> topics = 
admin.listTopics().names().get();
+                                return topics.contains(TOPIC)
+                                        && topics.contains(RESULT_TOPIC)
+                                        && topics.contains(MANUAL_TOPIC)
+                                        && 
topics.contains(MANUAL_RESULT_TOPIC);
+                            } catch (Exception e) {
+                                LOG.warn("Exception while checking topic 
creation", e);
+                                return false;
+                            }
+                        },
+                        Duration.ofSeconds(30),
+                        Duration.ofMillis(100),
+                        "Topics were not created in time");
+            } catch (TimeoutException | InterruptedException e) {
+                throw new RuntimeException("Failed to wait for topic 
creation", e);
+            }
+
+            LOG.info(
+                    "Topics {}, {}, {}, and {} created successfully",
+                    TOPIC,
+                    RESULT_TOPIC,
+                    MANUAL_TOPIC,
+                    MANUAL_RESULT_TOPIC);
+        }
+    }
+
+    @Test
+    public void testAvroConfluentIntegrationWithAutoRegister() throws 
Exception {
+        // Combine all SQL statements into a single submission
+        List<String> allSqlStatements =
+                Arrays.asList(
+                        "SET 'table.dml-sync' = 'true';",
+                        "",
+                        "CREATE TABLE avro_input (",
+                        "  name STRING,",
+                        "  favoriteNumber STRING,",
+                        "  favoriteColor STRING,",
+                        "  eventType STRING",
+                        ") WITH (",
+                        "  'connector' = 'kafka',",
+                        "  'topic' = '" + TOPIC + "',",
+                        "  'properties.bootstrap.servers' = '"
+                                + INTER_CONTAINER_KAFKA_ALIAS
+                                + ":9092',",
+                        "  'properties.group.id' = 'test-group',",
+                        "  'scan.startup.mode' = 'earliest-offset',",
+                        "  'scan.bounded.mode' = 'latest-offset',",
+                        "  'format' = 'avro-confluent',",
+                        "  'avro-confluent.url' = 'http://";
+                                + INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS
+                                + ":8081',",
+                        "  'avro-confluent.auto.register.schemas' = 'true'",
+                        ");",
+                        "",
+                        "CREATE TABLE avro_output (",
+                        "  name STRING,",
+                        "  favoriteNumber STRING,",
+                        "  favoriteColor STRING,",
+                        "  eventType STRING",
+                        ") WITH (",
+                        "  'connector' = 'kafka',",
+                        "  'topic' = '" + RESULT_TOPIC + "',",
+                        "  'properties.bootstrap.servers' = '"
+                                + INTER_CONTAINER_KAFKA_ALIAS
+                                + ":9092',",
+                        "  'properties.group.id' = 'test-group',",
+                        "  'scan.startup.mode' = 'earliest-offset',",
+                        "  'scan.bounded.mode' = 'latest-offset',",
+                        "  'format' = 'avro-confluent',",
+                        "  'avro-confluent.url' = 'http://";
+                                + INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS
+                                + ":8081',",
+                        "  'avro-confluent.auto.register.schemas' = 'true'",
+                        ");",
+                        "",
+                        "INSERT INTO avro_input VALUES",
+                        "  ('Alice', '42', 'blue', 'INSERT'),",
+                        "  ('Bob', '7', 'red', 'INSERT'),",
+                        "  ('Charlie', '73', 'green', 'INSERT');",
+                        "",
+                        "INSERT INTO avro_output",
+                        "SELECT * FROM avro_input;");

Review Comment:
   Nit: Can you extract the String to a template? Currently, it's hard to 
review and spot the difference between the two test cases.



##########
flink-end-to-end-tests/flink-confluent-schema-registry/src/test/java/org/apache/flink/schema/registry/test/AvroConfluentITCase.java:
##########
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.schema.registry.test;
+
+import org.apache.flink.connector.testframe.container.FlinkContainers;
+import org.apache.flink.connector.testframe.container.FlinkContainersSettings;
+import org.apache.flink.connector.testframe.container.TestcontainersSettings;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.test.resources.ResourceTestUtils;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** E2E Test for Avro-Confluent integration. */
+@Testcontainers
+public class AvroConfluentITCase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AvroConfluentITCase.class);
+
+    private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
+    private static final String INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS = 
"schema-registry";
+
+    private static final String TOPIC = "test-avro-input";
+    private static final String RESULT_TOPIC = "test-avro-output";
+    private static final String MANUAL_TOPIC = "test-avro-input-manual";
+    private static final String MANUAL_RESULT_TOPIC = 
"test-avro-output-manual";
+
+    private static final String INPUT_SCHEMA = 
SchemaLoader.loadSchema("avro/input-record.avsc");
+    private static final String OUTPUT_SCHEMA = 
SchemaLoader.loadSchema("avro/output-record.avsc");
+
+    private static final Path sqlToolBoxJar = 
ResourceTestUtils.getResource(".*/SqlToolbox\\.jar");
+
+    private final Path sqlConnectorKafkaJar = 
ResourceTestUtils.getResource(".*kafka.*\\.jar");
+    private final Path sqlConnectorUpsertTestJar =
+            ResourceTestUtils.getResource(".*flink-test-utils.*\\.jar");
+    private final Path sqlAvroConfluentJar =
+            ResourceTestUtils.getResource(".*avro-confluent.*\\.jar");
+
+    private static final Network NETWORK = Network.newNetwork();
+
+    @Container
+    public static final KafkaContainer KAFKA =
+            new 
KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
+
+    @Container
+    private static final GenericContainer<?> SCHEMA_REGISTRY =
+            new 
GenericContainer<>(DockerImageName.parse(DockerImageVersions.SCHEMA_REGISTRY))
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases(INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS)
+                    .withExposedPorts(8081)
+                    .withEnv("SCHEMA_REGISTRY_HOST_NAME", 
INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS)
+                    .withEnv(
+                            "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS",
+                            INTER_CONTAINER_KAFKA_ALIAS + ":9092")
+                    .dependsOn(KAFKA);
+
+    private static final FlinkContainers FLINK =
+            FlinkContainers.builder()
+                    .withFlinkContainersSettings(
+                            
FlinkContainersSettings.builder().numTaskManagers(1).build())
+                    .withTestcontainersSettings(
+                            TestcontainersSettings.builder()
+                                    .network(NETWORK)
+                                    .logger(LOG)
+                                    .dependsOn(KAFKA)
+                                    .build())
+                    .build();
+
+    private final HttpClient client = HttpClient.newHttpClient();
+    private final ObjectMapper objectMapper = 
JacksonMapperFactory.createObjectMapper();
+
+    @BeforeAll
+    public static void setup() throws Exception {
+        KAFKA.start();
+        SCHEMA_REGISTRY.start();
+        FLINK.start();
+    }
+
+    @AfterAll
+    public static void tearDown() {
+        FLINK.stop();
+        SCHEMA_REGISTRY.stop();
+        KAFKA.stop();
+    }

Review Comment:
   These methods should be obsolete since for the Kafka/SR container the 
lifecycle is already handled by using the `@Container` annotation and for the 
Flink container see the commend above.



##########
flink-end-to-end-tests/flink-confluent-schema-registry/src/test/java/org/apache/flink/schema/registry/test/AvroConfluentITCase.java:
##########
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.schema.registry.test;
+
+import org.apache.flink.connector.testframe.container.FlinkContainers;
+import org.apache.flink.connector.testframe.container.FlinkContainersSettings;
+import org.apache.flink.connector.testframe.container.TestcontainersSettings;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.test.resources.ResourceTestUtils;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** E2E Test for Avro-Confluent integration. */
+@Testcontainers
+public class AvroConfluentITCase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AvroConfluentITCase.class);
+
+    private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
+    private static final String INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS = 
"schema-registry";
+
+    private static final String TOPIC = "test-avro-input";
+    private static final String RESULT_TOPIC = "test-avro-output";
+    private static final String MANUAL_TOPIC = "test-avro-input-manual";
+    private static final String MANUAL_RESULT_TOPIC = 
"test-avro-output-manual";
+
+    private static final String INPUT_SCHEMA = 
SchemaLoader.loadSchema("avro/input-record.avsc");
+    private static final String OUTPUT_SCHEMA = 
SchemaLoader.loadSchema("avro/output-record.avsc");
+
+    private static final Path sqlToolBoxJar = 
ResourceTestUtils.getResource(".*/SqlToolbox\\.jar");
+
+    private final Path sqlConnectorKafkaJar = 
ResourceTestUtils.getResource(".*kafka.*\\.jar");
+    private final Path sqlConnectorUpsertTestJar =
+            ResourceTestUtils.getResource(".*flink-test-utils.*\\.jar");
+    private final Path sqlAvroConfluentJar =
+            ResourceTestUtils.getResource(".*avro-confluent.*\\.jar");
+
+    private static final Network NETWORK = Network.newNetwork();
+
+    @Container
+    public static final KafkaContainer KAFKA =
+            new 
KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
+
+    @Container
+    private static final GenericContainer<?> SCHEMA_REGISTRY =
+            new 
GenericContainer<>(DockerImageName.parse(DockerImageVersions.SCHEMA_REGISTRY))
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases(INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS)
+                    .withExposedPorts(8081)
+                    .withEnv("SCHEMA_REGISTRY_HOST_NAME", 
INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS)
+                    .withEnv(
+                            "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS",
+                            INTER_CONTAINER_KAFKA_ALIAS + ":9092")
+                    .dependsOn(KAFKA);
+

Review Comment:
   You can use
   ```suggestion
   @RegisterExtension
   ```
   to avoid manually starting the Flink Cluster and let Junit handle the 
lifecycle.
   



##########
flink-end-to-end-tests/flink-sql-client-test/pom.xml:
##########
@@ -61,14 +61,21 @@ under the License.
                        <!-- Used by maven-dependency-plugin -->
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-sql-connector-kafka</artifactId>
-                       <version>3.0.0-1.17</version>
+                       <version>4.0.0-2.0</version>
                        <scope>provided</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-sql-avro-confluent-registry</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
                <dependency>
                        <groupId>org.apache.kafka</groupId>
                        <artifactId>kafka-clients</artifactId>
-                       <version>3.2.3</version>
+                       <version>3.9.0</version>

Review Comment:
   Nit: SInce you also upgraded the FLink connector to be compatible with Kafka 
4.0, can you also use kafka 4.0 here?



##########
flink-end-to-end-tests/flink-confluent-schema-registry/src/test/resources/log4j2-test.properties:
##########
@@ -0,0 +1,34 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = INFO

Review Comment:
   Please set this to OFF before merging



##########
flink-end-to-end-tests/flink-confluent-schema-registry/src/test/java/org/apache/flink/schema/registry/test/AvroConfluentITCase.java:
##########
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.schema.registry.test;
+
+import org.apache.flink.connector.testframe.container.FlinkContainers;
+import org.apache.flink.connector.testframe.container.FlinkContainersSettings;
+import org.apache.flink.connector.testframe.container.TestcontainersSettings;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.test.resources.ResourceTestUtils;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** E2E Test for Avro-Confluent integration. */
+@Testcontainers
+public class AvroConfluentITCase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AvroConfluentITCase.class);
+
+    private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
+    private static final String INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS = 
"schema-registry";
+
+    private static final String TOPIC = "test-avro-input";
+    private static final String RESULT_TOPIC = "test-avro-output";
+    private static final String MANUAL_TOPIC = "test-avro-input-manual";
+    private static final String MANUAL_RESULT_TOPIC = 
"test-avro-output-manual";
+
+    private static final String INPUT_SCHEMA = 
SchemaLoader.loadSchema("avro/input-record.avsc");
+    private static final String OUTPUT_SCHEMA = 
SchemaLoader.loadSchema("avro/output-record.avsc");
+
+    private static final Path sqlToolBoxJar = 
ResourceTestUtils.getResource(".*/SqlToolbox\\.jar");
+
+    private final Path sqlConnectorKafkaJar = 
ResourceTestUtils.getResource(".*kafka.*\\.jar");
+    private final Path sqlConnectorUpsertTestJar =
+            ResourceTestUtils.getResource(".*flink-test-utils.*\\.jar");
+    private final Path sqlAvroConfluentJar =
+            ResourceTestUtils.getResource(".*avro-confluent.*\\.jar");
+
+    private static final Network NETWORK = Network.newNetwork();
+
+    @Container
+    public static final KafkaContainer KAFKA =
+            new 
KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
+
+    @Container
+    private static final GenericContainer<?> SCHEMA_REGISTRY =
+            new 
GenericContainer<>(DockerImageName.parse(DockerImageVersions.SCHEMA_REGISTRY))
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases(INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS)
+                    .withExposedPorts(8081)
+                    .withEnv("SCHEMA_REGISTRY_HOST_NAME", 
INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS)
+                    .withEnv(
+                            "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS",
+                            INTER_CONTAINER_KAFKA_ALIAS + ":9092")
+                    .dependsOn(KAFKA);
+
+    private static final FlinkContainers FLINK =
+            FlinkContainers.builder()
+                    .withFlinkContainersSettings(
+                            
FlinkContainersSettings.builder().numTaskManagers(1).build())
+                    .withTestcontainersSettings(
+                            TestcontainersSettings.builder()
+                                    .network(NETWORK)
+                                    .logger(LOG)
+                                    .dependsOn(KAFKA)
+                                    .build())
+                    .build();
+
+    private final HttpClient client = HttpClient.newHttpClient();
+    private final ObjectMapper objectMapper = 
JacksonMapperFactory.createObjectMapper();

Review Comment:
   Nit: I usually make the ObjectMapper static to avoid any reinitialization 



##########
docs/content.zh/docs/connectors/table/formats/avro-confluent.md:
##########
@@ -280,6 +280,13 @@ Format 参数
             <td>String</td>
             <td>The URL of the Confluent Schema Registry to fetch/register 
schemas.</td>
         </tr>
+        <tr>
+            <td><h5>auto.register.schemas</h5></td>
+            <td>optional</td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Whether to automatically register schemas with the Confluent 
Schema Registry if they don't exist. When set to <code>false</code>, schemas 
must be manually registered in the Schema Registry before being used. When set 
to <code>true</code>, schemas will be automatically registered during 
serialization if they don't already exist. The default value is 
<code>true</code>.</td>

Review Comment:
   Did we document anywhere in which scenario we auto register schemas for 
example when reading or writing from/to a table?



##########
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java:
##########
@@ -81,13 +100,29 @@ public Schema readSchema(InputStream in) throws 
IOException {
 
     @Override
     public void writeSchema(Schema schema, OutputStream out) throws 
IOException {
-        try {
-            int registeredId = schemaRegistryClient.register(subject, schema);
-            out.write(CONFLUENT_MAGIC_BYTE);
-            byte[] schemaIdBytes = 
ByteBuffer.allocate(4).putInt(registeredId).array();
-            out.write(schemaIdBytes);
-        } catch (RestClientException e) {
-            throw new IOException("Could not register schema in registry", e);
+        int registeredId;
+        if (registerSchema()) {
+            try {
+                registeredId = schemaRegistryClient.register(subject, schema);
+            } catch (RestClientException e) {
+                throw new IOException("Could not register schema in registry", 
e);
+            }
+        } else {
+            try {
+                registeredId = schemaRegistryClient.getId(subject, schema);
+            } catch (RestClientException e) {
+                throw new IOException("Could not retrieve schema in registry", 
e);

Review Comment:
   Does it throw an IOException if the schema is not present?
   
   Nit: Can we maybe throw a better exception e.g. FlinkException to make sure 
this is "expected" on not found schema?



##########
flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java:
##########
@@ -229,6 +229,74 @@ public void 
testSerializationSchemaWithInvalidOptionalSchema() {
                                 null, SCHEMA.toPhysicalRowDataType()));
     }
 
+    @Test

Review Comment:
   Nit: Can we turn the three (or at least the first/third) tests into a 
`parameterTest` to avoid the code duplication?
   
   It's a bit unclear to me why only test two verifies the behavior for sinks 
and sources while the other tests only look at sinks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to