This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 38026fde [Fix] Fix flink sql projection pushdown error (#428)
38026fde is described below
commit 38026fde2e1b010b0776df6d4e9cdbb2e49966b6
Author: wudi <[email protected]>
AuthorDate: Mon Jul 15 15:36:49 2024 +0800
[Fix] Fix flink sql projection pushdown error (#428)
---
.../doris/flink/table/DorisDynamicTableSource.java | 22 ++++++++-----
.../doris/flink/source/DorisSourceITCase.java | 36 ++++++++++++++++++++++
2 files changed, 50 insertions(+), 8 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 9753361c..5827f879 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -94,6 +94,15 @@ public final class DorisDynamicTableSource
String filterQuery =
resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
readOptions.setFilterQuery(filterQuery);
}
+ if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
+ String[] selectFields =
+ DataType.getFieldNames(physicalRowDataType).toArray(new
String[0]);
+ readOptions.setReadFields(
+ Arrays.stream(selectFields)
+ .map(item -> String.format("`%s`",
item.trim().replace("`", "")))
+ .collect(Collectors.joining(", ")));
+ }
+
if (readOptions.getUseOldApi()) {
List<PartitionDefinition> dorisPartitions;
try {
@@ -199,14 +208,11 @@ public final class DorisDynamicTableSource
@Override
public void applyProjection(int[][] projectedFields, DataType
producedDataType) {
this.physicalRowDataType =
Projection.of(projectedFields).project(physicalRowDataType);
- if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
- String[] selectFields =
- DataType.getFieldNames(physicalRowDataType).toArray(new
String[0]);
- this.readOptions.setReadFields(
- Arrays.stream(selectFields)
- .map(item -> String.format("`%s`",
item.trim().replace("`", "")))
- .collect(Collectors.joining(", ")));
- }
+ String[] selectFields =
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]);
+ this.readOptions.setReadFields(
+ Arrays.stream(selectFields)
+ .map(item -> String.format("`%s`",
item.trim().replace("`", "")))
+ .collect(Collectors.joining(", ")));
}
@VisibleForTesting
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
index e13eeb36..027159db 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -49,6 +49,7 @@ public class DorisSourceITCase extends DorisTestBase {
static final String TABLE_READ_TBL = "tbl_read_tbl";
static final String TABLE_READ_TBL_OLD_API = "tbl_read_tbl_old_api";
static final String TABLE_READ_TBL_ALL_OPTIONS =
"tbl_read_tbl_all_options";
+ static final String TABLE_READ_TBL_PUSH_DOWN = "tbl_read_tbl_push_down";
@Test
public void testSource() throws Exception {
@@ -231,6 +232,41 @@ public class DorisSourceITCase extends DorisTestBase {
Assert.assertArrayEquals(expected, actual.toArray());
}
+ @Test
+ public void testTableSourceFilterAndProjectionPushDown() throws Exception {
+ initializeTable(TABLE_READ_TBL_PUSH_DOWN);
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE doris_source ("
+ + " age INT"
+ + ") WITH ("
+ + " 'connector' = 'doris',"
+ + " 'fenodes' = '%s',"
+ + " 'table.identifier' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s'"
+ + ")",
+ getFenodes(),
+ DATABASE + "." + TABLE_READ_TBL_PUSH_DOWN,
+ USERNAME,
+ PASSWORD);
+ tEnv.executeSql(sourceDDL);
+ TableResult tableResult = tEnv.executeSql("SELECT age FROM
doris_source where age = '18'");
+
+ List<String> actual = new ArrayList<>();
+ try (CloseableIterator<Row> iterator = tableResult.collect()) {
+ while (iterator.hasNext()) {
+ actual.add(iterator.next().toString());
+ }
+ }
+ String[] expected = new String[] {"+I[18]"};
+ Assert.assertArrayEquals(expected, actual.toArray());
+ }
+
private void initializeTable(String table) throws Exception {
try (Connection connection =
DriverManager.getConnection(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]