This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 91d39c8099 [Feature][TransForms-V2] Support scala dynamic compile 
(#9625)
91d39c8099 is described below

commit 91d39c8099f02d58737a2af684b1c76278b31cf4
Author: xiaochen <[email protected]>
AuthorDate: Thu Jul 31 09:26:19 2025 +0800

    [Feature][TransForms-V2] Support scala dynamic compile (#9625)
---
 docs/en/transform-v2/dynamic-compile.md            |  44 +++-
 docs/zh/transform-v2/dynamic-compile.md            |   2 +-
 pom.xml                                            |   1 +
 seatunnel-dist/release-docs/LICENSE                |   3 +
 .../e2e/transform/TestDynamicCompileIT.java        |  83 +++++++
 .../conf/mixed_dynamic_all_compile_transform.conf  | 244 +++++++++++++++++++++
 ...ixed_dynamic_groovy_java_compile_transform.conf |   8 +-
 ...xed_dynamic_groovy_scala_compile_transform.conf | 159 ++++++++++++++
 ...mixed_dynamic_java_scala_compile_transform.conf | 162 ++++++++++++++
 .../multiple_dynamic_scala_compile_transform.conf  | 166 ++++++++++++++
 .../single_dynamic_scala_compile_transform.conf    | 101 +++++++++
 .../conf/single_scala_path_compile.conf            |  72 ++++++
 .../dynamic_compile/source_file/ScalaFile          |  44 ++++
 seatunnel-shade/pom.xml                            |   1 +
 seatunnel-shade/seatunnel-scala-compiler/pom.xml   | 108 +++++++++
 seatunnel-transforms-v2/pom.xml                    |   8 +
 .../transform/dynamiccompile/CompileLanguage.java  |   3 +-
 .../dynamiccompile/DynamicCompileTransform.java    |   5 +
 .../ScalaClassParse.java}                          |  11 +-
 .../dynamiccompile/parse/ScalaClassParser.java     |  80 +++++++
 tools/dependencies/known-dependencies.txt          |   6 +
 21 files changed, 1299 insertions(+), 12 deletions(-)

diff --git a/docs/en/transform-v2/dynamic-compile.md 
b/docs/en/transform-v2/dynamic-compile.md
index d5f21f2708..440b1d5056 100644
--- a/docs/en/transform-v2/dynamic-compile.md
+++ b/docs/en/transform-v2/dynamic-compile.md
@@ -31,7 +31,9 @@ Transform plugin common parameters, please refer to 
[Transform Plugin](common-op
 ### compile_language [Enum]
 
 Some syntax in Java may not be supported, please refer 
https://github.com/janino-compiler/janino
-GROOVY,JAVA
+GROOVY,JAVA,SCALA(Only Support Zeta)
+
+**Note**: SCALA support uses the Scala REPL for dynamic compilation and 
requires proper Scala syntax.
 
 ### compile_pattern [Enum]
 
@@ -223,6 +225,46 @@ Then the data in result table `java_out` will like this
 | Kin Dom  | 30  | 123  | JAVA             | 
 | Joy Dom  | 30  | 123  | JAVA             |
 
+- use scala
+```hacon
+transform {
+ DynamicCompile {
+    plugin_input = "fake"
+    plugin_output = "scala_out"
+    compile_language="SCALA"
+    compile_pattern="SOURCE_CODE"
+    source_code="""
+                 import org.apache.seatunnel.api.table.catalog.Column
+                 import org.apache.seatunnel.api.table.catalog.CatalogTable
+                 import org.apache.seatunnel.api.table.catalog.PhysicalColumn
+                 import 
org.apache.seatunnel.api.table.`type`.SeaTunnelRowAccessor
+                 import org.apache.seatunnel.api.table.`type`.BasicType
+                 import java.util.ArrayList
+
+                 class ScalaDemo {
+                   def getInlineOutputColumns(inputCatalogTable: 
CatalogTable): Array[Column] = {
+                     val columns = new ArrayList[Column]()
+                     val destColumn = PhysicalColumn.of(
+                       "compile_language",
+                       BasicType.STRING_TYPE,
+                       10L,
+                       true,
+                       "",
+                       ""
+                     )
+                     columns.add(destColumn)
+                     columns.toArray(new Array[Column](0))
+                   }
+
+                   def getInlineOutputFieldValues(inputRow: 
SeaTunnelRowAccessor): Array[Object] = {
+                     Array[Object]("SCALA")
+                   }
+                 }
+                """
+  }
+}
+```
+
 More complex examples can be referred to
 
https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf
 
diff --git a/docs/zh/transform-v2/dynamic-compile.md 
b/docs/zh/transform-v2/dynamic-compile.md
index c9cc870816..1fa055591a 100644
--- a/docs/zh/transform-v2/dynamic-compile.md
+++ b/docs/zh/transform-v2/dynamic-compile.md
@@ -31,7 +31,7 @@
 ### compile_language [Enum]
 
 Java中的某些语法可能不受支持,请参阅https://github.com/janino-compiler/janino
-GROOVY,JAVA
+GROOVY,JAVA,SCALA(目前支持 Zeta)
 
 ### compile_pattern [Enum]
 
diff --git a/pom.xml b/pom.xml
index afddae078c..8e143e0986 100644
--- a/pom.xml
+++ b/pom.xml
@@ -142,6 +142,7 @@
         <jsqlparser.version>4.9</jsqlparser.version>
         <json-path.version>2.7.0</json-path.version>
         <groovy.version>4.0.16</groovy.version>
+        <scala.version>2.12.15</scala.version>
         <jetty.version>9.4.56.v20240826</jetty.version>
         <jakarta.servlet-api>4.0.4</jakarta.servlet-api>
         <!-- Option args -->
diff --git a/seatunnel-dist/release-docs/LICENSE 
b/seatunnel-dist/release-docs/LICENSE
index f68045e4f5..69c9f4477a 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -307,6 +307,7 @@ The text of each license is the standard Apache 2.0 license.
      (Apache-2.0) kerby-pkix (org.apache.kerby:kerby-pkix:1.0.1 - 
https://github.com/apache/directory-kerby)
      (Apache-2.0) kerby-util (org.apache.kerby:kerby-util:1.0.1 - 
https://github.com/apache/directory-kerby)
      (Apache-2.0) kerby-xdr (org.apache.kerby:kerby-xdr:1.0.1 - 
https://github.com/apache/directory-kerby)
+     (Apache-2.0) jna (net.java.dev.jna:jna:5.13.0 - 
https://github.com/java-native-access/jna)
      (Apache-2.0) jna (net.java.dev.jna:jna:5.15.0 - 
https://github.com/java-native-access/jna)
      (Apache-2.0) jna-platform (net.java.dev.jna:jna-platform:5.15.0 - 
https://github.com/java-native-access/jna)
      (Apache-2.0) token-provider (org.apache.kerby:token-provider:1.0.1 - 
https://github.com/apache/directory-kerby)
@@ -359,7 +360,9 @@ The text of each license is also included at 
licenses/LICENSE-[project].txt.
 
      (New BSD license) Protocol Buffer Java API 
(com.google.protobuf:protobuf-java:2.5.0 - http://code.google.com/p/protobuf)
      (FreeBSD License) stax2-api (org.codehaus.woodstox:stax2-api:3.1.4 - 
https://github.com/FasterXML/stax2-api)
+     (BSD 3-Clause) Scala Library (org.scala-lang:scala-compiler:2.13.11 - 
http://www.scala-lang.org/)
      (BSD 3-Clause) Scala Library (org.scala-lang:scala-library:2.12.15 - 
http://www.scala-lang.org/)
+     (BSD 3-Clause) Scala Reflect (org.scala-lang:scala-reflect:2.13.11 - 
http://www.scala-lang.org/)
      (BSD 3-Clause) asm (org.ow2.asm:asm:9.1 - 
https://mvnrepository.com/artifact/org.ow2.asm/asm/)
      (BSD 3-Clause) asm (org.ow2.asm:asm:5.0.4 - 
https://mvnrepository.com/artifact/org.ow2.asm/asm/)
 ========================================================================
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java
index 757c38dfcf..edded1a4d2 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java
@@ -19,7 +19,9 @@ package org.apache.seatunnel.e2e.transform;
 
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
 
 import org.junit.jupiter.api.AfterAll;
@@ -182,4 +184,85 @@ public class TestDynamicCompileIT extends TestSuiteBase 
implements TestResource
                 container.executeJob(basePath + 
"single_dynamic_http_compile_transform.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
     }
+
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "Currently SPARK and FLINK do not support scala 
dynamic compile")
+    @TestTemplate
+    public void testDynamicSingleCompileScala(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob(basePath + 
"single_dynamic_scala_compile_transform.conf");
+        Assertions.assertEquals(
+                0,
+                execResult.getExitCode(),
+                "Scala dynamic compilation test failed. Error: " + 
execResult.getStderr());
+    }
+
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "Currently SPARK and FLINK do not support scala 
dynamic compile")
+    @TestTemplate
+    public void testDynamicSinglePathScala(TestContainer container)
+            throws IOException, InterruptedException {
+        
container.copyFileToContainer("/dynamic_compile/source_file/ScalaFile", 
"/tmp/ScalaFile");
+        Container.ExecResult execResult =
+                container.executeJob(basePath + 
"single_scala_path_compile.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "Currently SPARK and FLINK do not support scala 
dynamic compile")
+    @TestTemplate
+    public void testDynamicMultipleCompileScala(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob(basePath + 
"multiple_dynamic_scala_compile_transform.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "Currently SPARK and FLINK do not support scala 
dynamic compile ")
+    @TestTemplate
+    public void testDynamicMixedCompileJavaAndScala(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob(basePath + 
"mixed_dynamic_java_scala_compile_transform.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "Currently SPARK and FLINK do not support scala 
dynamic compile ")
+    @TestTemplate
+    public void testDynamicMixedCompileGroovyAndScala(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob(
+                        basePath + 
"mixed_dynamic_groovy_scala_compile_transform.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "Currently SPARK and FLINK do not support scala 
dynamic compile ")
+    @TestTemplate
+    public void testMixedThreeLanguagesCompile(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob(basePath + 
"mixed_dynamic_all_compile_transform.conf");
+        Assertions.assertEquals(
+                0,
+                execResult.getExitCode(),
+                "Mixed three languages (Java + Groovy + Scala) compilation 
test failed. Error: "
+                        + execResult.getStderr());
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_all_compile_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_all_compile_transform.conf
new file mode 100644
index 0000000000..f02dd56b65
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_all_compile_transform.conf
@@ -0,0 +1,244 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### Mixed Three Languages (Java + Groovy + Scala) Dynamic Compilation Test
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    plugin_output = "source_data"
+    row.num = 100
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+        value = "double"
+      }
+    }
+  }
+}
+
+transform {
+  DynamicCompile {
+    plugin_input = "source_data"
+    plugin_output = "java_processed"
+    compile_language = "JAVA"
+    compile_pattern = "SOURCE_CODE"
+    source_code = """
+                 import org.apache.seatunnel.api.table.catalog.Column;
+                 import 
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
+                 import org.apache.seatunnel.api.table.catalog.*;
+                 import org.apache.seatunnel.api.table.type.*;
+                 import java.util.ArrayList;
+
+                 public Column[] getInlineOutputColumns(CatalogTable 
inputCatalogTable) {
+                   // Create array directly instead of using ArrayList
+                   Column[] columns = new Column[4];
+
+                   // Add original columns
+                   columns[0] = PhysicalColumn.of("id", BasicType.INT_TYPE, 
10L, true, "", "");
+                   columns[1] = PhysicalColumn.of("name", 
BasicType.STRING_TYPE, 50L, true, "", "");
+                   columns[2] = PhysicalColumn.of("value", 
BasicType.DOUBLE_TYPE, 10L, true, "", "");
+
+                   // Add Java processed column
+                   columns[3] = PhysicalColumn.of("java_processed", 
BasicType.STRING_TYPE, 50L, true, "", "Java processing result");
+
+                   return columns;
+                 }
+
+                 public Object[] 
getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+                   Object[] fieldValues = new Object[4];
+                   
+                   // Pass through original values
+                   fieldValues[0] = inputRow.getField(0);
+                   fieldValues[1] = inputRow.getField(1);
+                   fieldValues[2] = inputRow.getField(2);
+                   
+                   // Java processing
+                   String javaResult = "JAVA_STEP";
+                   fieldValues[3] = javaResult;
+                   
+                   return fieldValues;
+                 }
+                """
+  }
+
+  # Second transformation: Groovy
+  DynamicCompile {
+    plugin_input = "java_processed"
+    plugin_output = "groovy_processed"
+    compile_language = "GROOVY"
+    compile_pattern = "SOURCE_CODE"
+    source_code = """
+                 import org.apache.seatunnel.api.table.catalog.Column
+                 import 
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
+                 import org.apache.seatunnel.api.table.catalog.*
+                 import org.apache.seatunnel.api.table.type.*
+
+                 Column[] getInlineOutputColumns(CatalogTable 
inputCatalogTable) {
+                   Column[] columns = new Column[5]
+                   columns[0] = PhysicalColumn.of("id", BasicType.INT_TYPE, 
10L, true, "", "")
+                   columns[1] = PhysicalColumn.of("name", 
BasicType.STRING_TYPE, 50L, true, "", "")
+                   columns[2] = PhysicalColumn.of("value", 
BasicType.DOUBLE_TYPE, 10L, true, "", "")
+                   columns[3] = PhysicalColumn.of("java_processed", 
BasicType.STRING_TYPE, 50L, true, "", "")
+                   columns[4] = PhysicalColumn.of("groovy_processed", 
BasicType.STRING_TYPE, 50L, true, "", "Groovy processing result")
+
+                   return columns
+                 }
+
+                 Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor 
inputRow) {
+                   def fieldValues = new Object[5]
+                   
+                   // Pass through all previous values
+                   fieldValues[0] = inputRow.getField(0)
+                   fieldValues[1] = inputRow.getField(1)
+                   fieldValues[2] = inputRow.getField(2)
+                   fieldValues[3] = inputRow.getField(3)
+                   def groovyResult = "GROOVY_STEP"
+                   fieldValues[4] = groovyResult
+                   
+                   return fieldValues
+                 }
+                """
+  }
+
+  # Third transformation: Scala
+  DynamicCompile {
+    plugin_input = "groovy_processed"
+    plugin_output = "scala_processed"
+    compile_language = "SCALA"
+    compile_pattern = "SOURCE_CODE"
+    source_code = """
+                 import org.apache.seatunnel.api.table.catalog.Column
+                 import org.apache.seatunnel.api.table.catalog.CatalogTable
+                 import org.apache.seatunnel.api.table.catalog.PhysicalColumn
+                 import 
org.apache.seatunnel.api.table.`type`.SeaTunnelRowAccessor
+                 import org.apache.seatunnel.api.table.`type`.BasicType
+                 import java.util.ArrayList
+
+                 class ScalaFinalProcessor {
+                   
+                   def getInlineOutputColumns(inputCatalogTable: 
CatalogTable): Array[Column] = {
+                     val columns = new ArrayList[Column]()
+
+                     columns.add(PhysicalColumn.of("id", BasicType.INT_TYPE, 
10L, true, "", ""))
+                     columns.add(PhysicalColumn.of("name", 
BasicType.STRING_TYPE, 50L, true, "", ""))
+                     columns.add(PhysicalColumn.of("value", 
BasicType.DOUBLE_TYPE, 10L, true, "", ""))
+                     columns.add(PhysicalColumn.of("java_processed", 
BasicType.STRING_TYPE, 50L, true, "", ""))
+                     columns.add(PhysicalColumn.of("groovy_processed", 
BasicType.STRING_TYPE, 50L, true, "", ""))
+                     
+                     // Add Scala processed column
+                     columns.add(PhysicalColumn.of("scala_processed", 
BasicType.STRING_TYPE, 100L, true, "", "Scala functional processing result"))
+                     columns.toArray(new Array[Column](0))
+                   }
+
+                   def getInlineOutputFieldValues(inputRow: 
SeaTunnelRowAccessor): Array[Object] = {
+                     val id = 
Option(inputRow.getField(0)).map(_.toString.toInt).getOrElse(0)
+                     val name = 
Option(inputRow.getField(1)).map(_.toString).getOrElse("")
+                     val value = 
Option(inputRow.getField(2)).map(_.toString.toDouble).getOrElse(0.0)
+                     val javaProcessed = 
Option(inputRow.getField(3)).map(_.toString).getOrElse("")
+                     val groovyProcessed = 
Option(inputRow.getField(4)).map(_.toString).getOrElse("")
+                     Array[Object](
+                       id.asInstanceOf[Object],
+                       name,
+                       value.asInstanceOf[Object],
+                       javaProcessed,
+                       groovyProcessed,
+                       "SCALA_STEP"
+                     )
+                   }
+                 }
+                """
+  }
+}
+
+sink {
+  Assert {
+    plugin_input = "scala_processed"
+    rules = {
+      row_rules = [
+        {
+          rule_type = MIN_ROW
+          rule_value = 100
+        }
+      ],
+      field_rules = [
+        {
+          field_name = id
+          field_type = int
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = value
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = java_processed
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = "JAVA_STEP"
+            }
+          ]
+        },
+        {
+          field_name = groovy_processed
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = "GROOVY_STEP"
+            }
+          ]
+        },
+        {
+          field_name = scala_processed
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = "SCALA_STEP"
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_java_compile_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_java_compile_transform.conf
index ae25d50921..cb8bbf12e9 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_java_compile_transform.conf
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_java_compile_transform.conf
@@ -55,17 +55,15 @@ transform {
                      public Column[] getInlineOutputColumns(CatalogTable 
inputCatalogTable) {
 
                        ArrayList<Column> columns = new ArrayList<Column>();
-                                               PhysicalColumn destColumn =
-                                               PhysicalColumn.of(
+                       PhysicalColumn destColumn =
+                                    PhysicalColumn.of(
                                                "col1",
                                               BasicType.STRING_TYPE,
                                                10,
                                               true,
                                               "",
                                               "");
-                                                 return new Column[]{
-                                                                destColumn
-                                                        };
+                       return new Column[]{destColumn};
 
                      }
                      public Object[] 
getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_scala_compile_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_scala_compile_transform.conf
new file mode 100644
index 0000000000..8d26ef1322
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_scala_compile_transform.conf
@@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### Simple Mixed Groovy + Scala Dynamic Compilation Test
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    plugin_output = "fake"
+    row.num = 100
+    parallelism = 1
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+transform {
+  DynamicCompile {
+    plugin_input = "fake"
+    plugin_output = "fake1"
+    compile_language = "GROOVY"
+    compile_pattern = "SOURCE_CODE"
+    source_code = """
+                 import org.apache.seatunnel.api.table.catalog.Column
+                 import 
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
+                 import org.apache.seatunnel.api.table.catalog.*
+                 import org.apache.seatunnel.api.table.type.*
+
+                 Column[] getInlineOutputColumns(CatalogTable 
inputCatalogTable) {
+                   Column[] columns = new Column[3]
+                   columns[0] = PhysicalColumn.of("name", 
BasicType.STRING_TYPE, 50L, true, "", "")
+                   columns[1] = PhysicalColumn.of("age", BasicType.INT_TYPE, 
10L, true, "", "")
+                   columns[2] = PhysicalColumn.of("groovy_col", 
BasicType.STRING_TYPE, 50L, true, "", "")
+                   return columns
+                 }
+
+                 Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor 
inputRow) {
+                   def fieldValues = new Object[3]
+                   fieldValues[0] = inputRow.getField(0)
+                   fieldValues[1] = inputRow.getField(1)
+                   fieldValues[2] = "GROOVY_VALUE"
+                   return fieldValues
+                 }
+                """
+  }
+
+  # Second transformation: Scala
+  DynamicCompile {
+    plugin_input = "fake1"
+    plugin_output = "fake2"
+    compile_language = "SCALA"
+    compile_pattern = "SOURCE_CODE"
+    source_code = """
+                 import org.apache.seatunnel.api.table.catalog.Column
+                 import org.apache.seatunnel.api.table.catalog.CatalogTable
+                 import org.apache.seatunnel.api.table.catalog.PhysicalColumn
+                 import 
org.apache.seatunnel.api.table.`type`.SeaTunnelRowAccessor
+                 import org.apache.seatunnel.api.table.`type`.BasicType
+                 import java.util.ArrayList
+
+                 class ScalaSimpleProcessor {
+                   
+                   def getInlineOutputColumns(inputCatalogTable: 
CatalogTable): Array[Column] = {
+                     val columns = new ArrayList[Column]()
+                     columns.add(PhysicalColumn.of("name", 
BasicType.STRING_TYPE, 50L, true, "", ""))
+                     columns.add(PhysicalColumn.of("age", BasicType.INT_TYPE, 
10L, true, "", ""))
+                     columns.add(PhysicalColumn.of("groovy_col", 
BasicType.STRING_TYPE, 50L, true, "", ""))
+                     columns.add(PhysicalColumn.of("scala_col", 
BasicType.STRING_TYPE, 50L, true, "", ""))
+                     columns.toArray(new Array[Column](0))
+                   }
+
+                   def getInlineOutputFieldValues(inputRow: 
SeaTunnelRowAccessor): Array[Object] = {
+                     Array[Object](
+                       inputRow.getField(0),
+                       inputRow.getField(1),
+                       inputRow.getField(2),
+                       "SCALA_VALUE"
+                     )
+                   }
+                 }
+                """
+  }
+}
+
+sink {
+  Assert {
+    plugin_input = "fake2"
+    rules = {
+      row_rules = [
+        {
+          rule_type = MIN_ROW
+          rule_value = 100
+        }
+      ],
+      field_rules = [
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = age
+          field_type = int
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = groovy_col
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = "GROOVY_VALUE"
+            }
+          ]
+        },
+        {
+          field_name = scala_col
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = "SCALA_VALUE"
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_java_scala_compile_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_java_scala_compile_transform.conf
new file mode 100644
index 0000000000..631ad4949c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_java_scala_compile_transform.conf
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    plugin_output = "fake"
+    row.num = 100
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+      }
+    }
+  }
+}
+
+transform {
+  DynamicCompile {
+    plugin_input = "fake"
+    plugin_output = "fake1"
+    compile_language="JAVA"
+    compile_pattern="SOURCE_CODE"
+    source_code="""
+                 import org.apache.seatunnel.api.table.catalog.Column;
+                 import 
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
+                 import org.apache.seatunnel.api.table.catalog.*;
+                 import org.apache.seatunnel.api.table.type.*;
+                 import java.util.ArrayList;
+
+                 public Column[] getInlineOutputColumns(CatalogTable 
inputCatalogTable) {
+                   ArrayList<Column> columns = new ArrayList<Column>();
+                   PhysicalColumn destColumn =
+                           PhysicalColumn.of(
+                                   "java_col",
+                                   BasicType.STRING_TYPE,
+                                   10,
+                                   true,
+                                   "",
+                                   "");
+                   return new Column[]{destColumn};
+                 }
+
+                 public Object[] 
getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+                   Object[] fieldValues = new Object[1];
+                   fieldValues[0] = "JAVA_VALUE";
+                   return fieldValues;
+                 }
+                """
+  }
+
+  DynamicCompile {
+    plugin_input = "fake1"
+    plugin_output = "fake2"
+    compile_language="SCALA"
+    compile_pattern="SOURCE_CODE"
+    source_code="""
+                 import org.apache.seatunnel.api.table.catalog.Column
+                 import org.apache.seatunnel.api.table.catalog.CatalogTable
+                 import org.apache.seatunnel.api.table.catalog.PhysicalColumn
+                 import 
org.apache.seatunnel.api.table.`type`.SeaTunnelRowAccessor
+                 import org.apache.seatunnel.api.table.`type`.BasicType
+                 import java.util.ArrayList
+
+                 class ScalaDemo {
+                   def getInlineOutputColumns(inputCatalogTable: 
CatalogTable): Array[Column] = {
+                     val columns = new ArrayList[Column]()
+                     val destColumn = PhysicalColumn.of(
+                       "scala_col",
+                       BasicType.STRING_TYPE,
+                       10L,
+                       true,
+                       "",
+                       ""
+                     )
+                     columns.add(destColumn)
+                     columns.toArray(new Array[Column](0))
+                   }
+
+                   def getInlineOutputFieldValues(inputRow: 
SeaTunnelRowAccessor): Array[Object] = {
+                     val fieldValues = new Array[Object](1)
+                     fieldValues(0) = "SCALA_VALUE"
+                     fieldValues
+                   }
+                 }
+                """
+  }
+}
+
+sink {
+  Assert {
+    plugin_input = "fake2"
+    rules = {
+      row_rules = [
+        {
+          rule_type = MIN_ROW
+          rule_value = 100
+        }
+      ],
+      field_rules = [
+        {
+          field_name = id
+          field_type = int
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = java_col
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = "JAVA_VALUE"
+            }
+          ]
+        },
+        {
+          field_name = scala_col
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = "SCALA_VALUE"
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_scala_compile_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_scala_compile_transform.conf
new file mode 100644
index 0000000000..8607315ecf
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_scala_compile_transform.conf
@@ -0,0 +1,166 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    plugin_output = "fake"
+    row.num = 100
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+      }
+    }
+  }
+}
+
+transform {
+  DynamicCompile {
+    plugin_input = "fake"
+    plugin_output = "fake1"
+    compile_language="SCALA"
+    compile_pattern="SOURCE_CODE"
+    source_code="""
+                 import org.apache.seatunnel.api.table.catalog.Column
+                 import org.apache.seatunnel.api.table.catalog.CatalogTable
+                 import org.apache.seatunnel.api.table.catalog.PhysicalColumn
+                 import 
org.apache.seatunnel.api.table.`type`.SeaTunnelRowAccessor
+                 import org.apache.seatunnel.api.table.`type`.BasicType
+                 import java.util.ArrayList
+
+                 class ScalaDemo1 {
+                   def getInlineOutputColumns(inputCatalogTable: 
CatalogTable): Array[Column] = {
+                     val columns = new ArrayList[Column]()
+                     val destColumn = PhysicalColumn.of(
+                       "scala_aa",
+                       BasicType.STRING_TYPE,
+                       10L,
+                       true,
+                       "",
+                       ""
+                     )
+                     columns.add(destColumn)
+                     columns.toArray(new Array[Column](0))
+                   }
+
+                   def getInlineOutputFieldValues(inputRow: 
SeaTunnelRowAccessor): Array[Object] = {
+                     val fieldValues = new Array[Object](1)
+                     fieldValues(0) = "SCALA_AA"
+                     fieldValues
+                   }
+                 }
+                """
+  }
+
+  DynamicCompile {
+    plugin_input = "fake1"
+    plugin_output = "fake2"
+    compile_language="SCALA"
+    compile_pattern="SOURCE_CODE"
+    source_code="""
+                 import org.apache.seatunnel.api.table.catalog.Column
+                 import org.apache.seatunnel.api.table.catalog.CatalogTable
+                 import org.apache.seatunnel.api.table.catalog.PhysicalColumn
+                 import 
org.apache.seatunnel.api.table.`type`.SeaTunnelRowAccessor
+                 import org.apache.seatunnel.api.table.`type`.BasicType
+                 import java.util.ArrayList
+
+                 class ScalaDemo2 {
+                   def getInlineOutputColumns(inputCatalogTable: 
CatalogTable): Array[Column] = {
+                     val columns = new ArrayList[Column]()
+                     val destColumn = PhysicalColumn.of(
+                       "scala_bb",
+                       BasicType.STRING_TYPE,
+                       10L,
+                       true,
+                       "",
+                       ""
+                     )
+                     columns.add(destColumn)
+                     columns.toArray(new Array[Column](0))
+                   }
+
+                   def getInlineOutputFieldValues(inputRow: 
SeaTunnelRowAccessor): Array[Object] = {
+                     val fieldValues = new Array[Object](1)
+                     fieldValues(0) = "SCALA_BB"
+                     fieldValues
+                   }
+                 }
+                """
+  }
+}
+
+sink {
+  Assert {
+    plugin_input = "fake2"
+    rules = {
+      row_rules = [
+        {
+          rule_type = MIN_ROW
+          rule_value = 100
+        }
+      ],
+      field_rules = [
+        {
+          field_name = id
+          field_type = int
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = scala_aa
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = "SCALA_AA"
+            }
+          ]
+        },
+        {
+          field_name = scala_bb
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = "SCALA_BB"
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_scala_compile_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_scala_compile_transform.conf
new file mode 100644
index 0000000000..0821b887bd
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_scala_compile_transform.conf
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    plugin_output = "fake"
+    row.num = 100
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+      }
+    }
+  }
+}
+
+transform {
+  DynamicCompile {
+    plugin_input = "fake"
+    plugin_output = "fake1"
+    compile_language="SCALA"
+    compile_pattern="SOURCE_CODE"
+    source_code="""
+                 import org.apache.seatunnel.api.table.catalog.Column
+                 import org.apache.seatunnel.api.table.catalog.CatalogTable
+                 import org.apache.seatunnel.api.table.catalog.PhysicalColumn
+                 import 
org.apache.seatunnel.api.table.`type`.SeaTunnelRowAccessor
+                 import org.apache.seatunnel.api.table.`type`.BasicType
+                 import java.util.ArrayList
+
+                 class ScalaDemo {
+                   def getInlineOutputColumns(inputCatalogTable: 
CatalogTable): Array[Column] = {
+                     val columns = new ArrayList[Column]()
+                     val destColumn = PhysicalColumn.of(
+                       "scala_col1",
+                       BasicType.STRING_TYPE,
+                       10L,
+                       true,
+                       "",
+                       ""
+                     )
+                     columns.add(destColumn)
+                     columns.toArray(new Array[Column](0))
+                   }
+
+                   def getInlineOutputFieldValues(inputRow: 
SeaTunnelRowAccessor): Array[Object] = {
+                     val fieldValues = new Array[Object](1)
+                     fieldValues(0) = "SCALA_VALUE1"
+                     fieldValues
+                   }
+                 }
+                """
+  }
+}
+
+sink {
+  Assert {
+    plugin_input = "fake1"
+    rules = {
+      row_rules = [
+        {
+          rule_type = MIN_ROW
+          rule_value = 100
+        }
+      ],
+      field_rules = [
+        {
+          field_name = scala_col1
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = "SCALA_VALUE1"
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_scala_path_compile.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_scala_path_compile.conf
new file mode 100644
index 0000000000..c60df8af10
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_scala_path_compile.conf
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    plugin_output = "fake"
+    row.num = 100
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+      }
+    }
+  }
+}
+
+transform {
+  DynamicCompile {
+    plugin_input = "fake"
+    plugin_output = "fake1"
+    compile_language="SCALA"
+    compile_pattern="ABSOLUTE_PATH"
+    absolute_path="""/tmp/ScalaFile"""
+  }
+}
+
+sink {
+  Assert {
+    plugin_input = "fake1"
+    rules = {
+      row_rules = [
+        {
+          rule_type = MIN_ROW
+          rule_value = 100
+        }
+      ],
+      field_rules = [
+        {
+          field_name = scala_col
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = "SCALA_TEST"
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/ScalaFile
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/ScalaFile
new file mode 100644
index 0000000000..af6d6a676d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/ScalaFile
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.seatunnel.api.table.catalog.Column
+import org.apache.seatunnel.api.table.catalog.CatalogTable
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn
+import org.apache.seatunnel.api.table.`type`.SeaTunnelRowAccessor
+import org.apache.seatunnel.api.table.`type`.BasicType
+import java.util.ArrayList
+
+class ScalaDemo {
+  def getInlineOutputColumns(inputCatalogTable: CatalogTable): Array[Column] = 
{
+    val columns = new ArrayList[Column]()
+    val destColumn = PhysicalColumn.of(
+      "scala_col",
+      BasicType.STRING_TYPE,
+      10L,
+      true,
+      "",
+      ""
+    )
+    columns.add(destColumn)
+    columns.toArray(new Array[Column](0))
+  }
+
+  def getInlineOutputFieldValues(inputRow: SeaTunnelRowAccessor): 
Array[Object] = {
+    val fieldValues = new Array[Object](1)
+    fieldValues(0) = "SCALA_TEST"
+    fieldValues
+  }
+}
diff --git a/seatunnel-shade/pom.xml b/seatunnel-shade/pom.xml
index 590f8d3b46..95e6e6d9e5 100644
--- a/seatunnel-shade/pom.xml
+++ b/seatunnel-shade/pom.xml
@@ -33,6 +33,7 @@
         <module>seatunnel-thrift-service</module>
         <module>seatunnel-hazelcast</module>
         <module>seatunnel-janino</module>
+        <module>seatunnel-scala-compiler</module>
         <module>seatunnel-jetty9-9.4.56</module>
         <module>seatunnel-hadoop-aws</module>
         <module>seatunnel-arrow</module>
diff --git a/seatunnel-shade/seatunnel-scala-compiler/pom.xml 
b/seatunnel-shade/seatunnel-scala-compiler/pom.xml
new file mode 100644
index 0000000000..58b8d9bfd2
--- /dev/null
+++ b/seatunnel-shade/seatunnel-scala-compiler/pom.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-shade</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>seatunnel-scala-compiler</artifactId>
+    <name>SeaTunnel : Shade : Scala</name>
+
+    <properties>
+        <scala.version>2.13.11</scala.version>
+        <scala.binary.version>2.13</scala.binary.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-compiler</artifactId>
+            <version>${scala.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <finalName>seatunnel-scala</finalName>
+                            
<createSourcesJar>${enableSourceJarCreation}</createSourcesJar>
+                            <shadeSourcesContent>true</shadeSourcesContent>
+                            
<shadedArtifactAttached>false</shadedArtifactAttached>
+                            
<createDependencyReducedPom>false</createDependencyReducedPom>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <relocations>
+                                <!-- Only shade compiler tools, completely 
avoid scala.reflect -->
+                                <relocation>
+                                    <pattern>scala.tools.nsc</pattern>
+                                    
<shadedPattern>${seatunnel.shade.package}.scala.tools.nsc</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>scala.tools.util</pattern>
+                                    
<shadedPattern>${seatunnel.shade.package}.scala.tools.util</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>attach-artifacts</id>
+                        <goals>
+                            <goal>attach-artifact</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <artifacts>
+                                <artifact>
+                                    
<file>${basedir}/target/seatunnel-scala.jar</file>
+                                    <type>jar</type>
+                                    <classifier>optional</classifier>
+                                </artifact>
+                            </artifacts>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/seatunnel-transforms-v2/pom.xml b/seatunnel-transforms-v2/pom.xml
index 9288a14531..9bc4c6aa06 100644
--- a/seatunnel-transforms-v2/pom.xml
+++ b/seatunnel-transforms-v2/pom.xml
@@ -84,11 +84,19 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-scala-compiler</artifactId>
+            <version>${project.version}</version>
+            <classifier>optional</classifier>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.groovy</groupId>
             <artifactId>groovy</artifactId>
             <version>${groovy.version}</version>
         </dependency>
+
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-janino</artifactId>
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileLanguage.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileLanguage.java
index be0e468e4d..ef96243598 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileLanguage.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileLanguage.java
@@ -19,5 +19,6 @@ package org.apache.seatunnel.transform.dynamiccompile;
 
 public enum CompileLanguage {
     GROOVY,
-    JAVA
+    JAVA,
+    SCALA
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
index 7ff88d85d4..00cb874973 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
@@ -28,6 +28,7 @@ import 
org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
 import org.apache.seatunnel.transform.dynamiccompile.parse.AbstractParse;
 import org.apache.seatunnel.transform.dynamiccompile.parse.GroovyClassParse;
 import org.apache.seatunnel.transform.dynamiccompile.parse.JavaClassParse;
+import org.apache.seatunnel.transform.dynamiccompile.parse.ScalaClassParse;
 import org.apache.seatunnel.transform.exception.TransformException;
 
 import java.nio.file.Paths;
@@ -59,6 +60,10 @@ public class DynamicCompileTransform extends 
MultipleFieldOutputTransform {
             DynamicCompileParse = new GroovyClassParse();
         } else if (CompileLanguage.JAVA.equals(compileLanguage)) {
             DynamicCompileParse = new JavaClassParse();
+        } else if (CompileLanguage.SCALA.equals(compileLanguage)) {
+            DynamicCompileParse = new ScalaClassParse();
+        } else {
+            throw new IllegalArgumentException("Unsupported compile language: 
" + compileLanguage);
         }
         compilePattern = 
readonlyConfig.get(DynamicCompileTransformConfig.COMPILE_PATTERN);
 
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileLanguage.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/ScalaClassParse.java
similarity index 75%
copy from 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileLanguage.java
copy to 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/ScalaClassParse.java
index be0e468e4d..78bf36ae92 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileLanguage.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/ScalaClassParse.java
@@ -15,9 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.transform.dynamiccompile;
+package org.apache.seatunnel.transform.dynamiccompile.parse;
 
-public enum CompileLanguage {
-    GROOVY,
-    JAVA
+public class ScalaClassParse extends AbstractParse {
+
+    @Override
+    public Class<?> parseClassSourceCode(String sourceCode) {
+        return ScalaClassParser.parseSourceCodeWithCache(sourceCode);
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/ScalaClassParser.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/ScalaClassParser.java
new file mode 100644
index 0000000000..e50289d903
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/ScalaClassParser.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.transform.dynamiccompile.parse;
+
+import org.apache.seatunnel.shade.scala.tools.nsc.Settings;
+import org.apache.seatunnel.shade.scala.tools.nsc.interpreter.IMain;
+import 
org.apache.seatunnel.shade.scala.tools.nsc.interpreter.shell.ReplReporterImpl;
+
+import org.apache.seatunnel.transform.exception.TransformException;
+
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static 
org.apache.seatunnel.transform.dynamiccompile.CompileTransformErrorCode.COMPILE_TRANSFORM_ERROR_CODE;
+
+public class ScalaClassParser extends AbstractParser {
+
+    private static final String SCALA_CLASS_NAME_PATTERN = 
"(?:class|object)\\s+(\\w+)";
+    private static final Pattern CLASS_NAME_REGEX = 
Pattern.compile(SCALA_CLASS_NAME_PATTERN);
+    private static IMain scalaInterpreter;
+
+    static {
+        try {
+            Settings settings = new Settings();
+            settings.usejavacp().v_$eq(true);
+            scalaInterpreter = new IMain(settings, new 
ReplReporterImpl(settings));
+        } catch (Exception e) {
+            throw new TransformException(COMPILE_TRANSFORM_ERROR_CODE, 
e.getMessage());
+        }
+    }
+
+    public static Class<?> parseSourceCodeWithCache(String sourceCode) {
+        return classCache.computeIfAbsent(
+                getClassKey(sourceCode),
+                new Function<String, Class<?>>() {
+                    @Override
+                    public Class<?> apply(String classKey) {
+                        String className = extractClassName(sourceCode);
+                        return compileWithREPL(sourceCode, className);
+                    }
+                });
+    }
+
+    /** Extract class name from Scala source code */
+    private static String extractClassName(String sourceCode) {
+        Matcher matcher = CLASS_NAME_REGEX.matcher(sourceCode);
+        if (matcher.find()) {
+            return matcher.group(1);
+        }
+        throw new IllegalArgumentException("Cannot extract class name from 
Scala source code");
+    }
+
+    private static Class<?> compileWithREPL(String sourceCode, String 
className) {
+        try {
+            boolean compileResult = scalaInterpreter.compileString(sourceCode);
+            if (!compileResult) {
+                throw new RuntimeException("Scala REPL compilation failed");
+            }
+            ClassLoader replClassLoader = scalaInterpreter.classLoader();
+            return replClassLoader.loadClass(className);
+        } catch (Exception e) {
+            throw new TransformException(COMPILE_TRANSFORM_ERROR_CODE, 
e.getMessage());
+        }
+    }
+}
diff --git a/tools/dependencies/known-dependencies.txt 
b/tools/dependencies/known-dependencies.txt
index ab7e75610b..353008ae0a 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -26,6 +26,9 @@ protostuff-collectionschema-1.8.0.jar
 protostuff-core-1.8.0.jar
 protostuff-runtime-1.8.0.jar
 scala-library-2.12.15.jar
+scala-compiler-2.13.11.jar
+scala-reflect-2.13.11.jar
+seatunnel-scala-compiler-2.3.12-SNAPSHOT-optional.jar
 seatunnel-jackson-2.3.12-SNAPSHOT-optional.jar
 seatunnel-guava-2.3.12-SNAPSHOT-optional.jar
 seatunnel-hazelcast-shade-2.3.12-SNAPSHOT-optional.jar
@@ -70,6 +73,7 @@ jetty-util-9.4.56.v20240826.jar
 jetty-util-ajax-9.4.56.v20240826.jar
 javax.servlet-api-3.1.0.jar
 seatunnel-jetty9-9.4.56-2.3.12-SNAPSHOT-optional.jar
+jna-5.13.0.jar
 jna-5.15.0.jar
 jna-platform-5.15.0.jar
 oshi-core-6.6.5.jar
@@ -121,3 +125,5 @@ netty-common-4.1.112.Final.jar
 netty-handler-4.1.112.Final.jar
 netty-resolver-4.1.112.Final.jar
 eventstream-1.0.1.jar
+java-diff-utils-4.12.jar
+jline-3.22.0.jar
\ No newline at end of file

Reply via email to