This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new fd971b92ed [Improve] improve avro format convert (#6082)
fd971b92ed is described below

commit fd971b92edbac0d304049e7d145073dd2f34eadc
Author: Jarvis <[email protected]>
AuthorDate: Fri Jan 5 18:00:34 2024 +0800

    [Improve] improve avro format convert (#6082)
---
 docs/en/connector-v2/formats/avro.md               | 111 ++++++++++++
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     | 198 ++++++++++++++++++++-
 .../avro/fake_source_to_kafka_avro_format.conf     |   4 +-
 ...o_to_console.conf => kafka_avro_to_assert.conf} |  16 +-
 .../format/avro/AvroSerializationSchema.java       |   3 +-
 .../seatunnel/format/avro/AvroToRowConverter.java  |  93 +++-------
 .../seatunnel/format/avro/RowToAvroConverter.java  |  13 +-
 .../format/avro/AvroSerializationSchemaTest.java   |   4 +-
 8 files changed, 356 insertions(+), 86 deletions(-)

diff --git a/docs/en/connector-v2/formats/avro.md 
b/docs/en/connector-v2/formats/avro.md
new file mode 100644
index 0000000000..b9ee961daf
--- /dev/null
+++ b/docs/en/connector-v2/formats/avro.md
@@ -0,0 +1,111 @@
+# Avro format
+
+Avro is very popular in streaming data pipeline. Now seatunnel supports Avro 
format in kafka connector.
+
+# How to use Avro format
+
+## Kafka uses example
+
+- This is an example to generate data from fake source and sink to kafka with 
avro format.
+
+```bash
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    row.num = 90
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_avro_topic_fake_source"
+    format = avro
+  }
+}
+```
+
+- This is an example read data from kafka with avro format and print to 
console.
+
+```bash
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_avro_topic"
+    result_table_name = "kafka_table"
+    kafka.auto.offset.reset = "earliest"
+    format = avro
+    format_error_handle_way = skip
+    schema = {
+      fields {
+        id = bigint
+        c_map = "map<string, smallint>"
+        c_array = "array<tinyint>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(2, 1)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+sink {
+  Console {
+    source_table_name = "kafka_table"
+  }
+}
+```
+
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 0bf222bf87..d86aa89186 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -36,6 +36,7 @@ import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.container.TestContainerId;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.format.avro.AvroDeserializationSchema;
 import org.apache.seatunnel.format.text.TextSerializationSchema;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -47,6 +48,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
@@ -294,17 +296,117 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
     }
 
     @TestTemplate
-    @DisabledOnContainer(TestContainerId.SPARK_2_4)
+    @DisabledOnContainer(value = {TestContainerId.SPARK_2_4})
     public void testFakeSourceToKafkaAvroFormat(TestContainer container)
             throws IOException, InterruptedException {
         Container.ExecResult execResult =
                 
container.executeJob("/avro/fake_source_to_kafka_avro_format.conf");
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+        String[] subField = {
+            "c_map",
+            "c_array",
+            "c_string",
+            "c_boolean",
+            "c_tinyint",
+            "c_smallint",
+            "c_int",
+            "c_bigint",
+            "c_float",
+            "c_double",
+            "c_bytes",
+            "c_date",
+            "c_decimal",
+            "c_timestamp"
+        };
+        SeaTunnelDataType<?>[] subFieldTypes = {
+            new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
+            ArrayType.INT_ARRAY_TYPE,
+            BasicType.STRING_TYPE,
+            BasicType.BOOLEAN_TYPE,
+            BasicType.BYTE_TYPE,
+            BasicType.SHORT_TYPE,
+            BasicType.INT_TYPE,
+            BasicType.LONG_TYPE,
+            BasicType.FLOAT_TYPE,
+            BasicType.DOUBLE_TYPE,
+            PrimitiveByteArrayType.INSTANCE,
+            LocalTimeType.LOCAL_DATE_TYPE,
+            new DecimalType(38, 18),
+            LocalTimeType.LOCAL_DATE_TIME_TYPE
+        };
+        SeaTunnelRowType subRow = new SeaTunnelRowType(subField, 
subFieldTypes);
+        String[] fieldNames = {
+            "c_map",
+            "c_array",
+            "c_string",
+            "c_boolean",
+            "c_tinyint",
+            "c_smallint",
+            "c_int",
+            "c_bigint",
+            "c_float",
+            "c_double",
+            "c_bytes",
+            "c_date",
+            "c_decimal",
+            "c_timestamp",
+            "c_row"
+        };
+        SeaTunnelDataType<?>[] fieldTypes = {
+            new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
+            ArrayType.INT_ARRAY_TYPE,
+            BasicType.STRING_TYPE,
+            BasicType.BOOLEAN_TYPE,
+            BasicType.BYTE_TYPE,
+            BasicType.SHORT_TYPE,
+            BasicType.INT_TYPE,
+            BasicType.LONG_TYPE,
+            BasicType.FLOAT_TYPE,
+            BasicType.DOUBLE_TYPE,
+            PrimitiveByteArrayType.INSTANCE,
+            LocalTimeType.LOCAL_DATE_TYPE,
+            new DecimalType(38, 18),
+            LocalTimeType.LOCAL_DATE_TIME_TYPE,
+            subRow
+        };
+        SeaTunnelRowType fake_source_row_type = new 
SeaTunnelRowType(fieldNames, fieldTypes);
+        AvroDeserializationSchema avroDeserializationSchema =
+                new AvroDeserializationSchema(fake_source_row_type);
+        List<SeaTunnelRow> kafkaSTRow =
+                getKafkaSTRow(
+                        "test_avro_topic_fake_source",
+                        value -> {
+                            try {
+                                return 
avroDeserializationSchema.deserialize(value);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+        Assertions.assertEquals(90, kafkaSTRow.size());
+        kafkaSTRow.forEach(
+                row -> {
+                    Assertions.assertInstanceOf(Map.class, row.getField(0));
+                    Assertions.assertInstanceOf(Integer[].class, 
row.getField(1));
+                    Assertions.assertInstanceOf(String.class, row.getField(2));
+                    Assertions.assertEquals("fake_source_avro", 
row.getField(2).toString());
+                    Assertions.assertInstanceOf(Boolean.class, 
row.getField(3));
+                    Assertions.assertInstanceOf(Byte.class, row.getField(4));
+                    Assertions.assertInstanceOf(Short.class, row.getField(5));
+                    Assertions.assertInstanceOf(Integer.class, 
row.getField(6));
+                    Assertions.assertInstanceOf(Long.class, row.getField(7));
+                    Assertions.assertInstanceOf(Float.class, row.getField(8));
+                    Assertions.assertInstanceOf(Double.class, row.getField(9));
+                    Assertions.assertInstanceOf(byte[].class, 
row.getField(10));
+                    Assertions.assertInstanceOf(LocalDate.class, 
row.getField(11));
+                    Assertions.assertInstanceOf(BigDecimal.class, 
row.getField(12));
+                    Assertions.assertInstanceOf(LocalDateTime.class, 
row.getField(13));
+                    Assertions.assertInstanceOf(SeaTunnelRow.class, 
row.getField(14));
+                });
     }
 
     @TestTemplate
-    @DisabledOnContainer(TestContainerId.SPARK_2_4)
-    public void testKafkaAvroToConsole(TestContainer container)
+    @DisabledOnContainer(value = {TestContainerId.SPARK_2_4})
+    public void testKafkaAvroToAssert(TestContainer container)
             throws IOException, InterruptedException {
         DefaultSeaTunnelRowSerializer serializer =
                 DefaultSeaTunnelRowSerializer.create(
@@ -312,9 +414,48 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
                         SEATUNNEL_ROW_TYPE,
                         MessageFormat.AVRO,
                         DEFAULT_FIELD_DELIMITER);
-        generateTestData(row -> serializer.serializeRow(row), 0, 100);
-        Container.ExecResult execResult = 
container.executeJob("/avro/kafka_avro_to_console.conf");
+        int start = 0;
+        int end = 100;
+        generateTestData(row -> serializer.serializeRow(row), start, end);
+        Container.ExecResult execResult = 
container.executeJob("/avro/kafka_avro_to_assert.conf");
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+
+        AvroDeserializationSchema avroDeserializationSchema =
+                new AvroDeserializationSchema(SEATUNNEL_ROW_TYPE);
+        List<SeaTunnelRow> kafkaSTRow =
+                getKafkaSTRow(
+                        "test_avro_topic",
+                        value -> {
+                            try {
+                                return 
avroDeserializationSchema.deserialize(value);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+        Assertions.assertEquals(100, kafkaSTRow.size());
+        kafkaSTRow.forEach(
+                row -> {
+                    Assertions.assertTrue(
+                            (long) row.getField(0) >= start && (long) 
row.getField(0) < end);
+                    Assertions.assertEquals(
+                            Collections.singletonMap("key", 
Short.parseShort("1")),
+                            (Map<String, Short>) row.getField(1));
+                    Assertions.assertArrayEquals(
+                            new Byte[] {Byte.parseByte("1")}, (Byte[]) 
row.getField(2));
+                    Assertions.assertEquals("string", 
row.getField(3).toString());
+                    Assertions.assertEquals(false, row.getField(4));
+                    Assertions.assertEquals(Byte.parseByte("1"), 
row.getField(5));
+                    Assertions.assertEquals(Short.parseShort("1"), 
row.getField(6));
+                    Assertions.assertEquals(Integer.parseInt("1"), 
row.getField(7));
+                    Assertions.assertEquals(Long.parseLong("1"), 
row.getField(8));
+                    Assertions.assertEquals(Float.parseFloat("1.1"), 
row.getField(9));
+                    Assertions.assertEquals(Double.parseDouble("1.1"), 
row.getField(10));
+                    Assertions.assertEquals(BigDecimal.valueOf(11, 1), 
row.getField(11));
+                    Assertions.assertArrayEquals("test".getBytes(), (byte[]) 
row.getField(12));
+                    Assertions.assertEquals(LocalDate.of(2024, 1, 1), 
row.getField(13));
+                    Assertions.assertEquals(
+                            LocalDateTime.of(2024, 1, 1, 12, 59, 23), 
row.getField(14));
+                });
     }
 
     public void testKafkaLatestToConsole(TestContainer container)
@@ -373,6 +514,22 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         return props;
     }
 
+    private Properties kafkaByteConsumerConfig() {
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaContainer.getBootstrapServers());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, 
"seatunnel-kafka-sink-group");
+        props.put(
+                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+                OffsetResetStrategy.EARLIEST.toString().toLowerCase());
+        props.setProperty(
+                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                ByteArrayDeserializer.class.getName());
+        props.setProperty(
+                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                ByteArrayDeserializer.class.getName());
+        return props;
+    }
+
     private void generateTestData(ProducerRecordConverter converter, int 
start, int end) {
         for (int i = start; i < end; i++) {
             SeaTunnelRow row =
@@ -391,8 +548,8 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
                                 Double.parseDouble("1.1"),
                                 BigDecimal.valueOf(11, 1),
                                 "test".getBytes(),
-                                LocalDate.now(),
-                                LocalDateTime.now()
+                                LocalDate.of(2024, 1, 1),
+                                LocalDateTime.of(2024, 1, 1, 12, 59, 23)
                             });
             ProducerRecord<byte[], byte[]> producerRecord = 
converter.convert(row);
             producer.send(producerRecord);
@@ -480,7 +637,34 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         return data;
     }
 
+    private List<SeaTunnelRow> getKafkaSTRow(String topicName, 
ConsumerRecordConverter converter) {
+        List<SeaTunnelRow> data = new ArrayList<>();
+        try (KafkaConsumer<byte[], byte[]> consumer =
+                new KafkaConsumer<>(kafkaByteConsumerConfig())) {
+            consumer.subscribe(Arrays.asList(topicName));
+            Map<TopicPartition, Long> offsets =
+                    consumer.endOffsets(Arrays.asList(new 
TopicPartition(topicName, 0)));
+            Long endOffset = offsets.entrySet().iterator().next().getValue();
+            Long lastProcessedOffset = -1L;
+
+            do {
+                ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(100));
+                for (ConsumerRecord<byte[], byte[]> record : records) {
+                    if (lastProcessedOffset < record.offset()) {
+                        data.add(converter.convert(record.value()));
+                    }
+                    lastProcessedOffset = record.offset();
+                }
+            } while (lastProcessedOffset < endOffset - 1);
+        }
+        return data;
+    }
+
     interface ProducerRecordConverter {
         ProducerRecord<byte[], byte[]> convert(SeaTunnelRow row);
     }
+
+    interface ConsumerRecordConverter {
+        SeaTunnelRow convert(byte[] value);
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf
index 99eac77ef1..fe33a1fc84 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf
@@ -29,6 +29,8 @@ env {
 
 source {
   FakeSource {
+    row.num = 90
+    string.template = ["fake_source_avro"]
     schema = {
       fields {
         c_map = "map<string, string>"
@@ -70,7 +72,7 @@ source {
 sink {
   Kafka {
     bootstrap.servers = "kafkaCluster:9092"
-    topic = "test_avro_topic"
+    topic = "test_avro_topic_fake_source"
     format = avro
   }
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf
similarity index 87%
rename from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf
index ce5932e744..d357eb6583 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf
@@ -82,7 +82,21 @@ sink {
                 rule_value = 99
               }
             ]
-          }
+          },
+          {
+           field_name = c_string
+           field_type = string
+           field_value = [
+            {
+              rule_type = MIN_LENGTH
+              rule_value = 6
+            },
+            {
+              rule_type = MAX_LENGTH
+              rule_value = 6
+            }
+          ]
+         }
         ]
       }
   }
diff --git 
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java
 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java
index 3d9a828bf7..16afe4b693 100644
--- 
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java
@@ -52,7 +52,6 @@ public class AvroSerializationSchema implements 
SerializationSchema {
     public byte[] serialize(SeaTunnelRow element) {
         GenericRecord record = converter.convertRowToGenericRecord(element);
         try {
-            out.reset();
             writer.write(record, encoder);
             encoder.flush();
             return out.toByteArray();
@@ -60,6 +59,8 @@ public class AvroSerializationSchema implements 
SerializationSchema {
             throw new SeaTunnelAvroFormatException(
                     AvroFormatErrorCode.SERIALIZATION_ERROR,
                     "Serialization error on record : " + element);
+        } finally {
+            out.reset();
         }
     }
 }
diff --git 
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
index 989087ee57..cb353e5ab4 100644
--- 
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
+++ 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.format.avro;
 
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -33,7 +34,11 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 
 import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class AvroToRowConverter implements Serializable {
 
@@ -72,31 +77,29 @@ public class AvroToRowConverter implements Serializable {
                 values[i] = null;
                 continue;
             }
-            values[i] =
-                    convertField(
-                            rowType.getFieldType(i),
-                            record.getSchema().getField(fieldNames[i]),
-                            record.get(fieldNames[i]));
+            values[i] = convertField(rowType.getFieldType(i), 
record.get(fieldNames[i]));
         }
         return new SeaTunnelRow(values);
     }
 
-    private Object convertField(SeaTunnelDataType<?> dataType, Schema.Field 
field, Object val) {
+    private Object convertField(SeaTunnelDataType<?> dataType, Object val) {
         switch (dataType.getSqlType()) {
-            case MAP:
             case STRING:
+                return val.toString();
             case BOOLEAN:
-            case SMALLINT:
             case INT:
             case BIGINT:
             case FLOAT:
             case DOUBLE:
             case NULL:
-            case BYTES:
             case DATE:
             case DECIMAL:
             case TIMESTAMP:
                 return val;
+            case BYTES:
+                return ((ByteBuffer) val).array();
+            case SMALLINT:
+                return ((Integer) val).shortValue();
             case TINYINT:
                 Class<?> typeClass = dataType.getTypeClass();
                 if (typeClass == Byte.class) {
@@ -104,6 +107,16 @@ public class AvroToRowConverter implements Serializable {
                     return integer.byteValue();
                 }
                 return val;
+            case MAP:
+                MapType<?, ?> mapType = (MapType<?, ?>) dataType;
+                Map<Object, Object> res = new HashMap<>();
+                Map map = (Map) val;
+                for (Object o : map.entrySet()) {
+                    res.put(
+                            convertField(mapType.getKeyType(), ((Map.Entry) 
o).getKey()),
+                            convertField(mapType.getValueType(), ((Map.Entry) 
o).getValue()));
+                }
+                return res;
             case ARRAY:
                 BasicType<?> basicType = ((ArrayType<?, ?>) 
dataType).getElementType();
                 List<Object> list = (List<Object>) val;
@@ -121,67 +134,15 @@ public class AvroToRowConverter implements Serializable {
         }
     }
 
-    protected static Object convertArray(List<Object> val, 
SeaTunnelDataType<?> dataType) {
+    protected Object convertArray(List<Object> val, SeaTunnelDataType<?> 
dataType) {
         if (val == null) {
             return null;
         }
         int length = val.size();
-        switch (dataType.getSqlType()) {
-            case STRING:
-                String[] strings = new String[length];
-                for (int i = 0; i < strings.length; i++) {
-                    strings[i] = val.get(i).toString();
-                }
-                return strings;
-            case BOOLEAN:
-                Boolean[] booleans = new Boolean[length];
-                for (int i = 0; i < booleans.length; i++) {
-                    booleans[i] = (Boolean) val.get(i);
-                }
-                return booleans;
-            case BYTES:
-                Byte[] bytes = new Byte[length];
-                for (int i = 0; i < bytes.length; i++) {
-                    bytes[i] = (Byte) val.get(i);
-                }
-                return bytes;
-            case SMALLINT:
-                Short[] shorts = new Short[length];
-                for (int i = 0; i < shorts.length; i++) {
-                    shorts[i] = (Short) val.get(i);
-                }
-                return shorts;
-            case INT:
-                Integer[] integers = new Integer[length];
-                for (int i = 0; i < integers.length; i++) {
-                    integers[i] = (Integer) val.get(i);
-                }
-                return integers;
-            case BIGINT:
-                Long[] longs = new Long[length];
-                for (int i = 0; i < longs.length; i++) {
-                    longs[i] = (Long) val.get(i);
-                }
-                return longs;
-            case FLOAT:
-                Float[] floats = new Float[length];
-                for (int i = 0; i < floats.length; i++) {
-                    floats[i] = (Float) val.get(i);
-                }
-                return floats;
-            case DOUBLE:
-                Double[] doubles = new Double[length];
-                for (int i = 0; i < doubles.length; i++) {
-                    doubles[i] = (Double) val.get(i);
-                }
-                return doubles;
-            default:
-                String errorMsg =
-                        String.format(
-                                "SeaTunnel avro array format is not supported 
for this data type [%s]",
-                                dataType.getSqlType());
-                throw new SeaTunnelAvroFormatException(
-                        AvroFormatErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
+        Object instance = Array.newInstance(dataType.getTypeClass(), length);
+        for (int i = 0; i < val.size(); i++) {
+            Array.set(instance, i, convertField(dataType, val.get(i)));
         }
+        return instance;
     }
 }
diff --git 
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/RowToAvroConverter.java
 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/RowToAvroConverter.java
index f8f0652a26..4a03ddce09 100644
--- 
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/RowToAvroConverter.java
+++ 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/RowToAvroConverter.java
@@ -35,9 +35,9 @@ import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.io.DatumWriter;
 
 import java.io.Serializable;
+import java.lang.reflect.Array;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.List;
 
 public class RowToAvroConverter implements Serializable {
 
@@ -111,14 +111,11 @@ public class RowToAvroConverter implements Serializable {
             case BYTES:
                 return ByteBuffer.wrap((byte[]) data);
             case ARRAY:
-                //                BasicType<?> basicType = ((ArrayType<?, ?>)
-                // seaTunnelDataType).getElementType();
-                //                return Util.convertArray((Object[]) data, 
basicType);
                 BasicType<?> basicType = ((ArrayType<?, ?>) 
seaTunnelDataType).getElementType();
-                List<Object> records = new ArrayList<>(((Object[]) 
data).length);
-                for (Object object : (Object[]) data) {
-                    Object resolvedObject = resolveObject(object, basicType);
-                    records.add(resolvedObject);
+                int length = Array.getLength(data);
+                ArrayList<Object> records = new ArrayList<>(length);
+                for (int i = 0; i < length; i++) {
+                    records.add(resolveObject(Array.get(data, i), basicType));
                 }
                 return records;
             case ROW:
diff --git 
a/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
 
b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
index 5f505e1ba6..8b2eadaf1e 100644
--- 
a/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
+++ 
b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
@@ -161,9 +161,9 @@ class AvroSerializationSchemaTest {
         SeaTunnelRowType rowType = buildSeaTunnelRowType();
         SeaTunnelRow seaTunnelRow = buildSeaTunnelRow();
         AvroSerializationSchema serializationSchema = new 
AvroSerializationSchema(rowType);
-        byte[] serialize = serializationSchema.serialize(seaTunnelRow);
+        byte[] bytes = serializationSchema.serialize(seaTunnelRow);
         AvroDeserializationSchema deserializationSchema = new 
AvroDeserializationSchema(rowType);
-        SeaTunnelRow deserialize = 
deserializationSchema.deserialize(serialize);
+        SeaTunnelRow deserialize = deserializationSchema.deserialize(bytes);
         String[] strArray1 = (String[]) seaTunnelRow.getField(1);
         String[] strArray2 = (String[]) deserialize.getField(1);
         Assertions.assertArrayEquals(strArray1, strArray2);

Reply via email to