This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 93a006156d [Feature] support avro format (#5084)
93a006156d is described below

commit 93a006156d704218c61f250dac6cb573f242d9e3
Author: Jarvis <[email protected]>
AuthorDate: Thu Dec 14 13:39:31 2023 +0800

    [Feature] support avro format (#5084)
---
 docs/en/connector-v2/sink/Kafka.md                 |  26 +--
 docs/en/connector-v2/source/kafka.md               |  34 ++--
 seatunnel-connectors-v2/connector-kafka/pom.xml    |   6 +-
 .../seatunnel/kafka/config/MessageFormat.java      |   3 +-
 .../serialize/DefaultSeaTunnelRowSerializer.java   |   3 +
 .../seatunnel/kafka/sink/KafkaSinkFactory.java     |   3 +-
 .../seatunnel/kafka/source/KafkaSourceConfig.java  |   3 +
 seatunnel-dist/release-docs/LICENSE                |   4 +-
 seatunnel-dist/release-docs/NOTICE                 |  29 ++--
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     |  25 +++
 .../avro/fake_source_to_kafka_avro_format.conf     |  76 +++++++++
 .../test/resources/avro/kafka_avro_to_console.conf |  89 ++++++++++
 seatunnel-formats/pom.xml                          |   1 +
 .../{ => seatunnel-format-avro}/pom.xml            |  30 ++--
 .../format/avro/AvroDeserializationSchema.java     |  54 ++++++
 .../format/avro/AvroSerializationSchema.java       |  65 +++++++
 .../seatunnel/format/avro/AvroToRowConverter.java  | 187 +++++++++++++++++++++
 .../seatunnel/format/avro/RowToAvroConverter.java  | 148 ++++++++++++++++
 .../SeaTunnelRowTypeToAvroSchemaConverter.java     | 113 +++++++++++++
 .../format/avro/exception/AvroFormatErrorCode.java |  34 +++-
 .../exception/SeaTunnelAvroFormatException.java    |  19 ++-
 .../seatunnel/format/avro/AvroConverterTest.java   | 174 +++++++++++++++++++
 .../format/avro/AvroSerializationSchemaTest.java   | 177 +++++++++++++++++++
 tools/dependencies/known-dependencies.txt          |   3 +-
 24 files changed, 1232 insertions(+), 74 deletions(-)

diff --git a/docs/en/connector-v2/sink/Kafka.md 
b/docs/en/connector-v2/sink/Kafka.md
index 835344126b..17ff091061 100644
--- a/docs/en/connector-v2/sink/Kafka.md
+++ b/docs/en/connector-v2/sink/Kafka.md
@@ -30,19 +30,19 @@ They can be downloaded via install-plugin.sh or from the 
Maven central repositor
 
 ## Sink Options
 
-|         Name         |  Type  | Required | Default |                         
                                                                                
                                                                                
                    Description                                                 
                                                                                
                                                                            |
-|----------------------|--------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| topic                | String | Yes      | -       | When the table is used 
as sink, the topic name is the topic to write data to.                          
                                                                                
                                                                                
                                                                                
                                                                             |
-| bootstrap.servers    | String | Yes      | -       | Comma separated list of 
Kafka brokers.                                                                  
                                                                                
                                                                                
                                                                                
                                                                            |
-| kafka.config         | Map    | No       | -       | In addition to the 
above parameters that must be specified by the `Kafka producer` client, the 
user can also specify multiple non-mandatory parameters for the `producer` 
client, covering [all the producer parameters specified in the official Kafka 
document](https://kafka.apache.org/documentation.html#producerconfigs).         
                                                                                
            |
-| semantics            | String | No       | NON     | Semantics that can be 
chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.                             
                                                                                
                                                                                
                                                                                
                                                                              |
-| partition_key_fields | Array  | No       | -       | Configure which fields 
are used as the key of the kafka message.                                       
                                                                                
                                                                                
                                                                                
                                                                             |
-| partition            | Int    | No       | -       | We can specify the 
partition, all messages will be sent to this partition.                         
                                                                                
                                                                                
                                                                                
                                                                                
 |
-| assign_partitions    | Array  | No       | -       | We can decide which 
partition to send based on the content of the message. The function of this 
parameter is to distribute information.                                         
                                                                                
                                                                                
                                                                                
    |
-| transaction_prefix   | String | No       | -       | If semantic is 
specified as EXACTLY_ONCE, the producer will write all messages in a Kafka 
transaction,kafka distinguishes different transactions by different 
transactionId. This parameter is prefix of  kafka  transactionId, make sure 
different job use different prefix.                                             
                                                                                
                          |
-| format               | String | No       | json    | Data format. The 
default format is json. Optional text format, canal-json and debezium-json.If 
you use json or text format. The default field separator is ", ". If you 
customize the delimiter, add the "field_delimiter" option.If you use canal 
format, please refer to [canal-json](../formats/canal-json.md) for details.If 
you use debezium format, please refer to 
[debezium-json](../formats/debezium-json.md) for details. |
-| field_delimiter      | String | No       | ,       | Customize the field 
delimiter for data format.                                                      
                                                                                
                                                                                
                                                                                
                                                                                
|
-| common-options       |        | No       | -       | Source plugin common 
parameters, please refer to [Source Common Options](common-options.md) for 
details                                                                         
                                                                                
                                                                                
                                                                                
    |
+|         Name         |  Type  | Required | Default |                         
                                                                                
                                                                                
                       Description                                              
                                                                                
                                                                                
  |
+|----------------------|--------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| topic                | String | Yes      | -       | When the table is used 
as sink, the topic name is the topic to write data to.                          
                                                                                
                                                                                
                                                                                
                                                                                
   |
+| bootstrap.servers    | String | Yes      | -       | Comma separated list of 
Kafka brokers.                                                                  
                                                                                
                                                                                
                                                                                
                                                                                
  |
+| kafka.config         | Map    | No       | -       | In addition to the 
above parameters that must be specified by the `Kafka producer` client, the 
user can also specify multiple non-mandatory parameters for the `producer` 
client, covering [all the producer parameters specified in the official Kafka 
document](https://kafka.apache.org/documentation.html#producerconfigs).         
                                                                                
                  |
+| semantics            | String | No       | NON     | Semantics that can be 
chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.                             
                                                                                
                                                                                
                                                                                
                                                                                
    |
+| partition_key_fields | Array  | No       | -       | Configure which fields 
are used as the key of the kafka message.                                       
                                                                                
                                                                                
                                                                                
                                                                                
   |
+| partition            | Int    | No       | -       | We can specify the 
partition, all messages will be sent to this partition.                         
                                                                                
                                                                                
                                                                                
                                                                                
       |
+| assign_partitions    | Array  | No       | -       | We can decide which 
partition to send based on the content of the message. The function of this 
parameter is to distribute information.                                         
                                                                                
                                                                                
                                                                                
          |
+| transaction_prefix   | String | No       | -       | If semantic is 
specified as EXACTLY_ONCE, the producer will write all messages in a Kafka 
transaction,kafka distinguishes different transactions by different 
transactionId. This parameter is prefix of  kafka  transactionId, make sure 
different job use different prefix.                                             
                                                                                
                                |
+| format               | String | No       | json    | Data format. The 
default format is json. Optional text format, canal-json, debezium-json and 
avro.If you use json or text format. The default field separator is ", ". If 
you customize the delimiter, add the "field_delimiter" option.If you use canal 
format, please refer to [canal-json](../formats/canal-json.md) for details.If 
you use debezium format, please refer to 
[debezium-json](../formats/debezium-json.md) for details. |
+| field_delimiter      | String | No       | ,       | Customize the field 
delimiter for data format.                                                      
                                                                                
                                                                                
                                                                                
                                                                                
      |
+| common-options       |        | No       | -       | Source plugin common 
parameters, please refer to [Source Common Options](common-options.md) for 
details                                                                         
                                                                                
                                                                                
                                                                                
          |
 
 ## Parameter Interpretation
 
diff --git a/docs/en/connector-v2/source/kafka.md 
b/docs/en/connector-v2/source/kafka.md
index 16b9c5420b..6b69e8e931 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -32,23 +32,23 @@ They can be downloaded via install-plugin.sh or from the 
Maven central repositor
 
 ## Source Options
 
-|                Name                 |                                    
Type                                     | Required |         Default          
|                                                                               
                                                                                
                                              Description                       
                                                                                
                   [...]
-|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-| topic                               | String                                 
                                     | Yes      | -                        | 
Topic name(s) to read data from when the table is used as source. It also 
supports topic list for source by separating topic by comma like 
'topic-1,topic-2'.                                                              
                                                                                
                                      [...]
-| bootstrap.servers                   | String                                 
                                     | Yes      | -                        | 
Comma separated list of Kafka brokers.                                          
                                                                                
                                                                                
                                                                                
                 [...]
-| pattern                             | Boolean                                
                                     | No       | false                    | If 
`pattern` is set to `true`,the regular expression for a pattern of topic names 
to read from. All topics in clients with names that match the specified regular 
expression will be subscribed by the consumer.                                  
                                                                                
               [...]
-| consumer.group                      | String                                 
                                     | No       | SeaTunnel-Consumer-Group | 
`Kafka consumer group id`, used to distinguish different consumer groups.       
                                                                                
                                                                                
                                                                                
                 [...]
-| commit_on_checkpoint                | Boolean                                
                                     | No       | true                     | If 
true the consumer's offset will be periodically committed in the background.    
                                                                                
                                                                                
                                                                                
              [...]
-| kafka.config                        | Map                                    
                                     | No       | -                        | In 
addition to the above necessary parameters that must be specified by the `Kafka 
consumer` client, users can also specify multiple `consumer` client 
non-mandatory parameters, covering [all consumer parameters specified in the 
official Kafka 
document](https://kafka.apache.org/documentation.html#consumerconfigs).         
              [...]
-| schema                              | Config                                 
                                     | No       | -                        | 
The structure of the data, including field names and field types.               
                                                                                
                                                                                
                                                                                
                 [...]
-| format                              | String                                 
                                     | No       | json                     | 
Data format. The default format is json. Optional text format, canal-json and 
debezium-json.If you use json or text format. The default field separator is ", 
". If you customize the delimiter, add the "field_delimiter" option.If you use 
canal format, please refer to [canal-json](../formats/canal-json.md) for 
details.If you use debeziu [...]
-| format_error_handle_way             | String                                 
                                     | No       | fail                     | 
The processing method of data format error. The default value is fail, and the 
optional value is (fail, skip). When fail is selected, data format error will 
block and an exception will be thrown. When skip is selected, data format error 
will skip this line data.                                                       
                    [...]
-| field_delimiter                     | String                                 
                                     | No       | ,                        | 
Customize the field delimiter for data format.                                  
                                                                                
                                                                                
                                                                                
                 [...]
-| start_mode                          | 
StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] | 
No       | group_offsets            | The initial consumption pattern of 
consumers.                                                                      
                                                                                
                                                                                
                                                              [...]
-| start_mode.offsets                  | Config                                 
                                     | No       | -                        | 
The offset required for consumption mode to be specific_offsets.                
                                                                                
                                                                                
                                                                                
                 [...]
-| start_mode.timestamp                | Long                                   
                                     | No       | -                        | 
The time required for consumption mode to be "timestamp".                       
                                                                                
                                                                                
                                                                                
                 [...]
-| partition-discovery.interval-millis | Long                                   
                                     | No       | -1                       | 
The interval for dynamically discovering topics and partitions.                 
                                                                                
                                                                                
                                                                                
                 [...]
-| common-options                      |                                        
                                     | No       | -                        | 
Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details                                         
                                                                                
                                                                                
                                 [...]
+|                Name                 |                                    
Type                                     | Required |         Default          
|                                                                               
                                                                                
                                                 Description                    
                                                                                
                   [...]
+|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| topic                               | String                                 
                                     | Yes      | -                        | 
Topic name(s) to read data from when the table is used as source. It also 
supports topic list for source by separating topic by comma like 
'topic-1,topic-2'.                                                              
                                                                                
                                      [...]
+| bootstrap.servers                   | String                                 
                                     | Yes      | -                        | 
Comma separated list of Kafka brokers.                                          
                                                                                
                                                                                
                                                                                
                 [...]
+| pattern                             | Boolean                                
                                     | No       | false                    | If 
`pattern` is set to `true`,the regular expression for a pattern of topic names 
to read from. All topics in clients with names that match the specified regular 
expression will be subscribed by the consumer.                                  
                                                                                
               [...]
+| consumer.group                      | String                                 
                                     | No       | SeaTunnel-Consumer-Group | 
`Kafka consumer group id`, used to distinguish different consumer groups.       
                                                                                
                                                                                
                                                                                
                 [...]
+| commit_on_checkpoint                | Boolean                                
                                     | No       | true                     | If 
true the consumer's offset will be periodically committed in the background.    
                                                                                
                                                                                
                                                                                
              [...]
+| kafka.config                        | Map                                    
                                     | No       | -                        | In 
addition to the above necessary parameters that must be specified by the `Kafka 
consumer` client, users can also specify multiple `consumer` client 
non-mandatory parameters, covering [all consumer parameters specified in the 
official Kafka 
document](https://kafka.apache.org/documentation.html#consumerconfigs).         
              [...]
+| schema                              | Config                                 
                                     | No       | -                        | 
The structure of the data, including field names and field types.               
                                                                                
                                                                                
                                                                                
                 [...]
+| format                              | String                                 
                                     | No       | json                     | 
Data format. The default format is json. Optional text format, canal-json, 
debezium-json and avro.If you use json or text format. The default field 
separator is ", ". If you customize the delimiter, add the "field_delimiter" 
option.If you use canal format, please refer to 
[canal-json](../formats/canal-json.md) for details.If you use d [...]
+| format_error_handle_way             | String                                 
                                     | No       | fail                     | 
The processing method of data format error. The default value is fail, and the 
optional value is (fail, skip). When fail is selected, data format error will 
block and an exception will be thrown. When skip is selected, data format error 
will skip this line data.                                                       
                    [...]
+| field_delimiter                     | String                                 
                                     | No       | ,                        | 
Customize the field delimiter for data format.                                  
                                                                                
                                                                                
                                                                                
                 [...]
+| start_mode                          | 
StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] | 
No       | group_offsets            | The initial consumption pattern of 
consumers.                                                                      
                                                                                
                                                                                
                                                              [...]
+| start_mode.offsets                  | Config                                 
                                     | No       | -                        | 
The offset required for consumption mode to be specific_offsets.                
                                                                                
                                                                                
                                                                                
                 [...]
+| start_mode.timestamp                | Long                                   
                                     | No       | -                        | 
The time required for consumption mode to be "timestamp".                       
                                                                                
                                                                                
                                                                                
                 [...]
+| partition-discovery.interval-millis | Long                                   
                                     | No       | -1                       | 
The interval for dynamically discovering topics and partitions.                 
                                                                                
                                                                                
                                                                                
                 [...]
+| common-options                      |                                        
                                     | No       | -                        | 
Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details                                         
                                                                                
                                                                                
                                 [...]
 
 ## Task Example
 
diff --git a/seatunnel-connectors-v2/connector-kafka/pom.xml 
b/seatunnel-connectors-v2/connector-kafka/pom.xml
index 7955ab3f54..6e470f12ba 100644
--- a/seatunnel-connectors-v2/connector-kafka/pom.xml
+++ b/seatunnel-connectors-v2/connector-kafka/pom.xml
@@ -72,7 +72,11 @@
             <artifactId>connect-json</artifactId>
             <version>${kafka.client.version}</version>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-avro</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
index 491990fd33..18e466e41c 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
@@ -24,5 +24,6 @@ public enum MessageFormat {
     DEBEZIUM_JSON,
     COMPATIBLE_DEBEZIUM_JSON,
     COMPATIBLE_KAFKA_CONNECT_JSON,
-    OGG_JSON
+    OGG_JSON,
+    AVRO
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index 0f8ae50f01..51d491002c 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
+import org.apache.seatunnel.format.avro.AvroSerializationSchema;
 import 
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema;
 import 
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonSerializationSchema;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
@@ -227,6 +228,8 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer {
                 return new DebeziumJsonSerializationSchema(rowType);
             case COMPATIBLE_DEBEZIUM_JSON:
                 return new CompatibleDebeziumJsonSerializationSchema(rowType, 
isKey);
+            case AVRO:
+                return new AvroSerializationSchema(rowType);
             default:
                 throw new SeaTunnelJsonFormatException(
                         CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
index f1a7bd85b8..3fbf6bb99b 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
@@ -46,7 +46,8 @@ public class KafkaSinkFactory implements TableSinkFactory {
                                 MessageFormat.JSON,
                                 MessageFormat.CANAL_JSON,
                                 MessageFormat.TEXT,
-                                MessageFormat.OGG_JSON),
+                                MessageFormat.OGG_JSON,
+                                MessageFormat.AVRO),
                         Config.TOPIC)
                 .optional(
                         Config.KAFKA_CONFIG,
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index 45ae694d54..8c449c6843 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -33,6 +33,7 @@ import 
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
+import org.apache.seatunnel.format.avro.AvroDeserializationSchema;
 import 
org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;
 import 
org.apache.seatunnel.format.compatible.kafka.connect.json.KafkaConnectJsonFormatOptions;
 import org.apache.seatunnel.format.json.JsonDeserializationSchema;
@@ -235,6 +236,8 @@ public class KafkaSourceConfig implements Serializable {
             case DEBEZIUM_JSON:
                 boolean includeSchema = 
readonlyConfig.get(DEBEZIUM_RECORD_INCLUDE_SCHEMA);
                 return new DebeziumJsonDeserializationSchema(seaTunnelRowType, 
true, includeSchema);
+            case AVRO:
+                return new AvroDeserializationSchema(seaTunnelRowType);
             default:
                 throw new SeaTunnelJsonFormatException(
                         CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
diff --git a/seatunnel-dist/release-docs/LICENSE 
b/seatunnel-dist/release-docs/LICENSE
index 310d061833..5bbf86c50c 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -219,8 +219,8 @@ The text of each license is the standard Apache 2.0 license.
 
      (Apache License 2.0) aircompressor (io.airlift:aircompressor:0.10 - 
http://github.com/airlift/aircompressor)
      (Apache License, Version 2.0) Apache Yetus - Audience Annotations 
(org.apache.yetus:audience-annotations:0.11.0 - 
https://yetus.apache.org/audience-annotations)
-     (The Apache Software License, Version 2.0) Apache Avro 
(org.apache.avro:avro:1.8.2 - http://avro.apache.org)
-     (Apache License, Version 2.0) Apache Commons Codec 
(commons-codec:commons-codec:1.13 - 
https://commons.apache.org/proper/commons-codec/)    
+     (The Apache Software License, Version 2.0) Apache Avro 
(org.apache.avro:avro:1.11.1 - http://avro.apache.org)
+     (Apache License, Version 2.0) Apache Commons Codec 
(commons-codec:commons-codec:1.13 - 
https://commons.apache.org/proper/commons-codec/)
      (Apache License, Version 2.0) Apache Commons Collections 
(org.apache.commons:commons-collections4:4.4 - 
https://commons.apache.org/proper/commons-collections/)
      (Apache License, Version 2.0) Apache Commons Compress 
(org.apache.commons:commons-compress:1.20 - 
https://commons.apache.org/proper/commons-compress/)
      (The Apache Software License, Version 2.0) Commons Lang 
(commons-lang:commons-lang:2.6 - http://commons.apache.org/lang/)
diff --git a/seatunnel-dist/release-docs/NOTICE 
b/seatunnel-dist/release-docs/NOTICE
index 4bed2d4be1..1c6a43afe3 100644
--- a/seatunnel-dist/release-docs/NOTICE
+++ b/seatunnel-dist/release-docs/NOTICE
@@ -49,16 +49,16 @@ Apache Avro NOTICE
 
 ========================================================================
 Apache Avro
-Copyright 2010-2015 The Apache Software Foundation
+Copyright 2010-2019 The Apache Software Foundation
 
 This product includes software developed at
-The Apache Software Foundation (http://www.apache.org/).
+The Apache Software Foundation (https://www.apache.org/).
 
 NUnit license acknowledgement:
 
 | Portions Copyright © 2002-2012 Charlie Poole or Copyright © 2002-2004 James
 | W. Newkirk, Michael C. Two, Alexei A. Vorontsov or Copyright © 2000-2002
-| Philip A. Craig 
+| Philip A. Craig
 
 Based upon the representations of upstream licensors, it is understood that
 portions of the mapreduce API included in the Java implementation are licensed
@@ -74,7 +74,7 @@ is:
 | (the "License"); you may not use this file except in compliance
 | with the License.  You may obtain a copy of the License at
 |
-|     http://www.apache.org/licenses/LICENSE-2.0
+|     https://www.apache.org/licenses/LICENSE-2.0
 |
 | Unless required by applicable law or agreed to in writing, software
 | distributed under the License is distributed on an "AS IS" BASIS,
@@ -85,7 +85,7 @@ is:
 The Odiago NOTICE at the time of the contribution:
 
 | This product includes software developed by Odiago, Inc.
-| (http://www.wibidata.com).
+| (https://www.wibidata.com).
 
 Apache Ivy includes the following in its NOTICE file:
 
@@ -93,18 +93,18 @@ Apache Ivy includes the following in its NOTICE file:
 | Copyright 2007-2010 The Apache Software Foundation
 |
 | This product includes software developed by
-| The Apache Software Foundation (http://www.apache.org/).
+| The Apache Software Foundation (https://www.apache.org/).
 |
 | Portions of Ivy were originally developed by
 | Jayasoft SARL (http://www.jayasoft.fr/)
 | and are licensed to the Apache Software Foundation under the
 | "Software Grant License Agreement"
 |
-| SSH and SFTP support is provided by the JCraft JSch package, 
+| SSH and SFTP support is provided by the JCraft JSch package,
 | which is open source software, available under
-| the terms of a BSD style license.  
+| the terms of a BSD style license.
 | The original software and related information is available
-| at http://www.jcraft.com/jsch/. 
+| at http://www.jcraft.com/jsch/.
 
 Apache Log4Net includes the following in its NOTICE file:
 
@@ -112,7 +112,16 @@ Apache Log4Net includes the following in its NOTICE file:
 | Copyright 2004-2015 The Apache Software Foundation
 |
 | This product includes software developed at
-| The Apache Software Foundation (http://www.apache.org/).
+| The Apache Software Foundation (https://www.apache.org/).
+
+csharp reflect serializers were contributed by Pitney Bowes Inc.
+
+| Copyright 2019 Pitney Bowes Inc.
+| Licensed under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the License.
+| You may obtain a copy of the License at 
https://www.apache.org/licenses/LICENSE-2.0.
+| Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+| See the License for the specific language governing permissions and 
limitations under the License.
 
 ========================================================================
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index b75c55153e..0bf222bf87 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -34,6 +34,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunne
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestContainerId;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.format.text.TextSerializationSchema;
 
@@ -292,6 +293,30 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         testKafkaGroupOffsetsToConsole(container);
     }
 
+    @TestTemplate
+    @DisabledOnContainer(TestContainerId.SPARK_2_4)
+    public void testFakeSourceToKafkaAvroFormat(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/avro/fake_source_to_kafka_avro_format.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
+    @TestTemplate
+    @DisabledOnContainer(TestContainerId.SPARK_2_4)
+    public void testKafkaAvroToConsole(TestContainer container)
+            throws IOException, InterruptedException {
+        DefaultSeaTunnelRowSerializer serializer =
+                DefaultSeaTunnelRowSerializer.create(
+                        "test_avro_topic",
+                        SEATUNNEL_ROW_TYPE,
+                        MessageFormat.AVRO,
+                        DEFAULT_FIELD_DELIMITER);
+        generateTestData(row -> serializer.serializeRow(row), 0, 100);
+        Container.ExecResult execResult = 
container.executeJob("/avro/kafka_avro_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
     public void testKafkaLatestToConsole(TestContainer container)
             throws IOException, InterruptedException {
         Container.ExecResult execResult =
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf
new file mode 100644
index 0000000000..c6f5d6944f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_avro_topic"
+    format = avro
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf
new file mode 100644
index 0000000000..3fc1a57c3d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_avro_topic"
+    result_table_name = "kafka_table"
+    kafka.auto.offset.reset = "earliest"
+    format = avro
+    format_error_handle_way = skip
+    schema = {
+      fields {
+        id = bigint
+        c_map = "map<string, smallint>"
+        c_array = "array<tinyint>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(2, 1)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+sink {
+  Console {
+    source_table_name = "kafka_table"
+  }
+  Assert {
+    source_table_name = "kafka_table"
+    rules =
+      {
+        field_rules = [
+          {
+            field_name = id
+            field_type = long
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              },
+              {
+                rule_type = MIN
+                rule_value = 0
+              },
+              {
+                rule_type = MAX
+                rule_value = 99
+              }
+            ]
+          }
+        ]
+      }
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-formats/pom.xml b/seatunnel-formats/pom.xml
index 7fc09b356a..a330e9b0e0 100644
--- a/seatunnel-formats/pom.xml
+++ b/seatunnel-formats/pom.xml
@@ -31,6 +31,7 @@
         <module>seatunnel-format-text</module>
         <module>seatunnel-format-compatible-debezium-json</module>
         <module>seatunnel-format-compatible-connect-json</module>
+        <module>seatunnel-format-avro</module>
     </modules>
 
 </project>
diff --git a/seatunnel-formats/pom.xml 
b/seatunnel-formats/seatunnel-format-avro/pom.xml
similarity index 64%
copy from seatunnel-formats/pom.xml
copy to seatunnel-formats/seatunnel-format-avro/pom.xml
index 7fc09b356a..513b0fa0f4 100644
--- a/seatunnel-formats/pom.xml
+++ b/seatunnel-formats/seatunnel-format-avro/pom.xml
@@ -18,19 +18,29 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.seatunnel</groupId>
-        <artifactId>seatunnel</artifactId>
+        <artifactId>seatunnel-formats</artifactId>
         <version>${revision}</version>
     </parent>
 
-    <artifactId>seatunnel-formats</artifactId>
-    <packaging>pom</packaging>
-    <name>SeaTunnel : Formats :</name>
+    <artifactId>seatunnel-format-avro</artifactId>
+    <name>SeaTunnel : Formats : Avro</name>
 
-    <modules>
-        <module>seatunnel-format-json</module>
-        <module>seatunnel-format-text</module>
-        <module>seatunnel-format-compatible-debezium-json</module>
-        <module>seatunnel-format-compatible-connect-json</module>
-    </modules>
+    <properties>
+        <avro.version>1.11.1</avro.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>${avro.version}</version>
+        </dependency>
+    </dependencies>
 
 </project>
diff --git 
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroDeserializationSchema.java
new file mode 100644
index 0000000000..b682a8e643
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroDeserializationSchema.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.avro;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+
+import java.io.IOException;
+
+public class AvroDeserializationSchema implements 
DeserializationSchema<SeaTunnelRow> {
+
+    private static final long serialVersionUID = -7907358485475741366L;
+
+    private final SeaTunnelRowType rowType;
+    private final AvroToRowConverter converter;
+
+    public AvroDeserializationSchema(SeaTunnelRowType rowType) {
+        this.rowType = rowType;
+        this.converter = new AvroToRowConverter(rowType);
+    }
+
+    @Override
+    public SeaTunnelRow deserialize(byte[] message) throws IOException {
+        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(message, 
null);
+        GenericRecord record = this.converter.getReader().read(null, decoder);
+        return converter.converter(record, rowType);
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        return this.rowType;
+    }
+}
diff --git 
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java
 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java
new file mode 100644
index 0000000000..3d9a828bf7
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.avro;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.format.avro.exception.AvroFormatErrorCode;
+import org.apache.seatunnel.format.avro.exception.SeaTunnelAvroFormatException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class AvroSerializationSchema implements SerializationSchema {
+
+    private static final long serialVersionUID = 4438784443025715370L;
+
+    private final ByteArrayOutputStream out;
+    private final BinaryEncoder encoder;
+    private final RowToAvroConverter converter;
+    private final DatumWriter<GenericRecord> writer;
+
+    public AvroSerializationSchema(SeaTunnelRowType rowType) {
+        this.out = new ByteArrayOutputStream();
+        this.encoder = EncoderFactory.get().binaryEncoder(out, null);
+        this.converter = new RowToAvroConverter(rowType);
+        this.writer = this.converter.getWriter();
+    }
+
+    @Override
+    public byte[] serialize(SeaTunnelRow element) {
+        GenericRecord record = converter.convertRowToGenericRecord(element);
+        try {
+            out.reset();
+            writer.write(record, encoder);
+            encoder.flush();
+            return out.toByteArray();
+        } catch (IOException e) {
+            throw new SeaTunnelAvroFormatException(
+                    AvroFormatErrorCode.SERIALIZATION_ERROR,
+                    "Serialization error on record : " + element);
+        }
+    }
+}
diff --git 
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
new file mode 100644
index 0000000000..989087ee57
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.avro;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.format.avro.exception.AvroFormatErrorCode;
+import org.apache.seatunnel.format.avro.exception.SeaTunnelAvroFormatException;
+
+import org.apache.avro.Conversions;
+import org.apache.avro.Schema;
+import org.apache.avro.data.TimeConversions;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class AvroToRowConverter implements Serializable {
+
+    private static final long serialVersionUID = 8177020083886379563L;
+
+    private DatumReader<GenericRecord> reader = null;
+    private Schema schema;
+
+    public AvroToRowConverter(SeaTunnelRowType rowType) {
+        schema = 
SeaTunnelRowTypeToAvroSchemaConverter.buildAvroSchemaWithRowType(rowType);
+    }
+
+    public DatumReader<GenericRecord> getReader() {
+        if (reader == null) {
+            reader = createReader();
+        }
+        return reader;
+    }
+
+    private DatumReader<GenericRecord> createReader() {
+        GenericDatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>(schema, schema);
+        datumReader.getData().addLogicalTypeConversion(new 
Conversions.DecimalConversion());
+        datumReader.getData().addLogicalTypeConversion(new 
TimeConversions.DateConversion());
+        datumReader
+                .getData()
+                .addLogicalTypeConversion(new 
TimeConversions.LocalTimestampMillisConversion());
+        return datumReader;
+    }
+
+    public SeaTunnelRow converter(GenericRecord record, SeaTunnelRowType 
rowType) {
+        String[] fieldNames = rowType.getFieldNames();
+
+        Object[] values = new Object[fieldNames.length];
+        for (int i = 0; i < fieldNames.length; i++) {
+            if (record.getSchema().getField(fieldNames[i]) == null) {
+                values[i] = null;
+                continue;
+            }
+            values[i] =
+                    convertField(
+                            rowType.getFieldType(i),
+                            record.getSchema().getField(fieldNames[i]),
+                            record.get(fieldNames[i]));
+        }
+        return new SeaTunnelRow(values);
+    }
+
+    private Object convertField(SeaTunnelDataType<?> dataType, Schema.Field 
field, Object val) {
+        switch (dataType.getSqlType()) {
+            case MAP:
+            case STRING:
+            case BOOLEAN:
+            case SMALLINT:
+            case INT:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+            case NULL:
+            case BYTES:
+            case DATE:
+            case DECIMAL:
+            case TIMESTAMP:
+                return val;
+            case TINYINT:
+                Class<?> typeClass = dataType.getTypeClass();
+                if (typeClass == Byte.class) {
+                    Integer integer = (Integer) val;
+                    return integer.byteValue();
+                }
+                return val;
+            case ARRAY:
+                BasicType<?> basicType = ((ArrayType<?, ?>) 
dataType).getElementType();
+                List<Object> list = (List<Object>) val;
+                return convertArray(list, basicType);
+            case ROW:
+                SeaTunnelRowType subRow = (SeaTunnelRowType) dataType;
+                return converter((GenericRecord) val, subRow);
+            default:
+                String errorMsg =
+                        String.format(
+                                "SeaTunnel avro format is not supported for 
this data type [%s]",
+                                dataType.getSqlType());
+                throw new SeaTunnelAvroFormatException(
+                        AvroFormatErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
+        }
+    }
+
+    protected static Object convertArray(List<Object> val, 
SeaTunnelDataType<?> dataType) {
+        if (val == null) {
+            return null;
+        }
+        int length = val.size();
+        switch (dataType.getSqlType()) {
+            case STRING:
+                String[] strings = new String[length];
+                for (int i = 0; i < strings.length; i++) {
+                    strings[i] = val.get(i).toString();
+                }
+                return strings;
+            case BOOLEAN:
+                Boolean[] booleans = new Boolean[length];
+                for (int i = 0; i < booleans.length; i++) {
+                    booleans[i] = (Boolean) val.get(i);
+                }
+                return booleans;
+            case BYTES:
+                Byte[] bytes = new Byte[length];
+                for (int i = 0; i < bytes.length; i++) {
+                    bytes[i] = (Byte) val.get(i);
+                }
+                return bytes;
+            case SMALLINT:
+                Short[] shorts = new Short[length];
+                for (int i = 0; i < shorts.length; i++) {
+                    shorts[i] = (Short) val.get(i);
+                }
+                return shorts;
+            case INT:
+                Integer[] integers = new Integer[length];
+                for (int i = 0; i < integers.length; i++) {
+                    integers[i] = (Integer) val.get(i);
+                }
+                return integers;
+            case BIGINT:
+                Long[] longs = new Long[length];
+                for (int i = 0; i < longs.length; i++) {
+                    longs[i] = (Long) val.get(i);
+                }
+                return longs;
+            case FLOAT:
+                Float[] floats = new Float[length];
+                for (int i = 0; i < floats.length; i++) {
+                    floats[i] = (Float) val.get(i);
+                }
+                return floats;
+            case DOUBLE:
+                Double[] doubles = new Double[length];
+                for (int i = 0; i < doubles.length; i++) {
+                    doubles[i] = (Double) val.get(i);
+                }
+                return doubles;
+            default:
+                String errorMsg =
+                        String.format(
+                                "SeaTunnel avro array format is not supported 
for this data type [%s]",
+                                dataType.getSqlType());
+                throw new SeaTunnelAvroFormatException(
+                        AvroFormatErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
+        }
+    }
+}
diff --git 
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/RowToAvroConverter.java
 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/RowToAvroConverter.java
new file mode 100644
index 0000000000..f8f0652a26
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/RowToAvroConverter.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.avro;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.format.avro.exception.AvroFormatErrorCode;
+import org.apache.seatunnel.format.avro.exception.SeaTunnelAvroFormatException;
+
+import org.apache.avro.Conversions;
+import org.apache.avro.Schema;
+import org.apache.avro.data.TimeConversions;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.io.DatumWriter;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RowToAvroConverter implements Serializable {
+
+    private static final long serialVersionUID = -576124379280229724L;
+
+    private final Schema schema;
+    private final SeaTunnelRowType rowType;
+    private final DatumWriter<GenericRecord> writer;
+
+    public RowToAvroConverter(SeaTunnelRowType rowType) {
+        this.schema = 
SeaTunnelRowTypeToAvroSchemaConverter.buildAvroSchemaWithRowType(rowType);
+        this.rowType = rowType;
+        this.writer = createWriter();
+    }
+
+    private DatumWriter<GenericRecord> createWriter() {
+        GenericDatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
+        datumWriter.getData().addLogicalTypeConversion(new 
Conversions.DecimalConversion());
+        datumWriter.getData().addLogicalTypeConversion(new 
TimeConversions.DateConversion());
+        datumWriter
+                .getData()
+                .addLogicalTypeConversion(new 
TimeConversions.LocalTimestampMillisConversion());
+        return datumWriter;
+    }
+
+    public Schema getSchema() {
+        return schema;
+    }
+
+    public DatumWriter<GenericRecord> getWriter() {
+        return writer;
+    }
+
+    public GenericRecord convertRowToGenericRecord(SeaTunnelRow element) {
+        GenericRecordBuilder builder = new GenericRecordBuilder(schema);
+        String[] fieldNames = rowType.getFieldNames();
+        for (int i = 0; i < fieldNames.length; i++) {
+            String fieldName = rowType.getFieldName(i);
+            Object value = element.getField(i);
+            builder.set(fieldName.toLowerCase(), resolveObject(value, 
rowType.getFieldType(i)));
+        }
+        return builder.build();
+    }
+
+    private Object resolveObject(Object data, SeaTunnelDataType<?> 
seaTunnelDataType) {
+        if (data == null) {
+            return null;
+        }
+        switch (seaTunnelDataType.getSqlType()) {
+            case STRING:
+            case SMALLINT:
+            case INT:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+            case BOOLEAN:
+            case MAP:
+            case DECIMAL:
+            case DATE:
+            case TIMESTAMP:
+                return data;
+            case TINYINT:
+                Class<?> typeClass = seaTunnelDataType.getTypeClass();
+                if (typeClass == Byte.class) {
+                    if (data instanceof Byte) {
+                        Byte aByte = (Byte) data;
+                        return Byte.toUnsignedInt(aByte);
+                    }
+                }
+                return data;
+            case BYTES:
+                return ByteBuffer.wrap((byte[]) data);
+            case ARRAY:
+                //                BasicType<?> basicType = ((ArrayType<?, ?>)
+                // seaTunnelDataType).getElementType();
+                //                return Util.convertArray((Object[]) data, 
basicType);
+                BasicType<?> basicType = ((ArrayType<?, ?>) 
seaTunnelDataType).getElementType();
+                List<Object> records = new ArrayList<>(((Object[]) 
data).length);
+                for (Object object : (Object[]) data) {
+                    Object resolvedObject = resolveObject(object, basicType);
+                    records.add(resolvedObject);
+                }
+                return records;
+            case ROW:
+                SeaTunnelRow seaTunnelRow = (SeaTunnelRow) data;
+                SeaTunnelDataType<?>[] fieldTypes =
+                        ((SeaTunnelRowType) seaTunnelDataType).getFieldTypes();
+                String[] fieldNames = ((SeaTunnelRowType) 
seaTunnelDataType).getFieldNames();
+                Schema recordSchema =
+                        
SeaTunnelRowTypeToAvroSchemaConverter.buildAvroSchemaWithRowType(
+                                (SeaTunnelRowType) seaTunnelDataType);
+                GenericRecordBuilder recordBuilder = new 
GenericRecordBuilder(recordSchema);
+                for (int i = 0; i < fieldNames.length; i++) {
+                    recordBuilder.set(
+                            fieldNames[i].toLowerCase(),
+                            resolveObject(seaTunnelRow.getField(i), 
fieldTypes[i]));
+                }
+                return recordBuilder.build();
+            default:
+                String errorMsg =
+                        String.format(
+                                "SeaTunnel avro format is not supported for 
this data type [%s]",
+                                seaTunnelDataType.getSqlType());
+                throw new SeaTunnelAvroFormatException(
+                        AvroFormatErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
+        }
+    }
+}
diff --git 
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/SeaTunnelRowTypeToAvroSchemaConverter.java
 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/SeaTunnelRowTypeToAvroSchemaConverter.java
new file mode 100644
index 0000000000..195ff8004c
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/SeaTunnelRowTypeToAvroSchemaConverter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.avro;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.format.avro.exception.AvroFormatErrorCode;
+import org.apache.seatunnel.format.avro.exception.SeaTunnelAvroFormatException;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SeaTunnelRowTypeToAvroSchemaConverter {
+
+    public static Schema buildAvroSchemaWithRowType(SeaTunnelRowType 
seaTunnelRowType) {
+        List<Schema.Field> fields = new ArrayList<>();
+        SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+        String[] fieldNames = seaTunnelRowType.getFieldNames();
+        for (int i = 0; i < fieldNames.length; i++) {
+            fields.add(generateField(fieldNames[i], fieldTypes[i]));
+        }
+        return Schema.createRecord("SeaTunnelRecord", null, null, false, 
fields);
+    }
+
+    private static Schema.Field generateField(
+            String fieldName, SeaTunnelDataType<?> seaTunnelDataType) {
+        return new Schema.Field(
+                fieldName,
+                seaTunnelDataType2AvroDataType(fieldName, seaTunnelDataType),
+                null,
+                null);
+    }
+
+    private static Schema seaTunnelDataType2AvroDataType(
+            String fieldName, SeaTunnelDataType<?> seaTunnelDataType) {
+
+        switch (seaTunnelDataType.getSqlType()) {
+            case STRING:
+                return Schema.create(Schema.Type.STRING);
+            case BYTES:
+                return Schema.create(Schema.Type.BYTES);
+            case TINYINT:
+            case SMALLINT:
+            case INT:
+                return Schema.create(Schema.Type.INT);
+            case BIGINT:
+                return Schema.create(Schema.Type.LONG);
+            case FLOAT:
+                return Schema.create(Schema.Type.FLOAT);
+            case DOUBLE:
+                return Schema.create(Schema.Type.DOUBLE);
+            case BOOLEAN:
+                return Schema.create(Schema.Type.BOOLEAN);
+            case MAP:
+                SeaTunnelDataType<?> valueType = ((MapType<?, ?>) 
seaTunnelDataType).getValueType();
+                return 
Schema.createMap(seaTunnelDataType2AvroDataType(fieldName, valueType));
+            case ARRAY:
+                BasicType<?> elementType = ((ArrayType<?, ?>) 
seaTunnelDataType).getElementType();
+                return 
Schema.createArray(seaTunnelDataType2AvroDataType(fieldName, elementType));
+            case ROW:
+                SeaTunnelDataType<?>[] fieldTypes =
+                        ((SeaTunnelRowType) seaTunnelDataType).getFieldTypes();
+                String[] fieldNames = ((SeaTunnelRowType) 
seaTunnelDataType).getFieldNames();
+                List<Schema.Field> subField = new ArrayList<>();
+                for (int i = 0; i < fieldNames.length; i++) {
+                    subField.add(generateField(fieldNames[i], fieldTypes[i]));
+                }
+                return Schema.createRecord(fieldName, null, null, false, 
subField);
+            case DECIMAL:
+                int precision = ((DecimalType) 
seaTunnelDataType).getPrecision();
+                int scale = ((DecimalType) seaTunnelDataType).getScale();
+                LogicalTypes.Decimal decimal = LogicalTypes.decimal(precision, 
scale);
+                return decimal.addToSchema(Schema.create(Schema.Type.BYTES));
+            case TIMESTAMP:
+                return LogicalTypes.localTimestampMillis()
+                        .addToSchema(Schema.create(Schema.Type.LONG));
+            case DATE:
+                return 
LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
+            case NULL:
+                return Schema.create(Schema.Type.NULL);
+            default:
+                String errorMsg =
+                        String.format(
+                                "SeaTunnel avro format is not supported for 
this data type [%s]",
+                                seaTunnelDataType.getSqlType());
+                throw new SeaTunnelAvroFormatException(
+                        AvroFormatErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/exception/AvroFormatErrorCode.java
similarity index 52%
copy from 
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
copy to 
seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/exception/AvroFormatErrorCode.java
index 491990fd33..6f60a47932 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
+++ 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/exception/AvroFormatErrorCode.java
@@ -15,14 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.kafka.config;
+package org.apache.seatunnel.format.avro.exception;
 
-public enum MessageFormat {
-    JSON,
-    TEXT,
-    CANAL_JSON,
-    DEBEZIUM_JSON,
-    COMPATIBLE_DEBEZIUM_JSON,
-    COMPATIBLE_KAFKA_CONNECT_JSON,
-    OGG_JSON
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum AvroFormatErrorCode implements SeaTunnelErrorCode {
+    UNSUPPORTED_DATA_TYPE("AVRO-01", "Unsupported data type."),
+    SERIALIZATION_ERROR("AVRO-02", "serialize error."),
+    FILED_NOT_EXIST("AVRO-03", "Field not exist.");
+
+    private final String code;
+    private final String description;
+
+    AvroFormatErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return code;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/exception/SeaTunnelAvroFormatException.java
similarity index 64%
copy from 
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
copy to 
seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/exception/SeaTunnelAvroFormatException.java
index 491990fd33..93c45323b4 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
+++ 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/exception/SeaTunnelAvroFormatException.java
@@ -15,14 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.kafka.config;
+package org.apache.seatunnel.format.avro.exception;
 
-public enum MessageFormat {
-    JSON,
-    TEXT,
-    CANAL_JSON,
-    DEBEZIUM_JSON,
-    COMPATIBLE_DEBEZIUM_JSON,
-    COMPATIBLE_KAFKA_CONNECT_JSON,
-    OGG_JSON
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class SeaTunnelAvroFormatException extends SeaTunnelRuntimeException {
+
+    public SeaTunnelAvroFormatException(
+            SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+    }
 }
diff --git 
a/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroConverterTest.java
 
b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroConverterTest.java
new file mode 100644
index 0000000000..fb45a0b537
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroConverterTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.avro;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.apache.avro.generic.GenericRecord;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.Map;
+
+class AvroConverterTest {
+
+    private SeaTunnelRow buildSeaTunnelRow() {
+        SeaTunnelRow subSeaTunnelRow = new SeaTunnelRow(14);
+        Map<String, String> map = new HashMap<String, String>();
+        map.put("k1", "v1");
+        map.put("k2", "v2");
+        String[] strArray = new String[] {"l1", "l2"};
+        byte byteVal = 100;
+        LocalDate localDate = LocalDate.of(2023, 1, 1);
+
+        BigDecimal bigDecimal = new 
BigDecimal("61592600349703735722.724745739637773662");
+        LocalDateTime localDateTime = LocalDateTime.of(2023, 1, 1, 6, 30, 40);
+
+        subSeaTunnelRow.setField(0, map);
+        subSeaTunnelRow.setField(1, strArray);
+        subSeaTunnelRow.setField(2, "strVal");
+        subSeaTunnelRow.setField(3, true);
+        subSeaTunnelRow.setField(4, 1);
+        subSeaTunnelRow.setField(5, 2);
+        subSeaTunnelRow.setField(6, 3);
+        subSeaTunnelRow.setField(7, Long.MAX_VALUE - 1);
+        subSeaTunnelRow.setField(8, 33.333F);
+        subSeaTunnelRow.setField(9, 123.456);
+        subSeaTunnelRow.setField(10, byteVal);
+        subSeaTunnelRow.setField(11, localDate);
+        subSeaTunnelRow.setField(12, bigDecimal);
+        subSeaTunnelRow.setField(13, localDateTime);
+
+        SeaTunnelRow seaTunnelRow = new SeaTunnelRow(15);
+        seaTunnelRow.setField(0, map);
+        seaTunnelRow.setField(1, strArray);
+        seaTunnelRow.setField(2, "strVal");
+        seaTunnelRow.setField(3, true);
+        seaTunnelRow.setField(4, 1);
+        seaTunnelRow.setField(5, 2);
+        seaTunnelRow.setField(6, 3);
+        seaTunnelRow.setField(7, Long.MAX_VALUE - 1);
+        seaTunnelRow.setField(8, 33.333F);
+        seaTunnelRow.setField(9, 123.456);
+        seaTunnelRow.setField(10, byteVal);
+        seaTunnelRow.setField(11, localDate);
+        seaTunnelRow.setField(12, bigDecimal);
+        seaTunnelRow.setField(13, localDateTime);
+        seaTunnelRow.setField(14, subSeaTunnelRow);
+        return seaTunnelRow;
+    }
+
+    private SeaTunnelRowType buildSeaTunnelRowType() {
+        String[] subField = {
+            "c_map",
+            "c_array",
+            "c_string",
+            "c_boolean",
+            "c_tinyint",
+            "c_smallint",
+            "c_int",
+            "c_bigint",
+            "c_float",
+            "c_double",
+            "c_bytes",
+            "c_date",
+            "c_decimal",
+            "c_timestamp"
+        };
+        SeaTunnelDataType<?>[] subFieldTypes = {
+            new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
+            ArrayType.STRING_ARRAY_TYPE,
+            BasicType.STRING_TYPE,
+            BasicType.BOOLEAN_TYPE,
+            BasicType.INT_TYPE,
+            BasicType.INT_TYPE,
+            BasicType.INT_TYPE,
+            BasicType.LONG_TYPE,
+            BasicType.FLOAT_TYPE,
+            BasicType.DOUBLE_TYPE,
+            BasicType.BYTE_TYPE,
+            LocalTimeType.LOCAL_DATE_TYPE,
+            new DecimalType(38, 18),
+            LocalTimeType.LOCAL_DATE_TIME_TYPE
+        };
+        SeaTunnelRowType subRow = new SeaTunnelRowType(subField, 
subFieldTypes);
+
+        String[] fieldNames = {
+            "c_map",
+            "c_array",
+            "c_string",
+            "c_boolean",
+            "c_tinyint",
+            "c_smallint",
+            "c_int",
+            "c_bigint",
+            "c_float",
+            "c_double",
+            "c_bytes",
+            "c_date",
+            "c_decimal",
+            "c_timestamp",
+            "c_row"
+        };
+        SeaTunnelDataType<?>[] fieldTypes = {
+            new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
+            ArrayType.STRING_ARRAY_TYPE,
+            BasicType.STRING_TYPE,
+            BasicType.BOOLEAN_TYPE,
+            BasicType.INT_TYPE,
+            BasicType.INT_TYPE,
+            BasicType.INT_TYPE,
+            BasicType.LONG_TYPE,
+            BasicType.FLOAT_TYPE,
+            BasicType.DOUBLE_TYPE,
+            BasicType.BYTE_TYPE,
+            LocalTimeType.LOCAL_DATE_TYPE,
+            new DecimalType(38, 18),
+            LocalTimeType.LOCAL_DATE_TIME_TYPE,
+            subRow
+        };
+        SeaTunnelRowType rowType = new SeaTunnelRowType(fieldNames, 
fieldTypes);
+        return rowType;
+    }
+
+    @Test
+    public void testConverter() {
+
+        SeaTunnelRowType rowType = buildSeaTunnelRowType();
+        SeaTunnelRow seaTunnelRow = buildSeaTunnelRow();
+        RowToAvroConverter rowToAvroConverter = new 
RowToAvroConverter(rowType);
+        GenericRecord record = 
rowToAvroConverter.convertRowToGenericRecord(seaTunnelRow);
+
+        AvroToRowConverter avroToRowConverter = new 
AvroToRowConverter(rowType);
+        SeaTunnelRow converterRow = avroToRowConverter.converter(record, 
rowType);
+
+        Assertions.assertEquals(converterRow, seaTunnelRow);
+    }
+}
diff --git 
a/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
 
b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
new file mode 100644
index 0000000000..5f505e1ba6
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.avro;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.Map;
+
+class AvroSerializationSchemaTest {
+
+    private LocalDate localDate = LocalDate.of(2023, 1, 1);
+    private BigDecimal bigDecimal = new 
BigDecimal("61592600349703735722.724745739637773662");
+    private LocalDateTime localDateTime = LocalDateTime.of(2023, 1, 1, 6, 30, 
40);
+
+    private SeaTunnelRow buildSeaTunnelRow() {
+        SeaTunnelRow subSeaTunnelRow = new SeaTunnelRow(14);
+        Map<String, String> map = new HashMap<String, String>();
+        map.put("k1", "v1");
+        map.put("k2", "v2");
+        String[] strArray = new String[] {"l1", "l2"};
+        byte byteVal = 100;
+        subSeaTunnelRow.setField(0, map);
+        subSeaTunnelRow.setField(1, strArray);
+        subSeaTunnelRow.setField(2, "strVal");
+        subSeaTunnelRow.setField(3, true);
+        subSeaTunnelRow.setField(4, 1);
+        subSeaTunnelRow.setField(5, 2);
+        subSeaTunnelRow.setField(6, 3);
+        subSeaTunnelRow.setField(7, Long.MAX_VALUE - 1);
+        subSeaTunnelRow.setField(8, 33.333F);
+        subSeaTunnelRow.setField(9, 123.456);
+        subSeaTunnelRow.setField(10, byteVal);
+        subSeaTunnelRow.setField(11, localDate);
+        subSeaTunnelRow.setField(12, bigDecimal);
+        subSeaTunnelRow.setField(13, localDateTime);
+
+        SeaTunnelRow seaTunnelRow = new SeaTunnelRow(15);
+        seaTunnelRow.setField(0, map);
+        seaTunnelRow.setField(1, strArray);
+        seaTunnelRow.setField(2, "strVal");
+        seaTunnelRow.setField(3, true);
+        seaTunnelRow.setField(4, 1);
+        seaTunnelRow.setField(5, 2);
+        seaTunnelRow.setField(6, 3);
+        seaTunnelRow.setField(7, Long.MAX_VALUE - 1);
+        seaTunnelRow.setField(8, 33.333F);
+        seaTunnelRow.setField(9, 123.456);
+        seaTunnelRow.setField(10, byteVal);
+        seaTunnelRow.setField(11, localDate);
+        seaTunnelRow.setField(12, bigDecimal);
+        seaTunnelRow.setField(13, localDateTime);
+        seaTunnelRow.setField(14, subSeaTunnelRow);
+        return seaTunnelRow;
+    }
+
+    private SeaTunnelRowType buildSeaTunnelRowType() {
+        String[] subField = {
+            "c_map",
+            "c_array",
+            "c_string",
+            "c_boolean",
+            "c_tinyint",
+            "c_smallint",
+            "c_int",
+            "c_bigint",
+            "c_float",
+            "c_double",
+            "c_bytes",
+            "c_date",
+            "c_decimal",
+            "c_timestamp"
+        };
+        SeaTunnelDataType<?>[] subFieldTypes = {
+            new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
+            ArrayType.STRING_ARRAY_TYPE,
+            BasicType.STRING_TYPE,
+            BasicType.BOOLEAN_TYPE,
+            BasicType.INT_TYPE,
+            BasicType.INT_TYPE,
+            BasicType.INT_TYPE,
+            BasicType.LONG_TYPE,
+            BasicType.FLOAT_TYPE,
+            BasicType.DOUBLE_TYPE,
+            BasicType.BYTE_TYPE,
+            LocalTimeType.LOCAL_DATE_TYPE,
+            new DecimalType(38, 18),
+            LocalTimeType.LOCAL_DATE_TIME_TYPE
+        };
+        SeaTunnelRowType subRow = new SeaTunnelRowType(subField, 
subFieldTypes);
+
+        String[] fieldNames = {
+            "c_map",
+            "c_array",
+            "c_string",
+            "c_boolean",
+            "c_tinyint",
+            "c_smallint",
+            "c_int",
+            "c_bigint",
+            "c_float",
+            "c_double",
+            "c_bytes",
+            "c_date",
+            "c_decimal",
+            "c_timestamp",
+            "c_row"
+        };
+        SeaTunnelDataType<?>[] fieldTypes = {
+            new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
+            ArrayType.STRING_ARRAY_TYPE,
+            BasicType.STRING_TYPE,
+            BasicType.BOOLEAN_TYPE,
+            BasicType.INT_TYPE,
+            BasicType.INT_TYPE,
+            BasicType.INT_TYPE,
+            BasicType.LONG_TYPE,
+            BasicType.FLOAT_TYPE,
+            BasicType.DOUBLE_TYPE,
+            BasicType.BYTE_TYPE,
+            LocalTimeType.LOCAL_DATE_TYPE,
+            new DecimalType(38, 18),
+            LocalTimeType.LOCAL_DATE_TIME_TYPE,
+            subRow
+        };
+        SeaTunnelRowType rowType = new SeaTunnelRowType(fieldNames, 
fieldTypes);
+        return rowType;
+    }
+
+    @Test
+    public void testSerialization() throws IOException {
+        SeaTunnelRowType rowType = buildSeaTunnelRowType();
+        SeaTunnelRow seaTunnelRow = buildSeaTunnelRow();
+        AvroSerializationSchema serializationSchema = new 
AvroSerializationSchema(rowType);
+        byte[] serialize = serializationSchema.serialize(seaTunnelRow);
+        AvroDeserializationSchema deserializationSchema = new 
AvroDeserializationSchema(rowType);
+        SeaTunnelRow deserialize = 
deserializationSchema.deserialize(serialize);
+        String[] strArray1 = (String[]) seaTunnelRow.getField(1);
+        String[] strArray2 = (String[]) deserialize.getField(1);
+        Assertions.assertArrayEquals(strArray1, strArray2);
+        SeaTunnelRow subRow = (SeaTunnelRow) deserialize.getField(14);
+        Assertions.assertEquals((double) subRow.getField(9), 123.456);
+        BigDecimal bigDecimal1 = (BigDecimal) subRow.getField(12);
+        Assertions.assertEquals(bigDecimal1.compareTo(bigDecimal), 0);
+        LocalDateTime localDateTime1 = (LocalDateTime) subRow.getField(13);
+        Assertions.assertEquals(localDateTime1.compareTo(localDateTime), 0);
+    }
+}
diff --git a/tools/dependencies/known-dependencies.txt 
b/tools/dependencies/known-dependencies.txt
index 52205f52e4..837a7fcc18 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -39,4 +39,5 @@ listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
 json-path-2.7.0.jar
 json-smart-2.4.7.jar
 accessors-smart-2.4.7.jar
-asm-9.1.jar
\ No newline at end of file
+asm-9.1.jar
+avro-1.11.1.jar
\ No newline at end of file

Reply via email to