This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c940f217b [flink] Support dateFormat compute functions (#1806)
c940f217b is described below

commit c940f217bc8dec11a1eda316fa1d6593bdc3013d
Author: ehui <[email protected]>
AuthorDate: Mon Aug 14 17:27:13 2023 +0800

    [flink] Support dateFormat compute functions (#1806)
---
 .../shortcodes/generated/compute_column.html       |  4 ++
 .../apache/paimon/flink/action/cdc/Expression.java | 48 +++++++++++++++++++++-
 .../cdc/mysql/MySqlSyncTableActionITCase.java      | 13 +++++-
 3 files changed, 62 insertions(+), 3 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/compute_column.html 
b/docs/layouts/shortcodes/generated/compute_column.html
index a86de07eb..f9bf8697f 100644
--- a/docs/layouts/shortcodes/generated/compute_column.html
+++ b/docs/layouts/shortcodes/generated/compute_column.html
@@ -41,6 +41,10 @@ under the License.
         <td><h5>hour(date-column)</h5></td>
         <td>Extract hour from a DATE, DATETIME or TIMESTAMP. Output is an INT 
value represent the hour.</td>
     </tr>
+    <tr>
+        <td><h5>date_format(date-column)</h5></td>
+        <td>Convert date format from a DATE, DATETIME or TIMESTAMP. Output is 
a string value in converted date format.</td>
+    </tr>
     <tr>
         <td><h5>substring(column,beginInclusive)</h5></td>
         <td>Get column.substring(beginInclusive). Output is a STRING.</td>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
index e38270524..15e9c683f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
@@ -28,6 +28,7 @@ import java.io.Serializable;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.Arrays;
 import java.util.List;
 
@@ -37,7 +38,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 public interface Expression extends Serializable {
 
     List<String> SUPPORTED_EXPRESSION =
-            Arrays.asList("year", "month", "day", "hour", "substring", 
"truncate");
+            Arrays.asList("year", "month", "day", "hour", "date_format", 
"substring", "truncate");
 
     /** Return name of referenced field. */
     String fieldReference();
@@ -59,6 +60,8 @@ public interface Expression extends Serializable {
                 return day(fieldReference);
             case "hour":
                 return hour(fieldReference);
+            case "date_format":
+                return dateFormat(fieldReference, literals);
             case "substring":
                 return substring(fieldReference, literals);
             case "truncate":
@@ -88,6 +91,15 @@ public interface Expression extends Serializable {
         return new HourComputer(fieldReference);
     }
 
+    static Expression dateFormat(String fieldReference, String... literals) {
+        checkArgument(
+                literals.length == 1,
+                String.format(
+                        "'date_format' expression supports one argument, but 
found '%s'.",
+                        literals.length));
+        return new DateFormat(fieldReference, literals[0]);
+    }
+
     static Expression substring(String fieldReference, String... literals) {
         checkArgument(
                 literals.length == 1 || literals.length == 2,
@@ -239,6 +251,40 @@ public interface Expression extends Serializable {
         }
     }
 
+    /** date format from a time input. */
+    final class DateFormat implements Expression {
+
+        private static final long serialVersionUID = 1L;
+
+        private final String fieldReference;
+        private final String pattern;
+        private transient DateTimeFormatter formatter;
+
+        private DateFormat(String fieldReference, String pattern) {
+            this.fieldReference = fieldReference;
+            this.pattern = pattern;
+        }
+
+        @Override
+        public String fieldReference() {
+            return fieldReference;
+        }
+
+        @Override
+        public DataType outputType() {
+            return DataTypes.STRING();
+        }
+
+        @Override
+        public String eval(String input) {
+            if (formatter == null) {
+                formatter = DateTimeFormatter.ofPattern(pattern);
+            }
+            LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime(input, 
0);
+            return localDateTime.format(formatter);
+        }
+    }
+
     /** Get substring using {@link String#substring}. */
     final class Substring implements Expression {
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 1195f3dac..b4cf2131d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -773,6 +773,9 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                         "_hour_date=hour(_date)",
                         "_hour_datetime=hour(_datetime)",
                         "_hour_timestamp=hour(_timestamp)",
+                        "_date_format_date=date_format(_date,yyyy)",
+                        
"_date_format_datetime=date_format(_datetime,yyyy-MM-dd)",
+                        
"_date_format_timestamp=date_format(_timestamp,yyyyMMdd)",
                         "_substring_date1=substring(_date,2)",
                         "_substring_date2=substring(_timestamp,5,10)",
                         "_truncate_date=truncate(pk,2)");
@@ -824,6 +827,9 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                             DataTypes.INT(),
                             DataTypes.STRING(),
                             DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
                             DataTypes.INT()
                         },
                         new String[] {
@@ -843,14 +849,17 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                             "_hour_date",
                             "_hour_datetime",
                             "_hour_timestamp",
+                            "_date_format_date",
+                            "_date_format_datetime",
+                            "_date_format_timestamp",
                             "_substring_date1",
                             "_substring_date2",
                             "_truncate_date"
                         });
         List<String> expected =
                 Arrays.asList(
-                        "+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10, 
2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 23-03-23, 09-15, 0]",
-                        "+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3, NULL, 
NULL, 23, NULL, NULL, 0, NULL, NULL, 23-03-23, NULL, 2]");
+                        "+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10, 
2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 2023, 2022-01-01, 20210915, 
23-03-23, 09-15, 0]",
+                        "+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3, NULL, 
NULL, 23, NULL, NULL, 0, NULL, NULL, 2023, NULL, NULL, 23-03-23, NULL, 2]");
         waitForResult(expected, table, rowType, Arrays.asList("pk", 
"_year_date"));
     }
 

Reply via email to