This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 3dd2be7 [Improve]Compatible with previous filter query options (#373)
3dd2be7 is described below
commit 3dd2be7a896a9cbfc252e98cd5fdfce87169d2a9
Author: wudi <[email protected]>
AuthorDate: Mon May 6 22:12:43 2024 +0800
[Improve]Compatible with previous filter query options (#373)
---
.../apache/doris/flink/table/DorisConfigOptions.java | 18 ++++++++++++++++++
.../doris/flink/table/DorisDynamicTableFactory.java | 6 ++++++
.../doris/flink/table/DorisDynamicTableSource.java | 19 ++++++++++++-------
3 files changed, 36 insertions(+), 7 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 63e550a..77349a6 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -80,6 +80,24 @@ public class DorisConfigOptions {
"Use automatic redirection of fe without
explicitly obtaining the be list");
// source config options
+ // This is compatible with the previous writing method.
+ // Some expressions may not be pushed down by FlinkSQL.
+ @Deprecated
+ public static final ConfigOption<String> DORIS_FILTER_QUERY =
+ ConfigOptions.key("doris.filter.query")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Filter expression of the query, which is
transparently transmitted to Doris. Doris uses this expression to complete
source-side data filtering");
+
+ @Deprecated
+ public static final ConfigOption<String> DORIS_READ_FIELD =
+ ConfigOptions.key("doris.read.field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "List of column names in the Doris table,
separated by commas");
+
public static final ConfigOption<Integer> DORIS_TABLET_SIZE =
ConfigOptions.key("doris.request.tablet.size")
.intType()
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index a276696..b198ca3 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -45,6 +45,8 @@ import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_BATCH_SIZE;
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_EXEC_MEM_LIMIT;
+import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_FILTER_QUERY;
+import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_READ_FIELD;
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS;
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_QUERY_TIMEOUT_S;
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ_TIMEOUT_MS;
@@ -116,6 +118,8 @@ public final class DorisDynamicTableFactory
options.add(JDBC_URL);
options.add(AUTO_REDIRECT);
+ options.add(DORIS_READ_FIELD);
+ options.add(DORIS_FILTER_QUERY);
options.add(DORIS_TABLET_SIZE);
options.add(DORIS_REQUEST_CONNECT_TIMEOUT_MS);
options.add(DORIS_REQUEST_READ_TIMEOUT_MS);
@@ -202,6 +206,8 @@ public final class DorisDynamicTableFactory
builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
.setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
.setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT).getBytes())
+ .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
+ .setReadFields(readableConfig.get(DORIS_READ_FIELD))
.setRequestQueryTimeoutS(
(int)
readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S).getSeconds())
.setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index b6cd1a7..32851d4 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -95,13 +95,18 @@ public final class DorisDynamicTableSource
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext
runtimeProviderContext) {
- String filterQuery =
resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
- readOptions.setFilterQuery(filterQuery);
- String[] selectFields =
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]);
- readOptions.setReadFields(
- Arrays.stream(selectFields)
- .map(item -> String.format("`%s`",
item.trim().replace("`", "")))
- .collect(Collectors.joining(", ")));
+ if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) {
+ String filterQuery =
resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
+ readOptions.setFilterQuery(filterQuery);
+ }
+ if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
+ String[] selectFields =
+ DataType.getFieldNames(physicalRowDataType).toArray(new
String[0]);
+ readOptions.setReadFields(
+ Arrays.stream(selectFields)
+ .map(item -> String.format("`%s`",
item.trim().replace("`", "")))
+ .collect(Collectors.joining(", ")));
+ }
if (readOptions.getUseOldApi()) {
List<PartitionDefinition> dorisPartitions;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]