This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f8091fbe [cdc] Time related computed functions support to handle 
epoch time (#2989)
8f8091fbe is described below

commit 8f8091fbe47863c9c3ba09133a84eaefd58de1f5
Author: yuzelin <[email protected]>
AuthorDate: Tue Mar 12 13:22:44 2024 +0800

    [cdc] Time related computed functions support to handle epoch time (#2989)
---
 docs/content/cdc-ingestion/overview.md             |  33 ++-
 .../{compute_column.html => other_functions.html}  |  30 --
 .../shortcodes/generated/temporal_functions.html   |  57 ++++
 .../flink/action/cdc/ComputedColumnUtils.java      |   2 +-
 .../apache/paimon/flink/action/cdc/Expression.java | 312 +++++++++------------
 .../cdc/mysql/MySqlSyncTableActionITCase.java      | 177 ++++++++++++
 .../src/test/resources/mysql/sync_table_setup.sql  |  20 ++
 7 files changed, 412 insertions(+), 219 deletions(-)

diff --git a/docs/content/cdc-ingestion/overview.md 
b/docs/content/cdc-ingestion/overview.md
index 16b31a79b..90e2235d3 100644
--- a/docs/content/cdc-ingestion/overview.md
+++ b/docs/content/cdc-ingestion/overview.md
@@ -78,9 +78,38 @@ behaviors of `RENAME TABLE` and `DROP COLUMN` will be 
ignored, `RENAME COLUMN` w
 
 ## Computed Functions
 
-`--computed_column` are the definitions of computed columns. The argument 
field is from source table field name. Supported expressions are:
+`--computed_column` are the definitions of computed columns. The argument 
field is from source table field name. 
 
-{{< generated/compute_column >}}
+### Temporal Functions
+
+Temporal functions can convert date and epoch time to another form. A common 
use case is to generate partition values.
+
+{{< generated/temporal_functions >}}
+
+The data type of the temporal-column can be one of the following cases:
+1. DATE, DATETIME or TIMESTAMP.
+2. Any integer numeric type (such as INT and BIGINT). In this case, the data 
will be considered as epoch time of `1970-01-01 00:00:00`. 
+You should set precision of the value (default is 0).
+3. STRING. In this case, if you didn't set the time unit, the data will be 
considered as formatted string of DATE, 
+DATETIME or TIMESTAMP value. Otherwise, the data will be considered as string 
value of epoch time. So you must set time 
+unit in the latter case.
+
+The precision represents the unit of the epoch time. Currently, There are four 
valid precisions: `0` (for epoch seconds),
+`3` (for epoch milliseconds), `6`(for epoch microseconds) and `9` (for epoch 
nanoseconds). Take the time point 
+`1970-01-01 00:00:00.123456789` as an example, the epoch seconds are 0, the 
epoch milliseconds are 123, the epoch microseconds 
+are 123456, and the epoch nanoseconds are 123456789. The precision should 
match the input values. You can set precision 
+in this way: `date_format(epoch_col, yyyy-MM-dd, 0)`.
+
+`date_format` is a flexible function which is able to convert the temporal 
value to various formats with different format 
+strings. A most common format string is `yyyy-MM-dd HH:mm:ss.SSS`. Another 
example is `yyyy-ww` which can extract the year 
+and the week-of-the-year from the input. Note that the output is affected by 
the locale. For example, in some regions the 
+first day of a week is Monday while in others is Sunday, so if you use 
`date_format(date_col, yyyy-ww)` and the input of 
+date_col is 2024-01-07 (Sunday), the output maybe `2024-01` (if the first day 
of a week is Monday) or `2024-02` (if the 
+first day of a week is Sunday).
+
+### Other Functions
+
+{{< generated/other_functions >}}
 
 ## Special Data Type Mapping
 
diff --git a/docs/layouts/shortcodes/generated/compute_column.html 
b/docs/layouts/shortcodes/generated/other_functions.html
similarity index 55%
rename from docs/layouts/shortcodes/generated/compute_column.html
rename to docs/layouts/shortcodes/generated/other_functions.html
index cabeb5276..f367c97d0 100644
--- a/docs/layouts/shortcodes/generated/compute_column.html
+++ b/docs/layouts/shortcodes/generated/other_functions.html
@@ -25,36 +25,6 @@ under the License.
     </tr>
     </thead>
     <tbody>
-    <tr>
-        <td><h5>year(date-column)</h5></td>
-        <td>Extract year from a DATE, DATETIME or TIMESTAMP (or its 
corresponding string format). Output is an INT value represent the year.</td>
-    </tr>
-    <tr>
-        <td><h5>month(date-column)</h5></td>
-        <td>Extract month of year from a DATE, DATETIME or TIMESTAMP (or its 
corresponding string format). Output is an INT value represent the month of 
year.</td>
-    </tr>
-    <tr>
-        <td><h5>day(date-column)</h5></td>
-        <td>Extract day of month from a DATE, DATETIME or TIMESTAMP (or its 
corresponding string format). Output is an INT value represent the day of 
month.</td>
-    </tr>
-    <tr>
-        <td><h5>hour(date-column)</h5></td>
-        <td>Extract hour from a DATE, DATETIME or TIMESTAMP (or its 
corresponding string format). Output is an INT value represent the hour.</td>
-    </tr>
-    <tr>
-        <td><h5>minute(date-column)</h5></td>
-        <td>Extract minute from a DATE, DATETIME or TIMESTAMP (or its 
corresponding string format). Output is an INT value represent the minute.</td>
-    </tr>
-    <tr>
-        <td><h5>second(date-column)</h5></td>
-        <td>Extract second from a DATE, DATETIME or TIMESTAMP (or its 
corresponding string format). Output is an INT value represent the second.</td>
-    </tr>
-    <tr>
-        <td><h5>date_format(date-column,format)</h5></td>
-        <td>Convert date format from a DATE, DATETIME or TIMESTAMP (or its 
corresponding string format).
-            'format' is compatible with Java's DateTimeFormatter String (for 
example, 'yyyy-MM-dd'). Output is a string value in converted date format.
-        </td>
-    </tr>
     <tr>
         <td><h5>substring(column,beginInclusive)</h5></td>
         <td>Get column.substring(beginInclusive). Output is a STRING.</td>
diff --git a/docs/layouts/shortcodes/generated/temporal_functions.html 
b/docs/layouts/shortcodes/generated/temporal_functions.html
new file mode 100644
index 000000000..d652a5a0e
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/temporal_functions.html
@@ -0,0 +1,57 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+{{ $ref := ref . "maintenance/configurations.md" }}
+<table class="configuration table table-bordered">
+    <thead>
+    <tr>
+        <th class="text-left" style="width: 30%">Function</th>
+        <th class="text-left" style="width: 70%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+        <td><h5>year(temporal-column [, precision])</h5></td>
+        <td>Extract year from the input. Output is an INT value represent the 
year.</td>
+    </tr>
+    <tr>
+        <td><h5>month(temporal-column [, precision])</h5></td>
+        <td>Extract month of year from the input. Output is an INT value 
represent the month of year.</td>
+    </tr>
+    <tr>
+        <td><h5>day(temporal-column [, precision])</h5></td>
+        <td>Extract day of month from the input. Output is an INT value 
represent the day of month.</td>
+    </tr>
+    <tr>
+        <td><h5>hour(temporal-column [, precision])</h5></td>
+        <td>Extract hour from the input. Output is an INT value represent the 
hour.</td>
+    </tr>
+    <tr>
+        <td><h5>minute(temporal-column [, precision])</h5></td>
+        <td>Extract minute from the input. Output is an INT value represent 
the minute.</td>
+    </tr>
+    <tr>
+        <td><h5>second(temporal-column [, precision])</h5></td>
+        <td>Extract second from the input. Output is an INT value represent 
the second.</td>
+    </tr>
+    <tr>
+        <td><h5>date_format(temporal-column, format-string [, 
precision])</h5></td>
+        <td>Convert the input to desired formatted string. Output type is 
STRING.</td>
+    </tr>
+    </tbody>
+</table>
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
index 70b1898c0..a18355ca6 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
@@ -88,7 +88,7 @@ public class ComputedColumnUtils {
                             Expression.create(
                                     exprName,
                                     fieldReference,
-                                    typeMapping.get(fieldReference),
+                                    typeMapping.get(fieldReferenceCheckForm),
                                     literals)));
         }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
index 554b8fc9e..417758f4b 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
@@ -18,9 +18,12 @@
 
 package org.apache.paimon.flink.action.cdc;
 
+import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeFamily;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.utils.SerializableSupplier;
 
 import javax.annotation.Nullable;
 
@@ -31,6 +34,7 @@ import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.Arrays;
 import java.util.List;
+import java.util.function.Function;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -62,19 +66,25 @@ public interface Expression extends Serializable {
             String exprName, String fieldReference, DataType fieldType, 
String... literals) {
         switch (exprName.toLowerCase()) {
             case "year":
-                return year(fieldReference);
+                return TemporalToIntConverter.create(
+                        fieldReference, fieldType, () -> 
LocalDateTime::getYear, literals);
             case "month":
-                return month(fieldReference);
+                return TemporalToIntConverter.create(
+                        fieldReference, fieldType, () -> 
LocalDateTime::getMonthValue, literals);
             case "day":
-                return day(fieldReference);
+                return TemporalToIntConverter.create(
+                        fieldReference, fieldType, () -> 
LocalDateTime::getDayOfMonth, literals);
             case "hour":
-                return hour(fieldReference);
+                return TemporalToIntConverter.create(
+                        fieldReference, fieldType, () -> 
LocalDateTime::getHour, literals);
             case "minute":
-                return minute(fieldReference);
+                return TemporalToIntConverter.create(
+                        fieldReference, fieldType, () -> 
LocalDateTime::getMinute, literals);
             case "second":
-                return second(fieldReference);
+                return TemporalToIntConverter.create(
+                        fieldReference, fieldType, () -> 
LocalDateTime::getSecond, literals);
             case "date_format":
-                return dateFormat(fieldReference, literals);
+                return DateFormat.create(fieldReference, fieldType, literals);
             case "substring":
                 return substring(fieldReference, literals);
             case "truncate":
@@ -88,39 +98,6 @@ public interface Expression extends Serializable {
         }
     }
 
-    static Expression year(String fieldReference) {
-        return new YearComputer(fieldReference);
-    }
-
-    static Expression month(String fieldReference) {
-        return new MonthComputer(fieldReference);
-    }
-
-    static Expression day(String fieldReference) {
-        return new DayComputer(fieldReference);
-    }
-
-    static Expression hour(String fieldReference) {
-        return new HourComputer(fieldReference);
-    }
-
-    static Expression minute(String fieldReference) {
-        return new MinuteComputer(fieldReference);
-    }
-
-    static Expression second(String fieldReference) {
-        return new SecondComputer(fieldReference);
-    }
-
-    static Expression dateFormat(String fieldReference, String... literals) {
-        checkArgument(
-                literals.length == 1,
-                String.format(
-                        "'date_format' expression supports one argument, but 
found '%s'.",
-                        literals.length));
-        return new DateFormat(fieldReference, literals[0]);
-    }
-
     static Expression substring(String fieldReference, String... literals) {
         checkArgument(
                 literals.length == 1 || literals.length == 2,
@@ -160,99 +137,35 @@ public interface Expression extends Serializable {
         return new TruncateComputer(fieldReference, fieldType, literals[0]);
     }
 
-    /** Compute year from a time input. */
-    final class YearComputer implements Expression {
+    /** Expression to handle temporal value. */
+    abstract class TemporalExpressionBase<T> implements Expression {
 
         private static final long serialVersionUID = 1L;
 
-        private final String fieldReference;
-
-        private YearComputer(String fieldReference) {
-            this.fieldReference = fieldReference;
-        }
-
-        @Override
-        public String fieldReference() {
-            return fieldReference;
-        }
-
-        @Override
-        public DataType outputType() {
-            return DataTypes.INT();
-        }
-
-        @Override
-        public String eval(String input) {
-            LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime(input, 
0);
-            return String.valueOf(localDateTime.getYear());
-        }
-    }
-
-    /** Compute month from a time input. */
-    final class MonthComputer implements Expression {
-
-        private static final long serialVersionUID = 1L;
+        private static final List<Integer> SUPPORTED_PRECISION = 
Arrays.asList(0, 3, 6, 9);
 
         private final String fieldReference;
+        @Nullable private final Integer precision;
 
-        private MonthComputer(String fieldReference) {
-            this.fieldReference = fieldReference;
-        }
-
-        @Override
-        public String fieldReference() {
-            return fieldReference;
-        }
-
-        @Override
-        public DataType outputType() {
-            return DataTypes.INT();
-        }
-
-        @Override
-        public String eval(String input) {
-            LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime(input, 
0);
-            return String.valueOf(localDateTime.getMonthValue());
-        }
-    }
-
-    /** Compute day from a time input. */
-    final class DayComputer implements Expression {
-
-        private static final long serialVersionUID = 1L;
-
-        private final String fieldReference;
+        private transient Function<LocalDateTime, T> converter;
 
-        private DayComputer(String fieldReference) {
+        private TemporalExpressionBase(
+                String fieldReference, DataType fieldType, @Nullable Integer 
precision) {
             this.fieldReference = fieldReference;
-        }
-
-        @Override
-        public String fieldReference() {
-            return fieldReference;
-        }
 
-        @Override
-        public DataType outputType() {
-            return DataTypes.INT();
-        }
-
-        @Override
-        public String eval(String input) {
-            LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime(input, 
0);
-            return String.valueOf(localDateTime.getDayOfMonth());
-        }
-    }
-
-    /** Compute hour from a time input. */
-    final class HourComputer implements Expression {
-
-        private static final long serialVersionUID = 1L;
+            // when the input is INTEGER_NUMERIC, the precision must be set
+            if 
(fieldType.getTypeRoot().getFamilies().contains(DataTypeFamily.INTEGER_NUMERIC)
+                    && precision == null) {
+                precision = 0;
+            }
 
-        private final String fieldReference;
+            checkArgument(
+                    precision == null || 
SUPPORTED_PRECISION.contains(precision),
+                    "Unsupported precision of temporal function: %d. Supported 
precisions are: "
+                            + "0 (for epoch seconds), 3 (for epoch 
milliseconds), 6 (for epoch microseconds) and 9 (for epoch nanoseconds).",
+                    precision);
 
-        private HourComputer(String fieldReference) {
-            this.fieldReference = fieldReference;
+            this.precision = precision;
         }
 
         @Override
@@ -260,6 +173,7 @@ public interface Expression extends Serializable {
             return fieldReference;
         }
 
+        /** If not, this must be overridden! */
         @Override
         public DataType outputType() {
             return DataTypes.INT();
@@ -267,98 +181,124 @@ public interface Expression extends Serializable {
 
         @Override
         public String eval(String input) {
-            LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime(input, 
0);
-            return String.valueOf(localDateTime.getHour());
-        }
-    }
-
-    /** Compute minute from a time input. */
-    final class MinuteComputer implements Expression {
-
-        private static final long serialVersionUID = 1L;
-
-        private final String fieldReference;
-
-        private MinuteComputer(String fieldReference) {
-            this.fieldReference = fieldReference;
-        }
-
-        @Override
-        public String fieldReference() {
-            return fieldReference;
-        }
+            if (converter == null) {
+                this.converter = createConverter();
+            }
 
-        @Override
-        public DataType outputType() {
-            return DataTypes.INT();
+            T result = converter.apply(toLocalDateTime(input));
+            return String.valueOf(result);
+        }
+
+        private LocalDateTime toLocalDateTime(String input) {
+            if (precision == null) {
+                return DateTimeUtils.toLocalDateTime(input, 9);
+            } else {
+                long numericValue = Long.parseLong(input);
+                long milliseconds = 0;
+                int nanosOfMillisecond = 0;
+                switch (precision) {
+                    case 0:
+                        milliseconds = numericValue * 1000L;
+                        break;
+                    case 3:
+                        milliseconds = numericValue;
+                        break;
+                    case 6:
+                        milliseconds = numericValue / 1000;
+                        nanosOfMillisecond = (int) (numericValue % 1000 * 
1000);
+                        break;
+                    case 9:
+                        milliseconds = numericValue / 1_000_000;
+                        nanosOfMillisecond = (int) (numericValue % 1_000_000);
+                        break;
+                        // no error case because precision is validated
+                }
+                return Timestamp.fromEpochMillis(milliseconds, 
nanosOfMillisecond)
+                        .toLocalDateTime();
+            }
         }
 
-        @Override
-        public String eval(String input) {
-            LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime(input, 
0);
-            return String.valueOf(localDateTime.getMinute());
-        }
+        protected abstract Function<LocalDateTime, T> createConverter();
     }
 
-    /** Compute second from a time input. */
-    final class SecondComputer implements Expression {
+    /** Convert the temporal value to an integer. */
+    final class TemporalToIntConverter extends TemporalExpressionBase<Integer> 
{
 
         private static final long serialVersionUID = 1L;
 
-        private final String fieldReference;
-
-        private SecondComputer(String fieldReference) {
-            this.fieldReference = fieldReference;
-        }
-
-        @Override
-        public String fieldReference() {
-            return fieldReference;
-        }
+        private final SerializableSupplier<Function<LocalDateTime, Integer>> 
converterSupplier;
 
-        @Override
-        public DataType outputType() {
-            return DataTypes.INT();
+        private TemporalToIntConverter(
+                String fieldReference,
+                DataType fieldType,
+                @Nullable Integer precision,
+                SerializableSupplier<Function<LocalDateTime, Integer>> 
converterSupplier) {
+            super(fieldReference, fieldType, precision);
+            this.converterSupplier = converterSupplier;
         }
 
         @Override
-        public String eval(String input) {
-            LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime(input, 
0);
-            return String.valueOf(localDateTime.getSecond());
+        protected Function<LocalDateTime, Integer> createConverter() {
+            return converterSupplier.get();
+        }
+
+        private static TemporalToIntConverter create(
+                String fieldReference,
+                DataType fieldType,
+                SerializableSupplier<Function<LocalDateTime, Integer>> 
converterSupplier,
+                String... literals) {
+            checkArgument(
+                    literals.length == 0 || literals.length == 1,
+                    "TemporalToIntConverter supports 0 or 1 argument, but 
found '%s'.",
+                    literals.length);
+
+            return new TemporalToIntConverter(
+                    fieldReference,
+                    fieldType,
+                    literals.length == 0 ? null : Integer.valueOf(literals[0]),
+                    converterSupplier);
         }
     }
 
-    /** date format from a time input. */
-    final class DateFormat implements Expression {
+    /** Convert the temporal value to desired formatted string. */
+    final class DateFormat extends TemporalExpressionBase<String> {
 
-        private static final long serialVersionUID = 1L;
+        private static final long serialVersionUID = 2L;
 
-        private final String fieldReference;
         private final String pattern;
-        private transient DateTimeFormatter formatter;
 
-        private DateFormat(String fieldReference, String pattern) {
-            this.fieldReference = fieldReference;
+        private DateFormat(
+                String fieldReference,
+                DataType fieldType,
+                String pattern,
+                @Nullable Integer precision) {
+            super(fieldReference, fieldType, precision);
             this.pattern = pattern;
         }
 
-        @Override
-        public String fieldReference() {
-            return fieldReference;
-        }
-
         @Override
         public DataType outputType() {
             return DataTypes.STRING();
         }
 
         @Override
-        public String eval(String input) {
-            if (formatter == null) {
-                formatter = DateTimeFormatter.ofPattern(pattern);
-            }
-            LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime(input, 
0);
-            return localDateTime.format(formatter);
+        protected Function<LocalDateTime, String> createConverter() {
+            DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
+            return localDateTime -> localDateTime.format(formatter);
+        }
+
+        private static DateFormat create(
+                String fieldReference, DataType fieldType, String... literals) 
{
+            checkArgument(
+                    literals.length == 1 || literals.length == 2,
+                    "'date_format' supports 1 or 2 arguments, but found '%s'.",
+                    literals.length);
+
+            return new DateFormat(
+                    fieldReference,
+                    fieldType,
+                    literals[0],
+                    literals.length == 1 ? null : 
Integer.valueOf(literals[1]));
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index aee4c484f..6cd35eec8 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -37,13 +37,18 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import java.sql.SQLException;
 import java.sql.Statement;
+import java.time.DayOfWeek;
 import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.WeekFields;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
@@ -819,6 +824,178 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         waitForResult(expected, table, rowType, Arrays.asList("pk", 
"_year_date"));
     }
 
+    @Test
+    @Timeout(60)
+    public void testTemporalToIntWithEpochTime() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", DATABASE_NAME);
+        mySqlConfig.put("table-name", "test_time_to_int_epoch");
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        // pick field reference & time unit
+        int fieldRefIndex = random.nextInt(5);
+        String fieldReference =
+                Arrays.asList(
+                                "_second_val0",
+                                "_second_val1",
+                                "_millis_val",
+                                "_micros_val",
+                                "_nanos_val")
+                        .get(fieldRefIndex);
+        String precision = Arrays.asList("", ",0", ",3", ",6", 
",9").get(fieldRefIndex);
+
+        // pick test expression
+        int expIndex = random.nextInt(6);
+        String expression =
+                Arrays.asList("year", "month", "day", "hour", "minute", 
"second").get(expIndex);
+
+        String computedColumnDef =
+                String.format("_time_to_int=%s(%s%s)", expression, 
fieldReference, precision);
+
+        MySqlSyncTableAction action =
+                syncTableActionBuilder(mySqlConfig)
+                        .withComputedColumnArgs(computedColumnDef)
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        try (Statement statement = getStatement()) {
+            statement.execute("USE " + DATABASE_NAME);
+            insertEpochTime(
+                    "test_time_to_int_epoch", 1, 
"2024-01-01T00:01:02.123456789Z", statement);
+            insertEpochTime(
+                    "test_time_to_int_epoch", 2, 
"2024-12-31T12:59:59.123456789Z", statement);
+        }
+
+        FileStoreTable table = getFileStoreTable();
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.INT(),
+                            DataTypes.BIGINT(),
+                            DataTypes.BIGINT(),
+                            DataTypes.BIGINT(),
+                            DataTypes.BIGINT(),
+                            DataTypes.INT()
+                        },
+                        new String[] {
+                            "pk",
+                            "_second_val0",
+                            "_second_val1",
+                            "_millis_val",
+                            "_micros_val",
+                            "_nanos_val",
+                            "_time_to_int"
+                        });
+
+        int result1 = Arrays.asList(2024, 1, 1, 0, 1, 2).get(expIndex);
+        int result2 = Arrays.asList(2024, 12, 31, 12, 59, 59).get(expIndex);
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, 1704067262, 1704067262, 1704067262123, 
1704067262123456, 1704067262123456789, "
+                                + result1
+                                + "]",
+                        "+I[2, 1735649999, 1735649999, 1735649999123, 
1735649999123456, 1735649999123456789, "
+                                + result2
+                                + "]");
+        waitForResult(expected, table, rowType, 
Collections.singletonList("pk"));
+    }
+
+    @Test
+    @Timeout(60)
+    public void testDateFormatWithEpochTime() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", DATABASE_NAME);
+        mySqlConfig.put("table-name", "test_date_format_epoch");
+
+        List<String> computedColumnDefs =
+                Arrays.asList(
+                        "_from_second0_default=date_format(_second_val0, 
yyyy-MM-dd HH:mm:ss)",
+                        "_from_second0=date_format(_second_val0, yyyy-MM-dd 
HH:mm:ss, 0)",
+                        "_from_second1=date_format(_second_val1, yyyy-MM-dd 
HH:mm:ss, 0)",
+                        // test week format
+                        "_from_second1_week=date_format(_second_val1, yyyy-ww, 
0)",
+                        "_from_millisecond=date_format(_millis_val, yyyy-MM-dd 
HH:mm:ss.SSS, 3)",
+                        "_from_microsecond=date_format(_micros_val, yyyy-MM-dd 
HH:mm:ss.SSSSSS, 6)",
+                        "_from_nanoseconds=date_format(_nanos_val, yyyy-MM-dd 
HH:mm:ss.SSSSSSSSS, 9)");
+
+        MySqlSyncTableAction action =
+                syncTableActionBuilder(mySqlConfig)
+                        .withComputedColumnArgs(computedColumnDefs)
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        try (Statement statement = getStatement()) {
+            statement.execute("USE " + DATABASE_NAME);
+            insertEpochTime(
+                    "test_date_format_epoch", 1, 
"2024-01-07T00:01:02.123456789Z", statement);
+        }
+
+        FileStoreTable table = getFileStoreTable();
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.INT(),
+                            DataTypes.BIGINT(),
+                            DataTypes.BIGINT(),
+                            DataTypes.BIGINT(),
+                            DataTypes.BIGINT(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {
+                            "pk",
+                            "_second_val0",
+                            "_second_val1",
+                            "_millis_val",
+                            "_micros_val",
+                            "_nanos_val",
+                            "_from_second0_default",
+                            "_from_second0",
+                            "_from_second1",
+                            "_from_second1_week",
+                            "_from_millisecond",
+                            "_from_microsecond",
+                            "_from_nanoseconds"
+                        });
+
+        // depends on the Locale setting
+        WeekFields weekFields = WeekFields.of(Locale.getDefault());
+        int week = weekFields.getFirstDayOfWeek() == DayOfWeek.MONDAY ? 1 : 2;
+
+        List<String> expected =
+                Collections.singletonList(
+                        "+I[1, 1704585662, 1704585662, 1704585662123, 
1704585662123456, 1704585662123456789, "
+                                + "2024-01-07 00:01:02, 2024-01-07 00:01:02, 
2024-01-07 00:01:02, "
+                                + String.format("2024-0%s, ", week)
+                                + "2024-01-07 00:01:02.123, 2024-01-07 
00:01:02.123456, 2024-01-07 00:01:02.123456789]");
+        waitForResult(expected, table, rowType, 
Collections.singletonList("pk"));
+    }
+
+    private void insertEpochTime(String table, int pk, String dateStr, 
Statement statement)
+            throws SQLException {
+        Instant instant = Instant.parse(dateStr);
+        long epochSecond = instant.getEpochSecond();
+        int nano = instant.getNano();
+
+        statement.executeUpdate(
+                String.format(
+                        "INSERT INTO %s VALUES (%d, %d, %d, %d, %d, %d)",
+                        table,
+                        pk,
+                        epochSecond,
+                        epochSecond,
+                        epochSecond * 1000 + nano / 1_000_000,
+                        epochSecond * 1000_000 + nano / 1_000,
+                        epochSecond * 1_000_000_000 + nano));
+    }
+
     @Test
     @Timeout(60)
     public void testSyncShards() throws Exception {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
index d119b1c89..949f1c99d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
@@ -281,6 +281,26 @@ CREATE TABLE test_computed_column (
     PRIMARY KEY (pk)
 );
 
+CREATE TABLE test_time_to_int_epoch (
+    pk INT,
+    _second_val0 INT,
+    _second_val1 BIGINT,
+    _millis_val BIGINT,
+    _micros_val BIGINT,
+    _nanos_val BIGINT,
+    PRIMARY KEY (pk)
+);
+
+CREATE TABLE test_date_format_epoch (
+    pk INT,
+    _second_val0 INT,
+    _second_val1 BIGINT,
+    _millis_val BIGINT,
+    _micros_val BIGINT,
+    _nanos_val BIGINT,
+    PRIMARY KEY (pk)
+);
+
 CREATE TABLE test_options_change (
    pk INT,
    _date DATE,


Reply via email to