This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f20437e [Bug][Transform] Fix spark&flink split transfrom udf register
(#1394)
f20437e is described below
commit f20437ed39427b4748e48ab488c30c01d62a1975
Author: Simon <[email protected]>
AuthorDate: Mon Mar 7 21:11:29 2022 +0800
[Bug][Transform] Fix spark&flink split transfrom udf register (#1394)
* split_bugfix
* fix
* fix flink
---
.../spark/configuration/transform-plugins/Split.md | 28 +++++++++++-----
.../seatunnel/flink/batch/FlinkBatchExecution.java | 1 +
.../apache/seatunnel/spark/transform/Split.scala | 39 ++++++++++++----------
3 files changed, 41 insertions(+), 27 deletions(-)
diff --git a/docs/en/spark/configuration/transform-plugins/Split.md
b/docs/en/spark/configuration/transform-plugins/Split.md
index 30e7752..b536afb 100644
--- a/docs/en/spark/configuration/transform-plugins/Split.md
+++ b/docs/en/spark/configuration/transform-plugins/Split.md
@@ -4,19 +4,19 @@
## Description
-Split string according to `delimiter`
+Split string according to `separator`
## Options
| name | type | required | default value |
| -------------- | ------ | -------- | ------------- |
-| delimiter | string | no | " "(空格) |
+| separator | string | no | " " |
| fields | array | yes | - |
| source_field | string | no | raw_message |
| target_field | string | no | *root* |
| common-options | string | no | - |
-### delimiter [string]
+### separator [string]
Separator, the input string is separated according to the separator. The
default separator is a space `(" ")` .
@@ -37,24 +37,34 @@ The source field of the string before being split, if not
configured, the defaul
Transform plugin common parameters, please refer to [Transform
Plugin](./transform-plugin.md) for details
## Examples
+- Split the `message` field in the source data according to `&`, you can use
`field1` or `field2` as the key to get the corresponding value
```bash
split {
source_field = "message"
- delimiter = "&"
+ separator = "&"
fields = ["field1", "field2"]
}
```
-
-> Split the `message` field in the source data according to `&`, you can use
`field1` or `field2` as the key to get the corresponding value
+- Split the `message` field in the source data according to `,` , the split
field is `info` , you can use `info.field1` or `info.field2` as the key to get
the corresponding value
```bash
split {
source_field = "message"
target_field = "info"
- delimiter = ","
+ separator = ","
fields = ["field1", "field2"]
}
```
-
-> Split the `message` field in the source data according to `,` , the split
field is `info` , you can use `info.field1` or `info.field2` as the key to get
the corresponding value
+- Use `Split` as udf in sql.
+```bash
+ # This just created a udf called split
+ Split{
+ separator = "#"
+ fields = ["name","age"]
+ }
+ # Use the split function (confirm that the fake table exists)
+ sql {
+ sql = "select * from (select raw_message,split(raw_message) as info_row
from fake) t1"
+ }
+```
\ No newline at end of file
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
index fe199fe..3e27f9b 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
@@ -67,6 +67,7 @@ public class FlinkBatchExecution implements
Execution<FlinkBatchSource, FlinkBat
}
input = transform.processBatch(flinkEnvironment, dataSet);
registerResultTable(transform, input);
+ transform.registerFunction(flinkEnvironment);
}
for (FlinkBatchSink sink : sinks) {
diff --git
a/seatunnel-transforms/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
b/seatunnel-transforms/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
index 968b92e..6f88c92 100644
---
a/seatunnel-transforms/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
+++
b/seatunnel-transforms/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
@@ -16,14 +16,15 @@
*/
package org.apache.seatunnel.spark.transform
-import org.apache.seatunnel.common.Constants
-
import scala.collection.JavaConversions._
+
+import org.apache.seatunnel.common.Constants
import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{col, udf}
class Split extends BaseSparkTransform {
@@ -33,27 +34,29 @@ class Split extends BaseSparkTransform {
val keys = config.getStringList("fields")
// https://stackoverflow.com/a/33345698/1145750
- config.getString("target_field") match {
- case Constants.ROW_ROOT => {
- val func = udf((s: String) => {
- split(s, config.getString("delimiter"), keys.size())
+ var func: UserDefinedFunction = null
+ val ds = config.getString("target_field") match {
+ case Constants.ROW_ROOT =>
+ func = udf((s: String) => {
+ split(s, config.getString("separator"), keys.size())
})
var filterDf = df.withColumn(Constants.ROW_TMP, func(col(srcField)))
for (i <- 0 until keys.size()) {
filterDf = filterDf.withColumn(keys.get(i),
col(Constants.ROW_TMP)(i))
}
filterDf.drop(Constants.ROW_TMP)
- }
- case targetField: String => {
- val func = udf((s: String) => {
- val values = split(s, config.getString("delimiter"), keys.size)
+ case targetField: String =>
+ func = udf((s: String) => {
+ val values = split(s, config.getString("separator"), keys.size)
val kvs = (keys zip values).toMap
kvs
})
-
df.withColumn(targetField, func(col(srcField)))
- }
}
+ if (func != null) {
+ env.getSparkSession.udf.register("Split", func)
+ }
+ ds
}
override def checkConfig(): CheckResult = {
@@ -63,21 +66,21 @@ class Split extends BaseSparkTransform {
override def prepare(env: SparkEnvironment): Unit = {
val defaultConfig = ConfigFactory.parseMap(
Map(
- "delimiter" -> " ",
+ "separator" -> " ",
"source_field" -> "raw_message",
"target_field" -> Constants.ROW_ROOT))
config = config.withFallback(defaultConfig)
}
/**
- * Split string by delimiter, if size of splited parts is less than
fillLength,
+ * Split string by separator, if size of splited parts is less than
fillLength,
* empty string is filled; if greater than fillLength, parts will be
truncated.
*/
- private def split(str: String, delimiter: String, fillLength: Int):
Seq[String] = {
- val parts = str.split(delimiter).map(_.trim)
- val filled = fillLength compare parts.size match {
+ private def split(str: String, separator: String, fillLength: Int):
Seq[String] = {
+ val parts = str.split(separator).map(_.trim)
+ val filled = fillLength compare parts.length match {
case 0 => parts
- case 1 => parts ++ Array.fill[String](fillLength - parts.size)("")
+ case 1 => parts ++ Array.fill[String](fillLength - parts.length)("")
case -1 => parts.slice(0, fillLength)
}
filled.toSeq