This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 8efaa43d [Fix](source) fix IndexOutOfBoundsException for union all
(#495)
8efaa43d is described below
commit 8efaa43df769a56f21de9c9f8c9b93e0cf9c57f1
Author: wudi <[email protected]>
AuthorDate: Fri Sep 27 15:46:11 2024 +0800
[Fix](source) fix IndexOutOfBoundsException for union all (#495)
---
.../apache/doris/flink/cfg/DorisReadOptions.java | 18 +++++++++
.../doris/flink/table/DorisDynamicTableSource.java | 7 +++-
.../doris/flink/source/DorisSourceITCase.java | 47 ++++++++++++++++++++++
3 files changed, 71 insertions(+), 1 deletion(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index 2f6cd8a8..937d3286 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -187,6 +187,24 @@ public class DorisReadOptions implements Serializable {
flightSqlPort);
}
+ public DorisReadOptions copy() {
+ return new DorisReadOptions(
+ readFields,
+ filterQuery,
+ requestTabletSize,
+ requestConnectTimeoutMs,
+ requestReadTimeoutMs,
+ requestQueryTimeoutS,
+ requestRetries,
+ requestBatchSize,
+ execMemLimit,
+ deserializeQueueSize,
+ deserializeArrowAsync,
+ useOldApi,
+ useFlightSql,
+ flightSqlPort);
+ }
+
/** Builder of {@link DorisReadOptions}. */
public static class Builder {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index e55d3631..03d77bc3 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -166,9 +166,14 @@ public final class DorisDynamicTableSource
@Override
public DynamicTableSource copy() {
+ // filterQuery/readFields of readOption may be overwritten in union
all sql
DorisDynamicTableSource newSource =
new DorisDynamicTableSource(
- options, readOptions, lookupOptions, physicalSchema,
physicalRowDataType);
+ options,
+ readOptions.copy(),
+ lookupOptions,
+ physicalSchema,
+ physicalRowDataType);
newSource.resolvedFilterQuery = new
ArrayList<>(this.resolvedFilterQuery);
return newSource;
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
index 4fb6fba8..6f148301 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -63,6 +63,8 @@ public class DorisSourceITCase extends AbstractITCaseService {
"tbl_read_tbl_push_down_with_union_all";
static final String TABLE_CSV_JM = "tbl_csv_jm_source";
static final String TABLE_CSV_TM = "tbl_csv_tm_source";
+ private static final String
TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER =
+ "tbl_read_tbl_push_down_with_union_all_not_eq_filter";
@Rule
public final MiniClusterWithClientResource miniClusterResource =
@@ -353,6 +355,51 @@ public class DorisSourceITCase extends
AbstractITCaseService {
checkResultInAnyOrder("testTableSourceFilterWithUnionAll", expected,
actual.toArray());
}
+ @Test
+ public void testTableSourceFilterWithUnionAllNotEqualFilter() throws
Exception {
+ LOG.info("starting to execute
testTableSourceFilterWithUnionAllNotEqualFilter case.");
+ initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER);
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE doris_source_filter_with_union_all ("
+ + " name STRING,"
+ + " age INT"
+ + ") WITH ("
+ + " 'connector' = '"
+ + DorisConfigOptions.IDENTIFIER
+ + "',"
+ + " 'fenodes' = '%s',"
+ + " 'table.identifier' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s'"
+ + ")",
+ getFenodes(),
+ DATABASE + "." +
TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER,
+ getDorisUsername(),
+ getDorisPassword());
+ tEnv.executeSql(sourceDDL);
+ String querySql =
+ " SELECT * FROM doris_source_filter_with_union_all where name
= 'doris'"
+ + " UNION ALL "
+ + "SELECT * FROM doris_source_filter_with_union_all
where name in ('error','flink')";
+ TableResult tableResult = tEnv.executeSql(querySql);
+
+ List<String> actual = new ArrayList<>();
+ try (CloseableIterator<Row> iterator = tableResult.collect()) {
+ while (iterator.hasNext()) {
+ actual.add(iterator.next().toString());
+ }
+ }
+
+ String[] expected = new String[] {"+I[flink, 10]", "+I[doris, 18]"};
+ checkResultInAnyOrder(
+ "testTableSourceFilterWithUnionAllNotEqualFilter", expected,
actual.toArray());
+ }
+
@Test
public void testJobManagerFailoverSource() throws Exception {
LOG.info("start to test JobManagerFailoverSource.");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]