This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f20437e  [Bug][Transform] Fix spark&flink split transfrom udf register 
(#1394)
f20437e is described below

commit f20437ed39427b4748e48ab488c30c01d62a1975
Author: Simon <[email protected]>
AuthorDate: Mon Mar 7 21:11:29 2022 +0800

    [Bug][Transform] Fix spark&flink split transfrom udf register (#1394)
    
    * split_bugfix
    
    * fix
    
    * fix flink
---
 .../spark/configuration/transform-plugins/Split.md | 28 +++++++++++-----
 .../seatunnel/flink/batch/FlinkBatchExecution.java |  1 +
 .../apache/seatunnel/spark/transform/Split.scala   | 39 ++++++++++++----------
 3 files changed, 41 insertions(+), 27 deletions(-)

diff --git a/docs/en/spark/configuration/transform-plugins/Split.md 
b/docs/en/spark/configuration/transform-plugins/Split.md
index 30e7752..b536afb 100644
--- a/docs/en/spark/configuration/transform-plugins/Split.md
+++ b/docs/en/spark/configuration/transform-plugins/Split.md
@@ -4,19 +4,19 @@
 
 ## Description
 
-Split string according to `delimiter`
+Split string according to `separator`
 
 ## Options
 
 | name           | type   | required | default value |
 | -------------- | ------ | -------- | ------------- |
-| delimiter      | string | no       | " "(空格)       |
+| separator      | string | no       | " "      |
 | fields         | array  | yes      | -             |
 | source_field   | string | no       | raw_message   |
 | target_field   | string | no       | *root*        |
 | common-options | string | no       | -             |
 
-### delimiter [string]
+### separator [string]
 
 Separator, the input string is separated according to the separator. The 
default separator is a space `(" ")` .
 
@@ -37,24 +37,34 @@ The source field of the string before being split, if not 
configured, the defaul
 Transform plugin common parameters, please refer to [Transform 
Plugin](./transform-plugin.md) for details
 
 ## Examples
+- Split the `message` field in the source data according to `&`, you can use 
`field1` or `field2` as the key to get the corresponding value
 
 ```bash
 split {
     source_field = "message"
-    delimiter = "&"
+    separator = "&"
     fields = ["field1", "field2"]
 }
 ```
-
-> Split the `message` field in the source data according to `&`, you can use 
`field1` or `field2` as the key to get the corresponding value
+- Split the `message` field in the source data according to `,` , the split 
field is `info` , you can use `info.field1` or `info.field2` as the key to get 
the corresponding value
 
 ```bash
 split {
     source_field = "message"
     target_field = "info"
-    delimiter = ","
+    separator = ","
     fields = ["field1", "field2"]
 }
 ```
-
-> Split the `message` field in the source data according to `,` , the split 
field is `info` , you can use `info.field1` or `info.field2` as the key to get 
the corresponding value
+- Use `Split` as udf in sql.
+```bash
+  # This just created a udf called split
+  Split{
+    separator = "#"
+    fields = ["name","age"]
+  }
+  # Use the split function (confirm that the fake table exists)
+  sql {
+    sql = "select * from (select raw_message,split(raw_message) as info_row 
from fake) t1"
+  }
+```
\ No newline at end of file
diff --git 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
index fe199fe..3e27f9b 100644
--- 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
+++ 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
@@ -67,6 +67,7 @@ public class FlinkBatchExecution implements 
Execution<FlinkBatchSource, FlinkBat
             }
             input = transform.processBatch(flinkEnvironment, dataSet);
             registerResultTable(transform, input);
+            transform.registerFunction(flinkEnvironment);
         }
 
         for (FlinkBatchSink sink : sinks) {
diff --git 
a/seatunnel-transforms/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
 
b/seatunnel-transforms/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
index 968b92e..6f88c92 100644
--- 
a/seatunnel-transforms/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
+++ 
b/seatunnel-transforms/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
@@ -16,14 +16,15 @@
  */
 package org.apache.seatunnel.spark.transform
 
-import org.apache.seatunnel.common.Constants
-
 import scala.collection.JavaConversions._
+
+import org.apache.seatunnel.common.Constants
 import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
 import org.apache.seatunnel.common.config.CheckResult
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
 import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
 import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.expressions.UserDefinedFunction
 import org.apache.spark.sql.functions.{col, udf}
 
 class Split extends BaseSparkTransform {
@@ -33,27 +34,29 @@ class Split extends BaseSparkTransform {
     val keys = config.getStringList("fields")
 
     // https://stackoverflow.com/a/33345698/1145750
-    config.getString("target_field") match {
-      case Constants.ROW_ROOT => {
-        val func = udf((s: String) => {
-          split(s, config.getString("delimiter"), keys.size())
+    var func: UserDefinedFunction = null
+    val ds = config.getString("target_field") match {
+      case Constants.ROW_ROOT =>
+        func = udf((s: String) => {
+          split(s, config.getString("separator"), keys.size())
         })
         var filterDf = df.withColumn(Constants.ROW_TMP, func(col(srcField)))
         for (i <- 0 until keys.size()) {
           filterDf = filterDf.withColumn(keys.get(i), 
col(Constants.ROW_TMP)(i))
         }
         filterDf.drop(Constants.ROW_TMP)
-      }
-      case targetField: String => {
-        val func = udf((s: String) => {
-          val values = split(s, config.getString("delimiter"), keys.size)
+      case targetField: String =>
+        func = udf((s: String) => {
+          val values = split(s, config.getString("separator"), keys.size)
           val kvs = (keys zip values).toMap
           kvs
         })
-
         df.withColumn(targetField, func(col(srcField)))
-      }
     }
+    if (func != null) {
+      env.getSparkSession.udf.register("Split", func)
+    }
+    ds
   }
 
   override def checkConfig(): CheckResult = {
@@ -63,21 +66,21 @@ class Split extends BaseSparkTransform {
   override def prepare(env: SparkEnvironment): Unit = {
     val defaultConfig = ConfigFactory.parseMap(
       Map(
-        "delimiter" -> " ",
+        "separator" -> " ",
         "source_field" -> "raw_message",
         "target_field" -> Constants.ROW_ROOT))
     config = config.withFallback(defaultConfig)
   }
 
   /**
-   * Split string by delimiter, if size of splited parts is less than 
fillLength,
+   * Split string by separator, if size of splited parts is less than 
fillLength,
    * empty string is filled; if greater than fillLength, parts will be 
truncated.
    */
-  private def split(str: String, delimiter: String, fillLength: Int): 
Seq[String] = {
-    val parts = str.split(delimiter).map(_.trim)
-    val filled = fillLength compare parts.size match {
+  private def split(str: String, separator: String, fillLength: Int): 
Seq[String] = {
+    val parts = str.split(separator).map(_.trim)
+    val filled = fillLength compare parts.length match {
       case 0 => parts
-      case 1 => parts ++ Array.fill[String](fillLength - parts.size)("")
+      case 1 => parts ++ Array.fill[String](fillLength - parts.length)("")
       case -1 => parts.slice(0, fillLength)
     }
     filled.toSeq

Reply via email to