wuchong commented on a change in pull request #13800:
URL: https://github.com/apache/flink/pull/13800#discussion_r548364189
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/DerbyDialect.java
##########
@@ -66,6 +66,15 @@ public String dialectName() {
return "Derby";
}
+ @Override
+ public String getLimitStatement(long limit) {
+ if (limit >= 0) {
+ return String.format(" fetch first %d rows only",
limit);
+ } else {
+ return "";
Review comment:
The else branch should also handled by framework. The `limit` parameter
should always be a valid limit.
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/DerbyDialect.java
##########
@@ -66,6 +66,15 @@ public String dialectName() {
return "Derby";
}
+ @Override
+ public String getLimitStatement(long limit) {
+ if (limit >= 0) {
+ return String.format(" fetch first %d rows only",
limit);
Review comment:
1. Could you remove the the begging space? The space should be handled
by framework. It's easy to forget to add the space for new dialects.
2. Please use upper case for the SQL keywords.
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/DerbyDialect.java
##########
@@ -66,6 +66,15 @@ public String dialectName() {
return "Derby";
}
+ @Override
+ public String getLimitStatement(long limit) {
+ if (limit >= 0) {
+ return String.format(" fetch first %d rows only",
limit);
+ } else {
+ return "";
Review comment:
The else branch should also handled by framework. The `limit` parameter
should always be a valid limit.
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
##########
@@ -96,8 +102,9 @@ public ScanRuntimeProvider
getScanRuntimeProvider(ScanContext runtimeProviderCon
builder.setFetchSize(readOptions.getFetchSize());
}
final JdbcDialect dialect = options.getDialect();
- String query = dialect.getSelectFromStatement(
- options.getTableName(), physicalSchema.getFieldNames(),
new String[0]);
+ String query;
+ query = dialect.getSelectFromStatement(
+ options.getTableName(),
physicalSchema.getFieldNames(), new String[0], limit);
if (readOptions.getPartitionColumnName().isPresent()) {
Review comment:
Could you add a test that the source is configured with partition
columns and submit a select query with limit clause. I'm wondering currently
they can't work together well.
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
##########
@@ -57,6 +57,8 @@
*/
JdbcRowConverter getRowConverter(RowType rowType);
+ String getLimitStatement(long limit);
Review comment:
Please add javadoc on the method, including the description for limit
parameter, e.g. the value range.
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
##########
@@ -96,8 +102,9 @@ public ScanRuntimeProvider
getScanRuntimeProvider(ScanContext runtimeProviderCon
builder.setFetchSize(readOptions.getFetchSize());
}
final JdbcDialect dialect = options.getDialect();
- String query = dialect.getSelectFromStatement(
- options.getTableName(), physicalSchema.getFieldNames(),
new String[0]);
+ String query;
+ query = dialect.getSelectFromStatement(
Review comment:
Why separate the initialization into another line?
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
##########
@@ -148,14 +150,15 @@ default String getDeleteStatement(String tableName,
String[] conditionFields) {
/**
* Get select fields statement by condition fields. Default use SELECT.
*/
- default String getSelectFromStatement(String tableName, String[]
selectFields, String[] conditionFields) {
+ default String getSelectFromStatement(String tableName, String[]
selectFields, String[] conditionFields, long limit) {
String selectExpressions = Arrays.stream(selectFields)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String fieldExpressions = Arrays.stream(conditionFields)
.map(f -> format("%s = :%s",
quoteIdentifier(f), f))
.collect(Collectors.joining(" AND "));
return "SELECT " + selectExpressions + " FROM " +
- quoteIdentifier(tableName) +
(conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
+ quoteIdentifier(tableName) +
(conditionFields.length > 0 ? " WHERE " + fieldExpressions : "") +
+ getLimitStatement(limit);
Review comment:
Personally, I don't like the current design of the new interface.
- it couples the `limit` to the select statement which makes it impossible
to inject additional where clause.
- dialects need to implement/take care 2 interfaces about limit.
- many invokers have to pass an invalid limit.
In my opinion, a better solution would be separate them, by having a new
interface for limit:
```java
String getLimitClause(long limit);
```
And concat the limit caluse with the select statement where needs the limit
clause.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]