This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-webui.git
The following commit(s) were added to refs/heads/main by this push:
new e5092b2d [Improvement] Support query limit (#502)
e5092b2d is described below
commit e5092b2d8f67ed873497983f05d03572806c1f04
Author: s7monk <[email protected]>
AuthorDate: Thu Jul 11 16:43:13 2024 +0800
[Improvement] Support query limit (#502)
---
.../web/engine/flink/common/executor/Executor.java | 3 +-
.../flink/common/parser/CustomSqlParser.java | 86 +++++++++++++++
.../flink/common/parser/CustomSqlParserTest.java | 54 ++++++++++
.../flink/common/parser/StatementsConstant.java | 117 +++++++++++++++++++++
.../gateway/executor/FlinkSqlGatewayExecutor.java | 24 +++--
.../executor/FlinkSqlGatewayExecutorTest.java | 21 ++--
paimon-web-engine/paimon-web-engine-flink/pom.xml | 1 -
paimon-web-server/pom.xml | 7 --
.../paimon/web/server/data/dto/JobSubmitDTO.java | 2 +
.../web/server/service/impl/JobServiceImpl.java | 5 +-
10 files changed, 291 insertions(+), 29 deletions(-)
diff --git
a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/executor/Executor.java
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/executor/Executor.java
index 60a754f6..f3e8bece 100644
---
a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/executor/Executor.java
+++
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/executor/Executor.java
@@ -31,10 +31,11 @@ public interface Executor {
* Executes an SQL statement.
*
* @param statement The SQL statement to be executed.
+ * @param maxRows The maximum number of rows to return in the result set.
* @return SubmitResult containing information about the execution result.
* @throws Exception if there is an error executing the SQL statement.
*/
- ExecutionResult executeSql(String statement) throws Exception;
+ ExecutionResult executeSql(String statement, int maxRows) throws Exception;
/**
* Fetches the results of a previously submitted SQL statement execution.
diff --git
a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/parser/CustomSqlParser.java
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/parser/CustomSqlParser.java
new file mode 100644
index 00000000..395a7efc
--- /dev/null
+++
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/parser/CustomSqlParser.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.engine.flink.common.parser;
+
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
+import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
+
+/** CustomSqlParser to parse Sql list. */
+public class CustomSqlParser {
+
+ private static final SqlParser.Config config;
+ private final SqlParser parser;
+ private final int limit;
+
+ static {
+ config =
+ SqlParser.config()
+ .withParserFactory(FlinkSqlParserImpl.FACTORY)
+ .withConformance(FlinkSqlConformance.DEFAULT)
+ .withLex(Lex.JAVA)
+ .withIdentifierMaxLength(256);
+ }
+
+ public CustomSqlParser(String sql, int limit) {
+ this.parser = SqlParser.create(sql, config);
+ this.limit = limit;
+ }
+
+ public SqlNodeList parseStmtList() throws SqlParseException {
+ SqlNodeList nodeList = parser.parseStmtList();
+ for (SqlNode node : nodeList) {
+ if (node instanceof SqlSelect) {
+ SqlSelect select = (SqlSelect) node;
+ if (!hasAggregateOrGroupBy(select) && select.getFetch() ==
null) {
+ SqlLiteral sqlLiteral =
+
SqlLiteral.createExactNumeric(String.valueOf(limit), SqlParserPos.ZERO);
+ select.setFetch(sqlLiteral);
+ }
+ }
+ }
+ return nodeList;
+ }
+
+ private boolean hasAggregateOrGroupBy(SqlSelect select) {
+ if (select.getGroup() != null && !select.getGroup().isEmpty()) {
+ return true;
+ }
+ return containsComplexOperations(select.getSelectList());
+ }
+
+ private boolean containsComplexOperations(SqlNodeList nodes) {
+ if (nodes != null) {
+ for (SqlNode node : nodes) {
+ if (!(node instanceof SqlIdentifier)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+}
diff --git
a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/test/java/org/apache/paimon/web/engine/flink/common/parser/CustomSqlParserTest.java
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/test/java/org/apache/paimon/web/engine/flink/common/parser/CustomSqlParserTest.java
new file mode 100644
index 00000000..967c73a2
--- /dev/null
+++
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/test/java/org/apache/paimon/web/engine/flink/common/parser/CustomSqlParserTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.engine.flink.common.parser;
+
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.junit.jupiter.api.Test;
+
+import static
org.apache.paimon.web.engine.flink.common.parser.StatementsConstant.statement1;
+import static
org.apache.paimon.web.engine.flink.common.parser.StatementsConstant.statement2;
+import static
org.apache.paimon.web.engine.flink.common.parser.StatementsConstant.statement3;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests of {@link CustomSqlParser}. */
+public class CustomSqlParserTest {
+
+ @Test
+ public void testParse() throws SqlParseException {
+ CustomSqlParser customSqlParser = new CustomSqlParser(statement1, 0);
+ SqlNodeList sqlNodeList = customSqlParser.parseStmtList();
+ assertThat(sqlNodeList.size()).isEqualTo(5);
+ }
+
+ @Test
+ public void testSelectLimit() throws SqlParseException {
+ CustomSqlParser customSqlParser = new CustomSqlParser(statement2, 500);
+ String actual = customSqlParser.parseStmtList().get(2).toString();
+ assertThat(actual)
+ .isEqualToIgnoringWhitespace("SELECT * FROM `t_order` FETCH
NEXT 500 ROWS ONLY");
+ }
+
+ @Test
+ public void testSelectWithoutLimit() throws SqlParseException {
+ CustomSqlParser customSqlParser = new CustomSqlParser(statement3, 200);
+ String actual = customSqlParser.parseStmtList().get(2).toString();
+ assertThat(actual).isEqualToIgnoringWhitespace("SELECT COUNT(*) FROM
`t_order`");
+ }
+}
diff --git
a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/test/java/org/apache/paimon/web/engine/flink/common/parser/StatementsConstant.java
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/test/java/org/apache/paimon/web/engine/flink/common/parser/StatementsConstant.java
new file mode 100644
index 00000000..e029c4ba
--- /dev/null
+++
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/test/java/org/apache/paimon/web/engine/flink/common/parser/StatementsConstant.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.engine.flink.common.parser;
+
+/** Statements constant. */
+public class StatementsConstant {
+
+ public static String statement1 =
+ "DROP TABLE IF EXISTS t_order;\n"
+ + "CREATE TABLE IF NOT EXISTS t_order(\n"
+ + " --order id\n"
+ + " `order_id` BIGINT,\n"
+ + " --product\n"
+ + " `product` BIGINT,\n"
+ + " --amount\n"
+ + " `amount` BIGINT,\n"
+ + " --payment time\n"
+ + " `order_time` as CAST(CURRENT_TIMESTAMP AS
TIMESTAMP(3)),\n"
+ + " --WATERMARK\n"
+ + " WATERMARK FOR order_time AS order_time-INTERVAL '2'
SECOND\n"
+ + ") WITH(\n"
+ + " 'connector' = 'datagen',\n"
+ + " 'rows-per-second' = '1',\n"
+ + " 'fields.order_id.min' = '1',\n"
+ + " 'fields.order_id.max' = '2',\n"
+ + " 'fields.amount.min' = '1',\n"
+ + " 'fields.amount.max' = '10',\n"
+ + " 'fields.product.min' = '1',\n"
+ + " 'fields.product.max' = '2'\n"
+ + ");\n"
+ + "-- SELECT * FROM t_order LIMIT 10;\n"
+ + "DROP TABLE IF EXISTS sink_table;\n"
+ + "CREATE TABLE IF NOT EXISTS sink_table(\n"
+ + " --product\n"
+ + " `product` BIGINT,\n"
+ + " --amount\n"
+ + " `amount` BIGINT,\n"
+ + " --payment time\n"
+ + " `order_time` TIMESTAMP(3),\n"
+ + " `one_minute_sum` BIGINT\n"
+ + ") WITH('connector' = 'print');\n"
+ + "\n"
+ + "INSERT INTO\n"
+ + " sink_table\n"
+ + "SELECT\n"
+ + " product,\n"
+ + " amount,\n"
+ + " order_time,\n"
+ + " 0 as one_minute_sum\n"
+ + "FROM\n"
+ + " t_order;";
+
+ public static String statement2 =
+ "DROP TABLE IF EXISTS t_order;\n"
+ + "CREATE TABLE IF NOT EXISTS t_order(\n"
+ + " --order id\n"
+ + " `order_id` BIGINT,\n"
+ + " --product\n"
+ + " `product` BIGINT,\n"
+ + " --amount\n"
+ + " `amount` BIGINT,\n"
+ + " --payment time\n"
+ + " `order_time` as CAST(CURRENT_TIMESTAMP AS
TIMESTAMP(3)),\n"
+ + " --WATERMARK\n"
+ + " WATERMARK FOR order_time AS order_time-INTERVAL '2'
SECOND\n"
+ + ") WITH(\n"
+ + " 'connector' = 'datagen',\n"
+ + " 'rows-per-second' = '1',\n"
+ + " 'fields.order_id.min' = '1',\n"
+ + " 'fields.order_id.max' = '2',\n"
+ + " 'fields.amount.min' = '1',\n"
+ + " 'fields.amount.max' = '10',\n"
+ + " 'fields.product.min' = '1',\n"
+ + " 'fields.product.max' = '2'\n"
+ + ");\n"
+ + "SELECT * FROM t_order;";
+ public static String statement3 =
+ "DROP TABLE IF EXISTS t_order;\n"
+ + "CREATE TABLE IF NOT EXISTS t_order(\n"
+ + " --order id\n"
+ + " `order_id` BIGINT,\n"
+ + " --product\n"
+ + " `product` BIGINT,\n"
+ + " --amount\n"
+ + " `amount` BIGINT,\n"
+ + " --payment time\n"
+ + " `order_time` as CAST(CURRENT_TIMESTAMP AS
TIMESTAMP(3)),\n"
+ + " --WATERMARK\n"
+ + " WATERMARK FOR order_time AS order_time-INTERVAL '2'
SECOND\n"
+ + ") WITH(\n"
+ + " 'connector' = 'datagen',\n"
+ + " 'rows-per-second' = '1',\n"
+ + " 'fields.order_id.min' = '1',\n"
+ + " 'fields.order_id.max' = '2',\n"
+ + " 'fields.amount.min' = '1',\n"
+ + " 'fields.amount.max' = '10',\n"
+ + " 'fields.product.min' = '1',\n"
+ + " 'fields.product.max' = '2'\n"
+ + ");\n"
+ + "SELECT count(*) FROM t_order;";
+}
diff --git
a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/executor/FlinkSqlGatewayExecutor.java
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/executor/FlinkSqlGatewayExecutor.java
index 12c5b4de..ab58e471 100644
---
a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/executor/FlinkSqlGatewayExecutor.java
+++
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/executor/FlinkSqlGatewayExecutor.java
@@ -20,7 +20,7 @@ package
org.apache.paimon.web.engine.flink.sql.gateway.executor;
import org.apache.paimon.web.engine.flink.common.executor.Executor;
import
org.apache.paimon.web.engine.flink.common.operation.FlinkSqlOperationType;
-import org.apache.paimon.web.engine.flink.common.parser.StatementParser;
+import org.apache.paimon.web.engine.flink.common.parser.CustomSqlParser;
import org.apache.paimon.web.engine.flink.common.result.ExecutionResult;
import org.apache.paimon.web.engine.flink.common.result.FetchResultParams;
import org.apache.paimon.web.engine.flink.sql.gateway.client.SqlGatewayClient;
@@ -28,6 +28,8 @@ import
org.apache.paimon.web.engine.flink.sql.gateway.model.SessionEntity;
import org.apache.paimon.web.engine.flink.sql.gateway.utils.CollectResultUtil;
import
org.apache.paimon.web.engine.flink.sql.gateway.utils.FlinkSqlStatementSetBuilder;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
import org.apache.flink.table.gateway.api.results.ResultSet;
import
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
@@ -53,16 +55,18 @@ public class FlinkSqlGatewayExecutor implements Executor {
}
@Override
- public ExecutionResult executeSql(String multiStatement) throws Exception {
- String[] statements = StatementParser.parse(multiStatement);
+ public ExecutionResult executeSql(String multiStatement, int maxRows)
throws Exception {
+ CustomSqlParser customSqlParser = new CustomSqlParser(multiStatement,
maxRows);
+ SqlNodeList sqlNodeList = customSqlParser.parseStmtList();
List<String> insertStatements = new ArrayList<>();
ExecutionResult executionResult = null;
- for (String statement : statements) {
- FlinkSqlOperationType operationType =
FlinkSqlOperationType.getOperationType(statement);
+ for (SqlNode sqlNode : sqlNodeList) {
+ FlinkSqlOperationType operationType =
+ FlinkSqlOperationType.getOperationType(sqlNode.toString());
if (operationType == null) {
- String operationTypeString =
extractSqlOperationType(statement);
+ String operationTypeString =
extractSqlOperationType(sqlNode.toString());
throw new UnsupportedOperationException(
"Unsupported operation type: " + operationTypeString);
}
@@ -73,17 +77,17 @@ public class FlinkSqlGatewayExecutor implements Executor {
throw new UnsupportedOperationException(
"Cannot execute DQL statement with pending
INSERT statements.");
}
- executionResult = executeDqlStatement(statement,
operationType);
+ executionResult = executeDqlStatement(sqlNode.toString(),
operationType);
break;
case DML:
if
(operationType.getType().equals(FlinkSqlOperationType.INSERT.getType())) {
- insertStatements.add(statement);
+ insertStatements.add(sqlNode.toString());
} else {
- executionResult = executeDmlStatement(statement);
+ executionResult =
executeDmlStatement(sqlNode.toString());
}
break;
default:
- client.executeStatement(session.getSessionId(), statement,
null);
+ client.executeStatement(session.getSessionId(),
sqlNode.toString(), null);
break;
}
diff --git
a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/test/java/org/apache/paimon/web/engine/flink/sql/gataway/executor/FlinkSqlGatewayExecutorTest.java
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/test/java/org/apache/paimon/web/engine/flink/sql/gataway/executor/FlinkSqlGatewayExecutorTest.java
index 42620af7..69a6d8a8 100644
---
a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/test/java/org/apache/paimon/web/engine/flink/sql/gataway/executor/FlinkSqlGatewayExecutorTest.java
+++
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/test/java/org/apache/paimon/web/engine/flink/sql/gataway/executor/FlinkSqlGatewayExecutorTest.java
@@ -25,6 +25,7 @@ import
org.apache.paimon.web.engine.flink.sql.gateway.client.SqlGatewayClient;
import
org.apache.paimon.web.engine.flink.sql.gateway.executor.FlinkSqlGatewayExecutor;
import org.apache.paimon.web.engine.flink.sql.gateway.model.SessionEntity;
+import org.apache.calcite.sql.parser.SqlParseException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -52,21 +53,23 @@ public class FlinkSqlGatewayExecutorTest extends TestBase {
@Test
public void testExecuteSql() throws Exception {
- ExecutionResult executionResult =
executor.executeSql(StatementsConstant.statement);
+ ExecutionResult executionResult =
executor.executeSql(StatementsConstant.statement, 0);
assertNotNull(executionResult);
assertNotNull(executionResult.getJobId());
}
@Test
public void testExecuteStatementSetSql() throws Exception {
- ExecutionResult executionResult =
executor.executeSql(StatementsConstant.statementSetSql);
+ ExecutionResult executionResult =
+ executor.executeSql(StatementsConstant.statementSetSql, 0);
assertNotNull(executionResult);
assertNotNull(executionResult.getJobId());
}
@Test
public void testExecutorStatementWithoutResult() throws Exception {
- ExecutionResult executionResult =
executor.executeSql(StatementsConstant.createStatement);
+ ExecutionResult executionResult =
+ executor.executeSql(StatementsConstant.createStatement, 0);
assertNull(executionResult);
}
@@ -77,7 +80,8 @@ public class FlinkSqlGatewayExecutorTest extends TestBase {
UnsupportedOperationException.class,
() -> {
executor.executeSql(
-
StatementsConstant.selectStatementWithPendingInsertStatements);
+
StatementsConstant.selectStatementWithPendingInsertStatements,
+ 0);
});
String expectedMessage = "Cannot execute DQL statement with pending
INSERT statements.";
String actualMessage = exception.getMessage();
@@ -88,18 +92,19 @@ public class FlinkSqlGatewayExecutorTest extends TestBase {
public void testExecuteBadSqlStatement() {
Exception exception =
assertThrows(
- UnsupportedOperationException.class,
+ SqlParseException.class,
() -> {
-
executor.executeSql(StatementsConstant.badStatement);
+
executor.executeSql(StatementsConstant.badStatement, 0);
});
- String expectedMessage = "Unsupported operation type: CREAT";
+ String expectedMessage = "Non-query expression encountered in illegal
context";
String actualMessage = exception.getMessage();
assertTrue(actualMessage.contains(expectedMessage));
}
@Test
public void testFetchResults() throws Exception {
- ExecutionResult executionResult =
executor.executeSql(StatementsConstant.selectStatement);
+ ExecutionResult executionResult =
+ executor.executeSql(StatementsConstant.selectStatement, 10);
assertNotNull(executionResult);
assertNotNull(executionResult.getJobId());
assertNotNull(executionResult.getSubmitId());
diff --git a/paimon-web-engine/paimon-web-engine-flink/pom.xml
b/paimon-web-engine/paimon-web-engine-flink/pom.xml
index 84055cbd..0b4ee57f 100644
--- a/paimon-web-engine/paimon-web-engine-flink/pom.xml
+++ b/paimon-web-engine/paimon-web-engine-flink/pom.xml
@@ -88,7 +88,6 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.version}</artifactId>
<version>${flink.version}</version>
- <scope>provided</scope>
</dependency>
<dependency>
diff --git a/paimon-web-server/pom.xml b/paimon-web-server/pom.xml
index 599ee71c..c68b50e7 100644
--- a/paimon-web-server/pom.xml
+++ b/paimon-web-server/pom.xml
@@ -248,13 +248,6 @@ under the License.
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_${scala.version}</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- </dependency>
-
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
diff --git
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/JobSubmitDTO.java
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/JobSubmitDTO.java
index 83e4c384..b3c4d6f7 100644
---
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/JobSubmitDTO.java
+++
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/JobSubmitDTO.java
@@ -39,4 +39,6 @@ public class JobSubmitDTO {
private Map<String, String> config;
private String statements;
+
+ private int maxRows;
}
diff --git
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
index ceb46c73..2df400e2 100644
---
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
+++
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
@@ -142,7 +142,8 @@ public class JobServiceImpl extends ServiceImpl<JobMapper,
JobInfo> implements J
String.format(
"Starting to submit %s %s job...",
jobSubmitDTO.getTaskType(), executeMode));
- ExecutionResult executionResult =
executor.executeSql(jobSubmitDTO.getStatements());
+ ExecutionResult executionResult =
+ executor.executeSql(jobSubmitDTO.getStatements(),
jobSubmitDTO.getMaxRows());
if (StringUtils.isNotBlank(executionResult.getJobId())) {
JobInfo jobInfo = buildJobInfo(executionResult, jobSubmitDTO);
this.save(jobInfo);
@@ -318,7 +319,7 @@ public class JobServiceImpl extends ServiceImpl<JobMapper,
JobInfo> implements J
return;
}
- ExecutionResult executionResult =
executor.executeSql(SHOW_JOBS_STATEMENT);
+ ExecutionResult executionResult =
executor.executeSql(SHOW_JOBS_STATEMENT, 0);
List<Map<String, Object>> jobsData =
executionResult.getData();
for (Map<String, Object> jobData : jobsData) {
String jobId = (String) jobData.get("job id");