This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 93a006156d [Feature] support avro format (#5084)
93a006156d is described below
commit 93a006156d704218c61f250dac6cb573f242d9e3
Author: Jarvis <[email protected]>
AuthorDate: Thu Dec 14 13:39:31 2023 +0800
[Feature] support avro format (#5084)
---
docs/en/connector-v2/sink/Kafka.md | 26 +--
docs/en/connector-v2/source/kafka.md | 34 ++--
seatunnel-connectors-v2/connector-kafka/pom.xml | 6 +-
.../seatunnel/kafka/config/MessageFormat.java | 3 +-
.../serialize/DefaultSeaTunnelRowSerializer.java | 3 +
.../seatunnel/kafka/sink/KafkaSinkFactory.java | 3 +-
.../seatunnel/kafka/source/KafkaSourceConfig.java | 3 +
seatunnel-dist/release-docs/LICENSE | 4 +-
seatunnel-dist/release-docs/NOTICE | 29 ++--
.../seatunnel/e2e/connector/kafka/KafkaIT.java | 25 +++
.../avro/fake_source_to_kafka_avro_format.conf | 76 +++++++++
.../test/resources/avro/kafka_avro_to_console.conf | 89 ++++++++++
seatunnel-formats/pom.xml | 1 +
.../{ => seatunnel-format-avro}/pom.xml | 30 ++--
.../format/avro/AvroDeserializationSchema.java | 54 ++++++
.../format/avro/AvroSerializationSchema.java | 65 +++++++
.../seatunnel/format/avro/AvroToRowConverter.java | 187 +++++++++++++++++++++
.../seatunnel/format/avro/RowToAvroConverter.java | 148 ++++++++++++++++
.../SeaTunnelRowTypeToAvroSchemaConverter.java | 113 +++++++++++++
.../format/avro/exception/AvroFormatErrorCode.java | 34 +++-
.../exception/SeaTunnelAvroFormatException.java | 19 ++-
.../seatunnel/format/avro/AvroConverterTest.java | 174 +++++++++++++++++++
.../format/avro/AvroSerializationSchemaTest.java | 177 +++++++++++++++++++
tools/dependencies/known-dependencies.txt | 3 +-
24 files changed, 1232 insertions(+), 74 deletions(-)
diff --git a/docs/en/connector-v2/sink/Kafka.md
b/docs/en/connector-v2/sink/Kafka.md
index 835344126b..17ff091061 100644
--- a/docs/en/connector-v2/sink/Kafka.md
+++ b/docs/en/connector-v2/sink/Kafka.md
@@ -30,19 +30,19 @@ They can be downloaded via install-plugin.sh or from the
Maven central repositor
## Sink Options
-| Name | Type | Required | Default |
Description
|
-|----------------------|--------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| topic | String | Yes | - | When the table is used
as sink, the topic name is the topic to write data to.
|
-| bootstrap.servers | String | Yes | - | Comma separated list of
Kafka brokers.
|
-| kafka.config | Map | No | - | In addition to the
above parameters that must be specified by the `Kafka producer` client, the
user can also specify multiple non-mandatory parameters for the `producer`
client, covering [all the producer parameters specified in the official Kafka
document](https://kafka.apache.org/documentation.html#producerconfigs).
|
-| semantics | String | No | NON | Semantics that can be
chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.
|
-| partition_key_fields | Array | No | - | Configure which fields
are used as the key of the kafka message.
|
-| partition | Int | No | - | We can specify the
partition, all messages will be sent to this partition.
|
-| assign_partitions | Array | No | - | We can decide which
partition to send based on the content of the message. The function of this
parameter is to distribute information.
|
-| transaction_prefix | String | No | - | If semantic is
specified as EXACTLY_ONCE, the producer will write all messages in a Kafka
transaction,kafka distinguishes different transactions by different
transactionId. This parameter is prefix of kafka transactionId, make sure
different job use different prefix.
|
-| format | String | No | json | Data format. The
default format is json. Optional text format, canal-json and debezium-json.If
you use json or text format. The default field separator is ", ". If you
customize the delimiter, add the "field_delimiter" option.If you use canal
format, please refer to [canal-json](../formats/canal-json.md) for details.If
you use debezium format, please refer to
[debezium-json](../formats/debezium-json.md) for details. |
-| field_delimiter | String | No | , | Customize the field
delimiter for data format.
|
-| common-options | | No | - | Source plugin common
parameters, please refer to [Source Common Options](common-options.md) for
details
|
+| Name | Type | Required | Default |
Description
|
+|----------------------|--------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| topic | String | Yes | - | When the table is used
as sink, the topic name is the topic to write data to.
|
+| bootstrap.servers | String | Yes | - | Comma separated list of
Kafka brokers.
|
+| kafka.config | Map | No | - | In addition to the
above parameters that must be specified by the `Kafka producer` client, the
user can also specify multiple non-mandatory parameters for the `producer`
client, covering [all the producer parameters specified in the official Kafka
document](https://kafka.apache.org/documentation.html#producerconfigs).
|
+| semantics | String | No | NON | Semantics that can be
chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.
|
+| partition_key_fields | Array | No | - | Configure which fields
are used as the key of the kafka message.
|
+| partition | Int | No | - | We can specify the
partition, all messages will be sent to this partition.
|
+| assign_partitions | Array | No | - | We can decide which
partition to send based on the content of the message. The function of this
parameter is to distribute information.
|
+| transaction_prefix | String | No | - | If semantic is
specified as EXACTLY_ONCE, the producer will write all messages in a Kafka
transaction,kafka distinguishes different transactions by different
transactionId. This parameter is prefix of kafka transactionId, make sure
different job use different prefix.
|
+| format | String | No | json | Data format. The
default format is json. Optional text format, canal-json, debezium-json and
avro.If you use json or text format. The default field separator is ", ". If
you customize the delimiter, add the "field_delimiter" option.If you use canal
format, please refer to [canal-json](../formats/canal-json.md) for details.If
you use debezium format, please refer to
[debezium-json](../formats/debezium-json.md) for details. |
+| field_delimiter | String | No | , | Customize the field
delimiter for data format.
|
+| common-options | | No | - | Source plugin common
parameters, please refer to [Source Common Options](common-options.md) for
details
|
## Parameter Interpretation
diff --git a/docs/en/connector-v2/source/kafka.md
b/docs/en/connector-v2/source/kafka.md
index 16b9c5420b..6b69e8e931 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -32,23 +32,23 @@ They can be downloaded via install-plugin.sh or from the
Maven central repositor
## Source Options
-| Name |
Type | Required | Default
|
Description
[...]
-|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
-| topic | String
| Yes | - |
Topic name(s) to read data from when the table is used as source. It also
supports topic list for source by separating topic by comma like
'topic-1,topic-2'.
[...]
-| bootstrap.servers | String
| Yes | - |
Comma separated list of Kafka brokers.
[...]
-| pattern | Boolean
| No | false | If
`pattern` is set to `true`,the regular expression for a pattern of topic names
to read from. All topics in clients with names that match the specified regular
expression will be subscribed by the consumer.
[...]
-| consumer.group | String
| No | SeaTunnel-Consumer-Group |
`Kafka consumer group id`, used to distinguish different consumer groups.
[...]
-| commit_on_checkpoint | Boolean
| No | true | If
true the consumer's offset will be periodically committed in the background.
[...]
-| kafka.config | Map
| No | - | In
addition to the above necessary parameters that must be specified by the `Kafka
consumer` client, users can also specify multiple `consumer` client
non-mandatory parameters, covering [all consumer parameters specified in the
official Kafka
document](https://kafka.apache.org/documentation.html#consumerconfigs).
[...]
-| schema | Config
| No | - |
The structure of the data, including field names and field types.
[...]
-| format | String
| No | json |
Data format. The default format is json. Optional text format, canal-json and
debezium-json.If you use json or text format. The default field separator is ",
". If you customize the delimiter, add the "field_delimiter" option.If you use
canal format, please refer to [canal-json](../formats/canal-json.md) for
details.If you use debeziu [...]
-| format_error_handle_way | String
| No | fail |
The processing method of data format error. The default value is fail, and the
optional value is (fail, skip). When fail is selected, data format error will
block and an exception will be thrown. When skip is selected, data format error
will skip this line data.
[...]
-| field_delimiter | String
| No | , |
Customize the field delimiter for data format.
[...]
-| start_mode |
StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] |
No | group_offsets | The initial consumption pattern of
consumers.
[...]
-| start_mode.offsets | Config
| No | - |
The offset required for consumption mode to be specific_offsets.
[...]
-| start_mode.timestamp | Long
| No | - |
The time required for consumption mode to be "timestamp".
[...]
-| partition-discovery.interval-millis | Long
| No | -1 |
The interval for dynamically discovering topics and partitions.
[...]
-| common-options |
| No | - |
Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details
[...]
+| Name |
Type | Required | Default
|
Description
[...]
+|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| topic | String
| Yes | - |
Topic name(s) to read data from when the table is used as source. It also
supports topic list for source by separating topic by comma like
'topic-1,topic-2'.
[...]
+| bootstrap.servers | String
| Yes | - |
Comma separated list of Kafka brokers.
[...]
+| pattern | Boolean
| No | false | If
`pattern` is set to `true`,the regular expression for a pattern of topic names
to read from. All topics in clients with names that match the specified regular
expression will be subscribed by the consumer.
[...]
+| consumer.group | String
| No | SeaTunnel-Consumer-Group |
`Kafka consumer group id`, used to distinguish different consumer groups.
[...]
+| commit_on_checkpoint | Boolean
| No | true | If
true the consumer's offset will be periodically committed in the background.
[...]
+| kafka.config | Map
| No | - | In
addition to the above necessary parameters that must be specified by the `Kafka
consumer` client, users can also specify multiple `consumer` client
non-mandatory parameters, covering [all consumer parameters specified in the
official Kafka
document](https://kafka.apache.org/documentation.html#consumerconfigs).
[...]
+| schema | Config
| No | - |
The structure of the data, including field names and field types.
[...]
+| format | String
| No | json |
Data format. The default format is json. Optional text format, canal-json,
debezium-json and avro.If you use json or text format. The default field
separator is ", ". If you customize the delimiter, add the "field_delimiter"
option.If you use canal format, please refer to
[canal-json](../formats/canal-json.md) for details.If you use d [...]
+| format_error_handle_way | String
| No | fail |
The processing method of data format error. The default value is fail, and the
optional value is (fail, skip). When fail is selected, data format error will
block and an exception will be thrown. When skip is selected, data format error
will skip this line data.
[...]
+| field_delimiter | String
| No | , |
Customize the field delimiter for data format.
[...]
+| start_mode |
StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] |
No | group_offsets | The initial consumption pattern of
consumers.
[...]
+| start_mode.offsets | Config
| No | - |
The offset required for consumption mode to be specific_offsets.
[...]
+| start_mode.timestamp | Long
| No | - |
The time required for consumption mode to be "timestamp".
[...]
+| partition-discovery.interval-millis | Long
| No | -1 |
The interval for dynamically discovering topics and partitions.
[...]
+| common-options |
| No | - |
Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details
[...]
## Task Example
diff --git a/seatunnel-connectors-v2/connector-kafka/pom.xml
b/seatunnel-connectors-v2/connector-kafka/pom.xml
index 7955ab3f54..6e470f12ba 100644
--- a/seatunnel-connectors-v2/connector-kafka/pom.xml
+++ b/seatunnel-connectors-v2/connector-kafka/pom.xml
@@ -72,7 +72,11 @@
<artifactId>connect-json</artifactId>
<version>${kafka.client.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-format-avro</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
index 491990fd33..18e466e41c 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
@@ -24,5 +24,6 @@ public enum MessageFormat {
DEBEZIUM_JSON,
COMPATIBLE_DEBEZIUM_JSON,
COMPATIBLE_KAFKA_CONNECT_JSON,
- OGG_JSON
+ OGG_JSON,
+ AVRO
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index 0f8ae50f01..51d491002c 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
+import org.apache.seatunnel.format.avro.AvroSerializationSchema;
import
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema;
import
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonSerializationSchema;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
@@ -227,6 +228,8 @@ public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer {
return new DebeziumJsonSerializationSchema(rowType);
case COMPATIBLE_DEBEZIUM_JSON:
return new CompatibleDebeziumJsonSerializationSchema(rowType,
isKey);
+ case AVRO:
+ return new AvroSerializationSchema(rowType);
default:
throw new SeaTunnelJsonFormatException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
index f1a7bd85b8..3fbf6bb99b 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
@@ -46,7 +46,8 @@ public class KafkaSinkFactory implements TableSinkFactory {
MessageFormat.JSON,
MessageFormat.CANAL_JSON,
MessageFormat.TEXT,
- MessageFormat.OGG_JSON),
+ MessageFormat.OGG_JSON,
+ MessageFormat.AVRO),
Config.TOPIC)
.optional(
Config.KAFKA_CONFIG,
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index 45ae694d54..8c449c6843 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -33,6 +33,7 @@ import
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import
org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
+import org.apache.seatunnel.format.avro.AvroDeserializationSchema;
import
org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;
import
org.apache.seatunnel.format.compatible.kafka.connect.json.KafkaConnectJsonFormatOptions;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
@@ -235,6 +236,8 @@ public class KafkaSourceConfig implements Serializable {
case DEBEZIUM_JSON:
boolean includeSchema =
readonlyConfig.get(DEBEZIUM_RECORD_INCLUDE_SCHEMA);
return new DebeziumJsonDeserializationSchema(seaTunnelRowType,
true, includeSchema);
+ case AVRO:
+ return new AvroDeserializationSchema(seaTunnelRowType);
default:
throw new SeaTunnelJsonFormatException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
diff --git a/seatunnel-dist/release-docs/LICENSE
b/seatunnel-dist/release-docs/LICENSE
index 310d061833..5bbf86c50c 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -219,8 +219,8 @@ The text of each license is the standard Apache 2.0 license.
(Apache License 2.0) aircompressor (io.airlift:aircompressor:0.10 -
http://github.com/airlift/aircompressor)
(Apache License, Version 2.0) Apache Yetus - Audience Annotations
(org.apache.yetus:audience-annotations:0.11.0 -
https://yetus.apache.org/audience-annotations)
- (The Apache Software License, Version 2.0) Apache Avro
(org.apache.avro:avro:1.8.2 - http://avro.apache.org)
- (Apache License, Version 2.0) Apache Commons Codec
(commons-codec:commons-codec:1.13 -
https://commons.apache.org/proper/commons-codec/)
+ (The Apache Software License, Version 2.0) Apache Avro
(org.apache.avro:avro:1.11.1 - http://avro.apache.org)
+ (Apache License, Version 2.0) Apache Commons Codec
(commons-codec:commons-codec:1.13 -
https://commons.apache.org/proper/commons-codec/)
(Apache License, Version 2.0) Apache Commons Collections
(org.apache.commons:commons-collections4:4.4 -
https://commons.apache.org/proper/commons-collections/)
(Apache License, Version 2.0) Apache Commons Compress
(org.apache.commons:commons-compress:1.20 -
https://commons.apache.org/proper/commons-compress/)
(The Apache Software License, Version 2.0) Commons Lang
(commons-lang:commons-lang:2.6 - http://commons.apache.org/lang/)
diff --git a/seatunnel-dist/release-docs/NOTICE
b/seatunnel-dist/release-docs/NOTICE
index 4bed2d4be1..1c6a43afe3 100644
--- a/seatunnel-dist/release-docs/NOTICE
+++ b/seatunnel-dist/release-docs/NOTICE
@@ -49,16 +49,16 @@ Apache Avro NOTICE
========================================================================
Apache Avro
-Copyright 2010-2015 The Apache Software Foundation
+Copyright 2010-2019 The Apache Software Foundation
This product includes software developed at
-The Apache Software Foundation (http://www.apache.org/).
+The Apache Software Foundation (https://www.apache.org/).
NUnit license acknowledgement:
| Portions Copyright © 2002-2012 Charlie Poole or Copyright © 2002-2004 James
| W. Newkirk, Michael C. Two, Alexei A. Vorontsov or Copyright © 2000-2002
-| Philip A. Craig
+| Philip A. Craig
Based upon the representations of upstream licensors, it is understood that
portions of the mapreduce API included in the Java implementation are licensed
@@ -74,7 +74,7 @@ is:
| (the "License"); you may not use this file except in compliance
| with the License. You may obtain a copy of the License at
|
-| http://www.apache.org/licenses/LICENSE-2.0
+| https://www.apache.org/licenses/LICENSE-2.0
|
| Unless required by applicable law or agreed to in writing, software
| distributed under the License is distributed on an "AS IS" BASIS,
@@ -85,7 +85,7 @@ is:
The Odiago NOTICE at the time of the contribution:
| This product includes software developed by Odiago, Inc.
-| (http://www.wibidata.com).
+| (https://www.wibidata.com).
Apache Ivy includes the following in its NOTICE file:
@@ -93,18 +93,18 @@ Apache Ivy includes the following in its NOTICE file:
| Copyright 2007-2010 The Apache Software Foundation
|
| This product includes software developed by
-| The Apache Software Foundation (http://www.apache.org/).
+| The Apache Software Foundation (https://www.apache.org/).
|
| Portions of Ivy were originally developed by
| Jayasoft SARL (http://www.jayasoft.fr/)
| and are licensed to the Apache Software Foundation under the
| "Software Grant License Agreement"
|
-| SSH and SFTP support is provided by the JCraft JSch package,
+| SSH and SFTP support is provided by the JCraft JSch package,
| which is open source software, available under
-| the terms of a BSD style license.
+| the terms of a BSD style license.
| The original software and related information is available
-| at http://www.jcraft.com/jsch/.
+| at http://www.jcraft.com/jsch/.
Apache Log4Net includes the following in its NOTICE file:
@@ -112,7 +112,16 @@ Apache Log4Net includes the following in its NOTICE file:
| Copyright 2004-2015 The Apache Software Foundation
|
| This product includes software developed at
-| The Apache Software Foundation (http://www.apache.org/).
+| The Apache Software Foundation (https://www.apache.org/).
+
+csharp reflect serializers were contributed by Pitney Bowes Inc.
+
+| Copyright 2019 Pitney Bowes Inc.
+| Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License.
+| You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0.
+| Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
+| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+| See the License for the specific language governing permissions and
limitations under the License.
========================================================================
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index b75c55153e..0bf222bf87 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -34,6 +34,7 @@ import
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunne
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestContainerId;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.format.text.TextSerializationSchema;
@@ -292,6 +293,30 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
testKafkaGroupOffsetsToConsole(container);
}
+ @TestTemplate
+ @DisabledOnContainer(TestContainerId.SPARK_2_4)
+ public void testFakeSourceToKafkaAvroFormat(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/avro/fake_source_to_kafka_avro_format.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
+ @TestTemplate
+ @DisabledOnContainer(TestContainerId.SPARK_2_4)
+ public void testKafkaAvroToConsole(TestContainer container)
+ throws IOException, InterruptedException {
+ DefaultSeaTunnelRowSerializer serializer =
+ DefaultSeaTunnelRowSerializer.create(
+ "test_avro_topic",
+ SEATUNNEL_ROW_TYPE,
+ MessageFormat.AVRO,
+ DEFAULT_FIELD_DELIMITER);
+ generateTestData(row -> serializer.serializeRow(row), 0, 100);
+ Container.ExecResult execResult =
container.executeJob("/avro/kafka_avro_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
public void testKafkaLatestToConsole(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf
new file mode 100644
index 0000000000..c6f5d6944f
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_avro_topic"
+ format = avro
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf
new file mode 100644
index 0000000000..3fc1a57c3d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_avro_topic"
+ result_table_name = "kafka_table"
+ kafka.auto.offset.reset = "earliest"
+ format = avro
+ format_error_handle_way = skip
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map<string, smallint>"
+ c_array = "array<tinyint>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "kafka_table"
+ }
+ Assert {
+ source_table_name = "kafka_table"
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-formats/pom.xml b/seatunnel-formats/pom.xml
index 7fc09b356a..a330e9b0e0 100644
--- a/seatunnel-formats/pom.xml
+++ b/seatunnel-formats/pom.xml
@@ -31,6 +31,7 @@
<module>seatunnel-format-text</module>
<module>seatunnel-format-compatible-debezium-json</module>
<module>seatunnel-format-compatible-connect-json</module>
+ <module>seatunnel-format-avro</module>
</modules>
</project>
diff --git a/seatunnel-formats/pom.xml
b/seatunnel-formats/seatunnel-format-avro/pom.xml
similarity index 64%
copy from seatunnel-formats/pom.xml
copy to seatunnel-formats/seatunnel-format-avro/pom.xml
index 7fc09b356a..513b0fa0f4 100644
--- a/seatunnel-formats/pom.xml
+++ b/seatunnel-formats/seatunnel-format-avro/pom.xml
@@ -18,19 +18,29 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel</artifactId>
+ <artifactId>seatunnel-formats</artifactId>
<version>${revision}</version>
</parent>
- <artifactId>seatunnel-formats</artifactId>
- <packaging>pom</packaging>
- <name>SeaTunnel : Formats :</name>
+ <artifactId>seatunnel-format-avro</artifactId>
+ <name>SeaTunnel : Formats : Avro</name>
- <modules>
- <module>seatunnel-format-json</module>
- <module>seatunnel-format-text</module>
- <module>seatunnel-format-compatible-debezium-json</module>
- <module>seatunnel-format-compatible-connect-json</module>
- </modules>
+ <properties>
+ <avro.version>1.11.1</avro.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${avro.version}</version>
+ </dependency>
+ </dependencies>
</project>
diff --git
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroDeserializationSchema.java
new file mode 100644
index 0000000000..b682a8e643
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroDeserializationSchema.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.avro;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+
+import java.io.IOException;
+
+public class AvroDeserializationSchema implements
DeserializationSchema<SeaTunnelRow> {
+
+ private static final long serialVersionUID = -7907358485475741366L;
+
+ private final SeaTunnelRowType rowType;
+ private final AvroToRowConverter converter;
+
+ public AvroDeserializationSchema(SeaTunnelRowType rowType) {
+ this.rowType = rowType;
+ this.converter = new AvroToRowConverter(rowType);
+ }
+
+ @Override
+ public SeaTunnelRow deserialize(byte[] message) throws IOException {
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(message,
null);
+ GenericRecord record = this.converter.getReader().read(null, decoder);
+ return converter.converter(record, rowType);
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+ return this.rowType;
+ }
+}
diff --git
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java
new file mode 100644
index 0000000000..3d9a828bf7
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.avro;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.format.avro.exception.AvroFormatErrorCode;
+import org.apache.seatunnel.format.avro.exception.SeaTunnelAvroFormatException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class AvroSerializationSchema implements SerializationSchema {
+
+ private static final long serialVersionUID = 4438784443025715370L;
+
+ private final ByteArrayOutputStream out;
+ private final BinaryEncoder encoder;
+ private final RowToAvroConverter converter;
+ private final DatumWriter<GenericRecord> writer;
+
+ public AvroSerializationSchema(SeaTunnelRowType rowType) {
+ this.out = new ByteArrayOutputStream();
+ this.encoder = EncoderFactory.get().binaryEncoder(out, null);
+ this.converter = new RowToAvroConverter(rowType);
+ this.writer = this.converter.getWriter();
+ }
+
+ @Override
+ public byte[] serialize(SeaTunnelRow element) {
+ GenericRecord record = converter.convertRowToGenericRecord(element);
+ try {
+ out.reset();
+ writer.write(record, encoder);
+ encoder.flush();
+ return out.toByteArray();
+ } catch (IOException e) {
+ throw new SeaTunnelAvroFormatException(
+ AvroFormatErrorCode.SERIALIZATION_ERROR,
+ "Serialization error on record : " + element);
+ }
+ }
+}
diff --git
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
new file mode 100644
index 0000000000..989087ee57
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.avro;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.format.avro.exception.AvroFormatErrorCode;
+import org.apache.seatunnel.format.avro.exception.SeaTunnelAvroFormatException;
+
+import org.apache.avro.Conversions;
+import org.apache.avro.Schema;
+import org.apache.avro.data.TimeConversions;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class AvroToRowConverter implements Serializable {
+
+ private static final long serialVersionUID = 8177020083886379563L;
+
+ private DatumReader<GenericRecord> reader = null;
+ private Schema schema;
+
+ public AvroToRowConverter(SeaTunnelRowType rowType) {
+ schema =
SeaTunnelRowTypeToAvroSchemaConverter.buildAvroSchemaWithRowType(rowType);
+ }
+
+ public DatumReader<GenericRecord> getReader() {
+ if (reader == null) {
+ reader = createReader();
+ }
+ return reader;
+ }
+
+ private DatumReader<GenericRecord> createReader() {
+ GenericDatumReader<GenericRecord> datumReader = new
GenericDatumReader<>(schema, schema);
+ datumReader.getData().addLogicalTypeConversion(new
Conversions.DecimalConversion());
+ datumReader.getData().addLogicalTypeConversion(new
TimeConversions.DateConversion());
+ datumReader
+ .getData()
+ .addLogicalTypeConversion(new
TimeConversions.LocalTimestampMillisConversion());
+ return datumReader;
+ }
+
+ public SeaTunnelRow converter(GenericRecord record, SeaTunnelRowType
rowType) {
+ String[] fieldNames = rowType.getFieldNames();
+
+ Object[] values = new Object[fieldNames.length];
+ for (int i = 0; i < fieldNames.length; i++) {
+ if (record.getSchema().getField(fieldNames[i]) == null) {
+ values[i] = null;
+ continue;
+ }
+ values[i] =
+ convertField(
+ rowType.getFieldType(i),
+ record.getSchema().getField(fieldNames[i]),
+ record.get(fieldNames[i]));
+ }
+ return new SeaTunnelRow(values);
+ }
+
+ private Object convertField(SeaTunnelDataType<?> dataType, Schema.Field
field, Object val) {
+ switch (dataType.getSqlType()) {
+ case MAP:
+ case STRING:
+ case BOOLEAN:
+ case SMALLINT:
+ case INT:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ case NULL:
+ case BYTES:
+ case DATE:
+ case DECIMAL:
+ case TIMESTAMP:
+ return val;
+ case TINYINT:
+ Class<?> typeClass = dataType.getTypeClass();
+ if (typeClass == Byte.class) {
+ Integer integer = (Integer) val;
+ return integer.byteValue();
+ }
+ return val;
+ case ARRAY:
+ BasicType<?> basicType = ((ArrayType<?, ?>)
dataType).getElementType();
+ List<Object> list = (List<Object>) val;
+ return convertArray(list, basicType);
+ case ROW:
+ SeaTunnelRowType subRow = (SeaTunnelRowType) dataType;
+ return converter((GenericRecord) val, subRow);
+ default:
+ String errorMsg =
+ String.format(
+ "SeaTunnel avro format is not supported for
this data type [%s]",
+ dataType.getSqlType());
+ throw new SeaTunnelAvroFormatException(
+ AvroFormatErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
+ }
+ }
+
+ protected static Object convertArray(List<Object> val,
SeaTunnelDataType<?> dataType) {
+ if (val == null) {
+ return null;
+ }
+ int length = val.size();
+ switch (dataType.getSqlType()) {
+ case STRING:
+ String[] strings = new String[length];
+ for (int i = 0; i < strings.length; i++) {
+ strings[i] = val.get(i).toString();
+ }
+ return strings;
+ case BOOLEAN:
+ Boolean[] booleans = new Boolean[length];
+ for (int i = 0; i < booleans.length; i++) {
+ booleans[i] = (Boolean) val.get(i);
+ }
+ return booleans;
+ case BYTES:
+ Byte[] bytes = new Byte[length];
+ for (int i = 0; i < bytes.length; i++) {
+ bytes[i] = (Byte) val.get(i);
+ }
+ return bytes;
+ case SMALLINT:
+ Short[] shorts = new Short[length];
+ for (int i = 0; i < shorts.length; i++) {
+ shorts[i] = (Short) val.get(i);
+ }
+ return shorts;
+ case INT:
+ Integer[] integers = new Integer[length];
+ for (int i = 0; i < integers.length; i++) {
+ integers[i] = (Integer) val.get(i);
+ }
+ return integers;
+ case BIGINT:
+ Long[] longs = new Long[length];
+ for (int i = 0; i < longs.length; i++) {
+ longs[i] = (Long) val.get(i);
+ }
+ return longs;
+ case FLOAT:
+ Float[] floats = new Float[length];
+ for (int i = 0; i < floats.length; i++) {
+ floats[i] = (Float) val.get(i);
+ }
+ return floats;
+ case DOUBLE:
+ Double[] doubles = new Double[length];
+ for (int i = 0; i < doubles.length; i++) {
+ doubles[i] = (Double) val.get(i);
+ }
+ return doubles;
+ default:
+ String errorMsg =
+ String.format(
+ "SeaTunnel avro array format is not supported
for this data type [%s]",
+ dataType.getSqlType());
+ throw new SeaTunnelAvroFormatException(
+ AvroFormatErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
+ }
+ }
+}
diff --git
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/RowToAvroConverter.java
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/RowToAvroConverter.java
new file mode 100644
index 0000000000..f8f0652a26
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/RowToAvroConverter.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.avro;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.format.avro.exception.AvroFormatErrorCode;
+import org.apache.seatunnel.format.avro.exception.SeaTunnelAvroFormatException;
+
+import org.apache.avro.Conversions;
+import org.apache.avro.Schema;
+import org.apache.avro.data.TimeConversions;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.io.DatumWriter;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RowToAvroConverter implements Serializable {
+
+ private static final long serialVersionUID = -576124379280229724L;
+
+ private final Schema schema;
+ private final SeaTunnelRowType rowType;
+ private final DatumWriter<GenericRecord> writer;
+
+ public RowToAvroConverter(SeaTunnelRowType rowType) {
+ this.schema =
SeaTunnelRowTypeToAvroSchemaConverter.buildAvroSchemaWithRowType(rowType);
+ this.rowType = rowType;
+ this.writer = createWriter();
+ }
+
+ private DatumWriter<GenericRecord> createWriter() {
+ GenericDatumWriter<GenericRecord> datumWriter = new
GenericDatumWriter<>(schema);
+ datumWriter.getData().addLogicalTypeConversion(new
Conversions.DecimalConversion());
+ datumWriter.getData().addLogicalTypeConversion(new
TimeConversions.DateConversion());
+ datumWriter
+ .getData()
+ .addLogicalTypeConversion(new
TimeConversions.LocalTimestampMillisConversion());
+ return datumWriter;
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ public DatumWriter<GenericRecord> getWriter() {
+ return writer;
+ }
+
+ public GenericRecord convertRowToGenericRecord(SeaTunnelRow element) {
+ GenericRecordBuilder builder = new GenericRecordBuilder(schema);
+ String[] fieldNames = rowType.getFieldNames();
+ for (int i = 0; i < fieldNames.length; i++) {
+ String fieldName = rowType.getFieldName(i);
+ Object value = element.getField(i);
+ builder.set(fieldName.toLowerCase(), resolveObject(value,
rowType.getFieldType(i)));
+ }
+ return builder.build();
+ }
+
+ private Object resolveObject(Object data, SeaTunnelDataType<?>
seaTunnelDataType) {
+ if (data == null) {
+ return null;
+ }
+ switch (seaTunnelDataType.getSqlType()) {
+ case STRING:
+ case SMALLINT:
+ case INT:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ case BOOLEAN:
+ case MAP:
+ case DECIMAL:
+ case DATE:
+ case TIMESTAMP:
+ return data;
+ case TINYINT:
+ Class<?> typeClass = seaTunnelDataType.getTypeClass();
+ if (typeClass == Byte.class) {
+ if (data instanceof Byte) {
+ Byte aByte = (Byte) data;
+ return Byte.toUnsignedInt(aByte);
+ }
+ }
+ return data;
+ case BYTES:
+ return ByteBuffer.wrap((byte[]) data);
+ case ARRAY:
+ // BasicType<?> basicType = ((ArrayType<?, ?>)
+ // seaTunnelDataType).getElementType();
+ // return Util.convertArray((Object[]) data,
basicType);
+ BasicType<?> basicType = ((ArrayType<?, ?>)
seaTunnelDataType).getElementType();
+ List<Object> records = new ArrayList<>(((Object[])
data).length);
+ for (Object object : (Object[]) data) {
+ Object resolvedObject = resolveObject(object, basicType);
+ records.add(resolvedObject);
+ }
+ return records;
+ case ROW:
+ SeaTunnelRow seaTunnelRow = (SeaTunnelRow) data;
+ SeaTunnelDataType<?>[] fieldTypes =
+ ((SeaTunnelRowType) seaTunnelDataType).getFieldTypes();
+ String[] fieldNames = ((SeaTunnelRowType)
seaTunnelDataType).getFieldNames();
+ Schema recordSchema =
+
SeaTunnelRowTypeToAvroSchemaConverter.buildAvroSchemaWithRowType(
+ (SeaTunnelRowType) seaTunnelDataType);
+ GenericRecordBuilder recordBuilder = new
GenericRecordBuilder(recordSchema);
+ for (int i = 0; i < fieldNames.length; i++) {
+ recordBuilder.set(
+ fieldNames[i].toLowerCase(),
+ resolveObject(seaTunnelRow.getField(i),
fieldTypes[i]));
+ }
+ return recordBuilder.build();
+ default:
+ String errorMsg =
+ String.format(
+ "SeaTunnel avro format is not supported for
this data type [%s]",
+ seaTunnelDataType.getSqlType());
+ throw new SeaTunnelAvroFormatException(
+ AvroFormatErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
+ }
+ }
+}
diff --git
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/SeaTunnelRowTypeToAvroSchemaConverter.java
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/SeaTunnelRowTypeToAvroSchemaConverter.java
new file mode 100644
index 0000000000..195ff8004c
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/SeaTunnelRowTypeToAvroSchemaConverter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.avro;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.format.avro.exception.AvroFormatErrorCode;
+import org.apache.seatunnel.format.avro.exception.SeaTunnelAvroFormatException;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SeaTunnelRowTypeToAvroSchemaConverter {
+
+ public static Schema buildAvroSchemaWithRowType(SeaTunnelRowType
seaTunnelRowType) {
+ List<Schema.Field> fields = new ArrayList<>();
+ SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+ String[] fieldNames = seaTunnelRowType.getFieldNames();
+ for (int i = 0; i < fieldNames.length; i++) {
+ fields.add(generateField(fieldNames[i], fieldTypes[i]));
+ }
+ return Schema.createRecord("SeaTunnelRecord", null, null, false,
fields);
+ }
+
+ private static Schema.Field generateField(
+ String fieldName, SeaTunnelDataType<?> seaTunnelDataType) {
+ return new Schema.Field(
+ fieldName,
+ seaTunnelDataType2AvroDataType(fieldName, seaTunnelDataType),
+ null,
+ null);
+ }
+
+ private static Schema seaTunnelDataType2AvroDataType(
+ String fieldName, SeaTunnelDataType<?> seaTunnelDataType) {
+
+ switch (seaTunnelDataType.getSqlType()) {
+ case STRING:
+ return Schema.create(Schema.Type.STRING);
+ case BYTES:
+ return Schema.create(Schema.Type.BYTES);
+ case TINYINT:
+ case SMALLINT:
+ case INT:
+ return Schema.create(Schema.Type.INT);
+ case BIGINT:
+ return Schema.create(Schema.Type.LONG);
+ case FLOAT:
+ return Schema.create(Schema.Type.FLOAT);
+ case DOUBLE:
+ return Schema.create(Schema.Type.DOUBLE);
+ case BOOLEAN:
+ return Schema.create(Schema.Type.BOOLEAN);
+ case MAP:
+ SeaTunnelDataType<?> valueType = ((MapType<?, ?>)
seaTunnelDataType).getValueType();
+ return
Schema.createMap(seaTunnelDataType2AvroDataType(fieldName, valueType));
+ case ARRAY:
+ BasicType<?> elementType = ((ArrayType<?, ?>)
seaTunnelDataType).getElementType();
+ return
Schema.createArray(seaTunnelDataType2AvroDataType(fieldName, elementType));
+ case ROW:
+ SeaTunnelDataType<?>[] fieldTypes =
+ ((SeaTunnelRowType) seaTunnelDataType).getFieldTypes();
+ String[] fieldNames = ((SeaTunnelRowType)
seaTunnelDataType).getFieldNames();
+ List<Schema.Field> subField = new ArrayList<>();
+ for (int i = 0; i < fieldNames.length; i++) {
+ subField.add(generateField(fieldNames[i], fieldTypes[i]));
+ }
+ return Schema.createRecord(fieldName, null, null, false,
subField);
+ case DECIMAL:
+ int precision = ((DecimalType)
seaTunnelDataType).getPrecision();
+ int scale = ((DecimalType) seaTunnelDataType).getScale();
+ LogicalTypes.Decimal decimal = LogicalTypes.decimal(precision,
scale);
+ return decimal.addToSchema(Schema.create(Schema.Type.BYTES));
+ case TIMESTAMP:
+ return LogicalTypes.localTimestampMillis()
+ .addToSchema(Schema.create(Schema.Type.LONG));
+ case DATE:
+ return
LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
+ case NULL:
+ return Schema.create(Schema.Type.NULL);
+ default:
+ String errorMsg =
+ String.format(
+ "SeaTunnel avro format is not supported for
this data type [%s]",
+ seaTunnelDataType.getSqlType());
+ throw new SeaTunnelAvroFormatException(
+ AvroFormatErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/exception/AvroFormatErrorCode.java
similarity index 52%
copy from
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
copy to
seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/exception/AvroFormatErrorCode.java
index 491990fd33..6f60a47932 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
+++
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/exception/AvroFormatErrorCode.java
@@ -15,14 +15,30 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.kafka.config;
+package org.apache.seatunnel.format.avro.exception;
-public enum MessageFormat {
- JSON,
- TEXT,
- CANAL_JSON,
- DEBEZIUM_JSON,
- COMPATIBLE_DEBEZIUM_JSON,
- COMPATIBLE_KAFKA_CONNECT_JSON,
- OGG_JSON
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum AvroFormatErrorCode implements SeaTunnelErrorCode {
+ UNSUPPORTED_DATA_TYPE("AVRO-01", "Unsupported data type."),
+ SERIALIZATION_ERROR("AVRO-02", "serialize error."),
+ FILED_NOT_EXIST("AVRO-03", "Field not exist.");
+
+ private final String code;
+ private final String description;
+
+ AvroFormatErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return code;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/exception/SeaTunnelAvroFormatException.java
similarity index 64%
copy from
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
copy to
seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/exception/SeaTunnelAvroFormatException.java
index 491990fd33..93c45323b4 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
+++
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/exception/SeaTunnelAvroFormatException.java
@@ -15,14 +15,15 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.kafka.config;
+package org.apache.seatunnel.format.avro.exception;
-public enum MessageFormat {
- JSON,
- TEXT,
- CANAL_JSON,
- DEBEZIUM_JSON,
- COMPATIBLE_DEBEZIUM_JSON,
- COMPATIBLE_KAFKA_CONNECT_JSON,
- OGG_JSON
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class SeaTunnelAvroFormatException extends SeaTunnelRuntimeException {
+
+ public SeaTunnelAvroFormatException(
+ SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
+ super(seaTunnelErrorCode, errorMessage);
+ }
}
diff --git
a/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroConverterTest.java
b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroConverterTest.java
new file mode 100644
index 0000000000..fb45a0b537
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroConverterTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.avro;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.apache.avro.generic.GenericRecord;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.Map;
+
+class AvroConverterTest {
+
+ private SeaTunnelRow buildSeaTunnelRow() {
+ SeaTunnelRow subSeaTunnelRow = new SeaTunnelRow(14);
+ Map<String, String> map = new HashMap<String, String>();
+ map.put("k1", "v1");
+ map.put("k2", "v2");
+ String[] strArray = new String[] {"l1", "l2"};
+ byte byteVal = 100;
+ LocalDate localDate = LocalDate.of(2023, 1, 1);
+
+ BigDecimal bigDecimal = new
BigDecimal("61592600349703735722.724745739637773662");
+ LocalDateTime localDateTime = LocalDateTime.of(2023, 1, 1, 6, 30, 40);
+
+ subSeaTunnelRow.setField(0, map);
+ subSeaTunnelRow.setField(1, strArray);
+ subSeaTunnelRow.setField(2, "strVal");
+ subSeaTunnelRow.setField(3, true);
+ subSeaTunnelRow.setField(4, 1);
+ subSeaTunnelRow.setField(5, 2);
+ subSeaTunnelRow.setField(6, 3);
+ subSeaTunnelRow.setField(7, Long.MAX_VALUE - 1);
+ subSeaTunnelRow.setField(8, 33.333F);
+ subSeaTunnelRow.setField(9, 123.456);
+ subSeaTunnelRow.setField(10, byteVal);
+ subSeaTunnelRow.setField(11, localDate);
+ subSeaTunnelRow.setField(12, bigDecimal);
+ subSeaTunnelRow.setField(13, localDateTime);
+
+ SeaTunnelRow seaTunnelRow = new SeaTunnelRow(15);
+ seaTunnelRow.setField(0, map);
+ seaTunnelRow.setField(1, strArray);
+ seaTunnelRow.setField(2, "strVal");
+ seaTunnelRow.setField(3, true);
+ seaTunnelRow.setField(4, 1);
+ seaTunnelRow.setField(5, 2);
+ seaTunnelRow.setField(6, 3);
+ seaTunnelRow.setField(7, Long.MAX_VALUE - 1);
+ seaTunnelRow.setField(8, 33.333F);
+ seaTunnelRow.setField(9, 123.456);
+ seaTunnelRow.setField(10, byteVal);
+ seaTunnelRow.setField(11, localDate);
+ seaTunnelRow.setField(12, bigDecimal);
+ seaTunnelRow.setField(13, localDateTime);
+ seaTunnelRow.setField(14, subSeaTunnelRow);
+ return seaTunnelRow;
+ }
+
+ private SeaTunnelRowType buildSeaTunnelRowType() {
+ String[] subField = {
+ "c_map",
+ "c_array",
+ "c_string",
+ "c_boolean",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_bytes",
+ "c_date",
+ "c_decimal",
+ "c_timestamp"
+ };
+ SeaTunnelDataType<?>[] subFieldTypes = {
+ new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
+ ArrayType.STRING_ARRAY_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ BasicType.BYTE_TYPE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ new DecimalType(38, 18),
+ LocalTimeType.LOCAL_DATE_TIME_TYPE
+ };
+ SeaTunnelRowType subRow = new SeaTunnelRowType(subField,
subFieldTypes);
+
+ String[] fieldNames = {
+ "c_map",
+ "c_array",
+ "c_string",
+ "c_boolean",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_bytes",
+ "c_date",
+ "c_decimal",
+ "c_timestamp",
+ "c_row"
+ };
+ SeaTunnelDataType<?>[] fieldTypes = {
+ new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
+ ArrayType.STRING_ARRAY_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ BasicType.BYTE_TYPE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ new DecimalType(38, 18),
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ subRow
+ };
+ SeaTunnelRowType rowType = new SeaTunnelRowType(fieldNames,
fieldTypes);
+ return rowType;
+ }
+
+ @Test
+ public void testConverter() {
+
+ SeaTunnelRowType rowType = buildSeaTunnelRowType();
+ SeaTunnelRow seaTunnelRow = buildSeaTunnelRow();
+ RowToAvroConverter rowToAvroConverter = new
RowToAvroConverter(rowType);
+ GenericRecord record =
rowToAvroConverter.convertRowToGenericRecord(seaTunnelRow);
+
+ AvroToRowConverter avroToRowConverter = new
AvroToRowConverter(rowType);
+ SeaTunnelRow converterRow = avroToRowConverter.converter(record,
rowType);
+
+ Assertions.assertEquals(converterRow, seaTunnelRow);
+ }
+}
diff --git
a/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
new file mode 100644
index 0000000000..5f505e1ba6
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.avro;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.Map;
+
+class AvroSerializationSchemaTest {
+
+ private LocalDate localDate = LocalDate.of(2023, 1, 1);
+ private BigDecimal bigDecimal = new
BigDecimal("61592600349703735722.724745739637773662");
+ private LocalDateTime localDateTime = LocalDateTime.of(2023, 1, 1, 6, 30,
40);
+
+ private SeaTunnelRow buildSeaTunnelRow() {
+ SeaTunnelRow subSeaTunnelRow = new SeaTunnelRow(14);
+ Map<String, String> map = new HashMap<String, String>();
+ map.put("k1", "v1");
+ map.put("k2", "v2");
+ String[] strArray = new String[] {"l1", "l2"};
+ byte byteVal = 100;
+ subSeaTunnelRow.setField(0, map);
+ subSeaTunnelRow.setField(1, strArray);
+ subSeaTunnelRow.setField(2, "strVal");
+ subSeaTunnelRow.setField(3, true);
+ subSeaTunnelRow.setField(4, 1);
+ subSeaTunnelRow.setField(5, 2);
+ subSeaTunnelRow.setField(6, 3);
+ subSeaTunnelRow.setField(7, Long.MAX_VALUE - 1);
+ subSeaTunnelRow.setField(8, 33.333F);
+ subSeaTunnelRow.setField(9, 123.456);
+ subSeaTunnelRow.setField(10, byteVal);
+ subSeaTunnelRow.setField(11, localDate);
+ subSeaTunnelRow.setField(12, bigDecimal);
+ subSeaTunnelRow.setField(13, localDateTime);
+
+ SeaTunnelRow seaTunnelRow = new SeaTunnelRow(15);
+ seaTunnelRow.setField(0, map);
+ seaTunnelRow.setField(1, strArray);
+ seaTunnelRow.setField(2, "strVal");
+ seaTunnelRow.setField(3, true);
+ seaTunnelRow.setField(4, 1);
+ seaTunnelRow.setField(5, 2);
+ seaTunnelRow.setField(6, 3);
+ seaTunnelRow.setField(7, Long.MAX_VALUE - 1);
+ seaTunnelRow.setField(8, 33.333F);
+ seaTunnelRow.setField(9, 123.456);
+ seaTunnelRow.setField(10, byteVal);
+ seaTunnelRow.setField(11, localDate);
+ seaTunnelRow.setField(12, bigDecimal);
+ seaTunnelRow.setField(13, localDateTime);
+ seaTunnelRow.setField(14, subSeaTunnelRow);
+ return seaTunnelRow;
+ }
+
+ private SeaTunnelRowType buildSeaTunnelRowType() {
+ String[] subField = {
+ "c_map",
+ "c_array",
+ "c_string",
+ "c_boolean",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_bytes",
+ "c_date",
+ "c_decimal",
+ "c_timestamp"
+ };
+ SeaTunnelDataType<?>[] subFieldTypes = {
+ new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
+ ArrayType.STRING_ARRAY_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ BasicType.BYTE_TYPE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ new DecimalType(38, 18),
+ LocalTimeType.LOCAL_DATE_TIME_TYPE
+ };
+ SeaTunnelRowType subRow = new SeaTunnelRowType(subField,
subFieldTypes);
+
+ String[] fieldNames = {
+ "c_map",
+ "c_array",
+ "c_string",
+ "c_boolean",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_bytes",
+ "c_date",
+ "c_decimal",
+ "c_timestamp",
+ "c_row"
+ };
+ SeaTunnelDataType<?>[] fieldTypes = {
+ new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
+ ArrayType.STRING_ARRAY_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ BasicType.BYTE_TYPE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ new DecimalType(38, 18),
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ subRow
+ };
+ SeaTunnelRowType rowType = new SeaTunnelRowType(fieldNames,
fieldTypes);
+ return rowType;
+ }
+
+ @Test
+ public void testSerialization() throws IOException {
+ SeaTunnelRowType rowType = buildSeaTunnelRowType();
+ SeaTunnelRow seaTunnelRow = buildSeaTunnelRow();
+ AvroSerializationSchema serializationSchema = new
AvroSerializationSchema(rowType);
+ byte[] serialize = serializationSchema.serialize(seaTunnelRow);
+ AvroDeserializationSchema deserializationSchema = new
AvroDeserializationSchema(rowType);
+ SeaTunnelRow deserialize =
deserializationSchema.deserialize(serialize);
+ String[] strArray1 = (String[]) seaTunnelRow.getField(1);
+ String[] strArray2 = (String[]) deserialize.getField(1);
+ Assertions.assertArrayEquals(strArray1, strArray2);
+ SeaTunnelRow subRow = (SeaTunnelRow) deserialize.getField(14);
+ Assertions.assertEquals((double) subRow.getField(9), 123.456);
+ BigDecimal bigDecimal1 = (BigDecimal) subRow.getField(12);
+ Assertions.assertEquals(bigDecimal1.compareTo(bigDecimal), 0);
+ LocalDateTime localDateTime1 = (LocalDateTime) subRow.getField(13);
+ Assertions.assertEquals(localDateTime1.compareTo(localDateTime), 0);
+ }
+}
diff --git a/tools/dependencies/known-dependencies.txt
b/tools/dependencies/known-dependencies.txt
index 52205f52e4..837a7fcc18 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -39,4 +39,5 @@ listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
json-path-2.7.0.jar
json-smart-2.4.7.jar
accessors-smart-2.4.7.jar
-asm-9.1.jar
\ No newline at end of file
+asm-9.1.jar
+avro-1.11.1.jar
\ No newline at end of file