This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c940f217b [flink] Support dateFormat compute functions (#1806)
c940f217b is described below
commit c940f217bc8dec11a1eda316fa1d6593bdc3013d
Author: ehui <[email protected]>
AuthorDate: Mon Aug 14 17:27:13 2023 +0800
[flink] Support dateFormat compute functions (#1806)
---
.../shortcodes/generated/compute_column.html | 4 ++
.../apache/paimon/flink/action/cdc/Expression.java | 48 +++++++++++++++++++++-
.../cdc/mysql/MySqlSyncTableActionITCase.java | 13 +++++-
3 files changed, 62 insertions(+), 3 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/compute_column.html
b/docs/layouts/shortcodes/generated/compute_column.html
index a86de07eb..f9bf8697f 100644
--- a/docs/layouts/shortcodes/generated/compute_column.html
+++ b/docs/layouts/shortcodes/generated/compute_column.html
@@ -41,6 +41,10 @@ under the License.
<td><h5>hour(date-column)</h5></td>
<td>Extract hour from a DATE, DATETIME or TIMESTAMP. Output is an INT
value represent the hour.</td>
</tr>
+ <tr>
+ <td><h5>date_format(date-column)</h5></td>
+ <td>Convert date format from a DATE, DATETIME or TIMESTAMP. Output is
a string value in converted date format.</td>
+ </tr>
<tr>
<td><h5>substring(column,beginInclusive)</h5></td>
<td>Get column.substring(beginInclusive). Output is a STRING.</td>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
index e38270524..15e9c683f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
@@ -28,6 +28,7 @@ import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
@@ -37,7 +38,7 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
public interface Expression extends Serializable {
List<String> SUPPORTED_EXPRESSION =
- Arrays.asList("year", "month", "day", "hour", "substring",
"truncate");
+ Arrays.asList("year", "month", "day", "hour", "date_format",
"substring", "truncate");
/** Return name of referenced field. */
String fieldReference();
@@ -59,6 +60,8 @@ public interface Expression extends Serializable {
return day(fieldReference);
case "hour":
return hour(fieldReference);
+ case "date_format":
+ return dateFormat(fieldReference, literals);
case "substring":
return substring(fieldReference, literals);
case "truncate":
@@ -88,6 +91,15 @@ public interface Expression extends Serializable {
return new HourComputer(fieldReference);
}
+ static Expression dateFormat(String fieldReference, String... literals) {
+ checkArgument(
+ literals.length == 1,
+ String.format(
+ "'date_format' expression supports one argument, but
found '%s'.",
+ literals.length));
+ return new DateFormat(fieldReference, literals[0]);
+ }
+
static Expression substring(String fieldReference, String... literals) {
checkArgument(
literals.length == 1 || literals.length == 2,
@@ -239,6 +251,40 @@ public interface Expression extends Serializable {
}
}
+ /** date format from a time input. */
+ final class DateFormat implements Expression {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String fieldReference;
+ private final String pattern;
+ private transient DateTimeFormatter formatter;
+
+ private DateFormat(String fieldReference, String pattern) {
+ this.fieldReference = fieldReference;
+ this.pattern = pattern;
+ }
+
+ @Override
+ public String fieldReference() {
+ return fieldReference;
+ }
+
+ @Override
+ public DataType outputType() {
+ return DataTypes.STRING();
+ }
+
+ @Override
+ public String eval(String input) {
+ if (formatter == null) {
+ formatter = DateTimeFormatter.ofPattern(pattern);
+ }
+ LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime(input,
0);
+ return localDateTime.format(formatter);
+ }
+ }
+
/** Get substring using {@link String#substring}. */
final class Substring implements Expression {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 1195f3dac..b4cf2131d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -773,6 +773,9 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
"_hour_date=hour(_date)",
"_hour_datetime=hour(_datetime)",
"_hour_timestamp=hour(_timestamp)",
+ "_date_format_date=date_format(_date,yyyy)",
+
"_date_format_datetime=date_format(_datetime,yyyy-MM-dd)",
+
"_date_format_timestamp=date_format(_timestamp,yyyyMMdd)",
"_substring_date1=substring(_date,2)",
"_substring_date2=substring(_timestamp,5,10)",
"_truncate_date=truncate(pk,2)");
@@ -824,6 +827,9 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
DataTypes.INT(),
DataTypes.STRING(),
DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
DataTypes.INT()
},
new String[] {
@@ -843,14 +849,17 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
"_hour_date",
"_hour_datetime",
"_hour_timestamp",
+ "_date_format_date",
+ "_date_format_datetime",
+ "_date_format_timestamp",
"_substring_date1",
"_substring_date2",
"_truncate_date"
});
List<String> expected =
Arrays.asList(
- "+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10,
2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 23-03-23, 09-15, 0]",
- "+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3, NULL,
NULL, 23, NULL, NULL, 0, NULL, NULL, 23-03-23, NULL, 2]");
+ "+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10,
2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 2023, 2022-01-01, 20210915,
23-03-23, 09-15, 0]",
+ "+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3, NULL,
NULL, 23, NULL, NULL, 0, NULL, NULL, 2023, NULL, NULL, 23-03-23, NULL, 2]");
waitForResult(expected, table, rowType, Arrays.asList("pk",
"_year_date"));
}