This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new ac7be62 [Improvement] SupportsFilterPushDown and ProjectionPushDown
in DorisSource (#348)
ac7be62 is described below
commit ac7be62fb42693d11803cdf2f5d3c6ba50b0087a
Author: wudi <[email protected]>
AuthorDate: Fri Mar 22 15:43:41 2024 +0800
[Improvement] SupportsFilterPushDown and ProjectionPushDown in DorisSource
(#348)
---
.../doris/flink/catalog/DorisCatalogFactory.java | 4 -
.../org/apache/doris/flink/rest/RestService.java | 2 +-
.../doris/flink/table/DorisConfigOptions.java | 12 ---
.../flink/table/DorisDynamicTableFactory.java | 9 +-
.../doris/flink/table/DorisDynamicTableSource.java | 63 ++++++++++--
.../doris/flink/table/DorisExpressionVisitor.java | 109 +++++++++++++++++++++
.../flink/table/DorisDynamicTableSourceTest.java | 9 +-
7 files changed, 172 insertions(+), 36 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
index d837395..06bbbc1 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
@@ -33,8 +33,6 @@ import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_BATCH_SIZE;
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_EXEC_MEM_LIMIT;
-import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_FILTER_QUERY;
-import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_READ_FIELD;
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS;
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_QUERY_TIMEOUT_S;
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ_TIMEOUT_MS;
@@ -86,8 +84,6 @@ public class DorisCatalogFactory implements CatalogFactory {
options.add(USERNAME);
options.add(PASSWORD);
- options.add(DORIS_READ_FIELD);
- options.add(DORIS_FILTER_QUERY);
options.add(DORIS_TABLET_SIZE);
options.add(DORIS_REQUEST_CONNECT_TIMEOUT_MS);
options.add(DORIS_REQUEST_READ_TIMEOUT_MS);
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index fcf31c3..5ac139e 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -605,7 +605,7 @@ public class RestService implements Serializable {
if (!StringUtils.isEmpty(readOptions.getFilterQuery())) {
sql += " where " + readOptions.getFilterQuery();
}
- logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);
+ logger.info("Query SQL Sending to Doris FE is: '{}'.", sql);
HttpPost httpPost = new HttpPost(getUriStr(options, logger) +
QUERY_PLAN);
String entity = "{\"sql\": \"" + sql + "\"}";
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index b91b04b..6d74bf8 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -80,18 +80,6 @@ public class DorisConfigOptions {
"Use automatic redirection of fe without
explicitly obtaining the be list");
// source config options
- public static final ConfigOption<String> DORIS_READ_FIELD =
- ConfigOptions.key("doris.read.field")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "List of column names in the Doris table,
separated by commas");
- public static final ConfigOption<String> DORIS_FILTER_QUERY =
- ConfigOptions.key("doris.filter.query")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "Filter expression of the query, which is
transparently transmitted to Doris. Doris uses this expression to complete
source-side data filtering");
public static final ConfigOption<Integer> DORIS_TABLET_SIZE =
ConfigOptions.key("doris.request.tablet.size")
.intType()
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index f327b9c..a276696 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -45,8 +45,6 @@ import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_BATCH_SIZE;
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_EXEC_MEM_LIMIT;
-import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_FILTER_QUERY;
-import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_READ_FIELD;
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS;
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_QUERY_TIMEOUT_S;
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ_TIMEOUT_MS;
@@ -118,8 +116,6 @@ public final class DorisDynamicTableFactory
options.add(JDBC_URL);
options.add(AUTO_REDIRECT);
- options.add(DORIS_READ_FIELD);
- options.add(DORIS_FILTER_QUERY);
options.add(DORIS_TABLET_SIZE);
options.add(DORIS_REQUEST_CONNECT_TIMEOUT_MS);
options.add(DORIS_REQUEST_READ_TIMEOUT_MS);
@@ -181,7 +177,8 @@ public final class DorisDynamicTableFactory
getDorisOptions(helper.getOptions()),
getDorisReadOptions(helper.getOptions()),
getDorisLookupOptions(helper.getOptions()),
- physicalSchema);
+ physicalSchema,
+ context.getPhysicalRowDataType());
}
private DorisOptions getDorisOptions(ReadableConfig readableConfig) {
@@ -205,8 +202,6 @@ public final class DorisDynamicTableFactory
builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
.setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
.setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT).getBytes())
- .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
- .setReadFields(readableConfig.get(DORIS_READ_FIELD))
.setRequestQueryTimeoutS(
(int)
readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S).getSeconds())
.setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 9057f1f..6b09735 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -21,6 +21,7 @@ import
org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
@@ -31,8 +32,10 @@ import
org.apache.flink.table.connector.source.TableFunctionProvider;
import
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.StringUtils;
import org.apache.doris.flink.cfg.DorisLookupOptions;
import org.apache.doris.flink.cfg.DorisOptions;
@@ -45,6 +48,7 @@ import org.apache.doris.flink.source.DorisSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
@@ -58,23 +62,31 @@ import java.util.stream.Collectors;
* SourceFunction} and its {@link DeserializationSchema} for runtime. Both
instances are
* parameterized to return internal data structures (i.e. {@link RowData}).
*/
-public final class DorisDynamicTableSource implements ScanTableSource,
LookupTableSource {
+public final class DorisDynamicTableSource
+ implements ScanTableSource,
+ LookupTableSource,
+ SupportsFilterPushDown,
+ SupportsProjectionPushDown {
private static final Logger LOG =
LoggerFactory.getLogger(DorisDynamicTableSource.class);
private final DorisOptions options;
private final DorisReadOptions readOptions;
private DorisLookupOptions lookupOptions;
private TableSchema physicalSchema;
+ private List<String> resolvedFilterQuery = new ArrayList<>();
+ private DataType physicalRowDataType;
public DorisDynamicTableSource(
DorisOptions options,
DorisReadOptions readOptions,
DorisLookupOptions lookupOptions,
- TableSchema physicalSchema) {
+ TableSchema physicalSchema,
+ DataType physicalRowDataType) {
this.options = options;
this.lookupOptions = lookupOptions;
this.readOptions = readOptions;
this.physicalSchema = physicalSchema;
+ this.physicalRowDataType = physicalRowDataType;
}
public DorisDynamicTableSource(
@@ -93,8 +105,11 @@ public final class DorisDynamicTableSource implements
ScanTableSource, LookupTab
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext
runtimeProviderContext) {
+ String filterQuery =
resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
+ readOptions.setFilterQuery(filterQuery);
+ String[] selectFields =
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]);
readOptions.setReadFields(
- Arrays.stream(physicalSchema.getFieldNames())
+ Arrays.stream(selectFields)
.map(item -> String.format("`%s`",
item.trim().replace("`", "")))
.collect(Collectors.joining(", ")));
@@ -114,7 +129,7 @@ public final class DorisDynamicTableSource implements
ScanTableSource, LookupTab
.setTableIdentifier(options.getTableIdentifier())
.setPartitions(dorisPartitions)
.setReadOptions(readOptions)
- .setRowType((RowType)
physicalSchema.toRowDataType().getLogicalType());
+ .setRowType((RowType)
physicalRowDataType.getLogicalType());
return InputFormatProvider.of(builder.build());
} else {
// Read data using the interface of the FLIP-27 specification
@@ -124,10 +139,7 @@ public final class DorisDynamicTableSource implements
ScanTableSource, LookupTab
.setDorisOptions(options)
.setDeserializer(
new RowDataDeserializationSchema(
- (RowType)
- physicalSchema
- .toRowDataType()
- .getLogicalType()))
+ (RowType)
physicalRowDataType.getLogicalType()))
.build();
return SourceProvider.of(build);
}
@@ -135,7 +147,6 @@ public final class DorisDynamicTableSource implements
ScanTableSource, LookupTab
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext
context) {
- DataType physicalRowDataType = physicalSchema.toRowDataType();
String[] keyNames = new String[context.getKeys().length];
int[] keyIndexs = new int[context.getKeys().length];
for (int i = 0; i < keyNames.length; i++) {
@@ -168,11 +179,43 @@ public final class DorisDynamicTableSource implements
ScanTableSource, LookupTab
@Override
public DynamicTableSource copy() {
- return new DorisDynamicTableSource(options, readOptions,
lookupOptions, physicalSchema);
+ DorisDynamicTableSource newSource =
+ new DorisDynamicTableSource(
+ options, readOptions, lookupOptions, physicalSchema,
physicalRowDataType);
+ newSource.resolvedFilterQuery = new
ArrayList<>(this.resolvedFilterQuery);
+ return newSource;
}
@Override
public String asSummaryString() {
return "Doris Table Source";
}
+
+ @Override
+ public Result applyFilters(List<ResolvedExpression> filters) {
+ List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+ List<ResolvedExpression> remainingFilters = new ArrayList<>();
+
+ DorisExpressionVisitor expressionVisitor = new
DorisExpressionVisitor();
+ for (ResolvedExpression filter : filters) {
+ String filterQuery = filter.accept(expressionVisitor);
+ if (!StringUtils.isNullOrWhitespaceOnly(filterQuery)) {
+ acceptedFilters.add(filter);
+ this.resolvedFilterQuery.add(filterQuery);
+ } else {
+ remainingFilters.add(filter);
+ }
+ }
+ return Result.of(acceptedFilters, remainingFilters);
+ }
+
+ @Override
+ public boolean supportsNestedProjection() {
+ return false;
+ }
+
+ @Override
+ public void applyProjection(int[][] projectedFields, DataType
producedDataType) {
+ this.physicalRowDataType =
Projection.of(projectedFields).project(physicalRowDataType);
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java
new file mode 100644
index 0000000..3f327fe
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.table;
+
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.List;
+
+public class DorisExpressionVisitor implements ExpressionVisitor<String> {
+
+ @Override
+ public String visit(CallExpression call) {
+ if
(BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) {
+ return combineExpression("=", call.getResolvedChildren());
+ }
+ if
(BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) {
+ return combineExpression("<", call.getResolvedChildren());
+ }
+ if
(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition()))
{
+ return combineExpression("<=", call.getResolvedChildren());
+ }
+ if
(BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) {
+ return combineExpression(">", call.getResolvedChildren());
+ }
+ if
(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition()))
{
+ return combineExpression(">=", call.getResolvedChildren());
+ }
+ if
(BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) {
+ return combineExpression("<>", call.getResolvedChildren());
+ }
+ if
(BuiltInFunctionDefinitions.OR.equals(call.getFunctionDefinition())) {
+ return combineExpression("OR", call.getResolvedChildren());
+ }
+ if
(BuiltInFunctionDefinitions.AND.equals(call.getFunctionDefinition())) {
+ return combineExpression("AND", call.getResolvedChildren());
+ }
+ if
(BuiltInFunctionDefinitions.LIKE.equals(call.getFunctionDefinition())) {
+ return combineExpression("LIKE", call.getResolvedChildren());
+ }
+ if
(BuiltInFunctionDefinitions.IS_NULL.equals(call.getFunctionDefinition())) {
+ return combineLeftExpression("IS NULL",
call.getResolvedChildren().get(0));
+ }
+ if
(BuiltInFunctionDefinitions.IS_NOT_NULL.equals(call.getFunctionDefinition())) {
+ return combineLeftExpression("IS NOT NULL",
call.getResolvedChildren().get(0));
+ }
+ return null;
+ }
+
+ private String combineExpression(String operator, List<ResolvedExpression>
operand) {
+ String left = operand.get(0).accept(this);
+ String right = operand.get(1).accept(this);
+ return String.format("(%s %s %s)", left, operator, right);
+ }
+
+ private String combineLeftExpression(String operator, ResolvedExpression
operand) {
+ String left = operand.accept(this);
+ return String.format("(%s %s)", left, operator);
+ }
+
+ @Override
+ public String visit(ValueLiteralExpression valueLiteral) {
+ LogicalTypeRoot typeRoot =
valueLiteral.getOutputDataType().getLogicalType().getTypeRoot();
+ if (typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
+ ||
typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
+ || typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE)
+ || typeRoot.equals(LogicalTypeRoot.DATE)) {
+ return "'" + valueLiteral + "'";
+ }
+ return valueLiteral.toString();
+ }
+
+ @Override
+ public String visit(FieldReferenceExpression fieldReference) {
+ return fieldReference.getName();
+ }
+
+ @Override
+ public String visit(TypeLiteralExpression typeLiteral) {
+ return typeLiteral.getOutputDataType().toString();
+ }
+
+ @Override
+ public String visit(Expression expression) {
+ return null;
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableSourceTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableSourceTest.java
index ecc08f2..290e193 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableSourceTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableSourceTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.data.RowData;
import
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.doris.flink.cfg.DorisLookupOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.OptionUtils;
import org.apache.doris.flink.source.DorisSource;
@@ -48,7 +49,9 @@ public class DorisDynamicTableSourceTest {
new DorisDynamicTableSource(
OptionUtils.buildDorisOptions(),
builder.build(),
- TableSchema.fromResolvedSchema(FactoryMocks.SCHEMA));
+ DorisLookupOptions.builder().build(),
+ TableSchema.fromResolvedSchema(FactoryMocks.SCHEMA),
+ FactoryMocks.PHYSICAL_DATA_TYPE);
ScanTableSource.ScanRuntimeProvider provider =
actualDorisSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
assertDorisSource(provider);
@@ -60,7 +63,9 @@ public class DorisDynamicTableSourceTest {
new DorisDynamicTableSource(
OptionUtils.buildDorisOptions(),
OptionUtils.buildDorisReadOptions(),
- TableSchema.fromResolvedSchema(FactoryMocks.SCHEMA));
+ DorisLookupOptions.builder().build(),
+ TableSchema.fromResolvedSchema(FactoryMocks.SCHEMA),
+ FactoryMocks.PHYSICAL_DATA_TYPE);
ScanTableSource.ScanRuntimeProvider provider =
actualDorisSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
assertDorisSource(provider);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]