This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new fd971b92ed [Improve] improve avro format convert (#6082)
fd971b92ed is described below
commit fd971b92edbac0d304049e7d145073dd2f34eadc
Author: Jarvis <[email protected]>
AuthorDate: Fri Jan 5 18:00:34 2024 +0800
[Improve] improve avro format convert (#6082)
---
docs/en/connector-v2/formats/avro.md | 111 ++++++++++++
.../seatunnel/e2e/connector/kafka/KafkaIT.java | 198 ++++++++++++++++++++-
.../avro/fake_source_to_kafka_avro_format.conf | 4 +-
...o_to_console.conf => kafka_avro_to_assert.conf} | 16 +-
.../format/avro/AvroSerializationSchema.java | 3 +-
.../seatunnel/format/avro/AvroToRowConverter.java | 93 +++-------
.../seatunnel/format/avro/RowToAvroConverter.java | 13 +-
.../format/avro/AvroSerializationSchemaTest.java | 4 +-
8 files changed, 356 insertions(+), 86 deletions(-)
diff --git a/docs/en/connector-v2/formats/avro.md
b/docs/en/connector-v2/formats/avro.md
new file mode 100644
index 0000000000..b9ee961daf
--- /dev/null
+++ b/docs/en/connector-v2/formats/avro.md
@@ -0,0 +1,111 @@
+# Avro format
+
+Avro is very popular in streaming data pipeline. Now seatunnel supports Avro
format in kafka connector.
+
+# How to use Avro format
+
+## Kafka uses example
+
+- This is an example to generate data from fake source and sink to kafka with
avro format.
+
+```bash
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 90
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_avro_topic_fake_source"
+ format = avro
+ }
+}
+```
+
+- This is an example read data from kafka with avro format and print to
console.
+
+```bash
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_avro_topic"
+ result_table_name = "kafka_table"
+ kafka.auto.offset.reset = "earliest"
+ format = avro
+ format_error_handle_way = skip
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map<string, smallint>"
+ c_array = "array<tinyint>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "kafka_table"
+ }
+}
+```
+
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 0bf222bf87..d86aa89186 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -36,6 +36,7 @@ import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.TestContainerId;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.format.avro.AvroDeserializationSchema;
import org.apache.seatunnel.format.text.TextSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -47,6 +48,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -294,17 +296,117 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
}
@TestTemplate
- @DisabledOnContainer(TestContainerId.SPARK_2_4)
+ @DisabledOnContainer(value = {TestContainerId.SPARK_2_4})
public void testFakeSourceToKafkaAvroFormat(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/avro/fake_source_to_kafka_avro_format.conf");
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ String[] subField = {
+ "c_map",
+ "c_array",
+ "c_string",
+ "c_boolean",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_bytes",
+ "c_date",
+ "c_decimal",
+ "c_timestamp"
+ };
+ SeaTunnelDataType<?>[] subFieldTypes = {
+ new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
+ ArrayType.INT_ARRAY_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ PrimitiveByteArrayType.INSTANCE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ new DecimalType(38, 18),
+ LocalTimeType.LOCAL_DATE_TIME_TYPE
+ };
+ SeaTunnelRowType subRow = new SeaTunnelRowType(subField,
subFieldTypes);
+ String[] fieldNames = {
+ "c_map",
+ "c_array",
+ "c_string",
+ "c_boolean",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_bytes",
+ "c_date",
+ "c_decimal",
+ "c_timestamp",
+ "c_row"
+ };
+ SeaTunnelDataType<?>[] fieldTypes = {
+ new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
+ ArrayType.INT_ARRAY_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ PrimitiveByteArrayType.INSTANCE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ new DecimalType(38, 18),
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ subRow
+ };
+ SeaTunnelRowType fake_source_row_type = new
SeaTunnelRowType(fieldNames, fieldTypes);
+ AvroDeserializationSchema avroDeserializationSchema =
+ new AvroDeserializationSchema(fake_source_row_type);
+ List<SeaTunnelRow> kafkaSTRow =
+ getKafkaSTRow(
+ "test_avro_topic_fake_source",
+ value -> {
+ try {
+ return
avroDeserializationSchema.deserialize(value);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ Assertions.assertEquals(90, kafkaSTRow.size());
+ kafkaSTRow.forEach(
+ row -> {
+ Assertions.assertInstanceOf(Map.class, row.getField(0));
+ Assertions.assertInstanceOf(Integer[].class,
row.getField(1));
+ Assertions.assertInstanceOf(String.class, row.getField(2));
+ Assertions.assertEquals("fake_source_avro",
row.getField(2).toString());
+ Assertions.assertInstanceOf(Boolean.class,
row.getField(3));
+ Assertions.assertInstanceOf(Byte.class, row.getField(4));
+ Assertions.assertInstanceOf(Short.class, row.getField(5));
+ Assertions.assertInstanceOf(Integer.class,
row.getField(6));
+ Assertions.assertInstanceOf(Long.class, row.getField(7));
+ Assertions.assertInstanceOf(Float.class, row.getField(8));
+ Assertions.assertInstanceOf(Double.class, row.getField(9));
+ Assertions.assertInstanceOf(byte[].class,
row.getField(10));
+ Assertions.assertInstanceOf(LocalDate.class,
row.getField(11));
+ Assertions.assertInstanceOf(BigDecimal.class,
row.getField(12));
+ Assertions.assertInstanceOf(LocalDateTime.class,
row.getField(13));
+ Assertions.assertInstanceOf(SeaTunnelRow.class,
row.getField(14));
+ });
}
@TestTemplate
- @DisabledOnContainer(TestContainerId.SPARK_2_4)
- public void testKafkaAvroToConsole(TestContainer container)
+ @DisabledOnContainer(value = {TestContainerId.SPARK_2_4})
+ public void testKafkaAvroToAssert(TestContainer container)
throws IOException, InterruptedException {
DefaultSeaTunnelRowSerializer serializer =
DefaultSeaTunnelRowSerializer.create(
@@ -312,9 +414,48 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
SEATUNNEL_ROW_TYPE,
MessageFormat.AVRO,
DEFAULT_FIELD_DELIMITER);
- generateTestData(row -> serializer.serializeRow(row), 0, 100);
- Container.ExecResult execResult =
container.executeJob("/avro/kafka_avro_to_console.conf");
+ int start = 0;
+ int end = 100;
+ generateTestData(row -> serializer.serializeRow(row), start, end);
+ Container.ExecResult execResult =
container.executeJob("/avro/kafka_avro_to_assert.conf");
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+
+ AvroDeserializationSchema avroDeserializationSchema =
+ new AvroDeserializationSchema(SEATUNNEL_ROW_TYPE);
+ List<SeaTunnelRow> kafkaSTRow =
+ getKafkaSTRow(
+ "test_avro_topic",
+ value -> {
+ try {
+ return
avroDeserializationSchema.deserialize(value);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ Assertions.assertEquals(100, kafkaSTRow.size());
+ kafkaSTRow.forEach(
+ row -> {
+ Assertions.assertTrue(
+ (long) row.getField(0) >= start && (long)
row.getField(0) < end);
+ Assertions.assertEquals(
+ Collections.singletonMap("key",
Short.parseShort("1")),
+ (Map<String, Short>) row.getField(1));
+ Assertions.assertArrayEquals(
+ new Byte[] {Byte.parseByte("1")}, (Byte[])
row.getField(2));
+ Assertions.assertEquals("string",
row.getField(3).toString());
+ Assertions.assertEquals(false, row.getField(4));
+ Assertions.assertEquals(Byte.parseByte("1"),
row.getField(5));
+ Assertions.assertEquals(Short.parseShort("1"),
row.getField(6));
+ Assertions.assertEquals(Integer.parseInt("1"),
row.getField(7));
+ Assertions.assertEquals(Long.parseLong("1"),
row.getField(8));
+ Assertions.assertEquals(Float.parseFloat("1.1"),
row.getField(9));
+ Assertions.assertEquals(Double.parseDouble("1.1"),
row.getField(10));
+ Assertions.assertEquals(BigDecimal.valueOf(11, 1),
row.getField(11));
+ Assertions.assertArrayEquals("test".getBytes(), (byte[])
row.getField(12));
+ Assertions.assertEquals(LocalDate.of(2024, 1, 1),
row.getField(13));
+ Assertions.assertEquals(
+ LocalDateTime.of(2024, 1, 1, 12, 59, 23),
row.getField(14));
+ });
}
public void testKafkaLatestToConsole(TestContainer container)
@@ -373,6 +514,22 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
return props;
}
+ private Properties kafkaByteConsumerConfig() {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG,
"seatunnel-kafka-sink-group");
+ props.put(
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ OffsetResetStrategy.EARLIEST.toString().toLowerCase());
+ props.setProperty(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+ props.setProperty(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+ return props;
+ }
+
private void generateTestData(ProducerRecordConverter converter, int
start, int end) {
for (int i = start; i < end; i++) {
SeaTunnelRow row =
@@ -391,8 +548,8 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
Double.parseDouble("1.1"),
BigDecimal.valueOf(11, 1),
"test".getBytes(),
- LocalDate.now(),
- LocalDateTime.now()
+ LocalDate.of(2024, 1, 1),
+ LocalDateTime.of(2024, 1, 1, 12, 59, 23)
});
ProducerRecord<byte[], byte[]> producerRecord =
converter.convert(row);
producer.send(producerRecord);
@@ -480,7 +637,34 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
return data;
}
+ private List<SeaTunnelRow> getKafkaSTRow(String topicName,
ConsumerRecordConverter converter) {
+ List<SeaTunnelRow> data = new ArrayList<>();
+ try (KafkaConsumer<byte[], byte[]> consumer =
+ new KafkaConsumer<>(kafkaByteConsumerConfig())) {
+ consumer.subscribe(Arrays.asList(topicName));
+ Map<TopicPartition, Long> offsets =
+ consumer.endOffsets(Arrays.asList(new
TopicPartition(topicName, 0)));
+ Long endOffset = offsets.entrySet().iterator().next().getValue();
+ Long lastProcessedOffset = -1L;
+
+ do {
+ ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofMillis(100));
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ if (lastProcessedOffset < record.offset()) {
+ data.add(converter.convert(record.value()));
+ }
+ lastProcessedOffset = record.offset();
+ }
+ } while (lastProcessedOffset < endOffset - 1);
+ }
+ return data;
+ }
+
interface ProducerRecordConverter {
ProducerRecord<byte[], byte[]> convert(SeaTunnelRow row);
}
+
+ interface ConsumerRecordConverter {
+ SeaTunnelRow convert(byte[] value);
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf
index 99eac77ef1..fe33a1fc84 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf
@@ -29,6 +29,8 @@ env {
source {
FakeSource {
+ row.num = 90
+ string.template = ["fake_source_avro"]
schema = {
fields {
c_map = "map<string, string>"
@@ -70,7 +72,7 @@ source {
sink {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
- topic = "test_avro_topic"
+ topic = "test_avro_topic_fake_source"
format = avro
}
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf
similarity index 87%
rename from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf
index ce5932e744..d357eb6583 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf
@@ -82,7 +82,21 @@ sink {
rule_value = 99
}
]
- }
+ },
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = MIN_LENGTH
+ rule_value = 6
+ },
+ {
+ rule_type = MAX_LENGTH
+ rule_value = 6
+ }
+ ]
+ }
]
}
}
diff --git
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java
index 3d9a828bf7..16afe4b693 100644
---
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java
@@ -52,7 +52,6 @@ public class AvroSerializationSchema implements
SerializationSchema {
public byte[] serialize(SeaTunnelRow element) {
GenericRecord record = converter.convertRowToGenericRecord(element);
try {
- out.reset();
writer.write(record, encoder);
encoder.flush();
return out.toByteArray();
@@ -60,6 +59,8 @@ public class AvroSerializationSchema implements
SerializationSchema {
throw new SeaTunnelAvroFormatException(
AvroFormatErrorCode.SERIALIZATION_ERROR,
"Serialization error on record : " + element);
+ } finally {
+ out.reset();
}
}
}
diff --git
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
index 989087ee57..cb353e5ab4 100644
---
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
+++
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.format.avro;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -33,7 +34,11 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class AvroToRowConverter implements Serializable {
@@ -72,31 +77,29 @@ public class AvroToRowConverter implements Serializable {
values[i] = null;
continue;
}
- values[i] =
- convertField(
- rowType.getFieldType(i),
- record.getSchema().getField(fieldNames[i]),
- record.get(fieldNames[i]));
+ values[i] = convertField(rowType.getFieldType(i),
record.get(fieldNames[i]));
}
return new SeaTunnelRow(values);
}
- private Object convertField(SeaTunnelDataType<?> dataType, Schema.Field
field, Object val) {
+ private Object convertField(SeaTunnelDataType<?> dataType, Object val) {
switch (dataType.getSqlType()) {
- case MAP:
case STRING:
+ return val.toString();
case BOOLEAN:
- case SMALLINT:
case INT:
case BIGINT:
case FLOAT:
case DOUBLE:
case NULL:
- case BYTES:
case DATE:
case DECIMAL:
case TIMESTAMP:
return val;
+ case BYTES:
+ return ((ByteBuffer) val).array();
+ case SMALLINT:
+ return ((Integer) val).shortValue();
case TINYINT:
Class<?> typeClass = dataType.getTypeClass();
if (typeClass == Byte.class) {
@@ -104,6 +107,16 @@ public class AvroToRowConverter implements Serializable {
return integer.byteValue();
}
return val;
+ case MAP:
+ MapType<?, ?> mapType = (MapType<?, ?>) dataType;
+ Map<Object, Object> res = new HashMap<>();
+ Map map = (Map) val;
+ for (Object o : map.entrySet()) {
+ res.put(
+ convertField(mapType.getKeyType(), ((Map.Entry)
o).getKey()),
+ convertField(mapType.getValueType(), ((Map.Entry)
o).getValue()));
+ }
+ return res;
case ARRAY:
BasicType<?> basicType = ((ArrayType<?, ?>)
dataType).getElementType();
List<Object> list = (List<Object>) val;
@@ -121,67 +134,15 @@ public class AvroToRowConverter implements Serializable {
}
}
- protected static Object convertArray(List<Object> val,
SeaTunnelDataType<?> dataType) {
+ protected Object convertArray(List<Object> val, SeaTunnelDataType<?>
dataType) {
if (val == null) {
return null;
}
int length = val.size();
- switch (dataType.getSqlType()) {
- case STRING:
- String[] strings = new String[length];
- for (int i = 0; i < strings.length; i++) {
- strings[i] = val.get(i).toString();
- }
- return strings;
- case BOOLEAN:
- Boolean[] booleans = new Boolean[length];
- for (int i = 0; i < booleans.length; i++) {
- booleans[i] = (Boolean) val.get(i);
- }
- return booleans;
- case BYTES:
- Byte[] bytes = new Byte[length];
- for (int i = 0; i < bytes.length; i++) {
- bytes[i] = (Byte) val.get(i);
- }
- return bytes;
- case SMALLINT:
- Short[] shorts = new Short[length];
- for (int i = 0; i < shorts.length; i++) {
- shorts[i] = (Short) val.get(i);
- }
- return shorts;
- case INT:
- Integer[] integers = new Integer[length];
- for (int i = 0; i < integers.length; i++) {
- integers[i] = (Integer) val.get(i);
- }
- return integers;
- case BIGINT:
- Long[] longs = new Long[length];
- for (int i = 0; i < longs.length; i++) {
- longs[i] = (Long) val.get(i);
- }
- return longs;
- case FLOAT:
- Float[] floats = new Float[length];
- for (int i = 0; i < floats.length; i++) {
- floats[i] = (Float) val.get(i);
- }
- return floats;
- case DOUBLE:
- Double[] doubles = new Double[length];
- for (int i = 0; i < doubles.length; i++) {
- doubles[i] = (Double) val.get(i);
- }
- return doubles;
- default:
- String errorMsg =
- String.format(
- "SeaTunnel avro array format is not supported
for this data type [%s]",
- dataType.getSqlType());
- throw new SeaTunnelAvroFormatException(
- AvroFormatErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
+ Object instance = Array.newInstance(dataType.getTypeClass(), length);
+ for (int i = 0; i < val.size(); i++) {
+ Array.set(instance, i, convertField(dataType, val.get(i)));
}
+ return instance;
}
}
diff --git
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/RowToAvroConverter.java
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/RowToAvroConverter.java
index f8f0652a26..4a03ddce09 100644
---
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/RowToAvroConverter.java
+++
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/RowToAvroConverter.java
@@ -35,9 +35,9 @@ import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.DatumWriter;
import java.io.Serializable;
+import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.List;
public class RowToAvroConverter implements Serializable {
@@ -111,14 +111,11 @@ public class RowToAvroConverter implements Serializable {
case BYTES:
return ByteBuffer.wrap((byte[]) data);
case ARRAY:
- // BasicType<?> basicType = ((ArrayType<?, ?>)
- // seaTunnelDataType).getElementType();
- // return Util.convertArray((Object[]) data,
basicType);
BasicType<?> basicType = ((ArrayType<?, ?>)
seaTunnelDataType).getElementType();
- List<Object> records = new ArrayList<>(((Object[])
data).length);
- for (Object object : (Object[]) data) {
- Object resolvedObject = resolveObject(object, basicType);
- records.add(resolvedObject);
+ int length = Array.getLength(data);
+ ArrayList<Object> records = new ArrayList<>(length);
+ for (int i = 0; i < length; i++) {
+ records.add(resolveObject(Array.get(data, i), basicType));
}
return records;
case ROW:
diff --git
a/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
index 5f505e1ba6..8b2eadaf1e 100644
---
a/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
+++
b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
@@ -161,9 +161,9 @@ class AvroSerializationSchemaTest {
SeaTunnelRowType rowType = buildSeaTunnelRowType();
SeaTunnelRow seaTunnelRow = buildSeaTunnelRow();
AvroSerializationSchema serializationSchema = new
AvroSerializationSchema(rowType);
- byte[] serialize = serializationSchema.serialize(seaTunnelRow);
+ byte[] bytes = serializationSchema.serialize(seaTunnelRow);
AvroDeserializationSchema deserializationSchema = new
AvroDeserializationSchema(rowType);
- SeaTunnelRow deserialize =
deserializationSchema.deserialize(serialize);
+ SeaTunnelRow deserialize = deserializationSchema.deserialize(bytes);
String[] strArray1 = (String[]) seaTunnelRow.getField(1);
String[] strArray2 = (String[]) deserialize.getField(1);
Assertions.assertArrayEquals(strArray1, strArray2);