dawidwys commented on a change in pull request #12919:
URL: https://github.com/apache/flink/pull/12919#discussion_r458585321



##########
File path: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProvider.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/** A {@link SchemaCoder.SchemaCoderProvider} that uses a cached schema 
registry
+ * client underlying. **/
+@Internal
+class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider {

Review comment:
       Do we still need to extract this class after the latest changes?

##########
File path: flink-formats/flink-avro/pom.xml
##########
@@ -140,6 +132,14 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <!-- Avro RowData schema test dependency -->

Review comment:
       Unnecessary change.

##########
File path: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroSerializationSchema.java
##########
@@ -92,25 +91,4 @@ private ConfluentRegistryAvroSerializationSchema(Class<T> 
recordClazz, Schema sc
                        new CachedSchemaCoderProvider(subject, 
schemaRegistryUrl, DEFAULT_IDENTITY_MAP_CAPACITY)
                );
        }
-
-       private static class CachedSchemaCoderProvider implements 
SchemaCoder.SchemaCoderProvider {

Review comment:
       ditto

##########
File path: 
flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
+import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
+import org.apache.flink.formats.avro.AvroToRowDataConverters;
+import org.apache.flink.formats.avro.RowDataToAvroConverters;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TestDynamicTableFactory;
+import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link RegistryAvroFormatFactory}.
+ */
+public class RegistryAvroFormatFactoryTest {
+       private TableSchema schema;
+       private RowType rowType;
+       private String subject;
+       private String registryURL;
+
+       @Rule
+       public ExpectedException thrown = ExpectedException.none();
+
+       @Before
+       public void before() {
+               this.schema = TableSchema.builder()
+                               .field("a", DataTypes.STRING())
+                               .field("b", DataTypes.INT())
+                               .field("c", DataTypes.BOOLEAN())
+                               .build();
+               this.rowType = (RowType) 
schema.toRowDataType().getLogicalType();
+               this.subject = "test-subject";
+               this.registryURL = "http://localhost:8081";;
+       }
+
+       @Test
+       public void testSeDeSchema() {

Review comment:
       You have two completely independent tests in the single method. Please 
split it into two separate tests. We should always aim to test a single thing 
at a time. The benefits are:
   1. Both tests are always executed. Independent of the result of the other.
   2. It's easier to debug. You don't need to run the first case if the second 
fails.

##########
File path: 
flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
+import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
+import org.apache.flink.formats.avro.AvroToRowDataConverters;
+import org.apache.flink.formats.avro.RowDataToAvroConverters;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TestDynamicTableFactory;
+import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link RegistryAvroFormatFactory}.
+ */
+public class RegistryAvroFormatFactoryTest {
+       private TableSchema schema;
+       private RowType rowType;
+       private String subject;
+       private String registryURL;
+
+       @Rule
+       public ExpectedException thrown = ExpectedException.none();
+
+       @Before
+       public void before() {
+               this.schema = TableSchema.builder()
+                               .field("a", DataTypes.STRING())
+                               .field("b", DataTypes.INT())
+                               .field("c", DataTypes.BOOLEAN())
+                               .build();
+               this.rowType = (RowType) 
schema.toRowDataType().getLogicalType();
+               this.subject = "test-subject";
+               this.registryURL = "http://localhost:8081";;
+       }
+
+       @Test
+       public void testSeDeSchema() {
+               final AvroRowDataDeserializationSchema expectedDeser =
+                               new AvroRowDataDeserializationSchema(
+                                               
ConfluentRegistryAvroDeserializationSchema.forGeneric(
+                                                               
AvroSchemaConverter.convertToSchema(rowType),
+                                                               registryURL),
+                                               
AvroToRowDataConverters.createRowConverter(rowType),
+                                               InternalTypeInfo.of(rowType));
+
+               final Map<String, String> options = getAllOptions();
+
+               final DynamicTableSource actualSource = 
createTableSource(options);
+               assert actualSource instanceof 
TestDynamicTableFactory.DynamicTableSourceMock;
+               TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
+                               
(TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
+
+               DeserializationSchema<RowData> actualDeser = 
scanSourceMock.valueFormat
+                               .createRuntimeDecoder(
+                                               
ScanRuntimeProviderContext.INSTANCE,
+                                               schema.toRowDataType());
+
+               assertEquals(expectedDeser, actualDeser);
+
+               final AvroRowDataSerializationSchema expectedSer =
+                               new AvroRowDataSerializationSchema(
+                                               rowType,
+                                               
ConfluentRegistryAvroSerializationSchema.forGeneric(
+                                                               subject,
+                                                               
AvroSchemaConverter.convertToSchema(rowType),
+                                                               registryURL),
+                                               
RowDataToAvroConverters.createRowConverter(rowType));
+
+               final DynamicTableSink actualSink = createTableSink(options);
+               assert actualSink instanceof 
TestDynamicTableFactory.DynamicTableSinkMock;
+               TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+                               (TestDynamicTableFactory.DynamicTableSinkMock) 
actualSink;
+
+               SerializationSchema<RowData> actualSer = sinkMock.valueFormat
+                               .createRuntimeEncoder(
+                                               null,
+                                               schema.toRowDataType());
+
+               assertEquals(expectedSer, actualSer);
+       }
+
+       @Test
+       public void testMissingSubjectForSink() {
+               thrown.expect(ValidationException.class);
+               thrown.expect(
+                               containsCause(
+                                               new ValidationException("Option 
avro-sr.schema-registry.subject "
+                                                               + "is required 
for serialization")));
+
+               final Map<String, String> options =
+                               getModifiedOptions(opts -> 
opts.remove("avro-sr.schema-registry.subject"));
+
+               createTableSink(options);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Returns the full options modified by the given consumer {@code 
optionModifier}.
+        *
+        * @param optionModifier Consumer to modify the options
+        */
+       private Map<String, String> getModifiedOptions(Consumer<Map<String, 
String>> optionModifier) {
+               Map<String, String> options = getAllOptions();
+               optionModifier.accept(options);
+               return options;
+       }
+
+       private Map<String, String> getAllOptions() {

Review comment:
       `getAllOptions` -> `getDefaultOptions`

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java
##########
@@ -64,61 +41,59 @@
 
        private static final long serialVersionUID = 1L;
 
+       /** Nested schema to serialize the {@link GenericRecord} into bytes. **/
+       private final SerializationSchema<GenericRecord> nestedSchema;
+
        /**
         * Logical type describing the input type.
         */
        private final RowType rowType;
 
-       /**
-        * Runtime instance that performs the actual work.
-        */
-       private final SerializationRuntimeConverter runtimeConverter;
-
        /**
         * Avro serialization schema.
         */
        private transient Schema schema;
 
        /**
-        * Writer to serialize Avro record into a Avro bytes.
-        */
-       private transient DatumWriter<IndexedRecord> datumWriter;
-
-       /**
-        * Output stream to serialize records into byte array.
+        * Runtime instance that performs the actual work.
         */
-       private transient ByteArrayOutputStream arrayOutputStream;
+       private final RowDataToAvroConverters.RowDataToAvroConverter 
runtimeConverter;
 
        /**
-        * Low-level class for serialization of Avro values.
+        * Creates an Avro serialization schema with the given record row type.
         */
-       private transient Encoder encoder;
+       public AvroRowDataSerializationSchema(RowType rowType) {

Review comment:
       How about we remove this ctor? IMO the logic from this ctor should be 
only in the factory. I know that this class in theory is `PublicEvolving` but 
practically it is only usable from Table API through the factory. Therefore in 
my opinion it is safe to drop this ctor.
   
   The same applies to `SerializationSchema`.

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroRowDataSeDeSchemaTest.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.formats.avro.generated.Address;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.formats.avro.utils.TestDataGenerator;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Random;
+
+import static org.apache.flink.formats.avro.utils.AvroTestUtils.writeRecord;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link AvroRowDataDeserializationSchema} and
+ * {@link AvroRowDataSerializationSchema} for schema registry avro.
+ */
+public class RegistryAvroRowDataSeDeSchemaTest {
+       private static final String ADDRESS_SCHEMA = "" +

Review comment:
       IMO we should add a simple test for serializing and deserializing using 
schema registry. It does not need to be very in depth, but  so that it checks 
that everything is well connected.

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
##########
@@ -75,17 +44,10 @@
 @PublicEvolving
 public class AvroRowDataDeserializationSchema implements 
DeserializationSchema<RowData> {
 
-       private static final long serialVersionUID = 1L;
-
-       /**
-        * Used for converting Date type.
-        */
-       private static final int MILLIS_PER_DAY = 86400_000;
+       private static final long serialVersionUID = 9055890466043022732L;

Review comment:
       There is no point in using a large number here. Use  `2L` here.

##########
File path: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
##########
@@ -114,23 +113,4 @@ private 
ConfluentRegistryAvroDeserializationSchema(Class<T> recordClazz, @Nullab
                        new CachedSchemaCoderProvider(url, identityMapCapacity)
                );
        }
-
-       private static class CachedSchemaCoderProvider implements 
SchemaCoder.SchemaCoderProvider {

Review comment:
       ditto

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.extractValueTypeToAvroMap;
+import static org.joda.time.DateTimeConstants.MILLIS_PER_DAY;
+
+/** Tool class used to convert from Avro {@link GenericRecord} to {@link 
RowData}. **/
+public class AvroToRowDataConverters {

Review comment:
       `@Internal`

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroRowDataSeDeSchemaTest.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.formats.avro.generated.Address;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.formats.avro.utils.TestDataGenerator;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Random;
+
+import static org.apache.flink.formats.avro.utils.AvroTestUtils.writeRecord;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link AvroRowDataDeserializationSchema} and
+ * {@link AvroRowDataSerializationSchema} for schema registry avro.
+ */
+public class RegistryAvroRowDataSeDeSchemaTest {

Review comment:
       Those tests have nothing to do with schema registry.
   
   They test the same logic as in `AvroRowDataDeSerializationSchemaTest`

##########
File path: 
flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
+import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
+import org.apache.flink.formats.avro.AvroToRowDataConverters;
+import org.apache.flink.formats.avro.RowDataToAvroConverters;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TestDynamicTableFactory;
+import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link RegistryAvroFormatFactory}.
+ */
+public class RegistryAvroFormatFactoryTest {
+       private TableSchema schema;
+       private RowType rowType;
+       private String subject;
+       private String registryURL;
+
+       @Rule
+       public ExpectedException thrown = ExpectedException.none();
+
+       @Before
+       public void before() {

Review comment:
       Why do we need that in the `@Before` block? Can't we just initialize it 
statically?

##########
File path: 
flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
+import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
+import org.apache.flink.formats.avro.AvroToRowDataConverters;
+import org.apache.flink.formats.avro.RowDataToAvroConverters;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TestDynamicTableFactory;
+import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link RegistryAvroFormatFactory}.
+ */
+public class RegistryAvroFormatFactoryTest {
+       private TableSchema schema;
+       private RowType rowType;
+       private String subject;
+       private String registryURL;
+
+       @Rule
+       public ExpectedException thrown = ExpectedException.none();
+
+       @Before
+       public void before() {
+               this.schema = TableSchema.builder()
+                               .field("a", DataTypes.STRING())
+                               .field("b", DataTypes.INT())
+                               .field("c", DataTypes.BOOLEAN())
+                               .build();
+               this.rowType = (RowType) 
schema.toRowDataType().getLogicalType();
+               this.subject = "test-subject";
+               this.registryURL = "http://localhost:8081";;
+       }
+
+       @Test
+       public void testSeDeSchema() {
+               final AvroRowDataDeserializationSchema expectedDeser =
+                               new AvroRowDataDeserializationSchema(
+                                               
ConfluentRegistryAvroDeserializationSchema.forGeneric(
+                                                               
AvroSchemaConverter.convertToSchema(rowType),
+                                                               registryURL),
+                                               
AvroToRowDataConverters.createRowConverter(rowType),
+                                               InternalTypeInfo.of(rowType));
+
+               final Map<String, String> options = getAllOptions();
+
+               final DynamicTableSource actualSource = 
createTableSource(options);
+               assert actualSource instanceof 
TestDynamicTableFactory.DynamicTableSourceMock;

Review comment:
       Please don't use `assert`. I can't think of a reason to use an `assert` 
in a test. `assert` is an assertion you can disable via a compiler flag. Why 
would you want to disable assertions in tests? If you want to check the type of 
`actualSource` use e.g. 
   `assertThat(actualSink, 
instanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class));`

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -169,6 +171,121 @@ private AvroSchemaConverter() {
                throw new IllegalArgumentException("Unsupported Avro type '" + 
schema.getType() + "'.");
        }
 
+       /**
+        * Converts an Avro schema string into a nested row structure with 
deterministic field order and data
+        * types that are compatible with Flink's Table & SQL API.
+        *
+        * @param avroSchemaString Avro schema definition string
+        *
+        * @return data type matching the schema
+        */
+       public static DataType convertToDataType(String avroSchemaString) {

Review comment:
       Could we add tests for this method?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to