This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d13c7f7 [Improve] revert pr 479 to remove resolvedFilterQuery in 
DorisSource (#497)
3d13c7f7 is described below

commit 3d13c7f7f439ebe55d6dc1ec875b927ae06aa4ef
Author: wudi <[email protected]>
AuthorDate: Fri Oct 11 18:21:49 2024 +0800

    [Improve] revert pr 479 to remove resolvedFilterQuery in DorisSource (#497)
---
 .../org/apache/doris/flink/source/DorisSource.java | 23 +---------------------
 .../doris/flink/table/DorisDynamicTableSource.java |  6 +++++-
 2 files changed, 6 insertions(+), 23 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
index 19a7fe36..5cdd406c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.util.StringUtils;
 
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
@@ -65,18 +64,14 @@ public class DorisSource<OUT>
     private final Boundedness boundedness;
     private final DorisDeserializationSchema<OUT> deserializer;
 
-    private final List<String> resolvedFilterQuery;
-
     public DorisSource(
             DorisOptions options,
             DorisReadOptions readOptions,
             Boundedness boundedness,
-            List<String> resolvedFilterQuery,
             DorisDeserializationSchema<OUT> deserializer) {
         this.options = options;
         this.readOptions = readOptions;
         this.boundedness = boundedness;
-        this.resolvedFilterQuery = resolvedFilterQuery;
         this.deserializer = deserializer;
     }
 
@@ -100,15 +95,6 @@ public class DorisSource<OUT>
     public SplitEnumerator<DorisSourceSplit, PendingSplitsCheckpoint> 
createEnumerator(
             SplitEnumeratorContext<DorisSourceSplit> context) throws Exception 
{
         List<DorisSourceSplit> dorisSourceSplits = new ArrayList<>();
-        if (!resolvedFilterQuery.isEmpty()) {
-            String filterQuery = String.join(" AND ", resolvedFilterQuery);
-            if 
(StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) {
-                readOptions.setFilterQuery(filterQuery);
-            } else {
-                readOptions.setFilterQuery(
-                        String.join(" AND ", readOptions.getFilterQuery(), 
filterQuery));
-            }
-        }
         List<PartitionDefinition> partitions =
                 RestService.findPartitions(options, readOptions, LOG);
         for (int index = 0; index < partitions.size(); index++) {
@@ -162,7 +148,6 @@ public class DorisSource<OUT>
         // Boundedness
         private Boundedness boundedness;
         private DorisDeserializationSchema<OUT> deserializer;
-        private List<String> resolvedFilterQuery = new ArrayList<>();
 
         DorisSourceBuilder() {
             boundedness = Boundedness.BOUNDED;
@@ -208,11 +193,6 @@ public class DorisSource<OUT>
             return this;
         }
 
-        public DorisSourceBuilder<OUT> setResolvedFilterQuery(List<String> 
resolvedFilterQuery) {
-            this.resolvedFilterQuery = resolvedFilterQuery;
-            return this;
-        }
-
         /**
          * Build the {@link DorisSource}.
          *
@@ -222,8 +202,7 @@ public class DorisSource<OUT>
             if (readOptions == null) {
                 readOptions = DorisReadOptions.builder().build();
             }
-            return new DorisSource<>(
-                    options, readOptions, boundedness, resolvedFilterQuery, 
deserializer);
+            return new DorisSource<>(options, readOptions, boundedness, 
deserializer);
         }
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 03d77bc3..9763a888 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -90,6 +90,11 @@ public final class DorisDynamicTableSource
 
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext 
runtimeProviderContext) {
+        if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) {
+            String filterQuery = 
resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
+            readOptions.setFilterQuery(filterQuery);
+        }
+
         if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
             String[] selectFields =
                     DataType.getFieldNames(physicalRowDataType).toArray(new 
String[0]);
@@ -123,7 +128,6 @@ public final class DorisDynamicTableSource
                     DorisSource.<RowData>builder()
                             .setDorisReadOptions(readOptions)
                             .setDorisOptions(options)
-                            .setResolvedFilterQuery(resolvedFilterQuery)
                             .setDeserializer(
                                     new RowDataDeserializationSchema(
                                             (RowType) 
physicalRowDataType.getLogicalType()))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to