This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 3dd2be7  [Improve]Compatible with previous filter query options (#373)
3dd2be7 is described below

commit 3dd2be7a896a9cbfc252e98cd5fdfce87169d2a9
Author: wudi <[email protected]>
AuthorDate: Mon May 6 22:12:43 2024 +0800

    [Improve]Compatible with previous filter query options (#373)
---
 .../apache/doris/flink/table/DorisConfigOptions.java  | 18 ++++++++++++++++++
 .../doris/flink/table/DorisDynamicTableFactory.java   |  6 ++++++
 .../doris/flink/table/DorisDynamicTableSource.java    | 19 ++++++++++++-------
 3 files changed, 36 insertions(+), 7 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 63e550a..77349a6 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -80,6 +80,24 @@ public class DorisConfigOptions {
                             "Use automatic redirection of fe without 
explicitly obtaining the be list");
 
     // source config options
+    // This is compatible with the previous writing method.
+    // Some expressions may not be pushed down by FlinkSQL.
+    @Deprecated
+    public static final ConfigOption<String> DORIS_FILTER_QUERY =
+            ConfigOptions.key("doris.filter.query")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Filter expression of the query, which is 
transparently transmitted to Doris. Doris uses this expression to complete 
source-side data filtering");
+
+    @Deprecated
+    public static final ConfigOption<String> DORIS_READ_FIELD =
+            ConfigOptions.key("doris.read.field")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "List of column names in the Doris table, 
separated by commas");
+
     public static final ConfigOption<Integer> DORIS_TABLET_SIZE =
             ConfigOptions.key("doris.request.tablet.size")
                     .intType()
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index a276696..b198ca3 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -45,6 +45,8 @@ import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_BATCH_SIZE;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_EXEC_MEM_LIMIT;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_FILTER_QUERY;
+import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_READ_FIELD;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_QUERY_TIMEOUT_S;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ_TIMEOUT_MS;
@@ -116,6 +118,8 @@ public final class DorisDynamicTableFactory
         options.add(JDBC_URL);
         options.add(AUTO_REDIRECT);
 
+        options.add(DORIS_READ_FIELD);
+        options.add(DORIS_FILTER_QUERY);
         options.add(DORIS_TABLET_SIZE);
         options.add(DORIS_REQUEST_CONNECT_TIMEOUT_MS);
         options.add(DORIS_REQUEST_READ_TIMEOUT_MS);
@@ -202,6 +206,8 @@ public final class DorisDynamicTableFactory
         
builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
                 
.setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
                 
.setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT).getBytes())
+                .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
+                .setReadFields(readableConfig.get(DORIS_READ_FIELD))
                 .setRequestQueryTimeoutS(
                         (int) 
readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S).getSeconds())
                 .setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index b6cd1a7..32851d4 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -95,13 +95,18 @@ public final class DorisDynamicTableSource
 
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext 
runtimeProviderContext) {
-        String filterQuery = 
resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
-        readOptions.setFilterQuery(filterQuery);
-        String[] selectFields = 
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]);
-        readOptions.setReadFields(
-                Arrays.stream(selectFields)
-                        .map(item -> String.format("`%s`", 
item.trim().replace("`", "")))
-                        .collect(Collectors.joining(", ")));
+        if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) {
+            String filterQuery = 
resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
+            readOptions.setFilterQuery(filterQuery);
+        }
+        if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
+            String[] selectFields =
+                    DataType.getFieldNames(physicalRowDataType).toArray(new 
String[0]);
+            readOptions.setReadFields(
+                    Arrays.stream(selectFields)
+                            .map(item -> String.format("`%s`", 
item.trim().replace("`", "")))
+                            .collect(Collectors.joining(", ")));
+        }
 
         if (readOptions.getUseOldApi()) {
             List<PartitionDefinition> dorisPartitions;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to