This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 8efaa43d [Fix](source) fix IndexOutOfBoundsException for union all   
(#495)
8efaa43d is described below

commit 8efaa43df769a56f21de9c9f8c9b93e0cf9c57f1
Author: wudi <[email protected]>
AuthorDate: Fri Sep 27 15:46:11 2024 +0800

    [Fix](source) fix IndexOutOfBoundsException for union all   (#495)
---
 .../apache/doris/flink/cfg/DorisReadOptions.java   | 18 +++++++++
 .../doris/flink/table/DorisDynamicTableSource.java |  7 +++-
 .../doris/flink/source/DorisSourceITCase.java      | 47 ++++++++++++++++++++++
 3 files changed, 71 insertions(+), 1 deletion(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index 2f6cd8a8..937d3286 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -187,6 +187,24 @@ public class DorisReadOptions implements Serializable {
                 flightSqlPort);
     }
 
+    public DorisReadOptions copy() {
+        return new DorisReadOptions(
+                readFields,
+                filterQuery,
+                requestTabletSize,
+                requestConnectTimeoutMs,
+                requestReadTimeoutMs,
+                requestQueryTimeoutS,
+                requestRetries,
+                requestBatchSize,
+                execMemLimit,
+                deserializeQueueSize,
+                deserializeArrowAsync,
+                useOldApi,
+                useFlightSql,
+                flightSqlPort);
+    }
+
     /** Builder of {@link DorisReadOptions}. */
     public static class Builder {
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index e55d3631..03d77bc3 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -166,9 +166,14 @@ public final class DorisDynamicTableSource
 
     @Override
     public DynamicTableSource copy() {
+        // filterQuery/readFields of readOption may be overwritten in union 
all sql
         DorisDynamicTableSource newSource =
                 new DorisDynamicTableSource(
-                        options, readOptions, lookupOptions, physicalSchema, 
physicalRowDataType);
+                        options,
+                        readOptions.copy(),
+                        lookupOptions,
+                        physicalSchema,
+                        physicalRowDataType);
         newSource.resolvedFilterQuery = new 
ArrayList<>(this.resolvedFilterQuery);
         return newSource;
     }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
index 4fb6fba8..6f148301 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -63,6 +63,8 @@ public class DorisSourceITCase extends AbstractITCaseService {
             "tbl_read_tbl_push_down_with_union_all";
     static final String TABLE_CSV_JM = "tbl_csv_jm_source";
     static final String TABLE_CSV_TM = "tbl_csv_tm_source";
+    private static final String 
TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER =
+            "tbl_read_tbl_push_down_with_union_all_not_eq_filter";
 
     @Rule
     public final MiniClusterWithClientResource miniClusterResource =
@@ -353,6 +355,51 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
         checkResultInAnyOrder("testTableSourceFilterWithUnionAll", expected, 
actual.toArray());
     }
 
+    @Test
+    public void testTableSourceFilterWithUnionAllNotEqualFilter() throws 
Exception {
+        LOG.info("starting to execute 
testTableSourceFilterWithUnionAllNotEqualFilter case.");
+        initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER);
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(DEFAULT_PARALLELISM);
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        String sourceDDL =
+                String.format(
+                        "CREATE TABLE doris_source_filter_with_union_all ("
+                                + " name STRING,"
+                                + " age INT"
+                                + ") WITH ("
+                                + " 'connector' = '"
+                                + DorisConfigOptions.IDENTIFIER
+                                + "',"
+                                + " 'fenodes' = '%s',"
+                                + " 'table.identifier' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s'"
+                                + ")",
+                        getFenodes(),
+                        DATABASE + "." + 
TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER,
+                        getDorisUsername(),
+                        getDorisPassword());
+        tEnv.executeSql(sourceDDL);
+        String querySql =
+                "  SELECT * FROM doris_source_filter_with_union_all where name 
= 'doris'"
+                        + " UNION ALL "
+                        + "SELECT * FROM doris_source_filter_with_union_all 
where name in ('error','flink')";
+        TableResult tableResult = tEnv.executeSql(querySql);
+
+        List<String> actual = new ArrayList<>();
+        try (CloseableIterator<Row> iterator = tableResult.collect()) {
+            while (iterator.hasNext()) {
+                actual.add(iterator.next().toString());
+            }
+        }
+
+        String[] expected = new String[] {"+I[flink, 10]", "+I[doris, 18]"};
+        checkResultInAnyOrder(
+                "testTableSourceFilterWithUnionAllNotEqualFilter", expected, 
actual.toArray());
+    }
+
     @Test
     public void testJobManagerFailoverSource() throws Exception {
         LOG.info("start to test JobManagerFailoverSource.");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to