This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 3d13c7f7 [Improve] revert pr 479 to remove resolvedFilterQuery in
DorisSource (#497)
3d13c7f7 is described below
commit 3d13c7f7f439ebe55d6dc1ec875b927ae06aa4ef
Author: wudi <[email protected]>
AuthorDate: Fri Oct 11 18:21:49 2024 +0800
[Improve] revert pr 479 to remove resolvedFilterQuery in DorisSource (#497)
---
.../org/apache/doris/flink/source/DorisSource.java | 23 +---------------------
.../doris/flink/table/DorisDynamicTableSource.java | 6 +++++-
2 files changed, 6 insertions(+), 23 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
index 19a7fe36..5cdd406c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.util.StringUtils;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
@@ -65,18 +64,14 @@ public class DorisSource<OUT>
private final Boundedness boundedness;
private final DorisDeserializationSchema<OUT> deserializer;
- private final List<String> resolvedFilterQuery;
-
public DorisSource(
DorisOptions options,
DorisReadOptions readOptions,
Boundedness boundedness,
- List<String> resolvedFilterQuery,
DorisDeserializationSchema<OUT> deserializer) {
this.options = options;
this.readOptions = readOptions;
this.boundedness = boundedness;
- this.resolvedFilterQuery = resolvedFilterQuery;
this.deserializer = deserializer;
}
@@ -100,15 +95,6 @@ public class DorisSource<OUT>
public SplitEnumerator<DorisSourceSplit, PendingSplitsCheckpoint>
createEnumerator(
SplitEnumeratorContext<DorisSourceSplit> context) throws Exception
{
List<DorisSourceSplit> dorisSourceSplits = new ArrayList<>();
- if (!resolvedFilterQuery.isEmpty()) {
- String filterQuery = String.join(" AND ", resolvedFilterQuery);
- if
(StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) {
- readOptions.setFilterQuery(filterQuery);
- } else {
- readOptions.setFilterQuery(
- String.join(" AND ", readOptions.getFilterQuery(),
filterQuery));
- }
- }
List<PartitionDefinition> partitions =
RestService.findPartitions(options, readOptions, LOG);
for (int index = 0; index < partitions.size(); index++) {
@@ -162,7 +148,6 @@ public class DorisSource<OUT>
// Boundedness
private Boundedness boundedness;
private DorisDeserializationSchema<OUT> deserializer;
- private List<String> resolvedFilterQuery = new ArrayList<>();
DorisSourceBuilder() {
boundedness = Boundedness.BOUNDED;
@@ -208,11 +193,6 @@ public class DorisSource<OUT>
return this;
}
- public DorisSourceBuilder<OUT> setResolvedFilterQuery(List<String>
resolvedFilterQuery) {
- this.resolvedFilterQuery = resolvedFilterQuery;
- return this;
- }
-
/**
* Build the {@link DorisSource}.
*
@@ -222,8 +202,7 @@ public class DorisSource<OUT>
if (readOptions == null) {
readOptions = DorisReadOptions.builder().build();
}
- return new DorisSource<>(
- options, readOptions, boundedness, resolvedFilterQuery,
deserializer);
+ return new DorisSource<>(options, readOptions, boundedness,
deserializer);
}
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 03d77bc3..9763a888 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -90,6 +90,11 @@ public final class DorisDynamicTableSource
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext
runtimeProviderContext) {
+ if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) {
+ String filterQuery =
resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
+ readOptions.setFilterQuery(filterQuery);
+ }
+
if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
String[] selectFields =
DataType.getFieldNames(physicalRowDataType).toArray(new
String[0]);
@@ -123,7 +128,6 @@ public final class DorisDynamicTableSource
DorisSource.<RowData>builder()
.setDorisReadOptions(readOptions)
.setDorisOptions(options)
- .setResolvedFilterQuery(resolvedFilterQuery)
.setDeserializer(
new RowDataDeserializationSchema(
(RowType)
physicalRowDataType.getLogicalType()))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]