This is an automated email from the ASF dual-hosted git repository.
rickyhuo pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a13267d [SeaTunnel #1040] Add success message for CheckResult by
Constants.CHECK_SUCCESS (#1041)
a13267d is described below
commit a13267daff28d79e9f8b2b352c4c98d2e52292dd
Author: Simon <[email protected]>
AuthorDate: Fri Jan 14 22:55:41 2022 +0800
[SeaTunnel #1040] Add success message for CheckResult by
Constants.CHECK_SUCCESS (#1041)
* checkConfig
* fix
* codestyle
* success
* success
* fix
---
.../seatunnel/flink/batch/FlinkBatchExecution.java | 3 +-
.../flink/stream/FlinkStreamExecution.java | 3 +-
.../seatunnel/flink/util/EnvironmentUtil.java | 5 +-
.../apache/seatunnel/spark/SparkEnvironment.scala | 12 +++--
.../spark/batch/SparkBatchExecution.scala | 6 ++-
.../spark/stream/SparkStreamingExecution.scala | 6 ++-
.../StructuredStreamingExecution.scala | 5 +-
.../common/{RowConstant.java => Constants.java} | 12 +++--
.../seatunnel/common/config/CheckConfigUtil.java | 4 +-
.../apache/seatunnel/flink/sink/ConsoleSink.java | 3 +-
.../seatunnel/flink/source/FakeSourceStream.java | 3 +-
.../seatunnel/flink/source/SocketStream.java | 3 +-
.../apache/seatunnel/spark/sink/Clickhouse.scala | 62 +++++++++++-----------
.../org/apache/seatunnel/spark/sink/Console.scala | 3 +-
.../org/apache/seatunnel/spark/source/Fake.scala | 9 ++--
.../apache/seatunnel/spark/source/FakeStream.scala | 11 ++--
.../apache/seatunnel/spark/sink/FileSinkBase.scala | 5 +-
.../org/apache/seatunnel/spark/source/File.scala | 11 ++--
.../org/apache/seatunnel/spark/source/Hive.scala | 8 ++-
.../org/apache/seatunnel/spark/sink/Kafka.scala | 10 ++--
.../org/apache/seatunnel/spark/sink/Kudu.scala | 9 +---
.../org/apache/seatunnel/spark/source/Kudu.scala | 6 +--
.../apache/seatunnel/spark/source/MongoDB.scala | 11 ++--
.../org/apache/seatunnel/spark/source/Redis.scala | 7 +--
.../seatunnel/spark/source/SocketStream.scala | 12 +++--
.../apache/seatunnel/spark/transform/Json.scala | 17 +++---
.../apache/seatunnel/spark/transform/Split.scala | 14 ++---
.../org/apache/seatunnel/spark/transform/Sql.scala | 7 +--
28 files changed, 140 insertions(+), 127 deletions(-)
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
index 18bb8cd..6c44969 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.flink.batch;
+import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.env.Execution;
import org.apache.seatunnel.flink.FlinkEnvironment;
@@ -120,7 +121,7 @@ public class FlinkBatchExecution implements
Execution<FlinkBatchSource, FlinkBat
@Override
public CheckResult checkConfig() {
- return new CheckResult(true, "");
+ return new CheckResult(true, Constants.CHECK_SUCCESS);
}
@Override
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
index ee5a361..660cdc0 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.flink.stream;
+import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.env.Execution;
import org.apache.seatunnel.flink.FlinkEnvironment;
@@ -120,7 +121,7 @@ public class FlinkStreamExecution implements
Execution<FlinkStreamSource, FlinkS
@Override
public CheckResult checkConfig() {
- return new CheckResult(true, "");
+ return new CheckResult(true, Constants.CHECK_SUCCESS);
}
@Override
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/EnvironmentUtil.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/EnvironmentUtil.java
index 9e21b0f..1951317 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/EnvironmentUtil.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/EnvironmentUtil.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.flink.util;
+import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.flink.api.common.ExecutionConfig;
@@ -76,9 +77,9 @@ public class EnvironmentUtil {
}
break;
default:
- return new CheckResult(true, "");
+ return new CheckResult(true, Constants.CHECK_SUCCESS);
}
}
- return new CheckResult(true, "");
+ return new CheckResult(true, Constants.CHECK_SUCCESS);
}
}
diff --git
a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/SparkEnvironment.scala
b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/SparkEnvironment.scala
index cb0b57f..f6a430a 100644
---
a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/SparkEnvironment.scala
+++
b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/SparkEnvironment.scala
@@ -16,16 +16,18 @@
*/
package org.apache.seatunnel.spark
+import java.lang
+
+import scala.collection.JavaConversions._
+
+import org.apache.seatunnel.common.Constants
import org.apache.seatunnel.common.config.CheckResult
-import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
import org.apache.seatunnel.env.RuntimeEnv
+import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
-import java.lang
-import scala.collection.JavaConversions._
-
class SparkEnvironment extends RuntimeEnv {
private var sparkSession: SparkSession = _
@@ -38,7 +40,7 @@ class SparkEnvironment extends RuntimeEnv {
override def getConfig: Config = config
- override def checkConfig(): CheckResult = new CheckResult(true, "")
+ override def checkConfig(): CheckResult = new CheckResult(true,
Constants.CHECK_SUCCESS)
override def prepare(prepareEnv: lang.Boolean): Unit = {
val sparkConf = createSparkConf()
diff --git
a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchExecution.scala
b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchExecution.scala
index 9ed16fc..dad3e3f 100644
---
a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchExecution.scala
+++
b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/batch/SparkBatchExecution.scala
@@ -21,8 +21,10 @@ import
org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
import org.apache.seatunnel.env.Execution
import org.apache.seatunnel.spark.{BaseSparkSink, BaseSparkSource,
BaseSparkTransform, SparkEnvironment}
import org.apache.spark.sql.{Dataset, Row}
-
import java.util.{List => JList}
+
+import org.apache.seatunnel.common.Constants
+
import scala.collection.JavaConversions._
class SparkBatchExecution(environment: SparkEnvironment)
@@ -34,7 +36,7 @@ class SparkBatchExecution(environment: SparkEnvironment)
override def getConfig: Config = config
- override def checkConfig(): CheckResult = new CheckResult(true, "")
+ override def checkConfig(): CheckResult = new CheckResult(true,
Constants.CHECK_SUCCESS)
override def prepare(prepareEnv: Void): Unit = {}
diff --git
a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
index 0c4e9b2..d822782 100644
---
a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
+++
b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
@@ -22,8 +22,10 @@ import org.apache.seatunnel.env.Execution
import org.apache.seatunnel.spark.batch.SparkBatchExecution
import org.apache.seatunnel.spark.{BaseSparkSink, BaseSparkSource,
BaseSparkTransform, SparkEnvironment}
import org.apache.spark.sql.{Dataset, Row}
-
import java.util.{List => JList}
+
+import org.apache.seatunnel.common.Constants
+
import scala.collection.JavaConversions._
class SparkStreamingExecution(sparkEnvironment: SparkEnvironment)
@@ -76,7 +78,7 @@ class SparkStreamingExecution(sparkEnvironment:
SparkEnvironment)
override def getConfig: Config = config
- override def checkConfig(): CheckResult = new CheckResult(true, "")
+ override def checkConfig(): CheckResult = new CheckResult(true,
Constants.CHECK_SUCCESS)
override def prepare(void: Void): Unit = {}
}
diff --git
a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.scala
b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.scala
index 6505d81..d12131f 100644
---
a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.scala
+++
b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.scala
@@ -20,9 +20,10 @@ import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
import org.apache.seatunnel.env.Execution
import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
-
import java.util.{List => JList}
+import org.apache.seatunnel.common.Constants
+
class StructuredStreamingExecution(environment: SparkEnvironment)
extends Execution[StructuredStreamingSource, BaseSparkTransform,
StructuredStreamingSink] {
@@ -32,7 +33,7 @@ class StructuredStreamingExecution(environment:
SparkEnvironment)
override def getConfig: Config = config
- override def checkConfig(): CheckResult = new CheckResult(true, "")
+ override def checkConfig(): CheckResult = new CheckResult(true,
Constants.CHECK_SUCCESS)
override def prepare(void: Void): Unit = {}
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/RowConstant.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
similarity index 74%
rename from
seatunnel-common/src/main/java/org/apache/seatunnel/common/RowConstant.java
rename to
seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
index ec796a7..ebf1ec6 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/RowConstant.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
@@ -17,11 +17,13 @@
package org.apache.seatunnel.common;
-public final class RowConstant {
- public static final String ROOT = "__root__";
- public static final String TMP = "__tmp__";
- public static final String JSON = "__json__";
+public final class Constants {
+ public static final String ROW_ROOT = "__root__";
+ public static final String ROW_TMP = "__tmp__";
+ public static final String ROW_JSON = "__json__";
- private RowConstant() {
+ public static final String CHECK_SUCCESS = "All check is success";
+
+ private Constants() {
}
}
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/CheckConfigUtil.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/CheckConfigUtil.java
index bfec572..5946d68 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/CheckConfigUtil.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/CheckConfigUtil.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.common.config;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import static org.apache.seatunnel.common.Constants.CHECK_SUCCESS;
+
public class CheckConfigUtil {
public static CheckResult check(Config config, String... params) {
@@ -34,7 +36,7 @@ public class CheckConfigUtil {
missingParams.deleteCharAt(missingParams.length() - 1));
return new CheckResult(false, errorMsg);
} else {
- return new CheckResult(true, "");
+ return new CheckResult(true, CHECK_SUCCESS);
}
}
}
diff --git
a/seatunnel-connectors/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
b/seatunnel-connectors/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
index fe6109a..151618f 100644
---
a/seatunnel-connectors/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
+++
b/seatunnel-connectors/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.flink.sink;
+import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSink;
@@ -58,7 +59,7 @@ public class ConsoleSink extends RichOutputFormat<Row>
implements FlinkBatchSink
@Override
public CheckResult checkConfig() {
- return new CheckResult(true, "");
+ return new CheckResult(true, Constants.CHECK_SUCCESS);
}
@Override
diff --git
a/seatunnel-connectors/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSourceStream.java
b/seatunnel-connectors/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSourceStream.java
index 4c7e0ed..e3dfcf8 100644
---
a/seatunnel-connectors/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSourceStream.java
+++
b/seatunnel-connectors/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSourceStream.java
@@ -22,6 +22,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.types.Row;
+import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.flink.FlinkEnvironment;
@@ -55,7 +56,7 @@ public class FakeSourceStream extends
RichParallelSourceFunction<Row> implements
@Override
public CheckResult checkConfig() {
- return new CheckResult(true, "");
+ return new CheckResult(true, Constants.CHECK_SUCCESS);
}
@Override
diff --git
a/seatunnel-connectors/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/source/SocketStream.java
b/seatunnel-connectors/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/source/SocketStream.java
index f0ade5b..5513951 100644
---
a/seatunnel-connectors/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/source/SocketStream.java
+++
b/seatunnel-connectors/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/source/SocketStream.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.flink.source;
+import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.flink.FlinkEnvironment;
@@ -64,7 +65,7 @@ public class SocketStream implements FlinkStreamSource<Row> {
@Override
public CheckResult checkConfig() {
- return new CheckResult(true, "");
+ return new CheckResult(true, Constants.CHECK_SUCCESS);
}
@Override
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
b/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
index 25ccd9e..811070d 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
@@ -16,23 +16,25 @@
*/
package org.apache.seatunnel.spark.sink
-import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils}
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
-import org.apache.seatunnel.spark.SparkEnvironment
-import org.apache.seatunnel.spark.batch.SparkBatchSink
-import org.apache.spark.sql.{Dataset, Row}
-import ru.yandex.clickhouse.except.{ClickHouseException,
ClickHouseUnknownException}
-import ru.yandex.clickhouse.{BalancedClickhouseDataSource,
ClickHouseConnectionImpl}
-
import java.math.BigDecimal
import java.sql.PreparedStatement
import java.text.SimpleDateFormat
import java.util
import java.util.Properties
+
import scala.collection.JavaConversions._
import scala.collection.immutable.HashMap
-import scala.util.matching.Regex
import scala.util.{Failure, Success, Try}
+import scala.util.matching.Regex
+
+import org.apache.seatunnel.common.Constants
+import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils}
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
+import org.apache.seatunnel.spark.SparkEnvironment
+import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.spark.sql.{Dataset, Row}
+import ru.yandex.clickhouse.{BalancedClickhouseDataSource,
ClickHouseConnectionImpl}
+import ru.yandex.clickhouse.except.{ClickHouseException,
ClickHouseUnknownException}
class Clickhouse extends SparkBatchSink {
@@ -106,7 +108,7 @@ class Clickhouse extends SparkBatchSink {
}
.mkString(", ") + " as non-empty string")
} else if (config.hasPath("username") && !config.hasPath("password") ||
config.hasPath(
- "password")
+ "password")
&& !config.hasPath("username")) {
new CheckResult(false, "please specify username and password at the same
time")
} else {
@@ -130,7 +132,7 @@ class Clickhouse extends SparkBatchSink {
this.fields = config.getStringList("fields")
acceptedClickHouseSchema()
} else {
- new CheckResult(true, "")
+ new CheckResult(true, Constants.CHECK_SUCCESS)
}
}
}
@@ -151,8 +153,8 @@ class Clickhouse extends SparkBatchSink {
}
private def getClickHouseSchema(
- conn: ClickHouseConnectionImpl,
- table: String): Map[String, String] = {
+ conn: ClickHouseConnectionImpl,
+ table: String): Map[String, String] = {
val sql = String.format("desc %s", table)
val resultSet = conn.createStatement.executeQuery(sql)
var schema = new HashMap[String, String]()
@@ -196,15 +198,15 @@ class Clickhouse extends SparkBatchSink {
.map { case (option) => "[" + option + "]" }
.mkString(", ") + " not support in current version.")
} else {
- new CheckResult(true, "")
+ new CheckResult(true, Constants.CHECK_SUCCESS)
}
}
}
private def renderDefaultStatement(
- index: Int,
- fieldType: String,
- statement: PreparedStatement): Unit = {
+ index: Int,
+ fieldType: String,
+ statement: PreparedStatement): Unit = {
fieldType match {
case "DateTime" | "Date" | "String" =>
statement.setString(index + 1,
Clickhouse.renderStringDefault(fieldType))
@@ -224,9 +226,9 @@ class Clickhouse extends SparkBatchSink {
}
private def renderNullStatement(
- index: Int,
- fieldType: String,
- statement: PreparedStatement): Unit = {
+ index: Int,
+ fieldType: String,
+ statement: PreparedStatement): Unit = {
fieldType match {
case "String" =>
statement.setNull(index + 1, java.sql.Types.VARCHAR)
@@ -242,11 +244,11 @@ class Clickhouse extends SparkBatchSink {
}
private def renderBaseTypeStatement(
- index: Int,
- fieldIndex: Int,
- fieldType: String,
- item: Row,
- statement: PreparedStatement): Unit = {
+ index: Int,
+ fieldIndex: Int,
+ fieldType: String,
+ item: Row,
+ statement: PreparedStatement): Unit = {
fieldType match {
case "DateTime" | "Date" | "String" =>
statement.setString(index + 1, item.getAs[String](fieldIndex))
@@ -264,10 +266,10 @@ class Clickhouse extends SparkBatchSink {
}
private def renderStatement(
- fields: util.List[String],
- item: Row,
- dsFields: Array[String],
- statement: PreparedStatement): Unit = {
+ fields: util.List[String],
+ item: Row,
+ dsFields: Array[String],
+ statement: PreparedStatement): Unit = {
for (i <- 0 until fields.size()) {
val field = fields.get(i)
val fieldType = tableSchema(field)
@@ -284,7 +286,7 @@ class Clickhouse extends SparkBatchSink {
case "String" | "DateTime" | "Date" | Clickhouse.arrayPattern(_) =>
renderBaseTypeStatement(i, fieldIndex, fieldType, item,
statement)
case Clickhouse.floatPattern(_) | Clickhouse.intPattern(_) |
Clickhouse.uintPattern(
- _) =>
+ _) =>
renderBaseTypeStatement(i, fieldIndex, fieldType, item,
statement)
case Clickhouse.nullablePattern(dataType) =>
renderBaseTypeStatement(i, fieldIndex, dataType, item, statement)
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-console/src/main/scala/org/apache/seatunnel/spark/sink/Console.scala
b/seatunnel-connectors/seatunnel-connector-spark-console/src/main/scala/org/apache/seatunnel/spark/sink/Console.scala
index e623849..9b44ad9 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-console/src/main/scala/org/apache/seatunnel/spark/sink/Console.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-console/src/main/scala/org/apache/seatunnel/spark/sink/Console.scala
@@ -16,6 +16,7 @@
*/
package org.apache.seatunnel.spark.sink
+import org.apache.seatunnel.common.Constants
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.SparkEnvironment
@@ -53,7 +54,7 @@ class Console extends SparkBatchSink {
override def checkConfig(): CheckResult = {
!config.hasPath("limit") || (config.hasPath("limit") &&
config.getInt("limit") >= -1) match {
- case true => new CheckResult(true, "")
+ case true => new CheckResult(true, Constants.CHECK_SUCCESS)
case false =>
new CheckResult(false, "please specify [limit] as Number[-1, " +
Int.MaxValue + "]")
}
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/source/Fake.scala
b/seatunnel-connectors/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/source/Fake.scala
index 9f4b253..b2c2256 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/source/Fake.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/source/Fake.scala
@@ -16,12 +16,13 @@
*/
package org.apache.seatunnel.spark.source
-import org.apache.spark.sql.{Dataset, Row, RowFactory}
-import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.types.{DataTypes, StructType}
+import org.apache.seatunnel.common.Constants
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSource
+import org.apache.spark.sql.{Dataset, Row, RowFactory}
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.types.{DataTypes, StructType}
class Fake extends SparkBatchSource {
@@ -41,6 +42,6 @@ class Fake extends SparkBatchSource {
}
override def checkConfig(): CheckResult = {
- new CheckResult(true, "")
+ new CheckResult(true, Constants.CHECK_SUCCESS)
}
}
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/source/FakeStream.scala
b/seatunnel-connectors/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/source/FakeStream.scala
index de7c1cb..3117e5c 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/source/FakeStream.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/source/FakeStream.scala
@@ -26,8 +26,10 @@ import org.apache.spark.sql.{Dataset, Row, RowFactory,
SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.receiver.Receiver
-
import java.security.SecureRandom
+
+import org.apache.seatunnel.common.config.CheckConfigUtil.check
+
import scala.collection.JavaConversions._
class FakeStream extends SparkStreamingSource[String] {
@@ -54,12 +56,7 @@ class FakeStream extends SparkStreamingSource[String] {
}
override def checkConfig(): CheckResult = {
-
- if (config.hasPath("content") && config.getStringList("content").nonEmpty)
{
- new CheckResult(true, "")
- } else {
- new CheckResult(false, "please make sure [content] is of type string
array")
- }
+ check(config, "content")
}
}
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/FileSinkBase.scala
b/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/FileSinkBase.scala
index 08e9643..65b6a15 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/FileSinkBase.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/sink/FileSinkBase.scala
@@ -17,6 +17,9 @@
package org.apache.seatunnel.spark.sink
import java.util
+
+import org.apache.seatunnel.common.Constants
+
import scala.collection.JavaConversions._
import scala.util.{Failure, Success, Try}
import org.apache.spark.sql.{Dataset, Row}
@@ -34,7 +37,7 @@ abstract class FileSinkBase extends SparkBatchSink {
val dir = config.getString("path")
dir.startsWith("/") || uriInAllowedSchema(dir, allowedURISchema) match
{
- case true => new CheckResult(true, "")
+ case true => new CheckResult(true, Constants.CHECK_SUCCESS)
case false =>
new CheckResult(
false,
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala
b/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala
index 13950e4..fa68c92 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala
@@ -18,10 +18,12 @@ package org.apache.seatunnel.spark.source
import scala.collection.JavaConversions._
import scala.util.{Failure, Success, Try}
-import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils}
+import org.apache.seatunnel.common.config.CheckConfigUtil.check
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSource
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
class File extends SparkBatchSource {
@@ -33,12 +35,7 @@ class File extends SparkBatchSource {
}
override def checkConfig(): CheckResult = {
- config.hasPath("path") match {
- case true =>
- new CheckResult(true, "")
- case false =>
- new CheckResult(false, "please specify [path] as string")
- }
+ check(config, "path")
}
protected def fileReader(spark: SparkSession, path: String): Dataset[Row] = {
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/source/Hive.scala
b/seatunnel-connectors/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/source/Hive.scala
index 4bfa811..267ed48 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/source/Hive.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/source/Hive.scala
@@ -16,20 +16,18 @@
*/
package org.apache.seatunnel.spark.source
-import org.apache.spark.sql.{Dataset, Row}
+import org.apache.seatunnel.common.config.CheckConfigUtil.check
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSource
+import org.apache.spark.sql.{Dataset, Row}
class Hive extends SparkBatchSource {
override def prepare(env: SparkEnvironment): Unit = {}
override def checkConfig(): CheckResult = {
- config.hasPath("pre_sql") match {
- case true => new CheckResult(true, "")
- case false => new CheckResult(false, "please specify [pre_sql]")
- }
+ check(config, "pre_sql")
}
override def getData(env: SparkEnvironment): Dataset[Row] = {
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/sink/Kafka.scala
b/seatunnel-connectors/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/sink/Kafka.scala
index cd302be..fa33eec 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/sink/Kafka.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/sink/Kafka.scala
@@ -16,6 +16,11 @@
*/
package org.apache.seatunnel.spark.sink
+import java.util.Properties
+
+import scala.collection.JavaConversions._
+
+import org.apache.seatunnel.common.Constants
import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils}
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.SparkEnvironment
@@ -24,9 +29,6 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, Row}
-import java.util.Properties
-import scala.collection.JavaConversions._
-
class Kafka extends SparkBatchSink with Logging {
val producerPrefix = "producer."
@@ -38,7 +40,7 @@ class Kafka extends SparkBatchSink with Logging {
val producerConfig = TypesafeConfigUtils.extractSubConfig(config,
producerPrefix, false)
config.hasPath("topic") && producerConfig.hasPath("bootstrap.servers")
match {
- case true => new CheckResult(true, "")
+ case true => new CheckResult(true, Constants.CHECK_SUCCESS)
case false =>
new CheckResult(false, "please specify [topic] and
[producer.bootstrap.servers]")
}
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/sink/Kudu.scala
b/seatunnel-connectors/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/sink/Kudu.scala
index af7fcec..9f7ef86 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/sink/Kudu.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/sink/Kudu.scala
@@ -18,8 +18,8 @@
package org.apache.seatunnel.spark.sink
import scala.collection.JavaConversions._
-
import org.apache.kudu.spark.kudu._
+import org.apache.seatunnel.common.config.CheckConfigUtil.check
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.SparkEnvironment
@@ -36,12 +36,7 @@ class Kudu extends SparkBatchSink {
}
override def checkConfig(): CheckResult = {
- config.hasPath("kudu_master") && config.hasPath("kudu_table") match {
- case true =>
- new CheckResult(true, "")
- case false =>
- new CheckResult(false, "please specify [kudu_master] and [kudu_table]
")
- }
+ check(config, "kudu_master", "kudu_table")
}
override def output(df: Dataset[Row], environment: SparkEnvironment): Unit =
{
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/source/Kudu.scala
b/seatunnel-connectors/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/source/Kudu.scala
index 29d4315..8334677 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/source/Kudu.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/source/Kudu.scala
@@ -18,6 +18,7 @@
package org.apache.seatunnel.spark.source
import org.apache.kudu.spark.kudu._
+import org.apache.seatunnel.common.config.CheckConfigUtil.check
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSource
@@ -26,10 +27,7 @@ import org.apache.spark.sql.{Dataset, Row}
class Kudu extends SparkBatchSource {
override def checkConfig(): CheckResult = {
- config.hasPath("kudu_master") && config.hasPath("kudu_table") match {
- case true => new CheckResult(true, "")
- case false => new CheckResult(false, "please specify [kudu_master] and
[kudu_table]")
- }
+ check(config, "kudu_master", "kudu_table")
}
override def getData(env: SparkEnvironment): Dataset[Row] = {
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/source/MongoDB.scala
b/seatunnel-connectors/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/source/MongoDB.scala
index e4d2f05..4ceda87 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/source/MongoDB.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/source/MongoDB.scala
@@ -17,15 +17,17 @@
package org.apache.seatunnel.spark.source
import scala.collection.JavaConversions._
+
import com.alibaba.fastjson.JSON
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
-import org.apache.spark.sql.{Dataset, Row}
-import org.apache.spark.sql.types.StructType
+import org.apache.seatunnel.common.Constants
import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils}
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSource
import org.apache.seatunnel.spark.utils.SparkStructTypeUtil
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.types.StructType
class MongoDB extends SparkBatchSource {
@@ -67,15 +69,14 @@ class MongoDB extends SparkBatchSource {
override def checkConfig(): CheckResult = {
TypesafeConfigUtils.hasSubConfig(config, confPrefix) match {
- case true => {
+ case true =>
val read = TypesafeConfigUtils.extractSubConfig(config, confPrefix,
false)
read.hasPath("uri") && read.hasPath("database") &&
read.hasPath("collection") match {
- case true => new CheckResult(true, "")
+ case true => new CheckResult(true, Constants.CHECK_SUCCESS)
case false => new CheckResult(
false,
"please specify [readconfig.uri] and [readconfig.database] and
[readconfig.collection]")
}
- }
case false => new CheckResult(false, "please specify [readconfig]")
}
}
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/source/Redis.scala
b/seatunnel-connectors/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/source/Redis.scala
index cd4b7f6..c807ab5 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/source/Redis.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/source/Redis.scala
@@ -17,11 +17,12 @@
package org.apache.seatunnel.spark.source
-import com.redislabs.provider.redis.{RedisConfig, RedisEndpoint,
toRedisContext}
-import org.apache.spark.sql.{Dataset, Row}
+import com.redislabs.provider.redis.{toRedisContext, RedisConfig,
RedisEndpoint}
+import org.apache.seatunnel.common.Constants
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSource
+import org.apache.spark.sql.{Dataset, Row}
class Redis extends SparkBatchSource {
val defaultPort: Int = 6379
@@ -45,7 +46,7 @@ class Redis extends SparkBatchSource {
new CheckResult(
false,
"please specify [key_pattern] as non-empty string, multiple key
patterns separated by ','")
- case _ => new CheckResult(true, "")
+ case _ => new CheckResult(true, Constants.CHECK_SUCCESS)
}
}
diff --git
a/seatunnel-connectors/seatunnel-connector-spark-socket/src/main/scala/org/apache/seatunnel/spark/source/SocketStream.scala
b/seatunnel-connectors/seatunnel-connector-spark-socket/src/main/scala/org/apache/seatunnel/spark/source/SocketStream.scala
index 81b7c58..db93261 100644
---
a/seatunnel-connectors/seatunnel-connector-spark-socket/src/main/scala/org/apache/seatunnel/spark/source/SocketStream.scala
+++
b/seatunnel-connectors/seatunnel-connector-spark-socket/src/main/scala/org/apache/seatunnel/spark/source/SocketStream.scala
@@ -17,14 +17,16 @@
package org.apache.seatunnel.spark.source
import scala.collection.JavaConversions._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Dataset, Row, RowFactory, SparkSession}
-import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
-import org.apache.spark.streaming.dstream.DStream
+
+import org.apache.seatunnel.common.Constants
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.stream.SparkStreamingSource
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Row, RowFactory, SparkSession}
+import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
+import org.apache.spark.streaming.dstream.DStream
class SocketStream extends SparkStreamingSource[String] {
@@ -41,7 +43,7 @@ class SocketStream extends SparkStreamingSource[String] {
}
override def checkConfig(): CheckResult = {
- new CheckResult(true, "")
+ new CheckResult(true, Constants.CHECK_SUCCESS)
}
override def rdd2dataset(sparkSession: SparkSession, rdd: RDD[String]):
Dataset[Row] = {
diff --git
a/seatunnel-transforms/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
b/seatunnel-transforms/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
index 07f72f5..bd9134b 100644
---
a/seatunnel-transforms/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
+++
b/seatunnel-transforms/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
@@ -16,7 +16,6 @@
*/
package org.apache.seatunnel.spark.transform
-import org.apache.seatunnel.common.RowConstant
import org.apache.seatunnel.common.config.{CheckResult, Common,
ConfigRuntimeException}
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
@@ -24,9 +23,11 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.slf4j.LoggerFactory
-
import java.io.File
import java.nio.file.Paths
+
+import org.apache.seatunnel.common.Constants
+
import scala.collection.JavaConversions._
import scala.io.Source
import scala.util.{Failure, Success, Try}
@@ -45,7 +46,7 @@ class Json extends BaseSparkTransform {
import spark.implicits._
config.getString("target_field") match {
- case RowConstant.ROOT => {
+ case Constants.ROW_ROOT => {
val jsonRDD = df.select(srcField).as[String].rdd
@@ -64,11 +65,11 @@ class Json extends BaseSparkTransform {
case s: String => {
val schema =
if (this.useCustomSchema) this.customSchema else
spark.read.json(jsonRDD).schema
- var tmpDf = df.withColumn(RowConstant.TMP, from_json(col(s),
schema))
+ var tmpDf = df.withColumn(Constants.ROW_TMP, from_json(col(s),
schema))
schema.map { field =>
- tmpDf = tmpDf.withColumn(field.name,
col(RowConstant.TMP)(field.name))
+ tmpDf = tmpDf.withColumn(field.name,
col(Constants.ROW_TMP)(field.name))
}
- tmpDf.drop(RowConstant.TMP)
+ tmpDf.drop(Constants.ROW_TMP)
}
}
@@ -90,13 +91,13 @@ class Json extends BaseSparkTransform {
}
}
- override def checkConfig(): CheckResult = new CheckResult(true, "")
+ override def checkConfig(): CheckResult = new CheckResult(true,
Constants.CHECK_SUCCESS)
override def prepare(env: SparkEnvironment): Unit = {
val defaultConfig = ConfigFactory.parseMap(
Map(
"source_field" -> "raw_message",
- "target_field" -> RowConstant.ROOT,
+ "target_field" -> Constants.ROW_ROOT,
"schema_dir" -> Paths
.get(Common.pluginFilesDir("json").toString, "schemas")
.toString,
diff --git
a/seatunnel-transforms/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
b/seatunnel-transforms/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
index 2c47772..5bc326b 100644
---
a/seatunnel-transforms/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
+++
b/seatunnel-transforms/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
@@ -16,9 +16,9 @@
*/
package org.apache.seatunnel.spark.transform
-import scala.collection.JavaConversions._
+import org.apache.seatunnel.common.Constants
-import org.apache.seatunnel.common.RowConstant
+import scala.collection.JavaConversions._
import org.apache.seatunnel.common.config.CheckConfigUtil.check
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
@@ -34,15 +34,15 @@ class Split extends BaseSparkTransform {
// https://stackoverflow.com/a/33345698/1145750
config.getString("target_field") match {
- case RowConstant.ROOT => {
+ case Constants.ROW_ROOT => {
val func = udf((s: String) => {
split(s, config.getString("delimiter"), keys.size())
})
- var filterDf = df.withColumn(RowConstant.TMP, func(col(srcField)))
+ var filterDf = df.withColumn(Constants.ROW_TMP, func(col(srcField)))
for (i <- 0 until keys.size()) {
- filterDf = filterDf.withColumn(keys.get(i), col(RowConstant.TMP)(i))
+ filterDf = filterDf.withColumn(keys.get(i),
col(Constants.ROW_TMP)(i))
}
- filterDf.drop(RowConstant.TMP)
+ filterDf.drop(Constants.ROW_TMP)
}
case targetField: String => {
val func = udf((s: String) => {
@@ -65,7 +65,7 @@ class Split extends BaseSparkTransform {
Map(
"delimiter" -> " ",
"source_field" -> "raw_message",
- "target_field" -> RowConstant.ROOT))
+ "target_field" -> Constants.ROW_ROOT))
config = config.withFallback(defaultConfig)
}
diff --git
a/seatunnel-transforms/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
b/seatunnel-transforms/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
index 14e8ae9..0a7f6d2 100644
---
a/seatunnel-transforms/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
+++
b/seatunnel-transforms/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
@@ -16,6 +16,7 @@
*/
package org.apache.seatunnel.spark.transform
+import org.apache.seatunnel.common.config.CheckConfigUtil.check
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
import org.apache.spark.sql.{Dataset, Row}
@@ -27,11 +28,7 @@ class Sql extends BaseSparkTransform {
}
override def checkConfig(): CheckResult = {
- if (config.hasPath("sql")) {
- new CheckResult(true, "")
- } else {
- new CheckResult(false, "please specify [sql]")
- }
+ check(config, "sql")
}
override def prepare(env: SparkEnvironment): Unit = {}