This is an automated email from the ASF dual-hosted git repository.
rickyhuo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/master by this push:
new 1d0b1bc [Feature][plugin-filter-date] Date filter supports Timestamp.
(#973)
1d0b1bc is described below
commit 1d0b1bc11993f2ed3691d6a60b92d862b015a433
Author: Zonglei Dong <[email protected]>
AuthorDate: Mon Jan 10 22:53:45 2022 +0800
[Feature][plugin-filter-date] Date filter supports Timestamp. (#973)
* [Feature][plugin-filter-date] Date filter supports Timestamp.
* [Feature][plugin-filter-date] Date filter supports Timestamp.
* [Feature][plugin-filter-date] Fixes date filter Timestamp format.
* [Feature][plugin-filter-date] Fixes input Timestamp format.
* [Feature][plugin-filter-date] Fixes input Timestamp format.
---
.../interestinglab/waterdrop/filter/Date.scala | 3 +-
.../waterdrop/utils/TimestampParser.scala | 47 ++++++++++++++++++++++
2 files changed, 49 insertions(+), 1 deletion(-)
diff --git
a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/filter/Date.scala
b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/filter/Date.scala
index 10cfb9b..e2d3a41 100644
---
a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/filter/Date.scala
+++
b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/filter/Date.scala
@@ -22,7 +22,7 @@ package io.github.interestinglab.waterdrop.filter
import io.github.interestinglab.waterdrop.config.{Config, ConfigFactory}
import io.github.interestinglab.waterdrop.apis.BaseFilter
import io.github.interestinglab.waterdrop.core.RowConstant
-import io.github.interestinglab.waterdrop.utils.{FormatParser, StringTemplate,
UnixMSParser, UnixParser}
+import io.github.interestinglab.waterdrop.utils.{FormatParser, StringTemplate,
UnixMSParser, UnixParser, TimestampParser}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Dataset, Row, SparkSession}
@@ -77,6 +77,7 @@ class Date extends BaseFilter {
val dateParser = config.getString("source_time_format") match {
case "UNIX" => new UnixParser(targetTimeFormat)
case "UNIX_MS" => new UnixMSParser(targetTimeFormat)
+ case "TIMESTAMP" => new TimestampParser(targetTimeFormat)
case sourceTimeFormat: String => new FormatParser(sourceTimeFormat,
targetTimeFormat)
}
diff --git
a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/utils/TimestampParser.scala
b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/utils/TimestampParser.scala
new file mode 100644
index 0000000..1f248f9
--- /dev/null
+++
b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/utils/TimestampParser.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package io.github.interestinglab.waterdrop.utils
+
+import java.text.SimpleDateFormat
+import java.util.Date
+
+import scala.util.control.NonFatal
+
+class TimestampParser(targetTimeFormat: String) extends DateParser {
+
+ val targetFormat = targetTimeFormat
+
+ def parse(input: String): (Boolean, String) = {
+
+ try {
+ val inputDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
+ parse(inputDateFormat.parse(input).getTime())
+ } catch {
+ case NonFatal(e) => (false, "")
+ }
+ }
+
+ def parse(input: Long): (Boolean, String) = {
+
+ val targetDateFormat = new SimpleDateFormat(this.targetFormat)
+ val date = new Date(input)
+ (true, targetDateFormat.format(date))
+ }
+}