This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e0c11e6c78 [Test][Spark] Make sure the value in spark not be reused. 
(#5767)
e0c11e6c78 is described below

commit e0c11e6c7884880b1227d5ee7a64aa223e9a7fe0
Author: Jia Fan <[email protected]>
AuthorDate: Fri Nov 3 10:27:06 2023 +0800

    [Test][Spark] Make sure the value in spark not be reused. (#5767)
---
 pom.xml                                            |   8 +-
 seatunnel-dist/release-docs/LICENSE                |  12 +-
 .../spark/sink/SeaTunnelSinkWithBuffer.java        | 144 ++++++++++
 .../spark/sink/SeaTunnelSinkWithBufferWriter.java  | 252 +++++++++++++++++
 .../translation/spark/sink/SparkSinkTest.java      | 298 +++++++++++++++++++++
 tools/dependencies/known-dependencies.txt          |  12 +-
 6 files changed, 710 insertions(+), 16 deletions(-)

diff --git a/pom.xml b/pom.xml
index e744b342dd..41afca5217 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,8 +59,8 @@
         <seatunnel.config.shade.version>2.1.1</seatunnel.config.shade.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <java.version>1.8</java.version>
-        <scala.version>2.11.12</scala.version>
-        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.12.15</scala.version>
+        <scala.binary.version>2.12</scala.binary.version>
         <maven.compiler.source>${java.version}</maven.compiler.source>
         <maven.compiler.target>${java.version}</maven.compiler.target>
 
@@ -84,7 +84,7 @@
         <jersey.version>1.19</jersey.version>
         <javax.servlet.jap.version>2.1</javax.servlet.jap.version>
         <hadoop.binary.version>2.7</hadoop.binary.version>
-        <jackson.version>2.12.6</jackson.version>
+        <jackson.version>2.13.3</jackson.version>
         <lombok.version>1.18.24</lombok.version>
         <commons-compress.version>1.20</commons-compress.version>
         <skip.pmd.check>false</skip.pmd.check>
@@ -101,7 +101,7 @@
         <elasticsearch6.client.version>6.3.1</elasticsearch6.client.version>
         <elasticsearch7.client.version>7.5.1</elasticsearch7.client.version>
         
<flink-shaded-hadoop-2.version>2.7.5-7.0</flink-shaded-hadoop-2.version>
-        <commons-lang3.version>3.4</commons-lang3.version>
+        <commons-lang3.version>3.5</commons-lang3.version>
         <commons-io.version>2.11.0</commons-io.version>
         <commons-collections4.version>4.4</commons-collections4.version>
         <maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
diff --git a/seatunnel-dist/release-docs/LICENSE 
b/seatunnel-dist/release-docs/LICENSE
index 5d19a35a87..08f41da2a5 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -225,7 +225,7 @@ The text of each license is the standard Apache 2.0 license.
      (Apache License, Version 2.0) Apache Commons Compress 
(org.apache.commons:commons-compress:1.20 - 
https://commons.apache.org/proper/commons-compress/)
      (The Apache Software License, Version 2.0) Commons Lang 
(commons-lang:commons-lang:2.6 - http://commons.apache.org/lang/)
      (Apache License, Version 2.0) Apache Commons IO 
(commons-io:commons-io:2.11.0 - http://commons.apache.org/proper/commons-io/)   
 
-     (Apache License, Version 2.0) Apache Commons Lang 
(org.apache.commons:commons-lang3:3.4 - 
http://commons.apache.org/proper/commons-lang/)
+     (Apache License, Version 2.0) Apache Commons Lang 
(org.apache.commons:commons-lang3:3.5 - 
http://commons.apache.org/proper/commons-lang/)
      (The Apache Software License, Version 2.0) Commons Pool 
(commons-pool:commons-pool:1.6 - http://commons.apache.org/pool/)
      (Apache License, Version 2.0) config (com.typesafe:config:1.3.3 - 
https://github.com/lightbend/config)
      (The Apache Software License, Version 2.0) Flink : Formats : Avro 
(org.apache.flink:flink-avro:1.13.6 - 
https://flink.apache.org/flink-formats/flink-avro)
@@ -236,11 +236,11 @@ The text of each license is the standard Apache 2.0 
license.
      (Apache License, Version 2.0) Flink : Tools : Force Shading 
(org.apache.flink:force-shading:1.13.6 - https://www.apache.org/force-shading/)
      (The Apache Software License, Version 2.0) Guava: Google Core Libraries 
for Java (com.google.guava:guava:27.0-jre - 
https://github.com/google/guava/guava)
      (Apache License, Version 2.0) Hive Storage API 
(org.apache.hive:hive-storage-api:2.6.0 - 
https://www.apache.org/hive-storage-api/)
-     (The Apache Software License, Version 2.0) Jackson-annotations 
(com.fasterxml.jackson.core:jackson-annotations:2.12.6 - 
http://github.com/FasterXML/jackson)
-     (The Apache Software License, Version 2.0) Jackson-core 
(com.fasterxml.jackson.core:jackson-core:2.12.6 - 
https://github.com/FasterXML/jackson-core)
+     (The Apache Software License, Version 2.0) Jackson-annotations 
(com.fasterxml.jackson.core:jackson-annotations:2.13.3 - 
http://github.com/FasterXML/jackson)
+     (The Apache Software License, Version 2.0) Jackson-core 
(com.fasterxml.jackson.core:jackson-core:2.13.3 - 
https://github.com/FasterXML/jackson-core)
      (The Apache Software License, Version 2.0) Jackson 
(org.codehaus.jackson:jackson-core-asl:1.9.13 - http://jackson.codehaus.org)
-     (The Apache Software License, Version 2.0) jackson-databind 
(com.fasterxml.jackson.core:jackson-databind:2.12.6  - 
http://github.com/FasterXML/jackson)
-     (The Apache Software License, Version 2.0) Jackson-dataformat-properties 
(com.fasterxml.jackson.dataformat:jackson-dataformat-properties:2.12.6 - 
https://github.com/FasterXML/jackson-dataformats-text)
+     (The Apache Software License, Version 2.0) jackson-databind 
(com.fasterxml.jackson.core:jackson-databind:2.13.3  - 
http://github.com/FasterXML/jackson)
+     (The Apache Software License, Version 2.0) Jackson-dataformat-properties 
(com.fasterxml.jackson.dataformat:jackson-dataformat-properties:2.13.3 - 
https://github.com/FasterXML/jackson-dataformats-text)
      (The Apache Software License, Version 2.0) Data Mapper for Jackson 
(org.codehaus.jackson:jackson-mapper-asl:1.9.13 - http://jackson.codehaus.org)
      (Apache License, Version 2.0) jcommander (com.beust:jcommander:1.81 - 
https://jcommander.org)
      (The Apache Software License, Version 2.0) FindBugs-jsr305 
(com.google.code.findbugs:jsr305:1.3.9 - http://findbugs.sourceforge.net/)
@@ -292,7 +292,7 @@ The following components are provided under a BSD license. 
See project link for
 The text of each license is also included at licenses/LICENSE-[project].txt.  
     
      (New BSD license) Protocol Buffer Java API 
(com.google.protobuf:protobuf-java:2.5.0 - http://code.google.com/p/protobuf)
-     (BSD 3-Clause) Scala Library (org.scala-lang:scala-library:2.11.12 - 
http://www.scala-lang.org/)
+     (BSD 3-Clause) Scala Library (org.scala-lang:scala-library:2.12.15 - 
http://www.scala-lang.org/)
      (BSD 3-Clause) Scala Library (org.ow2.asm:asm:9.1 - 
https://mvnrepository.com/artifact/org.ow2.asm/asm/)
 ========================================================================
 CDDL License
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBuffer.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBuffer.java
new file mode 100644
index 0000000000..e16a29466b
--- /dev/null
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBuffer.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import java.io.IOException;
+
+public class SeaTunnelSinkWithBuffer implements SeaTunnelSink<SeaTunnelRow, 
Void, Void, Void> {
+
+    @Override
+    public String getPluginName() {
+        return "SeaTunnelSinkWithBuffer";
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+        return new SeaTunnelRowType(
+                new String[] {
+                    "int",
+                    "string",
+                    "boolean",
+                    "float",
+                    "double",
+                    "byte",
+                    "short",
+                    "long",
+                    "decimal",
+                    "date",
+                    "timestamp",
+                    "null",
+                    "array_string",
+                    "array_boolean",
+                    "array_byte",
+                    "array_short",
+                    "array_int",
+                    "array_long",
+                    "array_float",
+                    "array_double",
+                    "map",
+                    "row"
+                },
+                new SeaTunnelDataType[] {
+                    BasicType.INT_TYPE,
+                    BasicType.STRING_TYPE,
+                    BasicType.BOOLEAN_TYPE,
+                    BasicType.FLOAT_TYPE,
+                    BasicType.DOUBLE_TYPE,
+                    BasicType.BYTE_TYPE,
+                    BasicType.SHORT_TYPE,
+                    BasicType.LONG_TYPE,
+                    new DecimalType(10, 2),
+                    LocalTimeType.LOCAL_DATE_TYPE,
+                    LocalTimeType.LOCAL_DATE_TIME_TYPE,
+                    BasicType.VOID_TYPE,
+                    ArrayType.STRING_ARRAY_TYPE,
+                    ArrayType.BOOLEAN_ARRAY_TYPE,
+                    ArrayType.BYTE_ARRAY_TYPE,
+                    ArrayType.SHORT_ARRAY_TYPE,
+                    ArrayType.INT_ARRAY_TYPE,
+                    ArrayType.LONG_ARRAY_TYPE,
+                    ArrayType.FLOAT_ARRAY_TYPE,
+                    ArrayType.DOUBLE_ARRAY_TYPE,
+                    new MapType<>(BasicType.STRING_TYPE, 
BasicType.STRING_TYPE),
+                    new SeaTunnelRowType(
+                            new String[] {
+                                "int",
+                                "string",
+                                "boolean",
+                                "float",
+                                "double",
+                                "byte",
+                                "short",
+                                "long",
+                                "decimal",
+                                "date",
+                                "timestamp",
+                                "null",
+                                "array_string",
+                                "array_boolean",
+                                "array_byte",
+                                "array_short",
+                                "array_int",
+                                "array_long",
+                                "array_float",
+                                "array_double",
+                                "map"
+                            },
+                            new SeaTunnelDataType[] {
+                                BasicType.INT_TYPE,
+                                BasicType.STRING_TYPE,
+                                BasicType.BOOLEAN_TYPE,
+                                BasicType.FLOAT_TYPE,
+                                BasicType.DOUBLE_TYPE,
+                                BasicType.BYTE_TYPE,
+                                BasicType.SHORT_TYPE,
+                                BasicType.LONG_TYPE,
+                                new DecimalType(10, 2),
+                                LocalTimeType.LOCAL_DATE_TYPE,
+                                LocalTimeType.LOCAL_DATE_TIME_TYPE,
+                                BasicType.VOID_TYPE,
+                                ArrayType.STRING_ARRAY_TYPE,
+                                ArrayType.BOOLEAN_ARRAY_TYPE,
+                                ArrayType.BYTE_ARRAY_TYPE,
+                                ArrayType.SHORT_ARRAY_TYPE,
+                                ArrayType.INT_ARRAY_TYPE,
+                                ArrayType.LONG_ARRAY_TYPE,
+                                ArrayType.FLOAT_ARRAY_TYPE,
+                                ArrayType.DOUBLE_ARRAY_TYPE,
+                                new MapType<>(BasicType.STRING_TYPE, 
BasicType.STRING_TYPE)
+                            })
+                });
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, Void, Void> 
createWriter(SinkWriter.Context context)
+            throws IOException {
+        return new SeaTunnelSinkWithBufferWriter();
+    }
+}
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBufferWriter.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBufferWriter.java
new file mode 100644
index 0000000000..ca9aa86b66
--- /dev/null
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBufferWriter.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.junit.jupiter.api.Assertions;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+public class SeaTunnelSinkWithBufferWriter implements SinkWriter<SeaTunnelRow, 
Void, Void> {
+
+    private final List<Object[]> valueBuffer;
+
+    public SeaTunnelSinkWithBufferWriter() {
+        this.valueBuffer = new ArrayList<>();
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        valueBuffer.add(element.getFields());
+        if (valueBuffer.size() == 3) {
+            List<Object[]> expected =
+                    Arrays.asList(
+                            new Object[] {
+                                42,
+                                "string1",
+                                true,
+                                1.1f,
+                                33.33,
+                                (byte) 1,
+                                (short) 2,
+                                Long.MAX_VALUE,
+                                new BigDecimal("55.55"),
+                                LocalDate.parse("2021-01-01"),
+                                LocalDateTime.parse("2021-01-01T00:00:00"),
+                                null,
+                                new Object[] {"string1", "string2", "string3"},
+                                new Object[] {true, false, true},
+                                new Object[] {(byte) 1, (byte) 2, (byte) 3},
+                                new Object[] {(short) 1, (short) 2, (short) 3},
+                                new Object[] {1, 2, 3},
+                                new Object[] {1L, 2L, 3L},
+                                new Object[] {1.1f, 2.2f, 3.3f},
+                                new Object[] {11.11, 22.22, 33.33},
+                                new HashMap<String, String>() {
+                                    {
+                                        put("key1", "value1");
+                                        put("key2", "value2");
+                                        put("key3", "value3");
+                                    }
+                                },
+                                new SeaTunnelRow(
+                                        new Object[] {
+                                            42,
+                                            "string1",
+                                            true,
+                                            1.1f,
+                                            33.33,
+                                            (byte) 1,
+                                            (short) 2,
+                                            Long.MAX_VALUE,
+                                            new BigDecimal("55.55"),
+                                            LocalDate.parse("2021-01-01"),
+                                            
LocalDateTime.parse("2021-01-01T00:00:00"),
+                                            null,
+                                            new Object[] {"string1", 
"string2", "string3"},
+                                            new Object[] {true, false, true},
+                                            new Object[] {(byte) 1, (byte) 2, 
(byte) 3},
+                                            new Object[] {(short) 1, (short) 
2, (short) 3},
+                                            new Object[] {1, 2, 3},
+                                            new Object[] {1L, 2L, 3L},
+                                            new Object[] {1.1f, 2.2f, 3.3f},
+                                            new Object[] {11.11, 22.22, 33.33},
+                                            new HashMap<String, String>() {
+                                                {
+                                                    put("key1", "value1");
+                                                    put("key2", "value2");
+                                                    put("key3", "value3");
+                                                }
+                                            }
+                                        })
+                            },
+                            new Object[] {
+                                12,
+                                "string2",
+                                false,
+                                2.2f,
+                                43.33,
+                                (byte) 5,
+                                (short) 42,
+                                Long.MAX_VALUE - 1,
+                                new BigDecimal("25.55"),
+                                LocalDate.parse("2011-01-01"),
+                                LocalDateTime.parse("2020-01-01T00:00:00"),
+                                null,
+                                new Object[] {"string3", "string2", "string1"},
+                                new Object[] {true, false, false},
+                                new Object[] {(byte) 3, (byte) 4, (byte) 5},
+                                new Object[] {(short) 2, (short) 6, (short) 8},
+                                new Object[] {2, 4, 6},
+                                new Object[] {643634L, 421412L, 543543L},
+                                new Object[] {1.24f, 21.2f, 32.3f},
+                                new Object[] {421.11, 5322.22, 323.33},
+                                new HashMap<String, String>() {
+                                    {
+                                        put("key2", "value534");
+                                        put("key3", "value3");
+                                        put("key4", "value43");
+                                    }
+                                },
+                                new SeaTunnelRow(
+                                        new Object[] {
+                                            12,
+                                            "string2",
+                                            false,
+                                            2.2f,
+                                            43.33,
+                                            (byte) 5,
+                                            (short) 42,
+                                            Long.MAX_VALUE - 1,
+                                            new BigDecimal("25.55"),
+                                            LocalDate.parse("2011-01-01"),
+                                            
LocalDateTime.parse("2020-01-01T00:00:00"),
+                                            null,
+                                            new Object[] {"string3", 
"string2", "string1"},
+                                            new Object[] {true, false, false},
+                                            new Object[] {(byte) 3, (byte) 4, 
(byte) 5},
+                                            new Object[] {(short) 2, (short) 
6, (short) 8},
+                                            new Object[] {2, 4, 6},
+                                            new Object[] {643634L, 421412L, 
543543L},
+                                            new Object[] {1.24f, 21.2f, 32.3f},
+                                            new Object[] {421.11, 5322.22, 
323.33},
+                                            new HashMap<String, String>() {
+                                                {
+                                                    put("key2", "value534");
+                                                    put("key3", "value3");
+                                                    put("key4", "value43");
+                                                }
+                                            }
+                                        })
+                            },
+                            new Object[] {
+                                233,
+                                "string3",
+                                true,
+                                231.1f,
+                                3533.33,
+                                (byte) 7,
+                                (short) 2,
+                                Long.MAX_VALUE - 2,
+                                new BigDecimal("65.55"),
+                                LocalDate.parse("2001-01-01"),
+                                LocalDateTime.parse("2031-01-01T00:00:00"),
+                                null,
+                                new Object[] {"string1fsa", "stringdsa2", 
"strfdsaing3"},
+                                new Object[] {false, true, true},
+                                new Object[] {(byte) 6, (byte) 2, (byte) 1},
+                                new Object[] {(short) 7, (short) 8, (short) 9},
+                                new Object[] {3, 77, 22},
+                                new Object[] {143L, 642L, 533L},
+                                new Object[] {24.1f, 54.2f, 1.3f},
+                                new Object[] {431.11, 2422.22, 3243.33},
+                                new HashMap<String, String>() {
+                                    {
+                                        put("keyfs1", "valfdsue1");
+                                        put("kedfasy2", "vafdslue2");
+                                        put("kefdsay3", "vfdasalue3");
+                                    }
+                                },
+                                new SeaTunnelRow(
+                                        new Object[] {
+                                            233,
+                                            "string3",
+                                            true,
+                                            231.1f,
+                                            3533.33,
+                                            (byte) 7,
+                                            (short) 2,
+                                            Long.MAX_VALUE - 2,
+                                            new BigDecimal("65.55"),
+                                            LocalDate.parse("2001-01-01"),
+                                            
LocalDateTime.parse("2031-01-01T00:00:00"),
+                                            null,
+                                            new Object[] {
+                                                "string1fsa", "stringdsa2", 
"strfdsaing3"
+                                            },
+                                            new Object[] {false, true, true},
+                                            new Object[] {(byte) 6, (byte) 2, 
(byte) 1},
+                                            new Object[] {(short) 7, (short) 
8, (short) 9},
+                                            new Object[] {3, 77, 22},
+                                            new Object[] {143L, 642L, 533L},
+                                            new Object[] {24.1f, 54.2f, 1.3f},
+                                            new Object[] {431.11, 2422.22, 
3243.33},
+                                            new HashMap<String, String>() {
+                                                {
+                                                    put("keyfs1", "valfdsue1");
+                                                    put("kedfasy2", 
"vafdslue2");
+                                                    put("kefdsay3", 
"vfdasalue3");
+                                                }
+                                            }
+                                        })
+                            });
+            for (int i = 0; i < expected.size(); i++) {
+                Object[] values = expected.get(i);
+                Object[] actual = valueBuffer.get(i);
+                for (int v = 0; v < values.length; v++) {
+                    if (values[v] instanceof Object[]) {
+                        Assertions.assertArrayEquals((Object[]) values[v], 
(Object[]) actual[v]);
+                    } else {
+                        Assertions.assertEquals(values[v], actual[v]);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public Optional<Void> prepareCommit() throws IOException {
+        return Optional.empty();
+    }
+
+    @Override
+    public void abortPrepare() {}
+
+    @Override
+    public void close() throws IOException {}
+}
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SparkSinkTest.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SparkSinkTest.java
new file mode 100644
index 0000000000..d22789e1be
--- /dev/null
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SparkSinkTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.MapType;
+import org.apache.spark.sql.types.StructType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnJre;
+import org.junit.jupiter.api.condition.JRE;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.ByteType;
+import static org.apache.spark.sql.types.DataTypes.DateType;
+import static org.apache.spark.sql.types.DataTypes.DoubleType;
+import static org.apache.spark.sql.types.DataTypes.FloatType;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.LongType;
+import static org.apache.spark.sql.types.DataTypes.NullType;
+import static org.apache.spark.sql.types.DataTypes.ShortType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+import static org.apache.spark.sql.types.DataTypes.TimestampType;
+
+public class SparkSinkTest {
+
+    @Test
+    @DisabledOnJre(
+            value = JRE.JAVA_11,
+            disabledReason =
+                    "We should update apache common lang3 version to 3.8 to 
avoid NPE, "
+                            + "see 
https://github.com/apache/commons-lang/commit/50ce8c44e1601acffa39f5568f0fc140aade0564";)
+    public void testSparkSinkWriteDataWithCopy() {
+        // We should make sure that the data is written to the sink with copy.
+        SparkSession spark =
+                SparkSession.builder()
+                        .master("local")
+                        .appName("testSparkSinkWriteDataWithCopy")
+                        .getOrCreate();
+        StructType structType =
+                new StructType()
+                        .add("int", IntegerType)
+                        .add("string", StringType)
+                        .add("boolean", BooleanType)
+                        .add("float", FloatType)
+                        .add("double", DoubleType)
+                        .add("byte", ByteType)
+                        .add("short", ShortType)
+                        .add("long", LongType)
+                        .add("decimal", new DecimalType(10, 2))
+                        .add("date", DateType)
+                        // .add("time", TimeType) unsupported time type in 
Spark 3.3.0. Please trace
+                        // https://issues.apache.org/jira/browse/SPARK-41549
+                        .add("timestamp", TimestampType)
+                        .add("null", NullType)
+                        .add("array_string", new ArrayType(StringType, true))
+                        .add("array_boolean", new ArrayType(BooleanType, true))
+                        .add("array_byte", new ArrayType(ByteType, true))
+                        .add("array_short", new ArrayType(ShortType, true))
+                        .add("array_int", new ArrayType(IntegerType, true))
+                        .add("array_long", new ArrayType(LongType, true))
+                        .add("array_float", new ArrayType(FloatType, true))
+                        .add("array_double", new ArrayType(DoubleType, true))
+                        .add("map", new MapType(StringType, StringType, true));
+
+        GenericRow row1 =
+                new GenericRow(
+                        new Object[] {
+                            42,
+                            "string1",
+                            true,
+                            1.1f,
+                            33.33,
+                            (byte) 1,
+                            (short) 2,
+                            Long.MAX_VALUE,
+                            new BigDecimal("55.55"),
+                            LocalDate.parse("2021-01-01"),
+                            Timestamp.valueOf("2021-01-01 00:00:00"),
+                            null,
+                            Arrays.asList("string1", "string2", "string3"),
+                            Arrays.asList(true, false, true),
+                            Arrays.asList((byte) 1, (byte) 2, (byte) 3),
+                            Arrays.asList((short) 1, (short) 2, (short) 3),
+                            Arrays.asList(1, 2, 3),
+                            Arrays.asList(1L, 2L, 3L),
+                            Arrays.asList(1.1f, 2.2f, 3.3f),
+                            Arrays.asList(11.11, 22.22, 33.33),
+                            new HashMap<String, String>() {
+                                {
+                                    put("key1", "value1");
+                                    put("key2", "value2");
+                                    put("key3", "value3");
+                                }
+                            }
+                        });
+
+        GenericRow row1WithRow =
+                new GenericRow(
+                        new Object[] {
+                            42,
+                            "string1",
+                            true,
+                            1.1f,
+                            33.33,
+                            (byte) 1,
+                            (short) 2,
+                            Long.MAX_VALUE,
+                            new BigDecimal("55.55"),
+                            LocalDate.parse("2021-01-01"),
+                            Timestamp.valueOf("2021-01-01 00:00:00"),
+                            null,
+                            Arrays.asList("string1", "string2", "string3"),
+                            Arrays.asList(true, false, true),
+                            Arrays.asList((byte) 1, (byte) 2, (byte) 3),
+                            Arrays.asList((short) 1, (short) 2, (short) 3),
+                            Arrays.asList(1, 2, 3),
+                            Arrays.asList(1L, 2L, 3L),
+                            Arrays.asList(1.1f, 2.2f, 3.3f),
+                            Arrays.asList(11.11, 22.22, 33.33),
+                            new HashMap<String, String>() {
+                                {
+                                    put("key1", "value1");
+                                    put("key2", "value2");
+                                    put("key3", "value3");
+                                }
+                            },
+                            row1
+                        });
+
+        GenericRow row2 =
+                new GenericRow(
+                        new Object[] {
+                            12,
+                            "string2",
+                            false,
+                            2.2f,
+                            43.33,
+                            (byte) 5,
+                            (short) 42,
+                            Long.MAX_VALUE - 1,
+                            new BigDecimal("25.55"),
+                            LocalDate.parse("2011-01-01"),
+                            Timestamp.valueOf("2020-01-01 00:00:00"),
+                            null,
+                            Arrays.asList("string3", "string2", "string1"),
+                            Arrays.asList(true, false, false),
+                            Arrays.asList((byte) 3, (byte) 4, (byte) 5),
+                            Arrays.asList((short) 2, (short) 6, (short) 8),
+                            Arrays.asList(2, 4, 6),
+                            Arrays.asList(643634L, 421412L, 543543L),
+                            Arrays.asList(1.24f, 21.2f, 32.3f),
+                            Arrays.asList(421.11, 5322.22, 323.33),
+                            new HashMap<String, String>() {
+                                {
+                                    put("key2", "value534");
+                                    put("key3", "value3");
+                                    put("key4", "value43");
+                                }
+                            }
+                        });
+
+        GenericRow row2WithRow =
+                new GenericRow(
+                        new Object[] {
+                            12,
+                            "string2",
+                            false,
+                            2.2f,
+                            43.33,
+                            (byte) 5,
+                            (short) 42,
+                            Long.MAX_VALUE - 1,
+                            new BigDecimal("25.55"),
+                            LocalDate.parse("2011-01-01"),
+                            Timestamp.valueOf("2020-01-01 00:00:00"),
+                            null,
+                            Arrays.asList("string3", "string2", "string1"),
+                            Arrays.asList(true, false, false),
+                            Arrays.asList((byte) 3, (byte) 4, (byte) 5),
+                            Arrays.asList((short) 2, (short) 6, (short) 8),
+                            Arrays.asList(2, 4, 6),
+                            Arrays.asList(643634L, 421412L, 543543L),
+                            Arrays.asList(1.24f, 21.2f, 32.3f),
+                            Arrays.asList(421.11, 5322.22, 323.33),
+                            new HashMap<String, String>() {
+                                {
+                                    put("key2", "value534");
+                                    put("key3", "value3");
+                                    put("key4", "value43");
+                                }
+                            },
+                            row2
+                        });
+
+        GenericRow row3 =
+                new GenericRow(
+                        new Object[] {
+                            233,
+                            "string3",
+                            true,
+                            231.1f,
+                            3533.33,
+                            (byte) 7,
+                            (short) 2,
+                            Long.MAX_VALUE - 2,
+                            new BigDecimal("65.55"),
+                            LocalDate.parse("2001-01-01"),
+                            Timestamp.valueOf("2031-01-01 00:00:00"),
+                            null,
+                            Arrays.asList("string1fsa", "stringdsa2", 
"strfdsaing3"),
+                            Arrays.asList(false, true, true),
+                            Arrays.asList((byte) 6, (byte) 2, (byte) 1),
+                            Arrays.asList((short) 7, (short) 8, (short) 9),
+                            Arrays.asList(3, 77, 22),
+                            Arrays.asList(143L, 642L, 533L),
+                            Arrays.asList(24.1f, 54.2f, 1.3f),
+                            Arrays.asList(431.11, 2422.22, 3243.33),
+                            new HashMap<String, String>() {
+                                {
+                                    put("keyfs1", "valfdsue1");
+                                    put("kedfasy2", "vafdslue2");
+                                    put("kefdsay3", "vfdasalue3");
+                                }
+                            }
+                        });
+
+        GenericRow row3WithRow =
+                new GenericRow(
+                        new Object[] {
+                            233,
+                            "string3",
+                            true,
+                            231.1f,
+                            3533.33,
+                            (byte) 7,
+                            (short) 2,
+                            Long.MAX_VALUE - 2,
+                            new BigDecimal("65.55"),
+                            LocalDate.parse("2001-01-01"),
+                            Timestamp.valueOf("2031-01-01 00:00:00"),
+                            null,
+                            Arrays.asList("string1fsa", "stringdsa2", 
"strfdsaing3"),
+                            Arrays.asList(false, true, true),
+                            Arrays.asList((byte) 6, (byte) 2, (byte) 1),
+                            Arrays.asList((short) 7, (short) 8, (short) 9),
+                            Arrays.asList(3, 77, 22),
+                            Arrays.asList(143L, 642L, 533L),
+                            Arrays.asList(24.1f, 54.2f, 1.3f),
+                            Arrays.asList(431.11, 2422.22, 3243.33),
+                            new HashMap<String, String>() {
+                                {
+                                    put("keyfs1", "valfdsue1");
+                                    put("kedfasy2", "vafdslue2");
+                                    put("kefdsay3", "vfdasalue3");
+                                }
+                            },
+                            row3
+                        });
+
+        Dataset<Row> dataset =
+                spark.createDataFrame(
+                        Arrays.asList(row1WithRow, row2WithRow, row3WithRow),
+                        structType.add("row", structType));
+        SparkSinkInjector.inject(dataset.write(), new 
SeaTunnelSinkWithBuffer())
+                .option("checkpointLocation", "/tmp")
+                .mode(SaveMode.Append)
+                .save();
+        spark.close();
+    }
+}
diff --git a/tools/dependencies/known-dependencies.txt 
b/tools/dependencies/known-dependencies.txt
index 024961241b..60aea7fa79 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -2,15 +2,15 @@ commons-codec-1.13.jar
 commons-collections4-4.4.jar
 commons-compress-1.20.jar
 commons-io-2.11.0.jar
-commons-lang3-3.4.jar
+commons-lang3-3.5.jar
 config-1.3.3.jar
 disruptor-3.4.4.jar
 guava-27.0-jre.jar
 hazelcast-5.1.jar
-jackson-annotations-2.12.6.jar
-jackson-core-2.12.6.jar
-jackson-databind-2.12.6.jar
-jackson-dataformat-properties-2.12.6.jar
+jackson-annotations-2.13.3.jar
+jackson-core-2.13.3.jar
+jackson-databind-2.13.3.jar
+jackson-dataformat-properties-2.13.3.jar
 jcl-over-slf4j-1.7.25.jar
 jcommander-1.81.jar
 log4j-api-2.17.1.jar
@@ -21,7 +21,7 @@ protostuff-api-1.8.0.jar
 protostuff-collectionschema-1.8.0.jar
 protostuff-core-1.8.0.jar
 protostuff-runtime-1.8.0.jar
-scala-library-2.11.12.jar
+scala-library-2.12.15.jar
 seatunnel-jackson-2.3.4-SNAPSHOT-optional.jar
 seatunnel-guava-2.3.4-SNAPSHOT-optional.jar
 slf4j-api-1.7.25.jar


Reply via email to