leonardBang commented on a change in pull request #13800:
URL: https://github.com/apache/flink/pull/13800#discussion_r547108176
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
##########
@@ -244,6 +244,7 @@ object FlinkStreamRuleSets {
PushProjectIntoLegacyTableSourceScanRule.INSTANCE,
PushFilterIntoTableSourceScanRule.INSTANCE,
PushFilterIntoLegacyTableSourceScanRule.INSTANCE,
+ PushLimitIntoTableSourceScanRule.INSTANCE,
Review comment:
I think this rule should have been existed after support limit pushdown
in planner, why we add it util now?
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
##########
@@ -148,14 +150,15 @@ default String getDeleteStatement(String tableName,
String[] conditionFields) {
/**
* Get select fields statement by condition fields. Default use SELECT.
*/
- default String getSelectFromStatement(String tableName, String[]
selectFields, String[] conditionFields) {
+ default String getSelectFromStatement(String tableName, String[]
selectFields, String[] conditionFields, long limit) {
String selectExpressions = Arrays.stream(selectFields)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String fieldExpressions = Arrays.stream(conditionFields)
.map(f -> format("%s = :%s",
quoteIdentifier(f), f))
.collect(Collectors.joining(" AND "));
return "SELECT " + selectExpressions + " FROM " +
- quoteIdentifier(tableName) +
(conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
+ quoteIdentifier(tableName) +
(conditionFields.length > 0 ? " WHERE " + fieldExpressions : "") +
+ (limit >= 0 ? " " + getLimit(limit) : "");
Review comment:
move `limit >= 0 ? " " + getLimit(limit) : ""` to `getLimit(long
limit)` internal?
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
##########
@@ -156,11 +163,17 @@ public boolean equals(Object o) {
Objects.equals(readOptions, that.readOptions) &&
Objects.equals(lookupOptions, that.lookupOptions) &&
Objects.equals(physicalSchema, that.physicalSchema) &&
- Objects.equals(dialectName, that.dialectName);
+ Objects.equals(dialectName, that.dialectName) &&
+ Objects.equals(limit, that.limit);
}
@Override
public int hashCode() {
return Objects.hash(options, readOptions, lookupOptions,
physicalSchema, dialectName);
Review comment:
please also add missed `limit `
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
##########
@@ -148,14 +150,15 @@ default String getDeleteStatement(String tableName,
String[] conditionFields) {
/**
* Get select fields statement by condition fields. Default use SELECT.
*/
- default String getSelectFromStatement(String tableName, String[]
selectFields, String[] conditionFields) {
+ default String getSelectFromStatement(String tableName, String[]
selectFields, String[] conditionFields, long limit) {
String selectExpressions = Arrays.stream(selectFields)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String fieldExpressions = Arrays.stream(conditionFields)
.map(f -> format("%s = :%s",
quoteIdentifier(f), f))
.collect(Collectors.joining(" AND "));
return "SELECT " + selectExpressions + " FROM " +
- quoteIdentifier(tableName) +
(conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
+ quoteIdentifier(tableName) +
(conditionFields.length > 0 ? " WHERE " + fieldExpressions : "") +
+ (limit >= 0 ? " " + getLimit(limit) : "");
Review comment:
how about move `limit >= 0 ? " " + getLimit(limit) : ""` to
`getLimit(long limit)` internal?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]