This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1943af4 [Bug] [seatunnel-connector-spark-file] fix file source and
sink default params (#1590)
1943af4 is described below
commit 1943af4494d1fce8b01dc9be8c286c8e3463e526
Author: bestcx <[email protected]>
AuthorDate: Mon Mar 28 22:07:41 2022 +0800
[Bug] [seatunnel-connector-spark-file] fix file source and sink default
params (#1590)
---
.../src/main/scala/org/apache/seatunnel/spark/Config.scala | 4 +++-
.../main/scala/org/apache/seatunnel/spark/source/File.scala | 11 ++++++++++-
2 files changed, 13 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/Config.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/Config.scala
index 4d43a2d..38561cc 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/Config.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/Config.scala
@@ -23,7 +23,7 @@ object Config extends Serializable {
final val SAVE_MODE = "save_mode"
final val SERIALIZER = "serializer"
final val PATH_TIME_FORMAT = "path_time_format"
- final val DEFAULT_TIME_FORMAT = "path_time_format"
+ final val DEFAULT_TIME_FORMAT = "yyyyMMddHHmmss"
final val FORMAT = "format"
final val SAVE_MODE_ERROR = "error"
final val OPTION_PREFIX = "options."
@@ -34,4 +34,6 @@ object Config extends Serializable {
final val ORC = "orc"
final val CSV = "csv"
+ final val DEFAULT_FORMAT = JSON
+
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala
index 2dc6a1c..e4e2d75 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/source/File.scala
@@ -19,6 +19,7 @@ package org.apache.seatunnel.spark.source
import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
import org.apache.seatunnel.common.config.CheckResult
import
org.apache.seatunnel.common.config.TypesafeConfigUtils.extractSubConfigThrowable
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.Config._
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSource
@@ -29,8 +30,16 @@ import scala.util.{Failure, Success, Try}
class File extends SparkBatchSource {
+ override def prepare(env: SparkEnvironment): Unit = {
+ val defaultConfig = ConfigFactory.parseMap(
+ Map(
+ FORMAT -> DEFAULT_FORMAT
+ ))
+ config = config.withFallback(defaultConfig)
+ }
+
override def checkConfig(): CheckResult = {
- checkAllExists(config, PATH, FORMAT)
+ checkAllExists(config, PATH)
}
override def getData(env: SparkEnvironment): Dataset[Row] = {