This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 91d39c8099 [Feature][TransForms-V2] Support scala dynamic compile
(#9625)
91d39c8099 is described below
commit 91d39c8099f02d58737a2af684b1c76278b31cf4
Author: xiaochen <[email protected]>
AuthorDate: Thu Jul 31 09:26:19 2025 +0800
[Feature][TransForms-V2] Support scala dynamic compile (#9625)
---
docs/en/transform-v2/dynamic-compile.md | 44 +++-
docs/zh/transform-v2/dynamic-compile.md | 2 +-
pom.xml | 1 +
seatunnel-dist/release-docs/LICENSE | 3 +
.../e2e/transform/TestDynamicCompileIT.java | 83 +++++++
.../conf/mixed_dynamic_all_compile_transform.conf | 244 +++++++++++++++++++++
...ixed_dynamic_groovy_java_compile_transform.conf | 8 +-
...xed_dynamic_groovy_scala_compile_transform.conf | 159 ++++++++++++++
...mixed_dynamic_java_scala_compile_transform.conf | 162 ++++++++++++++
.../multiple_dynamic_scala_compile_transform.conf | 166 ++++++++++++++
.../single_dynamic_scala_compile_transform.conf | 101 +++++++++
.../conf/single_scala_path_compile.conf | 72 ++++++
.../dynamic_compile/source_file/ScalaFile | 44 ++++
seatunnel-shade/pom.xml | 1 +
seatunnel-shade/seatunnel-scala-compiler/pom.xml | 108 +++++++++
seatunnel-transforms-v2/pom.xml | 8 +
.../transform/dynamiccompile/CompileLanguage.java | 3 +-
.../dynamiccompile/DynamicCompileTransform.java | 5 +
.../ScalaClassParse.java} | 11 +-
.../dynamiccompile/parse/ScalaClassParser.java | 80 +++++++
tools/dependencies/known-dependencies.txt | 6 +
21 files changed, 1299 insertions(+), 12 deletions(-)
diff --git a/docs/en/transform-v2/dynamic-compile.md
b/docs/en/transform-v2/dynamic-compile.md
index d5f21f2708..440b1d5056 100644
--- a/docs/en/transform-v2/dynamic-compile.md
+++ b/docs/en/transform-v2/dynamic-compile.md
@@ -31,7 +31,9 @@ Transform plugin common parameters, please refer to
[Transform Plugin](common-op
### compile_language [Enum]
Some syntax in Java may not be supported, please refer
https://github.com/janino-compiler/janino
-GROOVY,JAVA
+GROOVY,JAVA,SCALA(Only Support Zeta)
+
+**Note**: SCALA support uses the Scala REPL for dynamic compilation and
requires proper Scala syntax.
### compile_pattern [Enum]
@@ -223,6 +225,46 @@ Then the data in result table `java_out` will like this
| Kin Dom | 30 | 123 | JAVA |
| Joy Dom | 30 | 123 | JAVA |
+- use scala
+```hacon
+transform {
+ DynamicCompile {
+ plugin_input = "fake"
+ plugin_output = "scala_out"
+ compile_language="SCALA"
+ compile_pattern="SOURCE_CODE"
+ source_code="""
+ import org.apache.seatunnel.api.table.catalog.Column
+ import org.apache.seatunnel.api.table.catalog.CatalogTable
+ import org.apache.seatunnel.api.table.catalog.PhysicalColumn
+ import
org.apache.seatunnel.api.table.`type`.SeaTunnelRowAccessor
+ import org.apache.seatunnel.api.table.`type`.BasicType
+ import java.util.ArrayList
+
+ class ScalaDemo {
+ def getInlineOutputColumns(inputCatalogTable:
CatalogTable): Array[Column] = {
+ val columns = new ArrayList[Column]()
+ val destColumn = PhysicalColumn.of(
+ "compile_language",
+ BasicType.STRING_TYPE,
+ 10L,
+ true,
+ "",
+ ""
+ )
+ columns.add(destColumn)
+ columns.toArray(new Array[Column](0))
+ }
+
+ def getInlineOutputFieldValues(inputRow:
SeaTunnelRowAccessor): Array[Object] = {
+ Array[Object]("SCALA")
+ }
+ }
+ """
+ }
+}
+```
+
More complex examples can be referred to
https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf
diff --git a/docs/zh/transform-v2/dynamic-compile.md
b/docs/zh/transform-v2/dynamic-compile.md
index c9cc870816..1fa055591a 100644
--- a/docs/zh/transform-v2/dynamic-compile.md
+++ b/docs/zh/transform-v2/dynamic-compile.md
@@ -31,7 +31,7 @@
### compile_language [Enum]
Java中的某些语法可能不受支持,请参阅https://github.com/janino-compiler/janino
-GROOVY,JAVA
+GROOVY,JAVA,SCALA(目前支持 Zeta)
### compile_pattern [Enum]
diff --git a/pom.xml b/pom.xml
index afddae078c..8e143e0986 100644
--- a/pom.xml
+++ b/pom.xml
@@ -142,6 +142,7 @@
<jsqlparser.version>4.9</jsqlparser.version>
<json-path.version>2.7.0</json-path.version>
<groovy.version>4.0.16</groovy.version>
+ <scala.version>2.12.15</scala.version>
<jetty.version>9.4.56.v20240826</jetty.version>
<jakarta.servlet-api>4.0.4</jakarta.servlet-api>
<!-- Option args -->
diff --git a/seatunnel-dist/release-docs/LICENSE
b/seatunnel-dist/release-docs/LICENSE
index f68045e4f5..69c9f4477a 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -307,6 +307,7 @@ The text of each license is the standard Apache 2.0 license.
(Apache-2.0) kerby-pkix (org.apache.kerby:kerby-pkix:1.0.1 -
https://github.com/apache/directory-kerby)
(Apache-2.0) kerby-util (org.apache.kerby:kerby-util:1.0.1 -
https://github.com/apache/directory-kerby)
(Apache-2.0) kerby-xdr (org.apache.kerby:kerby-xdr:1.0.1 -
https://github.com/apache/directory-kerby)
+ (Apache-2.0) jna (net.java.dev.jna:jna:5.13.0 -
https://github.com/java-native-access/jna)
(Apache-2.0) jna (net.java.dev.jna:jna:5.15.0 -
https://github.com/java-native-access/jna)
(Apache-2.0) jna-platform (net.java.dev.jna:jna-platform:5.15.0 -
https://github.com/java-native-access/jna)
(Apache-2.0) token-provider (org.apache.kerby:token-provider:1.0.1 -
https://github.com/apache/directory-kerby)
@@ -359,7 +360,9 @@ The text of each license is also included at
licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API
(com.google.protobuf:protobuf-java:2.5.0 - http://code.google.com/p/protobuf)
(FreeBSD License) stax2-api (org.codehaus.woodstox:stax2-api:3.1.4 -
https://github.com/FasterXML/stax2-api)
+ (BSD 3-Clause) Scala Library (org.scala-lang:scala-compiler:2.13.11 -
http://www.scala-lang.org/)
(BSD 3-Clause) Scala Library (org.scala-lang:scala-library:2.12.15 -
http://www.scala-lang.org/)
+ (BSD 3-Clause) Scala Reflect (org.scala-lang:scala-reflect:2.13.11 -
http://www.scala-lang.org/)
(BSD 3-Clause) asm (org.ow2.asm:asm:9.1 -
https://mvnrepository.com/artifact/org.ow2.asm/asm/)
(BSD 3-Clause) asm (org.ow2.asm:asm:5.0.4 -
https://mvnrepository.com/artifact/org.ow2.asm/asm/)
========================================================================
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java
index 757c38dfcf..edded1a4d2 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java
@@ -19,7 +19,9 @@ package org.apache.seatunnel.e2e.transform;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.junit.jupiter.api.AfterAll;
@@ -182,4 +184,85 @@ public class TestDynamicCompileIT extends TestSuiteBase
implements TestResource
container.executeJob(basePath +
"single_dynamic_http_compile_transform.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
+
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK and FLINK do not support scala
dynamic compile")
+ @TestTemplate
+ public void testDynamicSingleCompileScala(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob(basePath +
"single_dynamic_scala_compile_transform.conf");
+ Assertions.assertEquals(
+ 0,
+ execResult.getExitCode(),
+ "Scala dynamic compilation test failed. Error: " +
execResult.getStderr());
+ }
+
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK and FLINK do not support scala
dynamic compile")
+ @TestTemplate
+ public void testDynamicSinglePathScala(TestContainer container)
+ throws IOException, InterruptedException {
+
container.copyFileToContainer("/dynamic_compile/source_file/ScalaFile",
"/tmp/ScalaFile");
+ Container.ExecResult execResult =
+ container.executeJob(basePath +
"single_scala_path_compile.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK and FLINK do not support scala
dynamic compile")
+ @TestTemplate
+ public void testDynamicMultipleCompileScala(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob(basePath +
"multiple_dynamic_scala_compile_transform.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK and FLINK do not support scala
dynamic compile ")
+ @TestTemplate
+ public void testDynamicMixedCompileJavaAndScala(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob(basePath +
"mixed_dynamic_java_scala_compile_transform.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK and FLINK do not support scala
dynamic compile ")
+ @TestTemplate
+ public void testDynamicMixedCompileGroovyAndScala(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob(
+ basePath +
"mixed_dynamic_groovy_scala_compile_transform.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK and FLINK do not support scala
dynamic compile ")
+ @TestTemplate
+ public void testMixedThreeLanguagesCompile(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob(basePath +
"mixed_dynamic_all_compile_transform.conf");
+ Assertions.assertEquals(
+ 0,
+ execResult.getExitCode(),
+ "Mixed three languages (Java + Groovy + Scala) compilation
test failed. Error: "
+ + execResult.getStderr());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_all_compile_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_all_compile_transform.conf
new file mode 100644
index 0000000000..f02dd56b65
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_all_compile_transform.conf
@@ -0,0 +1,244 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### Mixed Three Languages (Java + Groovy + Scala) Dynamic Compilation Test
+######
+
+env {
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ plugin_output = "source_data"
+ row.num = 100
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ value = "double"
+ }
+ }
+ }
+}
+
+transform {
+ DynamicCompile {
+ plugin_input = "source_data"
+ plugin_output = "java_processed"
+ compile_language = "JAVA"
+ compile_pattern = "SOURCE_CODE"
+ source_code = """
+ import org.apache.seatunnel.api.table.catalog.Column;
+ import
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
+ import org.apache.seatunnel.api.table.catalog.*;
+ import org.apache.seatunnel.api.table.type.*;
+ import java.util.ArrayList;
+
+ public Column[] getInlineOutputColumns(CatalogTable
inputCatalogTable) {
+ // Create array directly instead of using ArrayList
+ Column[] columns = new Column[4];
+
+ // Add original columns
+ columns[0] = PhysicalColumn.of("id", BasicType.INT_TYPE,
10L, true, "", "");
+ columns[1] = PhysicalColumn.of("name",
BasicType.STRING_TYPE, 50L, true, "", "");
+ columns[2] = PhysicalColumn.of("value",
BasicType.DOUBLE_TYPE, 10L, true, "", "");
+
+ // Add Java processed column
+ columns[3] = PhysicalColumn.of("java_processed",
BasicType.STRING_TYPE, 50L, true, "", "Java processing result");
+
+ return columns;
+ }
+
+ public Object[]
getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+ Object[] fieldValues = new Object[4];
+
+ // Pass through original values
+ fieldValues[0] = inputRow.getField(0);
+ fieldValues[1] = inputRow.getField(1);
+ fieldValues[2] = inputRow.getField(2);
+
+ // Java processing
+ String javaResult = "JAVA_STEP";
+ fieldValues[3] = javaResult;
+
+ return fieldValues;
+ }
+ """
+ }
+
+ # Second transformation: Groovy
+ DynamicCompile {
+ plugin_input = "java_processed"
+ plugin_output = "groovy_processed"
+ compile_language = "GROOVY"
+ compile_pattern = "SOURCE_CODE"
+ source_code = """
+ import org.apache.seatunnel.api.table.catalog.Column
+ import
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
+ import org.apache.seatunnel.api.table.catalog.*
+ import org.apache.seatunnel.api.table.type.*
+
+ Column[] getInlineOutputColumns(CatalogTable
inputCatalogTable) {
+ Column[] columns = new Column[5]
+ columns[0] = PhysicalColumn.of("id", BasicType.INT_TYPE,
10L, true, "", "")
+ columns[1] = PhysicalColumn.of("name",
BasicType.STRING_TYPE, 50L, true, "", "")
+ columns[2] = PhysicalColumn.of("value",
BasicType.DOUBLE_TYPE, 10L, true, "", "")
+ columns[3] = PhysicalColumn.of("java_processed",
BasicType.STRING_TYPE, 50L, true, "", "")
+ columns[4] = PhysicalColumn.of("groovy_processed",
BasicType.STRING_TYPE, 50L, true, "", "Groovy processing result")
+
+ return columns
+ }
+
+ Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor
inputRow) {
+ def fieldValues = new Object[5]
+
+ // Pass through all previous values
+ fieldValues[0] = inputRow.getField(0)
+ fieldValues[1] = inputRow.getField(1)
+ fieldValues[2] = inputRow.getField(2)
+ fieldValues[3] = inputRow.getField(3)
+ def groovyResult = "GROOVY_STEP"
+ fieldValues[4] = groovyResult
+
+ return fieldValues
+ }
+ """
+ }
+
+ # Third transformation: Scala
+ DynamicCompile {
+ plugin_input = "groovy_processed"
+ plugin_output = "scala_processed"
+ compile_language = "SCALA"
+ compile_pattern = "SOURCE_CODE"
+ source_code = """
+ import org.apache.seatunnel.api.table.catalog.Column
+ import org.apache.seatunnel.api.table.catalog.CatalogTable
+ import org.apache.seatunnel.api.table.catalog.PhysicalColumn
+ import
org.apache.seatunnel.api.table.`type`.SeaTunnelRowAccessor
+ import org.apache.seatunnel.api.table.`type`.BasicType
+ import java.util.ArrayList
+
+ class ScalaFinalProcessor {
+
+ def getInlineOutputColumns(inputCatalogTable:
CatalogTable): Array[Column] = {
+ val columns = new ArrayList[Column]()
+
+ columns.add(PhysicalColumn.of("id", BasicType.INT_TYPE,
10L, true, "", ""))
+ columns.add(PhysicalColumn.of("name",
BasicType.STRING_TYPE, 50L, true, "", ""))
+ columns.add(PhysicalColumn.of("value",
BasicType.DOUBLE_TYPE, 10L, true, "", ""))
+ columns.add(PhysicalColumn.of("java_processed",
BasicType.STRING_TYPE, 50L, true, "", ""))
+ columns.add(PhysicalColumn.of("groovy_processed",
BasicType.STRING_TYPE, 50L, true, "", ""))
+
+ // Add Scala processed column
+ columns.add(PhysicalColumn.of("scala_processed",
BasicType.STRING_TYPE, 100L, true, "", "Scala functional processing result"))
+ columns.toArray(new Array[Column](0))
+ }
+
+ def getInlineOutputFieldValues(inputRow:
SeaTunnelRowAccessor): Array[Object] = {
+ val id =
Option(inputRow.getField(0)).map(_.toString.toInt).getOrElse(0)
+ val name =
Option(inputRow.getField(1)).map(_.toString).getOrElse("")
+ val value =
Option(inputRow.getField(2)).map(_.toString.toDouble).getOrElse(0.0)
+ val javaProcessed =
Option(inputRow.getField(3)).map(_.toString).getOrElse("")
+ val groovyProcessed =
Option(inputRow.getField(4)).map(_.toString).getOrElse("")
+ Array[Object](
+ id.asInstanceOf[Object],
+ name,
+ value.asInstanceOf[Object],
+ javaProcessed,
+ groovyProcessed,
+ "SCALA_STEP"
+ )
+ }
+ }
+ """
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = "scala_processed"
+ rules = {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ }
+ ],
+ field_rules = [
+ {
+ field_name = id
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = value
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = java_processed
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "JAVA_STEP"
+ }
+ ]
+ },
+ {
+ field_name = groovy_processed
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "GROOVY_STEP"
+ }
+ ]
+ },
+ {
+ field_name = scala_processed
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "SCALA_STEP"
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_java_compile_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_java_compile_transform.conf
index ae25d50921..cb8bbf12e9 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_java_compile_transform.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_java_compile_transform.conf
@@ -55,17 +55,15 @@ transform {
public Column[] getInlineOutputColumns(CatalogTable
inputCatalogTable) {
ArrayList<Column> columns = new ArrayList<Column>();
- PhysicalColumn destColumn =
- PhysicalColumn.of(
+ PhysicalColumn destColumn =
+ PhysicalColumn.of(
"col1",
BasicType.STRING_TYPE,
10,
true,
"",
"");
- return new Column[]{
- destColumn
- };
+ return new Column[]{destColumn};
}
public Object[]
getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_scala_compile_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_scala_compile_transform.conf
new file mode 100644
index 0000000000..8d26ef1322
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_scala_compile_transform.conf
@@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### Simple Mixed Groovy + Scala Dynamic Compilation Test
+######
+
+env {
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ plugin_output = "fake"
+ row.num = 100
+ parallelism = 1
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+transform {
+ DynamicCompile {
+ plugin_input = "fake"
+ plugin_output = "fake1"
+ compile_language = "GROOVY"
+ compile_pattern = "SOURCE_CODE"
+ source_code = """
+ import org.apache.seatunnel.api.table.catalog.Column
+ import
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
+ import org.apache.seatunnel.api.table.catalog.*
+ import org.apache.seatunnel.api.table.type.*
+
+ Column[] getInlineOutputColumns(CatalogTable
inputCatalogTable) {
+ Column[] columns = new Column[3]
+ columns[0] = PhysicalColumn.of("name",
BasicType.STRING_TYPE, 50L, true, "", "")
+ columns[1] = PhysicalColumn.of("age", BasicType.INT_TYPE,
10L, true, "", "")
+ columns[2] = PhysicalColumn.of("groovy_col",
BasicType.STRING_TYPE, 50L, true, "", "")
+ return columns
+ }
+
+ Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor
inputRow) {
+ def fieldValues = new Object[3]
+ fieldValues[0] = inputRow.getField(0)
+ fieldValues[1] = inputRow.getField(1)
+ fieldValues[2] = "GROOVY_VALUE"
+ return fieldValues
+ }
+ """
+ }
+
+ # Second transformation: Scala
+ DynamicCompile {
+ plugin_input = "fake1"
+ plugin_output = "fake2"
+ compile_language = "SCALA"
+ compile_pattern = "SOURCE_CODE"
+ source_code = """
+ import org.apache.seatunnel.api.table.catalog.Column
+ import org.apache.seatunnel.api.table.catalog.CatalogTable
+ import org.apache.seatunnel.api.table.catalog.PhysicalColumn
+ import
org.apache.seatunnel.api.table.`type`.SeaTunnelRowAccessor
+ import org.apache.seatunnel.api.table.`type`.BasicType
+ import java.util.ArrayList
+
+ class ScalaSimpleProcessor {
+
+ def getInlineOutputColumns(inputCatalogTable:
CatalogTable): Array[Column] = {
+ val columns = new ArrayList[Column]()
+ columns.add(PhysicalColumn.of("name",
BasicType.STRING_TYPE, 50L, true, "", ""))
+ columns.add(PhysicalColumn.of("age", BasicType.INT_TYPE,
10L, true, "", ""))
+ columns.add(PhysicalColumn.of("groovy_col",
BasicType.STRING_TYPE, 50L, true, "", ""))
+ columns.add(PhysicalColumn.of("scala_col",
BasicType.STRING_TYPE, 50L, true, "", ""))
+ columns.toArray(new Array[Column](0))
+ }
+
+ def getInlineOutputFieldValues(inputRow:
SeaTunnelRowAccessor): Array[Object] = {
+ Array[Object](
+ inputRow.getField(0),
+ inputRow.getField(1),
+ inputRow.getField(2),
+ "SCALA_VALUE"
+ )
+ }
+ }
+ """
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = "fake2"
+ rules = {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ }
+ ],
+ field_rules = [
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = age
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = groovy_col
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "GROOVY_VALUE"
+ }
+ ]
+ },
+ {
+ field_name = scala_col
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "SCALA_VALUE"
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_java_scala_compile_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_java_scala_compile_transform.conf
new file mode 100644
index 0000000000..631ad4949c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_java_scala_compile_transform.conf
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ plugin_output = "fake"
+ row.num = 100
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ }
+ }
+ }
+}
+
+transform {
+ DynamicCompile {
+ plugin_input = "fake"
+ plugin_output = "fake1"
+ compile_language="JAVA"
+ compile_pattern="SOURCE_CODE"
+ source_code="""
+ import org.apache.seatunnel.api.table.catalog.Column;
+ import
org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
+ import org.apache.seatunnel.api.table.catalog.*;
+ import org.apache.seatunnel.api.table.type.*;
+ import java.util.ArrayList;
+
+ public Column[] getInlineOutputColumns(CatalogTable
inputCatalogTable) {
+ ArrayList<Column> columns = new ArrayList<Column>();
+ PhysicalColumn destColumn =
+ PhysicalColumn.of(
+ "java_col",
+ BasicType.STRING_TYPE,
+ 10,
+ true,
+ "",
+ "");
+ return new Column[]{destColumn};
+ }
+
+ public Object[]
getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+ Object[] fieldValues = new Object[1];
+ fieldValues[0] = "JAVA_VALUE";
+ return fieldValues;
+ }
+ """
+ }
+
+ DynamicCompile {
+ plugin_input = "fake1"
+ plugin_output = "fake2"
+ compile_language="SCALA"
+ compile_pattern="SOURCE_CODE"
+ source_code="""
+ import org.apache.seatunnel.api.table.catalog.Column
+ import org.apache.seatunnel.api.table.catalog.CatalogTable
+ import org.apache.seatunnel.api.table.catalog.PhysicalColumn
+ import
org.apache.seatunnel.api.table.`type`.SeaTunnelRowAccessor
+ import org.apache.seatunnel.api.table.`type`.BasicType
+ import java.util.ArrayList
+
+ class ScalaDemo {
+ def getInlineOutputColumns(inputCatalogTable:
CatalogTable): Array[Column] = {
+ val columns = new ArrayList[Column]()
+ val destColumn = PhysicalColumn.of(
+ "scala_col",
+ BasicType.STRING_TYPE,
+ 10L,
+ true,
+ "",
+ ""
+ )
+ columns.add(destColumn)
+ columns.toArray(new Array[Column](0))
+ }
+
+ def getInlineOutputFieldValues(inputRow:
SeaTunnelRowAccessor): Array[Object] = {
+ val fieldValues = new Array[Object](1)
+ fieldValues(0) = "SCALA_VALUE"
+ fieldValues
+ }
+ }
+ """
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = "fake2"
+ rules = {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ }
+ ],
+ field_rules = [
+ {
+ field_name = id
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = java_col
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "JAVA_VALUE"
+ }
+ ]
+ },
+ {
+ field_name = scala_col
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "SCALA_VALUE"
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_scala_compile_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_scala_compile_transform.conf
new file mode 100644
index 0000000000..8607315ecf
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_scala_compile_transform.conf
@@ -0,0 +1,166 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ plugin_output = "fake"
+ row.num = 100
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ }
+ }
+ }
+}
+
+transform {
+ DynamicCompile {
+ plugin_input = "fake"
+ plugin_output = "fake1"
+ compile_language="SCALA"
+ compile_pattern="SOURCE_CODE"
+ source_code="""
+ import org.apache.seatunnel.api.table.catalog.Column
+ import org.apache.seatunnel.api.table.catalog.CatalogTable
+ import org.apache.seatunnel.api.table.catalog.PhysicalColumn
+ import
org.apache.seatunnel.api.table.`type`.SeaTunnelRowAccessor
+ import org.apache.seatunnel.api.table.`type`.BasicType
+ import java.util.ArrayList
+
+ class ScalaDemo1 {
+ def getInlineOutputColumns(inputCatalogTable:
CatalogTable): Array[Column] = {
+ val columns = new ArrayList[Column]()
+ val destColumn = PhysicalColumn.of(
+ "scala_aa",
+ BasicType.STRING_TYPE,
+ 10L,
+ true,
+ "",
+ ""
+ )
+ columns.add(destColumn)
+ columns.toArray(new Array[Column](0))
+ }
+
+ def getInlineOutputFieldValues(inputRow:
SeaTunnelRowAccessor): Array[Object] = {
+ val fieldValues = new Array[Object](1)
+ fieldValues(0) = "SCALA_AA"
+ fieldValues
+ }
+ }
+ """
+ }
+
+ DynamicCompile {
+ plugin_input = "fake1"
+ plugin_output = "fake2"
+ compile_language="SCALA"
+ compile_pattern="SOURCE_CODE"
+ source_code="""
+ import org.apache.seatunnel.api.table.catalog.Column
+ import org.apache.seatunnel.api.table.catalog.CatalogTable
+ import org.apache.seatunnel.api.table.catalog.PhysicalColumn
+ import
org.apache.seatunnel.api.table.`type`.SeaTunnelRowAccessor
+ import org.apache.seatunnel.api.table.`type`.BasicType
+ import java.util.ArrayList
+
+ class ScalaDemo2 {
+ def getInlineOutputColumns(inputCatalogTable:
CatalogTable): Array[Column] = {
+ val columns = new ArrayList[Column]()
+ val destColumn = PhysicalColumn.of(
+ "scala_bb",
+ BasicType.STRING_TYPE,
+ 10L,
+ true,
+ "",
+ ""
+ )
+ columns.add(destColumn)
+ columns.toArray(new Array[Column](0))
+ }
+
+ def getInlineOutputFieldValues(inputRow:
SeaTunnelRowAccessor): Array[Object] = {
+ val fieldValues = new Array[Object](1)
+ fieldValues(0) = "SCALA_BB"
+ fieldValues
+ }
+ }
+ """
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = "fake2"
+ rules = {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ }
+ ],
+ field_rules = [
+ {
+ field_name = id
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = scala_aa
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "SCALA_AA"
+ }
+ ]
+ },
+ {
+ field_name = scala_bb
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "SCALA_BB"
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_scala_compile_transform.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_scala_compile_transform.conf
new file mode 100644
index 0000000000..0821b887bd
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_scala_compile_transform.conf
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ plugin_output = "fake"
+ row.num = 100
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ }
+ }
+ }
+}
+
+transform {
+ DynamicCompile {
+ plugin_input = "fake"
+ plugin_output = "fake1"
+ compile_language="SCALA"
+ compile_pattern="SOURCE_CODE"
+ source_code="""
+ import org.apache.seatunnel.api.table.catalog.Column
+ import org.apache.seatunnel.api.table.catalog.CatalogTable
+ import org.apache.seatunnel.api.table.catalog.PhysicalColumn
+ import
org.apache.seatunnel.api.table.`type`.SeaTunnelRowAccessor
+ import org.apache.seatunnel.api.table.`type`.BasicType
+ import java.util.ArrayList
+
+ class ScalaDemo {
+ def getInlineOutputColumns(inputCatalogTable:
CatalogTable): Array[Column] = {
+ val columns = new ArrayList[Column]()
+ val destColumn = PhysicalColumn.of(
+ "scala_col1",
+ BasicType.STRING_TYPE,
+ 10L,
+ true,
+ "",
+ ""
+ )
+ columns.add(destColumn)
+ columns.toArray(new Array[Column](0))
+ }
+
+ def getInlineOutputFieldValues(inputRow:
SeaTunnelRowAccessor): Array[Object] = {
+ val fieldValues = new Array[Object](1)
+ fieldValues(0) = "SCALA_VALUE1"
+ fieldValues
+ }
+ }
+ """
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = "fake1"
+ rules = {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ }
+ ],
+ field_rules = [
+ {
+ field_name = scala_col1
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "SCALA_VALUE1"
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_scala_path_compile.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_scala_path_compile.conf
new file mode 100644
index 0000000000..c60df8af10
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_scala_path_compile.conf
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ plugin_output = "fake"
+ row.num = 100
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ }
+ }
+ }
+}
+
+transform {
+ DynamicCompile {
+ plugin_input = "fake"
+ plugin_output = "fake1"
+ compile_language="SCALA"
+ compile_pattern="ABSOLUTE_PATH"
+ absolute_path="""/tmp/ScalaFile"""
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = "fake1"
+ rules = {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ }
+ ],
+ field_rules = [
+ {
+ field_name = scala_col
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "SCALA_TEST"
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/ScalaFile
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/ScalaFile
new file mode 100644
index 0000000000..af6d6a676d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/ScalaFile
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.seatunnel.api.table.catalog.Column
+import org.apache.seatunnel.api.table.catalog.CatalogTable
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn
+import org.apache.seatunnel.api.table.`type`.SeaTunnelRowAccessor
+import org.apache.seatunnel.api.table.`type`.BasicType
+import java.util.ArrayList
+
+class ScalaDemo {
+ def getInlineOutputColumns(inputCatalogTable: CatalogTable): Array[Column] =
{
+ val columns = new ArrayList[Column]()
+ val destColumn = PhysicalColumn.of(
+ "scala_col",
+ BasicType.STRING_TYPE,
+ 10L,
+ true,
+ "",
+ ""
+ )
+ columns.add(destColumn)
+ columns.toArray(new Array[Column](0))
+ }
+
+ def getInlineOutputFieldValues(inputRow: SeaTunnelRowAccessor):
Array[Object] = {
+ val fieldValues = new Array[Object](1)
+ fieldValues(0) = "SCALA_TEST"
+ fieldValues
+ }
+}
diff --git a/seatunnel-shade/pom.xml b/seatunnel-shade/pom.xml
index 590f8d3b46..95e6e6d9e5 100644
--- a/seatunnel-shade/pom.xml
+++ b/seatunnel-shade/pom.xml
@@ -33,6 +33,7 @@
<module>seatunnel-thrift-service</module>
<module>seatunnel-hazelcast</module>
<module>seatunnel-janino</module>
+ <module>seatunnel-scala-compiler</module>
<module>seatunnel-jetty9-9.4.56</module>
<module>seatunnel-hadoop-aws</module>
<module>seatunnel-arrow</module>
diff --git a/seatunnel-shade/seatunnel-scala-compiler/pom.xml
b/seatunnel-shade/seatunnel-scala-compiler/pom.xml
new file mode 100644
index 0000000000..58b8d9bfd2
--- /dev/null
+++ b/seatunnel-shade/seatunnel-scala-compiler/pom.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-shade</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>seatunnel-scala-compiler</artifactId>
+ <name>SeaTunnel : Shade : Scala</name>
+
+ <properties>
+ <scala.version>2.13.11</scala.version>
+ <scala.binary.version>2.13</scala.binary.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <finalName>seatunnel-scala</finalName>
+
<createSourcesJar>${enableSourceJarCreation}</createSourcesJar>
+ <shadeSourcesContent>true</shadeSourcesContent>
+
<shadedArtifactAttached>false</shadedArtifactAttached>
+
<createDependencyReducedPom>false</createDependencyReducedPom>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <relocations>
+ <!-- Only shade compiler tools, completely
avoid scala.reflect -->
+ <relocation>
+ <pattern>scala.tools.nsc</pattern>
+
<shadedPattern>${seatunnel.shade.package}.scala.tools.nsc</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>scala.tools.util</pattern>
+
<shadedPattern>${seatunnel.shade.package}.scala.tools.util</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>attach-artifacts</id>
+ <goals>
+ <goal>attach-artifact</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <artifacts>
+ <artifact>
+
<file>${basedir}/target/seatunnel-scala.jar</file>
+ <type>jar</type>
+ <classifier>optional</classifier>
+ </artifact>
+ </artifacts>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/seatunnel-transforms-v2/pom.xml b/seatunnel-transforms-v2/pom.xml
index 9288a14531..9bc4c6aa06 100644
--- a/seatunnel-transforms-v2/pom.xml
+++ b/seatunnel-transforms-v2/pom.xml
@@ -84,11 +84,19 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-scala-compiler</artifactId>
+ <version>${project.version}</version>
+ <classifier>optional</classifier>
+ </dependency>
+
<dependency>
<groupId>org.apache.groovy</groupId>
<artifactId>groovy</artifactId>
<version>${groovy.version}</version>
</dependency>
+
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-janino</artifactId>
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileLanguage.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileLanguage.java
index be0e468e4d..ef96243598 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileLanguage.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileLanguage.java
@@ -19,5 +19,6 @@ package org.apache.seatunnel.transform.dynamiccompile;
public enum CompileLanguage {
GROOVY,
- JAVA
+ JAVA,
+ SCALA
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
index 7ff88d85d4..00cb874973 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
@@ -28,6 +28,7 @@ import
org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
import org.apache.seatunnel.transform.dynamiccompile.parse.AbstractParse;
import org.apache.seatunnel.transform.dynamiccompile.parse.GroovyClassParse;
import org.apache.seatunnel.transform.dynamiccompile.parse.JavaClassParse;
+import org.apache.seatunnel.transform.dynamiccompile.parse.ScalaClassParse;
import org.apache.seatunnel.transform.exception.TransformException;
import java.nio.file.Paths;
@@ -59,6 +60,10 @@ public class DynamicCompileTransform extends
MultipleFieldOutputTransform {
DynamicCompileParse = new GroovyClassParse();
} else if (CompileLanguage.JAVA.equals(compileLanguage)) {
DynamicCompileParse = new JavaClassParse();
+ } else if (CompileLanguage.SCALA.equals(compileLanguage)) {
+ DynamicCompileParse = new ScalaClassParse();
+ } else {
+ throw new IllegalArgumentException("Unsupported compile language:
" + compileLanguage);
}
compilePattern =
readonlyConfig.get(DynamicCompileTransformConfig.COMPILE_PATTERN);
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileLanguage.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/ScalaClassParse.java
similarity index 75%
copy from
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileLanguage.java
copy to
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/ScalaClassParse.java
index be0e468e4d..78bf36ae92 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileLanguage.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/ScalaClassParse.java
@@ -15,9 +15,12 @@
* limitations under the License.
*/
-package org.apache.seatunnel.transform.dynamiccompile;
+package org.apache.seatunnel.transform.dynamiccompile.parse;
-public enum CompileLanguage {
- GROOVY,
- JAVA
+public class ScalaClassParse extends AbstractParse {
+
+ @Override
+ public Class<?> parseClassSourceCode(String sourceCode) {
+ return ScalaClassParser.parseSourceCodeWithCache(sourceCode);
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/ScalaClassParser.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/ScalaClassParser.java
new file mode 100644
index 0000000000..e50289d903
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/ScalaClassParser.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.transform.dynamiccompile.parse;
+
+import org.apache.seatunnel.shade.scala.tools.nsc.Settings;
+import org.apache.seatunnel.shade.scala.tools.nsc.interpreter.IMain;
+import
org.apache.seatunnel.shade.scala.tools.nsc.interpreter.shell.ReplReporterImpl;
+
+import org.apache.seatunnel.transform.exception.TransformException;
+
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static
org.apache.seatunnel.transform.dynamiccompile.CompileTransformErrorCode.COMPILE_TRANSFORM_ERROR_CODE;
+
+public class ScalaClassParser extends AbstractParser {
+
+ private static final String SCALA_CLASS_NAME_PATTERN =
"(?:class|object)\\s+(\\w+)";
+ private static final Pattern CLASS_NAME_REGEX =
Pattern.compile(SCALA_CLASS_NAME_PATTERN);
+ private static IMain scalaInterpreter;
+
+ static {
+ try {
+ Settings settings = new Settings();
+ settings.usejavacp().v_$eq(true);
+ scalaInterpreter = new IMain(settings, new
ReplReporterImpl(settings));
+ } catch (Exception e) {
+ throw new TransformException(COMPILE_TRANSFORM_ERROR_CODE,
e.getMessage());
+ }
+ }
+
+ public static Class<?> parseSourceCodeWithCache(String sourceCode) {
+ return classCache.computeIfAbsent(
+ getClassKey(sourceCode),
+ new Function<String, Class<?>>() {
+ @Override
+ public Class<?> apply(String classKey) {
+ String className = extractClassName(sourceCode);
+ return compileWithREPL(sourceCode, className);
+ }
+ });
+ }
+
+ /** Extract class name from Scala source code */
+ private static String extractClassName(String sourceCode) {
+ Matcher matcher = CLASS_NAME_REGEX.matcher(sourceCode);
+ if (matcher.find()) {
+ return matcher.group(1);
+ }
+ throw new IllegalArgumentException("Cannot extract class name from
Scala source code");
+ }
+
+ private static Class<?> compileWithREPL(String sourceCode, String
className) {
+ try {
+ boolean compileResult = scalaInterpreter.compileString(sourceCode);
+ if (!compileResult) {
+ throw new RuntimeException("Scala REPL compilation failed");
+ }
+ ClassLoader replClassLoader = scalaInterpreter.classLoader();
+ return replClassLoader.loadClass(className);
+ } catch (Exception e) {
+ throw new TransformException(COMPILE_TRANSFORM_ERROR_CODE,
e.getMessage());
+ }
+ }
+}
diff --git a/tools/dependencies/known-dependencies.txt
b/tools/dependencies/known-dependencies.txt
index ab7e75610b..353008ae0a 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -26,6 +26,9 @@ protostuff-collectionschema-1.8.0.jar
protostuff-core-1.8.0.jar
protostuff-runtime-1.8.0.jar
scala-library-2.12.15.jar
+scala-compiler-2.13.11.jar
+scala-reflect-2.13.11.jar
+seatunnel-scala-compiler-2.3.12-SNAPSHOT-optional.jar
seatunnel-jackson-2.3.12-SNAPSHOT-optional.jar
seatunnel-guava-2.3.12-SNAPSHOT-optional.jar
seatunnel-hazelcast-shade-2.3.12-SNAPSHOT-optional.jar
@@ -70,6 +73,7 @@ jetty-util-9.4.56.v20240826.jar
jetty-util-ajax-9.4.56.v20240826.jar
javax.servlet-api-3.1.0.jar
seatunnel-jetty9-9.4.56-2.3.12-SNAPSHOT-optional.jar
+jna-5.13.0.jar
jna-5.15.0.jar
jna-platform-5.15.0.jar
oshi-core-6.6.5.jar
@@ -121,3 +125,5 @@ netty-common-4.1.112.Final.jar
netty-handler-4.1.112.Final.jar
netty-resolver-4.1.112.Final.jar
eventstream-1.0.1.jar
+java-diff-utils-4.12.jar
+jline-3.22.0.jar
\ No newline at end of file