This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 8160dd2e [Fix] Fix SupportsFilterPushDown bug when flinksql unionall
(#479)
8160dd2e is described below
commit 8160dd2e9d69f3ba32e981e4670482eb9f152ede
Author: xiayang <[email protected]>
AuthorDate: Fri Aug 30 17:45:28 2024 +0800
[Fix] Fix SupportsFilterPushDown bug when flinksql unionall (#479)
---
.../org/apache/doris/flink/source/DorisSource.java | 23 ++++++++-
.../doris/flink/table/DorisDynamicTableSource.java | 5 +-
.../doris/flink/source/DorisSourceITCase.java | 55 ++++++++++++++++++++--
3 files changed, 73 insertions(+), 10 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
index 3c71c068..6faed87d 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.StringUtils;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
@@ -64,14 +65,18 @@ public class DorisSource<OUT>
private final Boundedness boundedness;
private final DorisDeserializationSchema<OUT> deserializer;
+ private final List<String> resolvedFilterQuery;
+
public DorisSource(
DorisOptions options,
DorisReadOptions readOptions,
Boundedness boundedness,
+ List<String> resolvedFilterQuery,
DorisDeserializationSchema<OUT> deserializer) {
this.options = options;
this.readOptions = readOptions;
this.boundedness = boundedness;
+ this.resolvedFilterQuery = resolvedFilterQuery;
this.deserializer = deserializer;
}
@@ -95,6 +100,15 @@ public class DorisSource<OUT>
public SplitEnumerator<DorisSourceSplit, PendingSplitsCheckpoint>
createEnumerator(
SplitEnumeratorContext<DorisSourceSplit> context) throws Exception
{
List<DorisSourceSplit> dorisSourceSplits = new ArrayList<>();
+ if (!resolvedFilterQuery.isEmpty()) {
+ String filterQuery = String.join(" AND ", resolvedFilterQuery);
+ if
(StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) {
+ readOptions.setFilterQuery(filterQuery);
+ } else {
+ readOptions.setFilterQuery(
+ String.join(" AND ", readOptions.getFilterQuery(),
filterQuery));
+ }
+ }
List<PartitionDefinition> partitions =
RestService.findPartitions(options, readOptions, LOG);
for (int index = 0; index < partitions.size(); index++) {
@@ -147,6 +161,7 @@ public class DorisSource<OUT>
// Boundedness
private Boundedness boundedness;
private DorisDeserializationSchema<OUT> deserializer;
+ private List<String> resolvedFilterQuery = new ArrayList<>();
DorisSourceBuilder() {
boundedness = Boundedness.BOUNDED;
@@ -173,11 +188,17 @@ public class DorisSource<OUT>
return this;
}
+ public DorisSourceBuilder<OUT> setResolvedFilterQuery(List<String>
resolvedFilterQuery) {
+ this.resolvedFilterQuery = resolvedFilterQuery;
+ return this;
+ }
+
public DorisSource<OUT> build() {
if (readOptions == null) {
readOptions = DorisReadOptions.builder().build();
}
- return new DorisSource<>(options, readOptions, boundedness,
deserializer);
+ return new DorisSource<>(
+ options, readOptions, boundedness, resolvedFilterQuery,
deserializer);
}
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 5827f879..e55d3631 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -90,10 +90,6 @@ public final class DorisDynamicTableSource
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext
runtimeProviderContext) {
- if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) {
- String filterQuery =
resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
- readOptions.setFilterQuery(filterQuery);
- }
if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
String[] selectFields =
DataType.getFieldNames(physicalRowDataType).toArray(new
String[0]);
@@ -127,6 +123,7 @@ public final class DorisDynamicTableSource
DorisSource.<RowData>builder()
.setDorisReadOptions(readOptions)
.setDorisOptions(options)
+ .setResolvedFilterQuery(resolvedFilterQuery)
.setDeserializer(
new RowDataDeserializationSchema(
(RowType)
physicalRowDataType.getLogicalType()))
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
index 027159db..a13e96f7 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -50,6 +50,8 @@ public class DorisSourceITCase extends DorisTestBase {
static final String TABLE_READ_TBL_OLD_API = "tbl_read_tbl_old_api";
static final String TABLE_READ_TBL_ALL_OPTIONS =
"tbl_read_tbl_all_options";
static final String TABLE_READ_TBL_PUSH_DOWN = "tbl_read_tbl_push_down";
+ static final String TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL =
+ "tbl_read_tbl_push_down_with_union_all";
@Test
public void testSource() throws Exception {
@@ -77,7 +79,7 @@ public class DorisSourceITCase extends DorisTestBase {
actual.add(iterator.next().toString());
}
}
- List<String> expected = Arrays.asList("[doris, 18]", "[flink, 10]");
+ List<String> expected = Arrays.asList("[doris, 18]", "[flink, 10]",
"[apache, 12]");
Assert.assertArrayEquals(actual.toArray(), expected.toArray());
}
@@ -102,7 +104,7 @@ public class DorisSourceITCase extends DorisTestBase {
actual.add(iterator.next().toString());
}
}
- List<String> expected = Arrays.asList("[doris, 18]", "[flink, 10]");
+ List<String> expected = Arrays.asList("[doris, 18]", "[flink, 10]",
"[apache, 12]");
Assert.assertArrayEquals(actual.toArray(), expected.toArray());
}
@@ -136,7 +138,7 @@ public class DorisSourceITCase extends DorisTestBase {
actual.add(iterator.next().toString());
}
}
- String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]"};
+ String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]",
"+I[apache, 12]"};
Assert.assertArrayEquals(expected, actual.toArray());
// fitler query
@@ -182,7 +184,7 @@ public class DorisSourceITCase extends DorisTestBase {
actual.add(iterator.next().toString());
}
}
- String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]"};
+ String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]",
"+I[apache, 12]"};
Assert.assertArrayEquals(expected, actual.toArray());
}
@@ -228,7 +230,7 @@ public class DorisSourceITCase extends DorisTestBase {
actual.add(iterator.next().toString());
}
}
- String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]"};
+ String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]",
"+I[apache, 12]"};
Assert.assertArrayEquals(expected, actual.toArray());
}
@@ -242,6 +244,7 @@ public class DorisSourceITCase extends DorisTestBase {
String sourceDDL =
String.format(
"CREATE TABLE doris_source ("
+ + " name STRING,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = 'doris',"
@@ -267,6 +270,46 @@ public class DorisSourceITCase extends DorisTestBase {
Assert.assertArrayEquals(expected, actual.toArray());
}
+ @Test
+ public void testTableSourceFilterWithUnionAll() throws Exception {
+ initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL);
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE doris_source ("
+ + " name STRING,"
+ + " age INT"
+ + ") WITH ("
+ + " 'connector' = 'doris',"
+ + " 'fenodes' = '%s',"
+ + " 'table.identifier' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s'"
+ + ")",
+ getFenodes(),
+ DATABASE + "." +
TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL,
+ USERNAME,
+ PASSWORD);
+ tEnv.executeSql(sourceDDL);
+ TableResult tableResult =
+ tEnv.executeSql(
+ " SELECT * FROM doris_source where age = '18'"
+ + " UNION ALL "
+ + "SELECT * FROM doris_source where age = '10'
");
+
+ List<String> actual = new ArrayList<>();
+ try (CloseableIterator<Row> iterator = tableResult.collect()) {
+ while (iterator.hasNext()) {
+ actual.add(iterator.next().toString());
+ }
+ }
+ String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]"};
+ Assert.assertArrayEquals(expected, actual.toArray());
+ }
+
private void initializeTable(String table) throws Exception {
try (Connection connection =
DriverManager.getConnection(
@@ -288,6 +331,8 @@ public class DorisSourceITCase extends DorisTestBase {
String.format("insert into %s.%s values ('doris',18)",
DATABASE, table));
statement.execute(
String.format("insert into %s.%s values ('flink',10)",
DATABASE, table));
+ statement.execute(
+ String.format("insert into %s.%s values ('apache',12)",
DATABASE, table));
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]