This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8f8091fbe [cdc] Time related computed functions support to handle
epoch time (#2989)
8f8091fbe is described below
commit 8f8091fbe47863c9c3ba09133a84eaefd58de1f5
Author: yuzelin <[email protected]>
AuthorDate: Tue Mar 12 13:22:44 2024 +0800
[cdc] Time related computed functions support to handle epoch time (#2989)
---
docs/content/cdc-ingestion/overview.md | 33 ++-
.../{compute_column.html => other_functions.html} | 30 --
.../shortcodes/generated/temporal_functions.html | 57 ++++
.../flink/action/cdc/ComputedColumnUtils.java | 2 +-
.../apache/paimon/flink/action/cdc/Expression.java | 312 +++++++++------------
.../cdc/mysql/MySqlSyncTableActionITCase.java | 177 ++++++++++++
.../src/test/resources/mysql/sync_table_setup.sql | 20 ++
7 files changed, 412 insertions(+), 219 deletions(-)
diff --git a/docs/content/cdc-ingestion/overview.md
b/docs/content/cdc-ingestion/overview.md
index 16b31a79b..90e2235d3 100644
--- a/docs/content/cdc-ingestion/overview.md
+++ b/docs/content/cdc-ingestion/overview.md
@@ -78,9 +78,38 @@ behaviors of `RENAME TABLE` and `DROP COLUMN` will be
ignored, `RENAME COLUMN` w
## Computed Functions
-`--computed_column` are the definitions of computed columns. The argument
field is from source table field name. Supported expressions are:
+`--computed_column` are the definitions of computed columns. The argument
field is from source table field name.
-{{< generated/compute_column >}}
+### Temporal Functions
+
+Temporal functions can convert date and epoch time to another form. A common
use case is to generate partition values.
+
+{{< generated/temporal_functions >}}
+
+The data type of the temporal-column can be one of the following cases:
+1. DATE, DATETIME or TIMESTAMP.
+2. Any integer numeric type (such as INT and BIGINT). In this case, the data
will be considered as epoch time of `1970-01-01 00:00:00`.
+You should set precision of the value (default is 0).
+3. STRING. In this case, if you didn't set the time unit, the data will be
considered as formatted string of DATE,
+DATETIME or TIMESTAMP value. Otherwise, the data will be considered as string
value of epoch time. So you must set time
+unit in the latter case.
+
+The precision represents the unit of the epoch time. Currently, There are four
valid precisions: `0` (for epoch seconds),
+`3` (for epoch milliseconds), `6`(for epoch microseconds) and `9` (for epoch
nanoseconds). Take the time point
+`1970-01-01 00:00:00.123456789` as an example, the epoch seconds are 0, the
epoch milliseconds are 123, the epoch microseconds
+are 123456, and the epoch nanoseconds are 123456789. The precision should
match the input values. You can set precision
+in this way: `date_format(epoch_col, yyyy-MM-dd, 0)`.
+
+`date_format` is a flexible function which is able to convert the temporal
value to various formats with different format
+strings. A most common format string is `yyyy-MM-dd HH:mm:ss.SSS`. Another
example is `yyyy-ww` which can extract the year
+and the week-of-the-year from the input. Note that the output is affected by
the locale. For example, in some regions the
+first day of a week is Monday while in others is Sunday, so if you use
`date_format(date_col, yyyy-ww)` and the input of
+date_col is 2024-01-07 (Sunday), the output maybe `2024-01` (if the first day
of a week is Monday) or `2024-02` (if the
+first day of a week is Sunday).
+
+### Other Functions
+
+{{< generated/other_functions >}}
## Special Data Type Mapping
diff --git a/docs/layouts/shortcodes/generated/compute_column.html
b/docs/layouts/shortcodes/generated/other_functions.html
similarity index 55%
rename from docs/layouts/shortcodes/generated/compute_column.html
rename to docs/layouts/shortcodes/generated/other_functions.html
index cabeb5276..f367c97d0 100644
--- a/docs/layouts/shortcodes/generated/compute_column.html
+++ b/docs/layouts/shortcodes/generated/other_functions.html
@@ -25,36 +25,6 @@ under the License.
</tr>
</thead>
<tbody>
- <tr>
- <td><h5>year(date-column)</h5></td>
- <td>Extract year from a DATE, DATETIME or TIMESTAMP (or its
corresponding string format). Output is an INT value represent the year.</td>
- </tr>
- <tr>
- <td><h5>month(date-column)</h5></td>
- <td>Extract month of year from a DATE, DATETIME or TIMESTAMP (or its
corresponding string format). Output is an INT value represent the month of
year.</td>
- </tr>
- <tr>
- <td><h5>day(date-column)</h5></td>
- <td>Extract day of month from a DATE, DATETIME or TIMESTAMP (or its
corresponding string format). Output is an INT value represent the day of
month.</td>
- </tr>
- <tr>
- <td><h5>hour(date-column)</h5></td>
- <td>Extract hour from a DATE, DATETIME or TIMESTAMP (or its
corresponding string format). Output is an INT value represent the hour.</td>
- </tr>
- <tr>
- <td><h5>minute(date-column)</h5></td>
- <td>Extract minute from a DATE, DATETIME or TIMESTAMP (or its
corresponding string format). Output is an INT value represent the minute.</td>
- </tr>
- <tr>
- <td><h5>second(date-column)</h5></td>
- <td>Extract second from a DATE, DATETIME or TIMESTAMP (or its
corresponding string format). Output is an INT value represent the second.</td>
- </tr>
- <tr>
- <td><h5>date_format(date-column,format)</h5></td>
- <td>Convert date format from a DATE, DATETIME or TIMESTAMP (or its
corresponding string format).
- 'format' is compatible with Java's DateTimeFormatter String (for
example, 'yyyy-MM-dd'). Output is a string value in converted date format.
- </td>
- </tr>
<tr>
<td><h5>substring(column,beginInclusive)</h5></td>
<td>Get column.substring(beginInclusive). Output is a STRING.</td>
diff --git a/docs/layouts/shortcodes/generated/temporal_functions.html
b/docs/layouts/shortcodes/generated/temporal_functions.html
new file mode 100644
index 000000000..d652a5a0e
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/temporal_functions.html
@@ -0,0 +1,57 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+{{ $ref := ref . "maintenance/configurations.md" }}
+<table class="configuration table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 30%">Function</th>
+ <th class="text-left" style="width: 70%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>year(temporal-column [, precision])</h5></td>
+ <td>Extract year from the input. Output is an INT value represent the
year.</td>
+ </tr>
+ <tr>
+ <td><h5>month(temporal-column [, precision])</h5></td>
+ <td>Extract month of year from the input. Output is an INT value
represent the month of year.</td>
+ </tr>
+ <tr>
+ <td><h5>day(temporal-column [, precision])</h5></td>
+ <td>Extract day of month from the input. Output is an INT value
represent the day of month.</td>
+ </tr>
+ <tr>
+ <td><h5>hour(temporal-column [, precision])</h5></td>
+ <td>Extract hour from the input. Output is an INT value represent the
hour.</td>
+ </tr>
+ <tr>
+ <td><h5>minute(temporal-column [, precision])</h5></td>
+ <td>Extract minute from the input. Output is an INT value represent
the minute.</td>
+ </tr>
+ <tr>
+ <td><h5>second(temporal-column [, precision])</h5></td>
+ <td>Extract second from the input. Output is an INT value represent
the second.</td>
+ </tr>
+ <tr>
+ <td><h5>date_format(temporal-column, format-string [,
precision])</h5></td>
+ <td>Convert the input to desired formatted string. Output type is
STRING.</td>
+ </tr>
+ </tbody>
+</table>
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
index 70b1898c0..a18355ca6 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
@@ -88,7 +88,7 @@ public class ComputedColumnUtils {
Expression.create(
exprName,
fieldReference,
- typeMapping.get(fieldReference),
+ typeMapping.get(fieldReferenceCheckForm),
literals)));
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
index 554b8fc9e..417758f4b 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
@@ -18,9 +18,12 @@
package org.apache.paimon.flink.action.cdc;
+import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeFamily;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.utils.SerializableSupplier;
import javax.annotation.Nullable;
@@ -31,6 +34,7 @@ import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
+import java.util.function.Function;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -62,19 +66,25 @@ public interface Expression extends Serializable {
String exprName, String fieldReference, DataType fieldType,
String... literals) {
switch (exprName.toLowerCase()) {
case "year":
- return year(fieldReference);
+ return TemporalToIntConverter.create(
+ fieldReference, fieldType, () ->
LocalDateTime::getYear, literals);
case "month":
- return month(fieldReference);
+ return TemporalToIntConverter.create(
+ fieldReference, fieldType, () ->
LocalDateTime::getMonthValue, literals);
case "day":
- return day(fieldReference);
+ return TemporalToIntConverter.create(
+ fieldReference, fieldType, () ->
LocalDateTime::getDayOfMonth, literals);
case "hour":
- return hour(fieldReference);
+ return TemporalToIntConverter.create(
+ fieldReference, fieldType, () ->
LocalDateTime::getHour, literals);
case "minute":
- return minute(fieldReference);
+ return TemporalToIntConverter.create(
+ fieldReference, fieldType, () ->
LocalDateTime::getMinute, literals);
case "second":
- return second(fieldReference);
+ return TemporalToIntConverter.create(
+ fieldReference, fieldType, () ->
LocalDateTime::getSecond, literals);
case "date_format":
- return dateFormat(fieldReference, literals);
+ return DateFormat.create(fieldReference, fieldType, literals);
case "substring":
return substring(fieldReference, literals);
case "truncate":
@@ -88,39 +98,6 @@ public interface Expression extends Serializable {
}
}
- static Expression year(String fieldReference) {
- return new YearComputer(fieldReference);
- }
-
- static Expression month(String fieldReference) {
- return new MonthComputer(fieldReference);
- }
-
- static Expression day(String fieldReference) {
- return new DayComputer(fieldReference);
- }
-
- static Expression hour(String fieldReference) {
- return new HourComputer(fieldReference);
- }
-
- static Expression minute(String fieldReference) {
- return new MinuteComputer(fieldReference);
- }
-
- static Expression second(String fieldReference) {
- return new SecondComputer(fieldReference);
- }
-
- static Expression dateFormat(String fieldReference, String... literals) {
- checkArgument(
- literals.length == 1,
- String.format(
- "'date_format' expression supports one argument, but
found '%s'.",
- literals.length));
- return new DateFormat(fieldReference, literals[0]);
- }
-
static Expression substring(String fieldReference, String... literals) {
checkArgument(
literals.length == 1 || literals.length == 2,
@@ -160,99 +137,35 @@ public interface Expression extends Serializable {
return new TruncateComputer(fieldReference, fieldType, literals[0]);
}
- /** Compute year from a time input. */
- final class YearComputer implements Expression {
+ /** Expression to handle temporal value. */
+ abstract class TemporalExpressionBase<T> implements Expression {
private static final long serialVersionUID = 1L;
- private final String fieldReference;
-
- private YearComputer(String fieldReference) {
- this.fieldReference = fieldReference;
- }
-
- @Override
- public String fieldReference() {
- return fieldReference;
- }
-
- @Override
- public DataType outputType() {
- return DataTypes.INT();
- }
-
- @Override
- public String eval(String input) {
- LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime(input,
0);
- return String.valueOf(localDateTime.getYear());
- }
- }
-
- /** Compute month from a time input. */
- final class MonthComputer implements Expression {
-
- private static final long serialVersionUID = 1L;
+ private static final List<Integer> SUPPORTED_PRECISION =
Arrays.asList(0, 3, 6, 9);
private final String fieldReference;
+ @Nullable private final Integer precision;
- private MonthComputer(String fieldReference) {
- this.fieldReference = fieldReference;
- }
-
- @Override
- public String fieldReference() {
- return fieldReference;
- }
-
- @Override
- public DataType outputType() {
- return DataTypes.INT();
- }
-
- @Override
- public String eval(String input) {
- LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime(input,
0);
- return String.valueOf(localDateTime.getMonthValue());
- }
- }
-
- /** Compute day from a time input. */
- final class DayComputer implements Expression {
-
- private static final long serialVersionUID = 1L;
-
- private final String fieldReference;
+ private transient Function<LocalDateTime, T> converter;
- private DayComputer(String fieldReference) {
+ private TemporalExpressionBase(
+ String fieldReference, DataType fieldType, @Nullable Integer
precision) {
this.fieldReference = fieldReference;
- }
-
- @Override
- public String fieldReference() {
- return fieldReference;
- }
- @Override
- public DataType outputType() {
- return DataTypes.INT();
- }
-
- @Override
- public String eval(String input) {
- LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime(input,
0);
- return String.valueOf(localDateTime.getDayOfMonth());
- }
- }
-
- /** Compute hour from a time input. */
- final class HourComputer implements Expression {
-
- private static final long serialVersionUID = 1L;
+ // when the input is INTEGER_NUMERIC, the precision must be set
+ if
(fieldType.getTypeRoot().getFamilies().contains(DataTypeFamily.INTEGER_NUMERIC)
+ && precision == null) {
+ precision = 0;
+ }
- private final String fieldReference;
+ checkArgument(
+ precision == null ||
SUPPORTED_PRECISION.contains(precision),
+ "Unsupported precision of temporal function: %d. Supported
precisions are: "
+ + "0 (for epoch seconds), 3 (for epoch
milliseconds), 6 (for epoch microseconds) and 9 (for epoch nanoseconds).",
+ precision);
- private HourComputer(String fieldReference) {
- this.fieldReference = fieldReference;
+ this.precision = precision;
}
@Override
@@ -260,6 +173,7 @@ public interface Expression extends Serializable {
return fieldReference;
}
+ /** If not, this must be overridden! */
@Override
public DataType outputType() {
return DataTypes.INT();
@@ -267,98 +181,124 @@ public interface Expression extends Serializable {
@Override
public String eval(String input) {
- LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime(input,
0);
- return String.valueOf(localDateTime.getHour());
- }
- }
-
- /** Compute minute from a time input. */
- final class MinuteComputer implements Expression {
-
- private static final long serialVersionUID = 1L;
-
- private final String fieldReference;
-
- private MinuteComputer(String fieldReference) {
- this.fieldReference = fieldReference;
- }
-
- @Override
- public String fieldReference() {
- return fieldReference;
- }
+ if (converter == null) {
+ this.converter = createConverter();
+ }
- @Override
- public DataType outputType() {
- return DataTypes.INT();
+ T result = converter.apply(toLocalDateTime(input));
+ return String.valueOf(result);
+ }
+
+ private LocalDateTime toLocalDateTime(String input) {
+ if (precision == null) {
+ return DateTimeUtils.toLocalDateTime(input, 9);
+ } else {
+ long numericValue = Long.parseLong(input);
+ long milliseconds = 0;
+ int nanosOfMillisecond = 0;
+ switch (precision) {
+ case 0:
+ milliseconds = numericValue * 1000L;
+ break;
+ case 3:
+ milliseconds = numericValue;
+ break;
+ case 6:
+ milliseconds = numericValue / 1000;
+ nanosOfMillisecond = (int) (numericValue % 1000 *
1000);
+ break;
+ case 9:
+ milliseconds = numericValue / 1_000_000;
+ nanosOfMillisecond = (int) (numericValue % 1_000_000);
+ break;
+ // no error case because precision is validated
+ }
+ return Timestamp.fromEpochMillis(milliseconds,
nanosOfMillisecond)
+ .toLocalDateTime();
+ }
}
- @Override
- public String eval(String input) {
- LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime(input,
0);
- return String.valueOf(localDateTime.getMinute());
- }
+ protected abstract Function<LocalDateTime, T> createConverter();
}
- /** Compute second from a time input. */
- final class SecondComputer implements Expression {
+ /** Convert the temporal value to an integer. */
+ final class TemporalToIntConverter extends TemporalExpressionBase<Integer>
{
private static final long serialVersionUID = 1L;
- private final String fieldReference;
-
- private SecondComputer(String fieldReference) {
- this.fieldReference = fieldReference;
- }
-
- @Override
- public String fieldReference() {
- return fieldReference;
- }
+ private final SerializableSupplier<Function<LocalDateTime, Integer>>
converterSupplier;
- @Override
- public DataType outputType() {
- return DataTypes.INT();
+ private TemporalToIntConverter(
+ String fieldReference,
+ DataType fieldType,
+ @Nullable Integer precision,
+ SerializableSupplier<Function<LocalDateTime, Integer>>
converterSupplier) {
+ super(fieldReference, fieldType, precision);
+ this.converterSupplier = converterSupplier;
}
@Override
- public String eval(String input) {
- LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime(input,
0);
- return String.valueOf(localDateTime.getSecond());
+ protected Function<LocalDateTime, Integer> createConverter() {
+ return converterSupplier.get();
+ }
+
+ private static TemporalToIntConverter create(
+ String fieldReference,
+ DataType fieldType,
+ SerializableSupplier<Function<LocalDateTime, Integer>>
converterSupplier,
+ String... literals) {
+ checkArgument(
+ literals.length == 0 || literals.length == 1,
+ "TemporalToIntConverter supports 0 or 1 argument, but
found '%s'.",
+ literals.length);
+
+ return new TemporalToIntConverter(
+ fieldReference,
+ fieldType,
+ literals.length == 0 ? null : Integer.valueOf(literals[0]),
+ converterSupplier);
}
}
- /** date format from a time input. */
- final class DateFormat implements Expression {
+ /** Convert the temporal value to desired formatted string. */
+ final class DateFormat extends TemporalExpressionBase<String> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
- private final String fieldReference;
private final String pattern;
- private transient DateTimeFormatter formatter;
- private DateFormat(String fieldReference, String pattern) {
- this.fieldReference = fieldReference;
+ private DateFormat(
+ String fieldReference,
+ DataType fieldType,
+ String pattern,
+ @Nullable Integer precision) {
+ super(fieldReference, fieldType, precision);
this.pattern = pattern;
}
- @Override
- public String fieldReference() {
- return fieldReference;
- }
-
@Override
public DataType outputType() {
return DataTypes.STRING();
}
@Override
- public String eval(String input) {
- if (formatter == null) {
- formatter = DateTimeFormatter.ofPattern(pattern);
- }
- LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime(input,
0);
- return localDateTime.format(formatter);
+ protected Function<LocalDateTime, String> createConverter() {
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
+ return localDateTime -> localDateTime.format(formatter);
+ }
+
+ private static DateFormat create(
+ String fieldReference, DataType fieldType, String... literals)
{
+ checkArgument(
+ literals.length == 1 || literals.length == 2,
+ "'date_format' supports 1 or 2 arguments, but found '%s'.",
+ literals.length);
+
+ return new DateFormat(
+ fieldReference,
+ fieldType,
+ literals[0],
+ literals.length == 1 ? null :
Integer.valueOf(literals[1]));
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index aee4c484f..6cd35eec8 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -37,13 +37,18 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import java.sql.SQLException;
import java.sql.Statement;
+import java.time.DayOfWeek;
import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.WeekFields;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
@@ -819,6 +824,178 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
waitForResult(expected, table, rowType, Arrays.asList("pk",
"_year_date"));
}
+ @Test
+ @Timeout(60)
+ public void testTemporalToIntWithEpochTime() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", DATABASE_NAME);
+ mySqlConfig.put("table-name", "test_time_to_int_epoch");
+
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ // pick field reference & time unit
+ int fieldRefIndex = random.nextInt(5);
+ String fieldReference =
+ Arrays.asList(
+ "_second_val0",
+ "_second_val1",
+ "_millis_val",
+ "_micros_val",
+ "_nanos_val")
+ .get(fieldRefIndex);
+ String precision = Arrays.asList("", ",0", ",3", ",6",
",9").get(fieldRefIndex);
+
+ // pick test expression
+ int expIndex = random.nextInt(6);
+ String expression =
+ Arrays.asList("year", "month", "day", "hour", "minute",
"second").get(expIndex);
+
+ String computedColumnDef =
+ String.format("_time_to_int=%s(%s%s)", expression,
fieldReference, precision);
+
+ MySqlSyncTableAction action =
+ syncTableActionBuilder(mySqlConfig)
+ .withComputedColumnArgs(computedColumnDef)
+ .build();
+ runActionWithDefaultEnv(action);
+
+ try (Statement statement = getStatement()) {
+ statement.execute("USE " + DATABASE_NAME);
+ insertEpochTime(
+ "test_time_to_int_epoch", 1,
"2024-01-01T00:01:02.123456789Z", statement);
+ insertEpochTime(
+ "test_time_to_int_epoch", 2,
"2024-12-31T12:59:59.123456789Z", statement);
+ }
+
+ FileStoreTable table = getFileStoreTable();
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.BIGINT(),
+ DataTypes.BIGINT(),
+ DataTypes.BIGINT(),
+ DataTypes.INT()
+ },
+ new String[] {
+ "pk",
+ "_second_val0",
+ "_second_val1",
+ "_millis_val",
+ "_micros_val",
+ "_nanos_val",
+ "_time_to_int"
+ });
+
+ int result1 = Arrays.asList(2024, 1, 1, 0, 1, 2).get(expIndex);
+ int result2 = Arrays.asList(2024, 12, 31, 12, 59, 59).get(expIndex);
+ List<String> expected =
+ Arrays.asList(
+ "+I[1, 1704067262, 1704067262, 1704067262123,
1704067262123456, 1704067262123456789, "
+ + result1
+ + "]",
+ "+I[2, 1735649999, 1735649999, 1735649999123,
1735649999123456, 1735649999123456789, "
+ + result2
+ + "]");
+ waitForResult(expected, table, rowType,
Collections.singletonList("pk"));
+ }
+
+ @Test
+ @Timeout(60)
+ public void testDateFormatWithEpochTime() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", DATABASE_NAME);
+ mySqlConfig.put("table-name", "test_date_format_epoch");
+
+ List<String> computedColumnDefs =
+ Arrays.asList(
+ "_from_second0_default=date_format(_second_val0,
yyyy-MM-dd HH:mm:ss)",
+ "_from_second0=date_format(_second_val0, yyyy-MM-dd
HH:mm:ss, 0)",
+ "_from_second1=date_format(_second_val1, yyyy-MM-dd
HH:mm:ss, 0)",
+ // test week format
+ "_from_second1_week=date_format(_second_val1, yyyy-ww,
0)",
+ "_from_millisecond=date_format(_millis_val, yyyy-MM-dd
HH:mm:ss.SSS, 3)",
+ "_from_microsecond=date_format(_micros_val, yyyy-MM-dd
HH:mm:ss.SSSSSS, 6)",
+ "_from_nanoseconds=date_format(_nanos_val, yyyy-MM-dd
HH:mm:ss.SSSSSSSSS, 9)");
+
+ MySqlSyncTableAction action =
+ syncTableActionBuilder(mySqlConfig)
+ .withComputedColumnArgs(computedColumnDefs)
+ .build();
+ runActionWithDefaultEnv(action);
+
+ try (Statement statement = getStatement()) {
+ statement.execute("USE " + DATABASE_NAME);
+ insertEpochTime(
+ "test_date_format_epoch", 1,
"2024-01-07T00:01:02.123456789Z", statement);
+ }
+
+ FileStoreTable table = getFileStoreTable();
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.BIGINT(),
+ DataTypes.BIGINT(),
+ DataTypes.BIGINT(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {
+ "pk",
+ "_second_val0",
+ "_second_val1",
+ "_millis_val",
+ "_micros_val",
+ "_nanos_val",
+ "_from_second0_default",
+ "_from_second0",
+ "_from_second1",
+ "_from_second1_week",
+ "_from_millisecond",
+ "_from_microsecond",
+ "_from_nanoseconds"
+ });
+
+ // depends on the Locale setting
+ WeekFields weekFields = WeekFields.of(Locale.getDefault());
+ int week = weekFields.getFirstDayOfWeek() == DayOfWeek.MONDAY ? 1 : 2;
+
+ List<String> expected =
+ Collections.singletonList(
+ "+I[1, 1704585662, 1704585662, 1704585662123,
1704585662123456, 1704585662123456789, "
+ + "2024-01-07 00:01:02, 2024-01-07 00:01:02,
2024-01-07 00:01:02, "
+ + String.format("2024-0%s, ", week)
+ + "2024-01-07 00:01:02.123, 2024-01-07
00:01:02.123456, 2024-01-07 00:01:02.123456789]");
+ waitForResult(expected, table, rowType,
Collections.singletonList("pk"));
+ }
+
+ private void insertEpochTime(String table, int pk, String dateStr,
Statement statement)
+ throws SQLException {
+ Instant instant = Instant.parse(dateStr);
+ long epochSecond = instant.getEpochSecond();
+ int nano = instant.getNano();
+
+ statement.executeUpdate(
+ String.format(
+ "INSERT INTO %s VALUES (%d, %d, %d, %d, %d, %d)",
+ table,
+ pk,
+ epochSecond,
+ epochSecond,
+ epochSecond * 1000 + nano / 1_000_000,
+ epochSecond * 1000_000 + nano / 1_000,
+ epochSecond * 1_000_000_000 + nano));
+ }
+
@Test
@Timeout(60)
public void testSyncShards() throws Exception {
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
index d119b1c89..949f1c99d 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
@@ -281,6 +281,26 @@ CREATE TABLE test_computed_column (
PRIMARY KEY (pk)
);
+CREATE TABLE test_time_to_int_epoch (
+ pk INT,
+ _second_val0 INT,
+ _second_val1 BIGINT,
+ _millis_val BIGINT,
+ _micros_val BIGINT,
+ _nanos_val BIGINT,
+ PRIMARY KEY (pk)
+);
+
+CREATE TABLE test_date_format_epoch (
+ pk INT,
+ _second_val0 INT,
+ _second_val1 BIGINT,
+ _millis_val BIGINT,
+ _micros_val BIGINT,
+ _nanos_val BIGINT,
+ PRIMARY KEY (pk)
+);
+
CREATE TABLE test_options_change (
pk INT,
_date DATE,