This is an automated email from the ASF dual-hosted git repository.
zhangshenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0f4d43ef47 [Fix][Transform-V2] Avoid eager zeta udf open and fix null
table context (#10760)
0f4d43ef47 is described below
commit 0f4d43ef470d3f254aff72211b2476403af8f52a
Author: David Zollo <[email protected]>
AuthorDate: Thu Jun 11 21:53:52 2026 +0800
[Fix][Transform-V2] Avoid eager zeta udf open and fix null table context
(#10760)
---
.../transform/sql/zeta/ZetaSQLEngine.java | 27 ++++-
.../transform/sql/zeta/ZetaUDFContext.java | 5 +-
.../transform/sql/zeta/ZetaSQLEngineTest.java | 116 +++++++++++++++++++++
.../transform/sql/zeta/ZetaUDFContextTest.java | 43 ++++++++
4 files changed, 187 insertions(+), 4 deletions(-)
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
index 86a12e6126..c998582665 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
@@ -73,6 +73,7 @@ public class ZetaSQLEngine implements SQLEngine {
private ZetaUDFContext udfContext;
private Integer allColumnsCount = null;
+ private boolean udfOpened;
public ZetaSQLEngine() {}
@@ -95,7 +96,6 @@ public class ZetaSQLEngine implements SQLEngine {
this.zetaSQLFilter = new ZetaSQLFilter(zetaSQLFunction, zetaSQLType);
parseSQL();
- openUDFs();
}
protected List<ZetaUDF> loadUDFs() {
@@ -111,6 +111,14 @@ public class ZetaSQLEngine implements SQLEngine {
try {
udf.open();
} catch (Exception e) {
+ try {
+ udf.close();
+ } catch (Exception closeException) {
+ log.warn(
+ "Best-effort close failed for udf {}",
+ udf.functionName(),
+ closeException);
+ }
closeUDFs(i - 1);
log.error("Open udf {} failed", udf.functionName(), e);
throw new TransformException(
@@ -121,6 +129,19 @@ public class ZetaSQLEngine implements SQLEngine {
}
}
+ private void ensureUdfOpened() {
+ if (udfOpened || CollectionUtils.isEmpty(udfList)) {
+ return;
+ }
+ synchronized (this) {
+ if (udfOpened) {
+ return;
+ }
+ openUDFs();
+ udfOpened = true;
+ }
+ }
+
private void parseSQL() {
try {
Statement statement = CCJSqlParserUtil.parse(sql);
@@ -262,6 +283,7 @@ public class ZetaSQLEngine implements SQLEngine {
@Override
public List<SeaTunnelRow> transformBySQL(SeaTunnelRow inputRow,
SeaTunnelRowType outRowType) {
+ ensureUdfOpened();
// ------Physical Query Plan Execution------
// Scan Table
Object[] inputFields = scanTable(inputRow);
@@ -343,10 +365,11 @@ public class ZetaSQLEngine implements SQLEngine {
@Override
public void close() {
- if (udfList == null || udfList.isEmpty()) {
+ if (CollectionUtils.isEmpty(udfList) || !udfOpened) {
return;
}
closeUDFs(udfList.size() - 1);
+ udfOpened = false;
}
private void closeUDFs(int lastIndex) {
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDFContext.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDFContext.java
index b42dc9473c..89fde0967b 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDFContext.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDFContext.java
@@ -51,11 +51,12 @@ public class ZetaUDFContext {
}
private void updateTableId(String tableId) {
- if (Objects.equals(this.rawTableId, tableId)) {
+ boolean isNullTableId = tableId == null;
+ if (Objects.equals(this.rawTableId, tableId) && this.tableIdIsNull ==
isNullTableId) {
return;
}
this.rawTableId = tableId;
- this.tableIdIsNull = tableId == null;
+ this.tableIdIsNull = isNullTableId;
this.database = null;
this.schema = null;
this.table = null;
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngineTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngineTest.java
index 4513fdba4b..e955638b26 100644
---
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngineTest.java
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngineTest.java
@@ -27,6 +27,8 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
public class ZetaSQLEngineTest {
@@ -95,4 +97,118 @@ public class ZetaSQLEngineTest {
rowType,
"insert into test(id, name, age) values (1,
'bad', 10)"));
}
+
+ @Test
+ public void testSchemaInferenceShouldNotOpenUdf() {
+ TrackingUdf trackingUdf = new TrackingUdf("tracking", false);
+ ZetaSQLEngine engine = new
TestableZetaSQLEngine(Collections.singletonList(trackingUdf));
+ engine.init("test", "test", simpleRowType(), "select id, name from
test");
+
+ SeaTunnelRowType outType = engine.typeMapping(new ArrayList<>());
+
+ Assertions.assertNotNull(outType);
+ Assertions.assertEquals(0, trackingUdf.getOpenCount());
+ Assertions.assertEquals(0, trackingUdf.getCloseCount());
+ }
+
+ @Test
+ public void testOpenUdfWhenExecuteAndCloseOnEngineClose() {
+ TrackingUdf trackingUdf = new TrackingUdf("tracking", false);
+ ZetaSQLEngine engine = new
TestableZetaSQLEngine(Collections.singletonList(trackingUdf));
+ engine.init("test", "test", simpleRowType(), "select id from test");
+ SeaTunnelRowType outType = engine.typeMapping(new ArrayList<>());
+
+ SeaTunnelRow inputRow = new SeaTunnelRow(new Object[] {1, "Alice",
20});
+ engine.transformBySQL(inputRow, outType);
+ engine.transformBySQL(inputRow, outType);
+
+ Assertions.assertEquals(1, trackingUdf.getOpenCount());
+ Assertions.assertEquals(0, trackingUdf.getCloseCount());
+
+ engine.close();
+
+ Assertions.assertEquals(1, trackingUdf.getCloseCount());
+ }
+
+ @Test
+ public void testOpenFailureShouldCloseFailedAndOpenedUdfs() {
+ TrackingUdf firstUdf = new TrackingUdf("first", false);
+ TrackingUdf failedUdf = new TrackingUdf("failed", true);
+ ZetaSQLEngine engine = new
TestableZetaSQLEngine(Arrays.asList(firstUdf, failedUdf));
+ engine.init("test", "test", simpleRowType(), "select id from test");
+ SeaTunnelRowType outType = engine.typeMapping(new ArrayList<>());
+
+ Assertions.assertThrows(
+ TransformException.class,
+ () ->
+ engine.transformBySQL(
+ new SeaTunnelRow(new Object[] {1, "Alice",
20}), outType));
+
+ Assertions.assertEquals(1, firstUdf.getOpenCount());
+ Assertions.assertEquals(1, firstUdf.getCloseCount());
+ Assertions.assertEquals(1, failedUdf.getOpenCount());
+ Assertions.assertEquals(1, failedUdf.getCloseCount());
+ }
+
+ private static final class TestableZetaSQLEngine extends ZetaSQLEngine {
+
+ private final List<ZetaUDF> testUdfs;
+
+ private TestableZetaSQLEngine(List<ZetaUDF> testUdfs) {
+ this.testUdfs = testUdfs;
+ }
+
+ @Override
+ protected List<ZetaUDF> loadUDFs() {
+ return new ArrayList<>(testUdfs);
+ }
+ }
+
+ private static final class TrackingUdf implements ZetaUDF {
+ private final String functionName;
+ private final boolean failOnOpen;
+ private int openCount;
+ private int closeCount;
+
+ private TrackingUdf(String functionName, boolean failOnOpen) {
+ this.functionName = functionName;
+ this.failOnOpen = failOnOpen;
+ }
+
+ @Override
+ public String functionName() {
+ return functionName;
+ }
+
+ @Override
+ public SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>>
argsType) {
+ return BasicType.STRING_TYPE;
+ }
+
+ @Override
+ public Object evaluate(List<Object> args) {
+ return null;
+ }
+
+ @Override
+ public void open() throws Exception {
+ openCount++;
+ if (failOnOpen) {
+ throw new Exception("open failed");
+ }
+ }
+
+ @Override
+ public void close() {
+ closeCount++;
+ }
+
+ private int getOpenCount() {
+ return openCount;
+ }
+
+ private int getCloseCount() {
+ return closeCount;
+ }
+ }
}
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDFContextTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDFContextTest.java
new file mode 100644
index 0000000000..9df2a53d31
--- /dev/null
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDFContextTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sql.zeta;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class ZetaUDFContextTest {
+
+ @Test
+ public void testNullTableIdShouldNotTriggerTablePathResolve() {
+ ZetaUDFContext context = new ZetaUDFContext();
+ SeaTunnelRow row = new SeaTunnelRow(new Object[] {1});
+ row.setTableId(null);
+
+ context.update(row);
+
+ Assertions.assertNull(context.getRawTableId());
+ Assertions.assertDoesNotThrow(context::getDatabase);
+ Assertions.assertDoesNotThrow(context::getSchema);
+ Assertions.assertDoesNotThrow(context::getTable);
+ Assertions.assertNull(context.getDatabase());
+ Assertions.assertNull(context.getSchema());
+ Assertions.assertNull(context.getTable());
+ }
+}