This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new c4eed254 [fix](source)fix timestamp format push down error (#528)
c4eed254 is described below
commit c4eed254f434316bc5e82b8aaf0b655540b04325
Author: Petrichor <[email protected]>
AuthorDate: Tue Dec 17 14:18:10 2024 +0800
[fix](source)fix timestamp format push down error (#528)
---
.../doris/flink/table/DorisExpressionVisitor.java | 58 ++++++-
.../doris/flink/source/DorisSourceITCase.java | 186 +++++++++++++++++++++
2 files changed, 239 insertions(+), 5 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java
index 66242e1e..93f15beb 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java
@@ -28,9 +28,17 @@ import
org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.util.StringUtils;
+import org.apache.doris.flink.exception.DorisRuntimeException;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.List;
public class DorisExpressionVisitor implements ExpressionVisitor<String> {
+ private static final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
+ private static final String DATETIMEV2_PATTERN = "yyyy-MM-dd
HH:mm:ss.SSSSSS";
+ DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern(DATETIME_PATTERN);
+ DateTimeFormatter dateTimev2Formatter =
DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);
@Override
public String visit(CallExpression call) {
@@ -94,11 +102,47 @@ public class DorisExpressionVisitor implements
ExpressionVisitor<String> {
@Override
public String visit(ValueLiteralExpression valueLiteral) {
LogicalTypeRoot typeRoot =
valueLiteral.getOutputDataType().getLogicalType().getTypeRoot();
- if (typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
- ||
typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
- || typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE)
- || typeRoot.equals(LogicalTypeRoot.DATE)) {
- return "'" + valueLiteral + "'";
+
+ switch (typeRoot) {
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ case TIMESTAMP_WITH_TIME_ZONE:
+ case DATE:
+ return "'" + valueLiteral + "'";
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ Class<?> conversionClass =
valueLiteral.getOutputDataType().getConversionClass();
+ if (LocalDateTime.class.isAssignableFrom(conversionClass)) {
+ try {
+ LocalDateTime localDateTime =
+ valueLiteral
+ .getValueAs(LocalDateTime.class)
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ "Failed to get
LocalDateTime value"));
+ int nano = localDateTime.getNano();
+ if (nano == 0) {
+ // if nanoseconds equals to zero, the timestamp is
in seconds.
+ return
wrapWithQuotes(localDateTime.format(dateTimeFormatter));
+ } else {
+ // 1. Even though the datetime precision in Doris
is set to 3, the
+ // microseconds format such as "yyyy-MM-dd
HH:mm:ss.SSSSSS" can still
+ // function properly in the Doris query plan.
+ // 2. If the timestamp is in nanoseconds, format
it like 'yyyy-MM-dd
+ // HH:mm:ss.SSSSSS'. This will have no impact on
the result. Because
+ // when parsing the imported DATETIME type data on
the BE side (for
+ // example, through Stream load, Spark load,
etc.), or when using the FE
+ // side with Nereids enabled, the decimals that
exceed the current
+ // precision will be rounded.
+ return
wrapWithQuotes(localDateTime.format(dateTimev2Formatter));
+ }
+
+ } catch (Exception e) {
+ throw new DorisRuntimeException(e.getMessage());
+ }
+ }
+ break;
+ default:
+ return valueLiteral.toString();
}
return valueLiteral.toString();
}
@@ -117,4 +161,8 @@ public class DorisExpressionVisitor implements
ExpressionVisitor<String> {
public String visit(Expression expression) {
return null;
}
+
+ private static String wrapWithQuotes(String value) {
+ return "'" + value + "'";
+ }
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
index 6f148301..18de700e 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -24,6 +24,7 @@ import
org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
@@ -59,6 +60,8 @@ public class DorisSourceITCase extends AbstractITCaseService {
private static final String TABLE_READ_TBL_OLD_API =
"tbl_read_tbl_old_api";
private static final String TABLE_READ_TBL_ALL_OPTIONS =
"tbl_read_tbl_all_options";
private static final String TABLE_READ_TBL_PUSH_DOWN =
"tbl_read_tbl_push_down";
+ private static final String TABLE_READ_TBL_TIMESTAMP_PUSH_DOWN =
+ "tbl_read_tbl_timestamp_push_down";
private static final String TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL =
"tbl_read_tbl_push_down_with_union_all";
static final String TABLE_CSV_JM = "tbl_csv_jm_source";
@@ -311,6 +314,138 @@ public class DorisSourceITCase extends
AbstractITCaseService {
"testTableSourceFilterAndProjectionPushDown", expected,
actual.toArray());
}
+ @Test
+ public void testTableSourceTimestampFilterAndProjectionPushDown() throws
Exception {
+ initializeTimestampTable(TABLE_READ_TBL_TIMESTAMP_PUSH_DOWN);
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
+ final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
settings);
+
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE
doris_source_datetime_filter_and_projection_push_down ("
+ + "`id` int ,\n"
+ + "`name` timestamp,\n"
+ + "`age` int,\n"
+ + "`birthday` timestamp,\n"
+ + "`brilliant_time` timestamp(6)\n"
+ + ") WITH ("
+ + " 'connector' = '"
+ + DorisConfigOptions.IDENTIFIER
+ + "',"
+ + " 'fenodes' = '%s',"
+ + " 'table.identifier' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s'"
+ + ")",
+ getFenodes(),
+ DATABASE + "." + TABLE_READ_TBL_TIMESTAMP_PUSH_DOWN,
+ getDorisUsername(),
+ getDorisPassword());
+ tEnv.executeSql(sourceDDL);
+
+ List<String> actualProjectionResult =
+ generateExecuteSQLResult(
+ tEnv,
+ "SELECT id,birthday,brilliant_time FROM
doris_source_datetime_filter_and_projection_push_down order by id");
+
+ List<String> actualPushDownDatetimeResult =
+ generateExecuteSQLResult(
+ tEnv,
+ "SELECT id,birthday FROM
doris_source_datetime_filter_and_projection_push_down where birthday >=
'2023-01-01 00:00:00' order by id");
+ List<String> actualPushDownMicrosecondResult =
+ generateExecuteSQLResult(
+ tEnv,
+ "SELECT id,brilliant_time FROM
doris_source_datetime_filter_and_projection_push_down where brilliant_time >
'2023-01-01 00:00:00.000001' order by id");
+ List<String> actualPushDownNanosecondResult =
+ generateExecuteSQLResult(
+ tEnv,
+ "SELECT id,brilliant_time FROM
doris_source_datetime_filter_and_projection_push_down where brilliant_time >
'2023-01-01 00:00:00.000009001' order by id");
+
+ List<String> actualPushDownNanosecondRoundDownResult =
+ generateExecuteSQLResult(
+ tEnv,
+ "SELECT id,brilliant_time FROM
doris_source_datetime_filter_and_projection_push_down where brilliant_time >=
'2023-01-01 00:00:00.999999001' order by id");
+ List<String> actualPushDownNanosecondRoundUpResult =
+ generateExecuteSQLResult(
+ tEnv,
+ "SELECT id,brilliant_time FROM
doris_source_datetime_filter_and_projection_push_down where brilliant_time >=
'2023-01-01 00:00:00.999999999' order by id");
+
+ String[] expectedProjectionResult =
+ new String[] {
+ "+I[1, 2023-01-01T00:00, 2023-01-01T00:00:00.000001]",
+ "+I[2, 2023-01-01T00:00:01, 2023-01-01T00:00:00.005]",
+ "+I[3, 2023-01-01T00:00:02, 2023-01-01T00:00:00.000009]",
+ "+I[4, 2023-01-01T00:00:02, 2023-01-01T00:00:00.999999]",
+ "+I[5, 2023-01-01T00:00:02, 2023-01-01T00:00:00.999999]",
+ "+I[6, 2023-01-01T00:00:02, 2023-01-01T00:00:01]"
+ };
+ String[] expectedPushDownDatetimeResult =
+ new String[] {
+ "+I[1, 2023-01-01T00:00]",
+ "+I[2, 2023-01-01T00:00:01]",
+ "+I[3, 2023-01-01T00:00:02]",
+ "+I[4, 2023-01-01T00:00:02]",
+ "+I[5, 2023-01-01T00:00:02]",
+ "+I[6, 2023-01-01T00:00:02]"
+ };
+ String[] expectedPushDownWithMicrosecondResult =
+ new String[] {
+ "+I[2, 2023-01-01T00:00:00.005]",
+ "+I[3, 2023-01-01T00:00:00.000009]",
+ "+I[4, 2023-01-01T00:00:00.999999]",
+ "+I[5, 2023-01-01T00:00:00.999999]",
+ "+I[6, 2023-01-01T00:00:01]"
+ };
+
+ String[] expectedPushDownWithNanosecondResult =
+ new String[] {
+ "+I[2, 2023-01-01T00:00:00.005]",
+ "+I[4, 2023-01-01T00:00:00.999999]",
+ "+I[5, 2023-01-01T00:00:00.999999]",
+ "+I[6, 2023-01-01T00:00:01]"
+ };
+
+ String[] expectedPushDownWithNanosecondRoundDownResult =
+ new String[] {
+ "+I[4, 2023-01-01T00:00:00.999999]",
+ "+I[5, 2023-01-01T00:00:00.999999]",
+ "+I[6, 2023-01-01T00:00:01]"
+ };
+
+ String[] expectedPushDownWithNanosecondRoundUpResult =
+ new String[] {
+ "+I[4, 2023-01-01T00:00:00.999999]",
+ "+I[5, 2023-01-01T00:00:00.999999]",
+ "+I[6, 2023-01-01T00:00:01]"
+ };
+ checkResultInAnyOrder(
+ "testTableSourceTimestampFilterAndProjectionPushDown",
+ expectedProjectionResult,
+ actualProjectionResult.toArray());
+ checkResultInAnyOrder(
+ "testTableSourceTimestampFilterAndProjectionPushDown",
+ expectedPushDownDatetimeResult,
+ actualPushDownDatetimeResult.toArray());
+ checkResultInAnyOrder(
+ "testTableSourceTimestampFilterAndProjectionPushDown",
+ expectedPushDownWithMicrosecondResult,
+ actualPushDownMicrosecondResult.toArray());
+ checkResultInAnyOrder(
+ "testTableSourceTimestampFilterAndProjectionPushDown",
+ expectedPushDownWithNanosecondResult,
+ actualPushDownNanosecondResult.toArray());
+ checkResultInAnyOrder(
+ "testTableSourceTimestampFilterAndProjectionPushDown",
+ expectedPushDownWithNanosecondRoundDownResult,
+ actualPushDownNanosecondRoundDownResult.toArray());
+ checkResultInAnyOrder(
+ "testTableSourceTimestampFilterAndProjectionPushDown",
+ expectedPushDownWithNanosecondRoundUpResult,
+ actualPushDownNanosecondRoundUpResult.toArray());
+ }
+
@Test
public void testTableSourceFilterWithUnionAll() throws Exception {
LOG.info("starting to execute testTableSourceFilterWithUnionAll
case.");
@@ -566,6 +701,44 @@ public class DorisSourceITCase extends
AbstractITCaseService {
String.format("insert into %s.%s values ('apache',12)",
DATABASE, table));
}
+ private void initializeTimestampTable(String table) {
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection(),
+ LOG,
+ String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+ String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+ String.format(
+ "CREATE TABLE %s.%s ( \n"
+ + "`id` int,\n"
+ + "`name` varchar(256),\n"
+ + "`age` int,\n"
+ + "`birthday` datetime,\n"
+ + "`brilliant_time` datetime(6),\n"
+ + ") DISTRIBUTED BY HASH(`id`) BUCKETS 3\n"
+ + "PROPERTIES (\n"
+ + "\"replication_num\" = \"1\"\n"
+ + ")\n",
+ DATABASE, table),
+ String.format(
+ "insert into %s.%s values
(1,'Kevin',54,'2023-01-01T00:00:00','2023-01-01T00:00:00.000001')",
+ DATABASE, table),
+ String.format(
+ "insert into %s.%s values
(2,'Dylan',25,'2023-01-01T00:00:01','2023-01-01T00:00:00.005000')",
+ DATABASE, table),
+ String.format(
+ "insert into %s.%s values
(3,'Darren',65,'2023-01-01T00:00:02','2023-01-01T00:00:00.000009')",
+ DATABASE, table),
+ String.format(
+ "insert into %s.%s values
(4,'Warren',75,'2023-01-01T00:00:02','2023-01-01T00:00:00.999999')",
+ DATABASE, table),
+ String.format(
+ "insert into %s.%s values
(5,'Simba',75,'2023-01-01T00:00:02','2023-01-01T00:00:00.999999001')",
+ DATABASE, table),
+ String.format(
+ "insert into %s.%s values
(6,'Jimmy',75,'2023-01-01T00:00:02','2023-01-01T00:00:00.999999999')",
+ DATABASE, table));
+ }
+
private void initializeTableWithData(String table) {
ContainerUtils.executeSQLStatement(
getDorisQueryConnection(),
@@ -610,4 +783,17 @@ public class DorisSourceITCase extends
AbstractITCaseService {
}
return rows;
}
+
+ private List<String> generateExecuteSQLResult(StreamTableEnvironment tEnv,
String executeSql)
+ throws Exception {
+ List<String> actualResultList = new ArrayList<>();
+ TableResult tableResult = tEnv.executeSql(executeSql);
+ try (CloseableIterator<Row> iterator = tableResult.collect()) {
+ while (iterator.hasNext()) {
+
+ actualResultList.add(iterator.next().toString());
+ }
+ }
+ return actualResultList;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]