This is an automated email from the ASF dual-hosted git repository.

rickyhuo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d0b1bc  [Feature][plugin-filter-date] Date filter supports Timestamp. 
(#973)
1d0b1bc is described below

commit 1d0b1bc11993f2ed3691d6a60b92d862b015a433
Author: Zonglei Dong <[email protected]>
AuthorDate: Mon Jan 10 22:53:45 2022 +0800

    [Feature][plugin-filter-date] Date filter supports Timestamp. (#973)
    
    * [Feature][plugin-filter-date] Date filter supports Timestamp.
    
    * [Feature][plugin-filter-date] Date filter supports Timestamp.
    
    * [Feature][plugin-filter-date] Fixes date filter Timestamp format.
    
    * [Feature][plugin-filter-date] Fixes input Timestamp format.
    
    * [Feature][plugin-filter-date] Fixes input Timestamp format.
---
 .../interestinglab/waterdrop/filter/Date.scala     |  3 +-
 .../waterdrop/utils/TimestampParser.scala          | 47 ++++++++++++++++++++++
 2 files changed, 49 insertions(+), 1 deletion(-)

diff --git 
a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/filter/Date.scala
 
b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/filter/Date.scala
index 10cfb9b..e2d3a41 100644
--- 
a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/filter/Date.scala
+++ 
b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/filter/Date.scala
@@ -22,7 +22,7 @@ package io.github.interestinglab.waterdrop.filter
 import io.github.interestinglab.waterdrop.config.{Config, ConfigFactory}
 import io.github.interestinglab.waterdrop.apis.BaseFilter
 import io.github.interestinglab.waterdrop.core.RowConstant
-import io.github.interestinglab.waterdrop.utils.{FormatParser, StringTemplate, 
UnixMSParser, UnixParser}
+import io.github.interestinglab.waterdrop.utils.{FormatParser, StringTemplate, 
UnixMSParser, UnixParser, TimestampParser}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.{Dataset, Row, SparkSession}
 
@@ -77,6 +77,7 @@ class Date extends BaseFilter {
     val dateParser = config.getString("source_time_format") match {
       case "UNIX" => new UnixParser(targetTimeFormat)
       case "UNIX_MS" => new UnixMSParser(targetTimeFormat)
+      case "TIMESTAMP" => new TimestampParser(targetTimeFormat)
       case sourceTimeFormat: String => new FormatParser(sourceTimeFormat, 
targetTimeFormat)
     }
 
diff --git 
a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/utils/TimestampParser.scala
 
b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/utils/TimestampParser.scala
new file mode 100644
index 0000000..1f248f9
--- /dev/null
+++ 
b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/utils/TimestampParser.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package io.github.interestinglab.waterdrop.utils
+
+import java.text.SimpleDateFormat
+import java.util.Date
+
+import scala.util.control.NonFatal
+
+class TimestampParser(targetTimeFormat: String) extends DateParser {
+
+  val targetFormat = targetTimeFormat
+
+  def parse(input: String): (Boolean, String) = {
+
+    try {
+      val inputDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
+      parse(inputDateFormat.parse(input).getTime())
+    } catch {
+      case NonFatal(e) => (false, "")
+    }
+  }
+
+  def parse(input: Long): (Boolean, String) = {
+
+    val targetDateFormat = new SimpleDateFormat(this.targetFormat)
+    val date = new Date(input)
+    (true, targetDateFormat.format(date))
+  }
+}

Reply via email to