This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 9b49bc4 [fix](source) fix projection and function pushdown not
functioning correctly (#383)
9b49bc4 is described below
commit 9b49bc43827406135ba19342fb33e48daffda4b0
Author: Petrichor <[email protected]>
AuthorDate: Sat May 11 14:46:39 2024 +0800
[fix](source) fix projection and function pushdown not functioning
correctly (#383)
---
.../doris/flink/table/DorisDynamicTableSource.java | 20 ++++++++++----------
.../doris/flink/table/DorisExpressionVisitor.java | 11 +++++++++++
2 files changed, 21 insertions(+), 10 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 32851d4..e2b837c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -99,15 +99,6 @@ public final class DorisDynamicTableSource
String filterQuery =
resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
readOptions.setFilterQuery(filterQuery);
}
- if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
- String[] selectFields =
- DataType.getFieldNames(physicalRowDataType).toArray(new
String[0]);
- readOptions.setReadFields(
- Arrays.stream(selectFields)
- .map(item -> String.format("`%s`",
item.trim().replace("`", "")))
- .collect(Collectors.joining(", ")));
- }
-
if (readOptions.getUseOldApi()) {
List<PartitionDefinition> dorisPartitions;
try {
@@ -194,7 +185,8 @@ public final class DorisDynamicTableSource
DorisExpressionVisitor expressionVisitor = new
DorisExpressionVisitor();
for (ResolvedExpression filter : filters) {
String filterQuery = filter.accept(expressionVisitor);
- if (!StringUtils.isNullOrWhitespaceOnly(filterQuery)) {
+ if
(StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())
+ && !StringUtils.isNullOrWhitespaceOnly(filterQuery)) {
acceptedFilters.add(filter);
this.resolvedFilterQuery.add(filterQuery);
} else {
@@ -212,5 +204,13 @@ public final class DorisDynamicTableSource
@Override
public void applyProjection(int[][] projectedFields, DataType
producedDataType) {
this.physicalRowDataType =
Projection.of(projectedFields).project(physicalRowDataType);
+ if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
+ String[] selectFields =
+ DataType.getFieldNames(physicalRowDataType).toArray(new
String[0]);
+ this.readOptions.setReadFields(
+ Arrays.stream(selectFields)
+ .map(item -> String.format("`%s`",
item.trim().replace("`", "")))
+ .collect(Collectors.joining(", ")));
+ }
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java
index 3f327fe..66242e1 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java
@@ -26,6 +26,7 @@ import
org.apache.flink.table.expressions.TypeLiteralExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.StringUtils;
import java.util.List;
@@ -66,12 +67,22 @@ public class DorisExpressionVisitor implements
ExpressionVisitor<String> {
if
(BuiltInFunctionDefinitions.IS_NOT_NULL.equals(call.getFunctionDefinition())) {
return combineLeftExpression("IS NOT NULL",
call.getResolvedChildren().get(0));
}
+
+ if
(BuiltInFunctionDefinitions.CAST.equals(call.getFunctionDefinition())) {
+ return call.getChildren().get(0).accept(this);
+ }
return null;
}
private String combineExpression(String operator, List<ResolvedExpression>
operand) {
String left = operand.get(0).accept(this);
+ if (StringUtils.isNullOrWhitespaceOnly(left)) {
+ return null;
+ }
String right = operand.get(1).accept(this);
+ if (StringUtils.isNullOrWhitespaceOnly(right)) {
+ return null;
+ }
return String.format("(%s %s %s)", left, operator, right);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]