This is an automated email from the ASF dual-hosted git repository.
dailai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 41b5a88d61 [Hotfix][Transforms-v2] DynamicCompile Plugin compatibility
fix (#8057)
41b5a88d61 is described below
commit 41b5a88d611b58486d1c996127d6ed0b2b78c337
Author: zhangdonghao <[email protected]>
AuthorDate: Fri Nov 15 15:32:58 2024 +0800
[Hotfix][Transforms-v2] DynamicCompile Plugin compatibility fix (#8057)
---
.../e2e/transform/TestDynamicCompileIT.java | 9 ++
..._dynamic_java_compile_transform_compatible.conf | 115 +++++++++++++++++++++
.../dynamiccompile/DynamicCompileTransform.java | 21 +++-
3 files changed, 143 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java
index 2528499fc1..2ad2f32e4b 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java
@@ -116,6 +116,15 @@ public class TestDynamicCompileIT extends TestSuiteBase
implements TestResource
Assertions.assertEquals(0, execResult.getExitCode());
}
+ @TestTemplate
+ public void testDynamicSingleCompileJavaOldVersionCompatible(TestContainer
container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob(
+ basePath +
"single_dynamic_java_compile_transform_compatible.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
@TestTemplate
public void testDynamicMultipleCompileGroovy(TestContainer container)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform_compatible.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform_compatible.conf
new file mode 100644
index 0000000000..f26dd21060
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform_compatible.conf
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ row.num = 100
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ }
+ }
+ }
+}
+
+transform {
+ DynamicCompile {
+ source_table_name = "fake"
+ result_table_name = "fake1"
+ compile_language = "JAVA"
+ compile_pattern = "SOURCE_CODE"
+ source_code = """
+ import org.apache.seatunnel.api.table.catalog.Column;
+ import
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+ import org.apache.seatunnel.api.table.catalog.*;
+ import org.apache.seatunnel.api.table.type.*;
+ import java.util.ArrayList;
+
+
+ public Column[] getInlineOutputColumns(CatalogTable
inputCatalogTable) {
+
+ ArrayList<Column> columns = new ArrayList<Column>();
+ PhysicalColumn destColumn =
+ PhysicalColumn.of(
+ "col1",
+ BasicType.STRING_TYPE,
+ 10,
+ true,
+ "",
+ "");
+ return new Column[]{
+ destColumn
+ };
+
+ }
+ public Object[]
getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+
+ Object[] fieldValues = new Object[1];
+ fieldValues[0]="test1";
+ return fieldValues;
+ }
+ """
+
+ }
+}
+
+sink {
+ Assert {
+ source_table_name = "fake1"
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ }
+ ],
+ field_rules = [
+ {
+ field_name = id
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = col1
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "test1"
+
+ }
+
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
index bfae2b8d2a..7ff88d85d4 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.transform.dynamiccompile;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.common.utils.ReflectionUtils;
@@ -30,6 +31,7 @@ import
org.apache.seatunnel.transform.dynamiccompile.parse.JavaClassParse;
import org.apache.seatunnel.transform.exception.TransformException;
import java.nio.file.Paths;
+import java.util.Optional;
import static
org.apache.seatunnel.transform.dynamiccompile.CompileTransformErrorCode.COMPILE_TRANSFORM_ERROR_CODE;
@@ -42,6 +44,8 @@ public class DynamicCompileTransform extends
MultipleFieldOutputTransform {
private final String sourceCode;
+ private final boolean compatibilityMode;
+
private final CompilePattern compilePattern;
private AbstractParse DynamicCompileParse;
@@ -68,6 +72,9 @@ public class DynamicCompileTransform extends
MultipleFieldOutputTransform {
readonlyConfig.get(
DynamicCompileTransformConfig.ABSOLUTE_PATH)));
}
+ compatibilityMode =
+ sourceCode.contains(
+
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor.class.getName());
}
@Override
@@ -98,14 +105,24 @@ public class DynamicCompileTransform extends
MultipleFieldOutputTransform {
try {
result =
ReflectionUtils.invoke(
- getCompileLanguageInstance(),
getInlineOutputFieldValues, inputRow);
-
+ getCompileLanguageInstance(),
+ getInlineOutputFieldValues,
+ getCompatibilityAccessor(inputRow));
} catch (Exception e) {
throw new TransformException(COMPILE_TRANSFORM_ERROR_CODE,
e.getMessage());
}
return (Object[]) result;
}
+ private Object getCompatibilityAccessor(SeaTunnelRowAccessor inputRow) {
+ if (compatibilityMode) {
+ Optional<Object> field = ReflectionUtils.getField(inputRow, "row");
+ SeaTunnelRow row = (SeaTunnelRow) field.get();
+ return new
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor(row);
+ }
+ return inputRow;
+ }
+
private Object getCompileLanguageInstance()
throws InstantiationException, IllegalAccessException {
Class<?> compileClass =
DynamicCompileParse.parseClassSourceCode(sourceCode);