fsk119 commented on code in PR #20609:
URL: https://github.com/apache/flink/pull/20609#discussion_r953549271
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointFactory.java:
##########
@@ -94,6 +92,10 @@ public Set<ConfigOption<?>> requiredOptions() {
@Override
public Set<ConfigOption<?>> optionalOptions() {
- return Collections.emptySet();
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(BIND_ADDRESS);
+ options.add(PORT);
+ options.add(BIND_PORT);
+ return options;
Review Comment:
Thanks for your illustration. +1 for the change. Could you describe the
behaviour in the config options.
##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java:
##########
@@ -267,41 +275,144 @@ public void submitSQLJob(SQLJobSubmission job, Duration
timeout) throws Exceptio
.setEnv(job.getEnvProcessor())
.runBlocking(timeout);
} else if (job.getClientMode() ==
SQLJobSubmission.ClientMode.HIVE_JDBC) {
- FutureTaskWithException<Void> future =
- new FutureTaskWithException<>(
- () -> {
- // register HiveDriver to the DriverManager
- Class.forName(HIVE_DRIVER);
- Map<String, String> configMap =
- GlobalConfiguration.loadConfiguration(
-
conf.toAbsolutePath().toString())
- .toMap();
- String host =
- configMap.getOrDefault(
-
"sql-gateway.endpoint.hiveserver2.host",
-
InetAddress.getByName("localhost")
- .getHostAddress());
- String port =
- configMap.getOrDefault(
-
"sql-gateway.endpoint.hiveserver2.thrift.port",
- "10000");
- try (Connection connection =
- DriverManager.getConnection(
- String.format(
-
"jdbc:hive2://%s:%s/default;auth=noSasl;",
- host, port));
- Statement statement =
connection.createStatement()) {
- for (String jar : job.getJars()) {
- statement.execute(String.format("ADD
JAR '%s'", jar));
- }
- for (String sql : job.getSqlLines()) {
- statement.execute(sql);
- }
+ // register HiveDriver to the DriverManager
+ Class.forName(HIVE_DRIVER);
+ Map<String, String> configMap =
+
GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()).toMap();
+ String host =
+ configMap.getOrDefault(
+ "sql-gateway.endpoint.hiveserver2.host",
+
InetAddress.getByName("localhost").getHostAddress());
+ String port =
+
configMap.getOrDefault("sql-gateway.endpoint.hiveserver2.thrift.port", "10000");
+
+ submitSQL(
+ () -> {
+ try (Connection connection =
+ DriverManager.getConnection(
+ String.format(
+
"jdbc:hive2://%s:%s/default;auth=noSasl;",
+ host, port));
+ Statement statement =
connection.createStatement()) {
+ for (String jar : job.getJars()) {
+ statement.execute(String.format("ADD JAR
'%s'", jar));
+ }
+ for (String sql : job.getSqlLines()) {
+ statement.execute(sql);
+ }
+ }
+ },
+ timeout);
+ } else if (job.getClientMode() == SQLJobSubmission.ClientMode.REST) {
+ Map<String, String> configMap =
+
GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()).toMap();
+ String host =
+ configMap.getOrDefault(
+ "sql-gateway.endpoint.rest.address",
+
InetAddress.getByName("localhost").getHostAddress());
+ String port =
configMap.getOrDefault("sql-gateway.endpoint.rest.port", "8083");
+ submitSQL(
+ () -> {
+ // Open a session
+ String sessionHandle = openSession(host, port);
+ List<String> sqlLines = new ArrayList<>();
+ for (String jar : job.getJars()) {
+ sqlLines.add(String.format("ADD JAR '%s'", jar));
+ }
+ sqlLines.addAll(job.getSqlLines());
+ // Execute statement
+ for (String sql : sqlLines) {
+ String operationHandle =
+ executeStatement(sessionHandle, sql, host,
port);
+ long nextToken = 0L;
+ while (true) {
+ // Fetch results
+ String results =
+ fetchResults(
+ sessionHandle,
+ operationHandle,
+ host,
+ port,
+ nextToken);
+ if (results.contains("EOS")) {
+ break;
+ } else if (results.contains("PAYLOAD")) {
+ ++nextToken;
}
- });
- new Thread(future).start();
- future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ }
+ }
+ },
+ timeout);
+ }
+ }
+
+ private void submitSQL(RunnableWithException command, Duration timeout)
throws Exception {
+ FutureTaskWithException<Void> future = new
FutureTaskWithException<>(command);
+ new Thread(future).start();
+ future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ private String openSession(String host, String port) throws Exception {
+ FormBody.Builder builder = new FormBody.Builder();
+ FormBody requestBody = builder.build();
+ final Request request =
+ new Request.Builder()
+ .post(requestBody)
+ .url(String.format("http://%s:%s/v1/sessions/", host,
port))
+ .build();
+ final JsonNode jsonNode = OBJECT_MAPPER.readTree(sendRequest(request));
+ return jsonNode.get("sessionHandle").asText();
+ }
+
+ private String executeStatement(String sessionHandle, String sql, String
host, String port)
+ throws Exception {
+ ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
+ objectNode.put("statement", sql);
+ RequestBody requestBody =
+ RequestBody.create(
+ MediaType.parse("application/json; charset=utf-8"),
+ OBJECT_MAPPER.writeValueAsString(objectNode));
+ final Request request =
+ new Request.Builder()
+ .post(requestBody)
+ .url(
+ String.format(
+
"http://%s:%s/v1/sessions/%s/statements",
+ host, port, sessionHandle))
+ .build();
+ final JsonNode jsonNode = OBJECT_MAPPER.readTree(sendRequest(request));
+ return jsonNode.get("operationHandle").asText();
+ }
+
+ private String fetchResults(
+ String sessionHandle, String operationHandle, String host, String
port, Long nextToken)
+ throws Exception {
+ final Request request =
+ new Request.Builder()
+ .get()
+ .url(
Review Comment:
Can you wrap this methods into a TestClient? I think it would be much
clearer.
##########
flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java:
##########
@@ -91,36 +94,52 @@ public static void beforeClass() {
@AfterClass
public static void afterClass() throws Exception {
- port.close();
+ hiveserver2Port.close();
+ restPort.close();
}
@Test
- public void testExecuteStatement() throws Exception {
- URL url =
SqlGatewayE2ECase.class.getClassLoader().getResource(GATEWAY_E2E_SQL);
- if (url == null) {
- throw new FileNotFoundException(GATEWAY_E2E_SQL);
- }
+ public void testHiveserver2ExecuteStatement() throws Exception {
File result = FOLDER.newFolder("csv");
- String sql =
- Files.readAllLines(new File(url.getFile()).toPath()).stream()
- .filter(line -> !line.trim().startsWith("--"))
- .collect(Collectors.joining());
- List<String> lines =
- Arrays.stream(sql.split(";"))
- .map(line -> line.replace(RESULT_KEY,
result.getAbsolutePath()))
- .collect(Collectors.toList());
-
try (GatewayController gateway = flinkResource.startSqlGateway();
ClusterController ignore = flinkResource.startCluster(1)) {
gateway.submitSQLJob(
- new SQLJobSubmission.SQLJobSubmissionBuilder(lines)
+ new
SQLJobSubmission.SQLJobSubmissionBuilder(getSqlLines(result))
.setClientMode(SQLJobSubmission.ClientMode.HIVE_JDBC)
.build(),
Duration.ofSeconds(60));
}
assertEquals(Collections.singletonList("1"),
readCsvResultFiles(result.toPath()));
}
+ @Test
+ public void testRestExecuteStatement() throws Exception {
+ File result = FOLDER.newFolder("csv");
+ try (GatewayController gateway = flinkResource.startSqlGateway();
+ ClusterController ignore = flinkResource.startCluster(1)) {
+ gateway.submitSQLJob(
+ new
SQLJobSubmission.SQLJobSubmissionBuilder(getSqlLines(result))
+ .setClientMode(SQLJobSubmission.ClientMode.REST)
+ .build(),
+ Duration.ofSeconds(60));
+ }
+ assertEquals(Collections.singletonList("1"),
readCsvResultFiles(result.toPath()));
Review Comment:
The `testRestExecuteStatement` and `testHiveserver2ExecuteStatement` is much
similar. Can we add a helper method like `runExecuteStatement(ClientMode
clientMode)`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]