This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ab71d266b flink transform module support multi split,and add custome
split function name (#2268)
ab71d266b is described below
commit ab71d266b3205584d2f0fb3c990eeeb604b7e45a
Author: lcyyyyyy <[email protected]>
AuthorDate: Thu Jul 28 10:00:48 2022 +0800
flink transform module support multi split,and add custome split function
name (#2268)
---
.../main/java/org/apache/seatunnel/flink/transform/Split.java | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-split/src/main/java/org/apache/seatunnel/flink/transform/Split.java
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-split/src/main/java/org/apache/seatunnel/flink/transform/Split.java
index 3e8ba6dd1..0d9909653 100644
---
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-split/src/main/java/org/apache/seatunnel/flink/transform/Split.java
+++
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-split/src/main/java/org/apache/seatunnel/flink/transform/Split.java
@@ -40,9 +40,12 @@ public class Split implements FlinkStreamTransform,
FlinkBatchTransform {
private static final String SEPARATOR = "separator";
private static final String FIELDS = "fields";
+ private static final String NAME = "name";
private String separator = ",";
+ private String name = "split";
+
private int num;
private List<String> fields;
@@ -64,11 +67,11 @@ public class Split implements FlinkStreamTransform,
FlinkBatchTransform {
if (flinkEnvironment.isStreaming()) {
flinkEnvironment
.getStreamTableEnvironment()
- .registerFunction("split", new ScalarSplit(rowTypeInfo,
num, separator));
+ .registerFunction(name, new ScalarSplit(rowTypeInfo, num,
separator));
} else {
flinkEnvironment
.getBatchTableEnvironment()
- .registerFunction("split", new ScalarSplit(rowTypeInfo,
num, separator));
+ .registerFunction(name, new ScalarSplit(rowTypeInfo, num,
separator));
}
}
@@ -94,6 +97,9 @@ public class Split implements FlinkStreamTransform,
FlinkBatchTransform {
if (config.hasPath(SEPARATOR)) {
separator = config.getString(SEPARATOR);
}
+ if (config.hasPath(NAME)) {
+ name = config.getString(NAME);
+ }
TypeInformation<?>[] types = new TypeInformation[fields.size()];
for (int i = 0; i < types.length; i++) {
types[i] = Types.STRING();