This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3a2f719d2e [Feature][Transform] Add Zeta SQL UDF context and lifecycle 
hooks with e2e/docs updates (#10489)
3a2f719d2e is described below

commit 3a2f719d2e71c712ffc43baa65cc8ed2f7fa175b
Author: cosmosni <[email protected]>
AuthorDate: Tue Mar 17 21:48:23 2026 +0800

    [Feature][Transform] Add Zeta SQL UDF context and lifecycle hooks with 
e2e/docs updates (#10489)
---
 docs/en/transforms/sql-udf.md                      |  85 ++++++++++++++-
 docs/zh/transforms/sql-udf.md                      |  85 ++++++++++++++-
 .../seatunnel/e2e/transform/udf/ExampleUdfIT.java  |   8 ++
 .../resources/custom_udf_context_lifecycle.conf    |  68 ++++++++++++
 .../seatunnel/e2e/transform/udf/EncryptUDF.java    |  83 +++++++++++++++
 .../transform/sql/zeta/ZetaSQLEngine.java          |  53 +++++++++-
 .../transform/sql/zeta/ZetaSQLFunction.java        |  21 ++++
 .../seatunnel/transform/sql/zeta/ZetaUDF.java      |  29 +++++-
 .../transform/sql/zeta/ZetaUDFContext.java         | 116 +++++++++++++++++++++
 9 files changed, 541 insertions(+), 7 deletions(-)

diff --git a/docs/en/transforms/sql-udf.md b/docs/en/transforms/sql-udf.md
index 7b3d02d105..bc331b7051 100644
--- a/docs/en/transforms/sql-udf.md
+++ b/docs/en/transforms/sql-udf.md
@@ -34,9 +34,48 @@ public interface ZetaUDF {
      * @return result value
      */
     Object evaluate(List<Object> args);
+
+    /**
+     * Whether current udf requires row level context.
+     */
+    default boolean requiresContext() {
+        return false;
+    }
+
+    /**
+     * Evaluate with row level context.
+     */
+    default Object evaluateWithContext(List<Object> args, ZetaUDFContext 
context) {
+        return evaluate(args);
+    }
+
+    /**
+     * Initialize udf resources.
+     */
+    default void open() throws Exception {}
+
+    /**
+     * Release udf resources.
+     */
+    default void close() {}
 }
 ```
 
+`ZetaUDFContext` provides runtime row-level metadata and fields:
+
+- `getRawTableId()`
+- `getDatabase()`
+- `getSchema()`
+- `getTable()`
+- `getRowKind()`
+- `getAllFields()`
+
+Notes:
+
+- `database/schema/table` parsing follows `TablePath.of(tableId)` semantics.
+- If `tableId` is in an unsupported format, accessing `database/schema/table` 
throws `IllegalArgumentException`.
+- Existing UDFs remain backward compatible and continue using 
`evaluate(List<Object> args)`.
+
 ## UDF Implements Example
 
 Add these dependencies and provided scope to your maven project. **Dependency 
versions should match the runtime environment.**
@@ -94,6 +133,50 @@ public class ExampleUDF implements ZetaUDF {
 Package the UDF project and copy the jar to the path: ${SEATUNNEL_HOME}/lib. 
And if your UDF use third party library, you also need put it to 
${SEATUNNEL_HOME}/lib.  
 If you use cluster mode, you need put the lib to all your node's 
${SEATUNNEL_HOME}/lib folder and re-start the cluster.
 
+## Context-aware & lifecycle UDF example
+
+```java
+@AutoService(ZetaUDF.class)
+public class ContextLifecycleUdf implements ZetaUDF {
+
+    private transient String prefix;
+
+    @Override
+    public String functionName() {
+        return "CTX_LIFE";
+    }
+
+    @Override
+    public SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>> 
argsType) {
+        return BasicType.STRING_TYPE;
+    }
+
+    @Override
+    public boolean requiresContext() {
+        return true;
+    }
+
+    @Override
+    public void open() {
+        this.prefix = "OPENED";
+    }
+
+    @Override
+    public Object evaluateWithContext(List<Object> args, ZetaUDFContext 
context) {
+        String arg = args.get(0) == null ? null : String.valueOf(args.get(0));
+        if (arg == null) {
+            return null;
+        }
+        return prefix + ":" + context.getRowKind().shortString() + ":" + arg;
+    }
+
+    @Override
+    public void close() {
+        this.prefix = null;
+    }
+}
+```
+
 ## Example
 
 The data read from source is a table like this:
@@ -130,4 +213,4 @@ Then the data in result table `fake1` will update to
 
 ### new version
 
-- Add UDF of SQL Transform Connector
+- Add UDF of SQL Transform Connector
\ No newline at end of file
diff --git a/docs/zh/transforms/sql-udf.md b/docs/zh/transforms/sql-udf.md
index fdaaaa393b..ee6c56a159 100644
--- a/docs/zh/transforms/sql-udf.md
+++ b/docs/zh/transforms/sql-udf.md
@@ -34,9 +34,48 @@ public interface ZetaUDF {
      * @return result value
      */
     Object evaluate(List<Object> args);
+
+    /**
+     * 是否需要行级上下文。
+     */
+    default boolean requiresContext() {
+        return false;
+    }
+
+    /**
+     * 带上下文执行。
+     */
+    default Object evaluateWithContext(List<Object> args, ZetaUDFContext 
context) {
+        return evaluate(args);
+    }
+
+    /**
+     * 初始化 UDF 资源。
+     */
+    default void open() throws Exception {}
+
+    /**
+     * 释放 UDF 资源。
+     */
+    default void close() {}
 }
 ```
 
+`ZetaUDFContext` 提供运行时行级元数据与字段:
+
+- `getRawTableId()`
+- `getDatabase()`
+- `getSchema()`
+- `getTable()`
+- `getRowKind()`
+- `getAllFields()`
+
+说明:
+
+- `database/schema/table` 的解析语义与 `TablePath.of(tableId)` 保持一致。
+- 如果 `tableId` 格式不被支持,访问 `database/schema/table` 时会抛出 
`IllegalArgumentException`。
+- 已有 UDF 保持向后兼容,仍可只实现 `evaluate(List<Object> args)`。
+
 ## UDF 实现示例
 
 将这些依赖项添加到您的 Maven 项目,并使用 provided 作用域。**依赖版本应与运行环境一致。**
@@ -93,6 +132,50 @@ public class ExampleUDF implements ZetaUDF {
 
 打包UDF项目并将jar文件复制到路径:${SEATUNNEL_HOME}/lib
 
+## 支持上下文与生命周期的 UDF 示例
+
+```java
+@AutoService(ZetaUDF.class)
+public class ContextLifecycleUdf implements ZetaUDF {
+
+    private transient String prefix;
+
+    @Override
+    public String functionName() {
+        return "CTX_LIFE";
+    }
+
+    @Override
+    public SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>> 
argsType) {
+        return BasicType.STRING_TYPE;
+    }
+
+    @Override
+    public boolean requiresContext() {
+        return true;
+    }
+
+    @Override
+    public void open() {
+        this.prefix = "OPENED";
+    }
+
+    @Override
+    public Object evaluateWithContext(List<Object> args, ZetaUDFContext 
context) {
+        String arg = args.get(0) == null ? null : String.valueOf(args.get(0));
+        if (arg == null) {
+            return null;
+        }
+        return prefix + ":" + context.getRowKind().shortString() + ":" + arg;
+    }
+
+    @Override
+    public void close() {
+        this.prefix = null;
+    }
+}
+```
+
 ## 示例
 
 源端数据读取的表格如下:
@@ -129,4 +212,4 @@ transform {
 
 ### 新版本
 
-- 添加SQL转换连接器的UDF
+- 添加SQL转换连接器的UDF
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-udf/src/test/java/org/apache/seatunnel/e2e/transform/udf/ExampleUdfIT.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-udf/src/test/java/org/apache/seatunnel/e2e/transform/udf/ExampleUdfIT.java
index 5cf7f32135..b0d016969c 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-udf/src/test/java/org/apache/seatunnel/e2e/transform/udf/ExampleUdfIT.java
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-udf/src/test/java/org/apache/seatunnel/e2e/transform/udf/ExampleUdfIT.java
@@ -42,4 +42,12 @@ public class ExampleUdfIT extends TestSuiteBase {
         Container.ExecResult execResult = 
container.executeJob("/custom_udf.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
     }
+
+    @TestTemplate
+    public void testCustomUdfContextLifecycle(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob("/custom_udf_context_lifecycle.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-udf/src/test/resources/custom_udf_context_lifecycle.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-udf/src/test/resources/custom_udf_context_lifecycle.conf
new file mode 100644
index 0000000000..ec5c8eb707
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-udf/src/test/resources/custom_udf_context_lifecycle.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  checkpoint.interval = 10000
+}
+
+source {
+  FakeSource {
+    plugin_output = "fake"
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+      }
+    }
+    rows = [
+      {fields = [1, "Hello World"], kind = INSERT}
+    ]
+  }
+}
+
+transform {
+  sql {
+    plugin_input = "fake"
+    plugin_output = "fake1"
+    query = "select id, ENCRYPT(name) as name from dual"
+  }
+}
+
+sink {
+  Assert {
+    plugin_input = "fake1"
+    rules = {
+      field_rules = [
+        {
+          field_name = "id"
+          field_type = "int"
+          field_value = [
+            {equals_to = 1}
+          ]
+        },
+        {
+          field_name = "name"
+          field_type = "string"
+          field_value = [
+            {equals_to = "ENC(3135317):Hello World"}
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-udf/src/main/java/org/apache/seatunnel/e2e/transform/udf/EncryptUDF.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-udf/src/main/java/org/apache/seatunnel/e2e/transform/udf/EncryptUDF.java
new file mode 100644
index 0000000000..a764652785
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-udf/src/main/java/org/apache/seatunnel/e2e/transform/udf/EncryptUDF.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.transform.udf;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.transform.sql.zeta.ZetaUDF;
+import org.apache.seatunnel.transform.sql.zeta.ZetaUDFContext;
+
+import com.google.auto.service.AutoService;
+
+import java.util.List;
+
+@AutoService(ZetaUDF.class)
+public class EncryptUDF implements ZetaUDF {
+
+    private transient CryptoClient client;
+
+    @Override
+    public String functionName() {
+        return "ENCRYPT";
+    }
+
+    @Override
+    public SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>> 
argsType) {
+        return BasicType.STRING_TYPE;
+    }
+
+    @Override
+    public void open() {
+        this.client = new CryptoClient();
+    }
+
+    @Override
+    public boolean requiresContext() {
+        return true;
+    }
+
+    @Override
+    public Object evaluate(List<Object> args) {
+        throw new UnsupportedOperationException("ENCRYPT should be called with 
context");
+    }
+
+    @Override
+    public Object evaluateWithContext(List<Object> args, ZetaUDFContext 
context) {
+        if (client == null) {
+            throw new IllegalStateException("open() was not called before 
evaluateWithContext()");
+        }
+        Object value = args.get(0);
+        if (value == null) {
+            return null;
+        }
+        String tableId = context.getRawTableId();
+        return client.encrypt(value, tableId);
+    }
+
+    @Override
+    public void close() {
+        this.client = null;
+    }
+
+    private static class CryptoClient {
+        private String encrypt(Object value, String tableId) {
+            int keySeed = tableId == null ? 0 : tableId.hashCode();
+            return "ENC(" + keySeed + "):" + value;
+        }
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
index fff51a8998..86a12e6126 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
@@ -49,6 +49,7 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.ServiceLoader;
 import java.util.stream.Collectors;
@@ -68,6 +69,8 @@ public class ZetaSQLEngine implements SQLEngine {
     private ZetaSQLFunction zetaSQLFunction;
     private ZetaSQLFilter zetaSQLFilter;
     private ZetaSQLType zetaSQLType;
+    private List<ZetaUDF> udfList = Collections.emptyList();
+    private ZetaUDFContext udfContext;
 
     private Integer allColumnsCount = null;
 
@@ -84,15 +87,38 @@ public class ZetaSQLEngine implements SQLEngine {
         this.inputRowType = inputRowType;
         this.sql = sql;
 
-        List<ZetaUDF> udfList = new ArrayList<>();
-        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
-        ServiceLoader.load(ZetaUDF.class, classLoader).forEach(udfList::add);
+        udfList = loadUDFs();
+        udfContext = new ZetaUDFContext();
 
         this.zetaSQLType = new ZetaSQLType(inputRowType, udfList);
-        this.zetaSQLFunction = new ZetaSQLFunction(inputRowType, zetaSQLType, 
udfList);
+        this.zetaSQLFunction = new ZetaSQLFunction(inputRowType, zetaSQLType, 
udfList, udfContext);
         this.zetaSQLFilter = new ZetaSQLFilter(zetaSQLFunction, zetaSQLType);
 
         parseSQL();
+        openUDFs();
+    }
+
+    protected List<ZetaUDF> loadUDFs() {
+        List<ZetaUDF> loadedUdfs = new ArrayList<>();
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+        ServiceLoader.load(ZetaUDF.class, 
classLoader).forEach(loadedUdfs::add);
+        return loadedUdfs;
+    }
+
+    private void openUDFs() {
+        for (int i = 0; i < udfList.size(); i++) {
+            ZetaUDF udf = udfList.get(i);
+            try {
+                udf.open();
+            } catch (Exception e) {
+                closeUDFs(i - 1);
+                log.error("Open udf {} failed", udf.functionName(), e);
+                throw new TransformException(
+                        CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+                        String.format(
+                                "Open udf %s failed: %s", udf.functionName(), 
e.getMessage()));
+            }
+        }
     }
 
     private void parseSQL() {
@@ -239,6 +265,7 @@ public class ZetaSQLEngine implements SQLEngine {
         // ------Physical Query Plan Execution------
         // Scan Table
         Object[] inputFields = scanTable(inputRow);
+        zetaSQLFunction.updateUDFContext(inputFields, inputRow);
 
         // Filter
         try {
@@ -313,4 +340,22 @@ public class ZetaSQLEngine implements SQLEngine {
                         - allColumnsCnt;
         return allColumnsCount;
     }
+
+    @Override
+    public void close() {
+        if (udfList == null || udfList.isEmpty()) {
+            return;
+        }
+        closeUDFs(udfList.size() - 1);
+    }
+
+    private void closeUDFs(int lastIndex) {
+        for (int i = lastIndex; i >= 0; i--) {
+            try {
+                udfList.get(i).close();
+            } catch (Exception e) {
+                log.warn("Close udf {} failed", udfList.get(i).functionName(), 
e);
+            }
+        }
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
index de1a78eae2..e339434614 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
@@ -225,13 +225,30 @@ public class ZetaSQLFunction {
     private final ZetaSQLFilter zetaSQLFilter;
 
     private final List<ZetaUDF> udfList;
+    private final ZetaUDFContext udfContext;
 
     public ZetaSQLFunction(
             SeaTunnelRowType inputRowType, ZetaSQLType zetaSQLType, 
List<ZetaUDF> udfList) {
+        this(inputRowType, zetaSQLType, udfList, null);
+    }
+
+    public ZetaSQLFunction(
+            SeaTunnelRowType inputRowType,
+            ZetaSQLType zetaSQLType,
+            List<ZetaUDF> udfList,
+            ZetaUDFContext udfContext) {
         this.inputRowType = inputRowType;
         this.zetaSQLType = zetaSQLType;
         this.zetaSQLFilter = new ZetaSQLFilter(this, zetaSQLType);
         this.udfList = udfList;
+        this.udfContext = udfContext;
+    }
+
+    public void updateUDFContext(Object[] fields, SeaTunnelRow row) {
+        if (udfContext == null) {
+            return;
+        }
+        udfContext.update(fields, row);
     }
 
     public Object computeForValue(Expression expression, Object[] inputFields) 
{
@@ -648,6 +665,9 @@ public class ZetaSQLFunction {
             default:
                 for (ZetaUDF udf : udfList) {
                     if (udf.functionName().equalsIgnoreCase(functionName)) {
+                        if (udf.requiresContext() && udfContext != null) {
+                            return udf.evaluateWithContext(args, udfContext);
+                        }
                         return udf.evaluate(args);
                     }
                 }
@@ -847,6 +867,7 @@ public class ZetaSQLFunction {
             } else if (expression instanceof Function) {
                 List<SeaTunnelRow> next = new ArrayList<>();
                 for (SeaTunnelRow row : seaTunnelRows) {
+                    updateUDFContext(row.getFields(), row);
                     Object splitFieldValue = computeForValue(expression, 
row.getFields());
                     transformExplodeValue(
                             splitFieldValue,
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDF.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDF.java
index 6d14789c96..7281e9605e 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDF.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDF.java
@@ -19,9 +19,10 @@ package org.apache.seatunnel.transform.sql.zeta;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 
+import java.io.Serializable;
 import java.util.List;
 
-public interface ZetaUDF {
+public interface ZetaUDF extends Serializable {
     /**
      * Function name
      *
@@ -44,4 +45,30 @@ public interface ZetaUDF {
      * @return result value
      */
     Object evaluate(List<Object> args);
+
+    /**
+     * Whether current udf requires row level context.
+     *
+     * @return true means engine should call evaluateWithContext instead of 
evaluate
+     */
+    default boolean requiresContext() {
+        return false;
+    }
+
+    /**
+     * Evaluate with row level context.
+     *
+     * @param args input arguments
+     * @param context row context
+     * @return result value
+     */
+    default Object evaluateWithContext(List<Object> args, ZetaUDFContext 
context) {
+        return evaluate(args);
+    }
+
+    /** Initialize udf resources. */
+    default void open() throws Exception {}
+
+    /** Release udf resources. */
+    default void close() {}
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDFContext.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDFContext.java
new file mode 100644
index 0000000000..b42dc9473c
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDFContext.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sql.zeta;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/** Runtime context for zeta udf execution. */
+public class ZetaUDFContext {
+    private static final Object[] EMPTY_FIELDS = new Object[0];
+
+    @Nullable private String rawTableId;
+    private boolean tableIdIsNull;
+    @Nullable private String database;
+    @Nullable private String schema;
+    @Nullable private String table;
+    @Nullable private IllegalArgumentException tablePathParseException;
+    private boolean tablePathResolved;
+    private RowKind rowKind = RowKind.INSERT;
+    private Object[] allFields = EMPTY_FIELDS;
+
+    public ZetaUDFContext update(SeaTunnelRow row) {
+        return update(row.getFields(), row);
+    }
+
+    public ZetaUDFContext update(Object[] fields, SeaTunnelRow row) {
+        this.allFields = fields == null ? EMPTY_FIELDS : fields;
+        this.rowKind = row.getRowKind();
+        updateTableId(row.getTableId());
+        return this;
+    }
+
+    private void updateTableId(String tableId) {
+        if (Objects.equals(this.rawTableId, tableId)) {
+            return;
+        }
+        this.rawTableId = tableId;
+        this.tableIdIsNull = tableId == null;
+        this.database = null;
+        this.schema = null;
+        this.table = null;
+        this.tablePathParseException = null;
+        this.tablePathResolved = false;
+    }
+
+    private void resolveTablePathIfNeeded() {
+        if (tablePathResolved) {
+            if (tablePathParseException != null) {
+                throw tablePathParseException;
+            }
+            return;
+        }
+        tablePathResolved = true;
+
+        if (tableIdIsNull) {
+            return;
+        }
+
+        try {
+            TablePath tablePath = TablePath.of(rawTableId);
+            this.database = tablePath.getDatabaseName();
+            this.schema = tablePath.getSchemaName();
+            this.table = tablePath.getTableName();
+        } catch (IllegalArgumentException exception) {
+            this.tablePathParseException = exception;
+            throw exception;
+        }
+    }
+
+    @Nullable public String getRawTableId() {
+        return rawTableId;
+    }
+
+    @Nullable public String getDatabase() {
+        resolveTablePathIfNeeded();
+        return database;
+    }
+
+    @Nullable public String getSchema() {
+        resolveTablePathIfNeeded();
+        return schema;
+    }
+
+    @Nullable public String getTable() {
+        resolveTablePathIfNeeded();
+        return table;
+    }
+
+    public RowKind getRowKind() {
+        return rowKind;
+    }
+
+    public Object[] getAllFields() {
+        return allFields;
+    }
+}

Reply via email to