This is an automated email from the ASF dual-hosted git repository.

zhangshenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0f4d43ef47 [Fix][Transform-V2] Avoid eager zeta udf open and fix null 
table context (#10760)
0f4d43ef47 is described below

commit 0f4d43ef470d3f254aff72211b2476403af8f52a
Author: David Zollo <[email protected]>
AuthorDate: Thu Jun 11 21:53:52 2026 +0800

    [Fix][Transform-V2] Avoid eager zeta udf open and fix null table context 
(#10760)
---
 .../transform/sql/zeta/ZetaSQLEngine.java          |  27 ++++-
 .../transform/sql/zeta/ZetaUDFContext.java         |   5 +-
 .../transform/sql/zeta/ZetaSQLEngineTest.java      | 116 +++++++++++++++++++++
 .../transform/sql/zeta/ZetaUDFContextTest.java     |  43 ++++++++
 4 files changed, 187 insertions(+), 4 deletions(-)

diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
index 86a12e6126..c998582665 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
@@ -73,6 +73,7 @@ public class ZetaSQLEngine implements SQLEngine {
     private ZetaUDFContext udfContext;
 
     private Integer allColumnsCount = null;
+    private boolean udfOpened;
 
     public ZetaSQLEngine() {}
 
@@ -95,7 +96,6 @@ public class ZetaSQLEngine implements SQLEngine {
         this.zetaSQLFilter = new ZetaSQLFilter(zetaSQLFunction, zetaSQLType);
 
         parseSQL();
-        openUDFs();
     }
 
     protected List<ZetaUDF> loadUDFs() {
@@ -111,6 +111,14 @@ public class ZetaSQLEngine implements SQLEngine {
             try {
                 udf.open();
             } catch (Exception e) {
+                try {
+                    udf.close();
+                } catch (Exception closeException) {
+                    log.warn(
+                            "Best-effort close failed for udf {}",
+                            udf.functionName(),
+                            closeException);
+                }
                 closeUDFs(i - 1);
                 log.error("Open udf {} failed", udf.functionName(), e);
                 throw new TransformException(
@@ -121,6 +129,19 @@ public class ZetaSQLEngine implements SQLEngine {
         }
     }
 
+    private void ensureUdfOpened() {
+        if (udfOpened || CollectionUtils.isEmpty(udfList)) {
+            return;
+        }
+        synchronized (this) {
+            if (udfOpened) {
+                return;
+            }
+            openUDFs();
+            udfOpened = true;
+        }
+    }
+
     private void parseSQL() {
         try {
             Statement statement = CCJSqlParserUtil.parse(sql);
@@ -262,6 +283,7 @@ public class ZetaSQLEngine implements SQLEngine {
 
     @Override
     public List<SeaTunnelRow> transformBySQL(SeaTunnelRow inputRow, 
SeaTunnelRowType outRowType) {
+        ensureUdfOpened();
         // ------Physical Query Plan Execution------
         // Scan Table
         Object[] inputFields = scanTable(inputRow);
@@ -343,10 +365,11 @@ public class ZetaSQLEngine implements SQLEngine {
 
     @Override
     public void close() {
-        if (udfList == null || udfList.isEmpty()) {
+        if (CollectionUtils.isEmpty(udfList) || !udfOpened) {
             return;
         }
         closeUDFs(udfList.size() - 1);
+        udfOpened = false;
     }
 
     private void closeUDFs(int lastIndex) {
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDFContext.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDFContext.java
index b42dc9473c..89fde0967b 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDFContext.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDFContext.java
@@ -51,11 +51,12 @@ public class ZetaUDFContext {
     }
 
     private void updateTableId(String tableId) {
-        if (Objects.equals(this.rawTableId, tableId)) {
+        boolean isNullTableId = tableId == null;
+        if (Objects.equals(this.rawTableId, tableId) && this.tableIdIsNull == 
isNullTableId) {
             return;
         }
         this.rawTableId = tableId;
-        this.tableIdIsNull = tableId == null;
+        this.tableIdIsNull = isNullTableId;
         this.database = null;
         this.schema = null;
         this.table = null;
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngineTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngineTest.java
index 4513fdba4b..e955638b26 100644
--- 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngineTest.java
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngineTest.java
@@ -27,6 +27,8 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 public class ZetaSQLEngineTest {
@@ -95,4 +97,118 @@ public class ZetaSQLEngineTest {
                                 rowType,
                                 "insert into test(id, name, age) values (1, 
'bad', 10)"));
     }
+
+    @Test
+    public void testSchemaInferenceShouldNotOpenUdf() {
+        TrackingUdf trackingUdf = new TrackingUdf("tracking", false);
+        ZetaSQLEngine engine = new 
TestableZetaSQLEngine(Collections.singletonList(trackingUdf));
+        engine.init("test", "test", simpleRowType(), "select id, name from 
test");
+
+        SeaTunnelRowType outType = engine.typeMapping(new ArrayList<>());
+
+        Assertions.assertNotNull(outType);
+        Assertions.assertEquals(0, trackingUdf.getOpenCount());
+        Assertions.assertEquals(0, trackingUdf.getCloseCount());
+    }
+
+    @Test
+    public void testOpenUdfWhenExecuteAndCloseOnEngineClose() {
+        TrackingUdf trackingUdf = new TrackingUdf("tracking", false);
+        ZetaSQLEngine engine = new 
TestableZetaSQLEngine(Collections.singletonList(trackingUdf));
+        engine.init("test", "test", simpleRowType(), "select id from test");
+        SeaTunnelRowType outType = engine.typeMapping(new ArrayList<>());
+
+        SeaTunnelRow inputRow = new SeaTunnelRow(new Object[] {1, "Alice", 
20});
+        engine.transformBySQL(inputRow, outType);
+        engine.transformBySQL(inputRow, outType);
+
+        Assertions.assertEquals(1, trackingUdf.getOpenCount());
+        Assertions.assertEquals(0, trackingUdf.getCloseCount());
+
+        engine.close();
+
+        Assertions.assertEquals(1, trackingUdf.getCloseCount());
+    }
+
+    @Test
+    public void testOpenFailureShouldCloseFailedAndOpenedUdfs() {
+        TrackingUdf firstUdf = new TrackingUdf("first", false);
+        TrackingUdf failedUdf = new TrackingUdf("failed", true);
+        ZetaSQLEngine engine = new 
TestableZetaSQLEngine(Arrays.asList(firstUdf, failedUdf));
+        engine.init("test", "test", simpleRowType(), "select id from test");
+        SeaTunnelRowType outType = engine.typeMapping(new ArrayList<>());
+
+        Assertions.assertThrows(
+                TransformException.class,
+                () ->
+                        engine.transformBySQL(
+                                new SeaTunnelRow(new Object[] {1, "Alice", 
20}), outType));
+
+        Assertions.assertEquals(1, firstUdf.getOpenCount());
+        Assertions.assertEquals(1, firstUdf.getCloseCount());
+        Assertions.assertEquals(1, failedUdf.getOpenCount());
+        Assertions.assertEquals(1, failedUdf.getCloseCount());
+    }
+
+    private static final class TestableZetaSQLEngine extends ZetaSQLEngine {
+
+        private final List<ZetaUDF> testUdfs;
+
+        private TestableZetaSQLEngine(List<ZetaUDF> testUdfs) {
+            this.testUdfs = testUdfs;
+        }
+
+        @Override
+        protected List<ZetaUDF> loadUDFs() {
+            return new ArrayList<>(testUdfs);
+        }
+    }
+
+    private static final class TrackingUdf implements ZetaUDF {
+        private final String functionName;
+        private final boolean failOnOpen;
+        private int openCount;
+        private int closeCount;
+
+        private TrackingUdf(String functionName, boolean failOnOpen) {
+            this.functionName = functionName;
+            this.failOnOpen = failOnOpen;
+        }
+
+        @Override
+        public String functionName() {
+            return functionName;
+        }
+
+        @Override
+        public SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>> 
argsType) {
+            return BasicType.STRING_TYPE;
+        }
+
+        @Override
+        public Object evaluate(List<Object> args) {
+            return null;
+        }
+
+        @Override
+        public void open() throws Exception {
+            openCount++;
+            if (failOnOpen) {
+                throw new Exception("open failed");
+            }
+        }
+
+        @Override
+        public void close() {
+            closeCount++;
+        }
+
+        private int getOpenCount() {
+            return openCount;
+        }
+
+        private int getCloseCount() {
+            return closeCount;
+        }
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDFContextTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDFContextTest.java
new file mode 100644
index 0000000000..9df2a53d31
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDFContextTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sql.zeta;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class ZetaUDFContextTest {
+
+    @Test
+    public void testNullTableIdShouldNotTriggerTablePathResolve() {
+        ZetaUDFContext context = new ZetaUDFContext();
+        SeaTunnelRow row = new SeaTunnelRow(new Object[] {1});
+        row.setTableId(null);
+
+        context.update(row);
+
+        Assertions.assertNull(context.getRawTableId());
+        Assertions.assertDoesNotThrow(context::getDatabase);
+        Assertions.assertDoesNotThrow(context::getSchema);
+        Assertions.assertDoesNotThrow(context::getTable);
+        Assertions.assertNull(context.getDatabase());
+        Assertions.assertNull(context.getSchema());
+        Assertions.assertNull(context.getTable());
+    }
+}

Reply via email to