This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 31bb1f49bf4ccc657ad6879bdf5d954cc24062d1 Author: Shengkai <1059623...@qq.com> AuthorDate: Mon Aug 1 11:55:51 2022 +0800 [FLINK-27770][sql-gateway] Refactor SqlGateway E2E tests This closes #20174 --- .../table/endpoint/hive/HiveServer2Endpoint.java | 6 +- .../flink-sql-connector-hive-2.3.9/pom.xml | 9 + flink-dist/src/main/flink-bin/bin/flink-console.sh | 4 +- flink-dist/src/main/flink-bin/bin/flink-daemon.sh | 4 +- .../tests/util/kafka/SQLClientKafkaITCase.java | 16 +- .../org/apache/flink/tests/util/TestUtils.java | 16 ++ .../flink/tests/util/flink/ClusterController.java | 2 +- .../flink/tests/util/flink/FlinkDistribution.java | 75 +++++-- .../flink/tests/util/flink/FlinkResource.java | 9 + ...usterController.java => GatewayController.java} | 17 +- .../util/flink/LocalStandaloneFlinkResource.java | 35 +++- .../tests/util/hbase/SQLClientHBaseITCase.java | 2 +- .../table/sql/codegen/PlannerScalaFreeITCase.java | 2 +- .../flink-sql-gateway-test/pom.xml | 221 +++++++++++++++------ .../flink/table/gateway/SQLGatewayITCase.java | 71 ------- .../flink/table/gateway/SqlGatewayE2ECase.java | 197 ++++++++++++++++++ .../table/gateway/containers/HiveContainer.java | 53 +++++ .../src/test/resources/gateway_e2e.sql | 30 +++ .../src/test/resources/hive-site.xml | 86 ++++++++ .../src/test/resources/log4j2-test.properties | 32 +-- flink-end-to-end-tests/pom.xml | 2 +- flink-table/flink-sql-gateway/bin/sql-gateway.sh | 60 +++--- .../org/apache/flink/table/gateway/SqlGateway.java | 46 +++-- .../apache/flink/table/gateway/SqlGatewayTest.java | 53 +++-- .../table/planner/delegation/PlannerBase.scala | 2 +- .../org/apache/flink/util/DockerImageVersions.java | 2 + .../apache/flink/test/util/SQLJobSubmission.java | 23 ++- .../modules-skipping-deployment.modulelist | 1 + 28 files changed, 795 insertions(+), 281 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java index 07597450a4f..8ae5bb23954 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java @@ -318,9 +318,9 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin resp.setServerProtocolVersion(sessionVersion.getVersion()); resp.setSessionHandle(toTSessionHandle(sessionHandle)); resp.setConfiguration(service.getSessionConfig(sessionHandle)); - } catch (Exception e) { - LOG.error("Failed to OpenSession.", e); - resp.setStatus(toTStatus(e)); + } catch (Throwable t) { + LOG.error("Failed to OpenSession.", t); + resp.setStatus(toTStatus(t)); } return resp; } diff --git a/flink-connectors/flink-sql-connector-hive-2.3.9/pom.xml b/flink-connectors/flink-sql-connector-hive-2.3.9/pom.xml index e22865a8f0c..a948ed33bcf 100644 --- a/flink-connectors/flink-sql-connector-hive-2.3.9/pom.xml +++ b/flink-connectors/flink-sql-connector-hive-2.3.9/pom.xml @@ -104,6 +104,15 @@ under the License. <shadedPattern>org.apache.flink.hive.shaded.com.google</shadedPattern> </relocation> </relocations> + <!-- Table planner also contains calcite dependency. Exclude to prevent class conflicts. --> + <filters> + <filter> + <artifact>org.apache.hive:hive-exec</artifact> + <excludes> + <exclude>org/apache/calcite/**</exclude> + </excludes> + </filter> + </filters> </configuration> </execution> </executions> diff --git a/flink-dist/src/main/flink-bin/bin/flink-console.sh b/flink-dist/src/main/flink-bin/bin/flink-console.sh index 05b9d42632e..68181d2023b 100755 --- a/flink-dist/src/main/flink-bin/bin/flink-console.sh +++ b/flink-dist/src/main/flink-bin/bin/flink-console.sh @@ -19,7 +19,7 @@ # Start a Flink service as a console application. Must be stopped with Ctrl-C # or with SIGTERM by kill or the controlling process. -USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|kubernetes-session|kubernetes-application|kubernetes-taskmanager|sqlgateway) [args]" +USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|kubernetes-session|kubernetes-application|kubernetes-taskmanager|sql-gateway) [args]" SERVICE=$1 ARGS=("${@:2}") # get remaining arguments as array @@ -62,7 +62,7 @@ case $SERVICE in CLASS_TO_RUN=org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner ;; - (sqlgateway) + (sql-gateway) CLASS_TO_RUN=org.apache.flink.table.gateway.SqlGateway SQL_GATEWAY_CLASSPATH=`findSqlGatewayJar` ;; diff --git a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh index 246b540a972..d6a27416594 100644 --- a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh +++ b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh @@ -18,7 +18,7 @@ ################################################################################ # Start/stop a Flink daemon. -USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|sqlgateway) [args]" +USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|sql-gateway) [args]" STARTSTOP=$1 DAEMON=$2 @@ -50,7 +50,7 @@ case $DAEMON in CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint ;; - (sqlgateway) + (sql-gateway) CLASS_TO_RUN=org.apache.flink.table.gateway.SqlGateway SQL_GATEWAY_CLASSPATH=`findSqlGatewayJar` ;; diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java index 4d88ccca7fd..eee3c478e36 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java @@ -59,6 +59,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import static org.apache.flink.tests.util.TestUtils.readCsvResultFiles; import static org.hamcrest.Matchers.arrayContainingInAnyOrder; import static org.junit.Assert.assertThat; @@ -174,7 +175,7 @@ public class SQLClientKafkaITCase extends TestLogger { } private void executeSqlStatements(ClusterController clusterController, List<String> sqlLines) - throws IOException { + throws Exception { LOG.info("Executing Kafka {} end-to-end SQL statements.", kafkaSQLVersion); clusterController.submitSQLJob( new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines) @@ -234,17 +235,4 @@ public class SQLClientKafkaITCase extends TestLogger { } Assert.assertTrue("Did not get expected results before timeout.", success); } - - private static List<String> readCsvResultFiles(Path path) throws IOException { - File filePath = path.toFile(); - // list all the non-hidden files - File[] csvFiles = filePath.listFiles((dir, name) -> !name.startsWith(".")); - List<String> result = new ArrayList<>(); - if (csvFiles != null) { - for (File file : csvFiles) { - result.addAll(Files.readAllLines(file.toPath())); - } - } - return result; - } } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java index 4fe8c7a5b66..a76f2b8e420 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java @@ -20,6 +20,7 @@ package org.apache.flink.tests.util; import org.apache.flink.test.parameters.ParameterProperty; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.FileAlreadyExistsException; @@ -31,6 +32,7 @@ import java.nio.file.Paths; import java.nio.file.SimpleFileVisitor; import java.nio.file.StandardCopyOption; import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; import java.util.EnumSet; import java.util.List; import java.util.regex.Pattern; @@ -132,4 +134,18 @@ public enum TestUtils { return destination; } + + /** Read the all files with the specified path. */ + public static List<String> readCsvResultFiles(Path path) throws IOException { + File filePath = path.toFile(); + // list all the non-hidden files + File[] csvFiles = filePath.listFiles((dir, name) -> !name.startsWith(".")); + List<String> result = new ArrayList<>(); + if (csvFiles != null) { + for (File file : csvFiles) { + result.addAll(Files.readAllLines(file.toPath())); + } + } + return result; + } } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java index f732e07ce37..3d7c7219b53 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java @@ -45,5 +45,5 @@ public interface ClusterController extends AutoCloseableAsync { * @param timeout the maximum time to wait. * @throws IOException if any IO error happen. */ - void submitSQLJob(SQLJobSubmission job, Duration timeout) throws IOException; + void submitSQLJob(SQLJobSubmission job, Duration timeout) throws Exception; } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java index c699adaeb8f..86efed246f1 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java @@ -28,6 +28,7 @@ import org.apache.flink.test.util.SQLJobSubmission; import org.apache.flink.tests.util.AutoClosableProcess; import org.apache.flink.tests.util.TestUtils; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.function.FutureTaskWithException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -45,14 +46,19 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; +import java.net.InetAddress; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -71,6 +77,7 @@ final class FlinkDistribution { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final Pattern ROOT_LOGGER_PATTERN = Pattern.compile("(rootLogger.level =).*"); + private static final String HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver"; private final Path opt; private final Path lib; @@ -106,13 +113,15 @@ final class FlinkDistribution { bin.resolve("taskmanager.sh").toAbsolutePath().toString(), "start"); } - public void startSQLGateway(String arg) throws IOException { + public void startSqlGateway() throws IOException { LOG.info("Starting Flink SQL Gateway."); - AutoClosableProcess.runBlocking( - bin.resolve("sql-gateway.sh").toAbsolutePath().toString(), "start", arg); + AutoClosableProcess.create( + bin.resolve("sql-gateway.sh").toAbsolutePath().toString(), "start") + .setStdoutProcessor(LOG::info) + .runBlocking(); } - public void stopSQLGateway() throws IOException { + public void stopSqlGateway() throws IOException { LOG.info("Stopping Flink SQL Gateway."); AutoClosableProcess.runBlocking( bin.resolve("sql-gateway.sh").toAbsolutePath().toString(), "stop"); @@ -221,18 +230,56 @@ final class FlinkDistribution { } } - public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws IOException { + public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws Exception { final List<String> commands = new ArrayList<>(); - commands.add(bin.resolve("sql-client.sh").toAbsolutePath().toString()); - for (String jar : job.getJars()) { - commands.add("--jar"); - commands.add(jar); - } - AutoClosableProcess.create(commands.toArray(new String[0])) - .setStdInputs(job.getSqlLines().toArray(new String[0])) - .setStdoutProcessor(LOG::info) // logging the SQL statements and error message - .runBlocking(timeout); + if (job.getClientMode() == SQLJobSubmission.ClientMode.SQL_CLIENT) { + commands.add(bin.resolve("sql-client.sh").toAbsolutePath().toString()); + for (String jar : job.getJars()) { + commands.add("--jar"); + commands.add(jar); + } + + AutoClosableProcess.create(commands.toArray(new String[0])) + .setStdInputs(job.getSqlLines().toArray(new String[0])) + .setStdoutProcessor(LOG::info) // logging the SQL statements and error message + .runBlocking(timeout); + } else if (job.getClientMode() == SQLJobSubmission.ClientMode.HIVE_JDBC) { + FutureTaskWithException<Void> future = + new FutureTaskWithException<>( + () -> { + // register HiveDriver to the DriverManager + Class.forName(HIVE_DRIVER); + Map<String, String> configMap = + GlobalConfiguration.loadConfiguration( + conf.toAbsolutePath().toString()) + .toMap(); + String host = + configMap.getOrDefault( + "sql-gateway.endpoint.hiveserver2.host", + InetAddress.getByName("localhost") + .getHostAddress()); + String port = + configMap.getOrDefault( + "sql-gateway.endpoint.hiveserver2.thrift.port", + "10000"); + try (Connection connection = + DriverManager.getConnection( + String.format( + "jdbc:hive2://%s:%s/default;auth=noSasl;", + host, port)); + Statement statement = connection.createStatement()) { + for (String jar : job.getJars()) { + statement.execute(String.format("ADD JAR '%s'", jar)); + } + for (String sql : job.getSqlLines()) { + statement.execute(sql); + } + } + }); + new Thread(future).start(); + future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } } public void performJarAddition(JarAddition addition) throws IOException { diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java index 4bbf212235b..397d7428823 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java @@ -46,6 +46,15 @@ public interface FlinkResource extends ExternalResource { */ ClusterController startCluster(int numTaskManagers) throws IOException; + /** + * Starts a sqlserver and returns the {@link GatewayController} which can be used to shut down + * the process. + * + * @return controller for interacting with the cluster + * @throws IOException + */ + GatewayController startSqlGateway() throws IOException; + /** * Searches the logs of all processes for the given pattern, and applies the given processor for * every line for which {@link Matcher#matches()} returned true. diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/GatewayController.java similarity index 72% copy from flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java copy to flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/GatewayController.java index f732e07ce37..2b49face588 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/GatewayController.java @@ -18,25 +18,14 @@ package org.apache.flink.tests.util.flink; -import org.apache.flink.test.util.JobSubmission; import org.apache.flink.test.util.SQLJobSubmission; import org.apache.flink.util.AutoCloseableAsync; import java.io.IOException; import java.time.Duration; -/** Controller for interacting with a cluster. */ -public interface ClusterController extends AutoCloseableAsync { - - /** - * Submits the given job to the cluster. - * - * @param job job to submit - * @param timeout the maximum time to wait. - * @return JobController for the submitted job - * @throws IOException - */ - JobController submitJob(JobSubmission job, Duration timeout) throws IOException; +/** Controller for interacting with a SqlGateway. */ +public interface GatewayController extends AutoCloseableAsync { /** * Submits the given SQL job to the cluster. @@ -45,5 +34,5 @@ public interface ClusterController extends AutoCloseableAsync { * @param timeout the maximum time to wait. * @throws IOException if any IO error happen. */ - void submitSQLJob(SQLJobSubmission job, Duration timeout) throws IOException; + void submitSQLJob(SQLJobSubmission job, Duration timeout) throws Exception; } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java index dc767d382e0..7b5658867f4 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java @@ -113,7 +113,7 @@ public class LocalStandaloneFlinkResource implements FlinkResource { private void shutdownCluster() { try { distribution.stopFlinkCluster(); - distribution.stopSQLGateway(); + distribution.stopSqlGateway(); } catch (IOException e) { LOG.warn("Error while shutting down Flink cluster.", e); } @@ -184,8 +184,11 @@ public class LocalStandaloneFlinkResource implements FlinkResource { throw new RuntimeException("Cluster did not start in expected time-frame."); } - public void startSQLGateway(String arg) throws IOException { - distribution.startSQLGateway(arg); + @Override + public GatewayController startSqlGateway() throws IOException { + distribution.startSqlGateway(); + + return new GatewayClusterControllerImpl(distribution); } @Override @@ -194,6 +197,30 @@ public class LocalStandaloneFlinkResource implements FlinkResource { return distribution.searchAllLogs(pattern, matchProcessor); } + private static class GatewayClusterControllerImpl implements GatewayController { + + private final FlinkDistribution distribution; + + public GatewayClusterControllerImpl(FlinkDistribution distribution) { + this.distribution = distribution; + } + + @Override + public CompletableFuture<Void> closeAsync() { + try { + distribution.stopSqlGateway(); + return CompletableFuture.completedFuture(null); + } catch (IOException e) { + return FutureUtils.completedExceptionally(e); + } + } + + @Override + public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws Exception { + distribution.submitSQLJob(job, timeout); + } + } + private static class StandaloneClusterController implements ClusterController { private final FlinkDistribution distribution; @@ -210,7 +237,7 @@ public class LocalStandaloneFlinkResource implements FlinkResource { } @Override - public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws IOException { + public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws Exception { distribution.submitSQLJob(job, timeout); } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java index 165d0f7d832..0f15579d156 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java @@ -228,7 +228,7 @@ public class SQLClientHBaseITCase extends TestLogger { } private void executeSqlStatements(ClusterController clusterController, List<String> sqlLines) - throws IOException { + throws Exception { LOG.info("Executing SQL: HBase source table -> HBase sink table"); clusterController.submitSQLJob( new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java index 79e6c59db08..683e72cb155 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java @@ -131,7 +131,7 @@ public class PlannerScalaFreeITCase extends TestLogger { } private void executeSqlStatements(ClusterController clusterController, List<String> sqlLines) - throws IOException { + throws Exception { LOG.info("Executing end-to-end SQL statements {}.", sqlLines); clusterController.submitSQLJob( new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines) diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml b/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml index 3b8cc597bb2..9c30c64b880 100644 --- a/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml +++ b/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml @@ -17,68 +17,167 @@ specific language governing permissions and limitations under the License. --> <project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>flink-end-to-end-tests</artifactId> - <groupId>org.apache.flink</groupId> - <version>1.16-SNAPSHOT</version> - </parent> - <modelVersion>4.0.0</modelVersion> + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-end-to-end-tests</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.16-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> - <artifactId>flink-sql-gateway-test</artifactId> - <name>Flink : E2E Tests : SQL Gateway</name> - <packaging>jar</packaging> + <artifactId>flink-sql-gateway-test</artifactId> + <name>Flink : E2E Tests : SQL Gateway</name> + <packaging>jar</packaging> - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-end-to-end-tests-common</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-sql-connector-hive-${hive.version}_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-jdbc</artifactId> - <version>${hive.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils-junit</artifactId> - </dependency> - </dependencies> + <properties> + <!-- The test container uses hive-2.1.0 --> + <hive.version>2.3.9</hive.version> + <flink.hadoop.version>2.8.5</flink.hadoop.version> + </properties> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <executions> - <execution> - <id>copy</id> - <phase>pre-integration-test</phase> - <goals> - <goal>copy</goal> - </goals> - </execution> - </executions> - <configuration> - <artifactItems> - <artifactItem> - <groupId>org.apache.flink</groupId> - <artifactId>flink-sql-connector-hive-2.3.9_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <destFileName>flink-sql-connector-hive-2.3.9_${scala.binary.version}-${project.version}.jar</destFileName> - <type>jar</type> - <outputDirectory>${project.build.directory}/dependencies</outputDirectory> - </artifactItem> - </artifactItems> - </configuration> - </plugin> - </plugins> - </build> + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-end-to-end-tests-common</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-sql-connector-hive-${hive.version}_${scala.binary.version} + </artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <!-- It contains jackson-annotations that is conflicts with TestContainer --> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-jdbc</artifactId> + <version>${hive.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils-junit</artifactId> + </dependency> + + <!-- hadoop dependencies for end-to-end test --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${flink.hadoop.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <!-- This dependency is no longer shipped with the JDK since Java 9.--> + <groupId>jdk.tools</groupId> + <artifactId>jdk.tools</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + <executions> + <execution> + <id>dependency-convergence</id> + <goals> + <goal>enforce</goal> + </goals> + <configuration> + <skip>true</skip> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>copy</id> + <phase>pre-integration-test</phase> + <goals> + <goal>copy</goal> + </goals> + <configuration> + <artifactItems> + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId> + flink-sql-connector-hive-${hive.version}_${scala.binary.version} + </artifactId> + <version>${project.version}</version> + <destFileName> + flink-sql-connector-hive-${hive.version}_${scala.binary.version}-${project.version}.jar + </destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies + </outputDirectory> + </artifactItem> + </artifactItems> + </configuration> + </execution> + <execution> + <id>store-classpath-in-target-for-tests</id> + <phase>package</phase> + <goals> + <goal>build-classpath</goal> + </goals> + <configuration> + <outputFile>${project.build.directory}/hadoop.classpath</outputFile> + <excludeGroupIds>org.apache.flink</excludeGroupIds> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>hive3</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> </project> diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SQLGatewayITCase.java b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SQLGatewayITCase.java deleted file mode 100644 index 5b037b1ccae..00000000000 --- a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SQLGatewayITCase.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.gateway; - -import org.apache.flink.tests.util.TestUtils; -import org.apache.flink.tests.util.flink.ClusterController; -import org.apache.flink.tests.util.flink.FlinkResource; -import org.apache.flink.tests.util.flink.FlinkResourceSetup; -import org.apache.flink.tests.util.flink.JarLocation; -import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource; -import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory; -import org.apache.flink.util.TestLogger; - -import org.junit.Rule; -import org.junit.Test; - -import java.nio.file.Path; -import java.sql.DriverManager; -import java.sql.SQLException; - -import static org.assertj.core.api.Assertions.assertThat; - -public class SQLGatewayITCase extends TestLogger { - - private static String JDBC_URL = "jdbc:hive2://localhost:8084/default;auth=noSasl"; - private static String DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver"; - - private static final Path HIVE_SQL_CONNECOTR_JAR = - TestUtils.getResource(".*dependencies/flink-sql-connector-hive-.*.jar"); - - @Rule - public final FlinkResource flink = - new LocalStandaloneFlinkResourceFactory() - .create( - FlinkResourceSetup.builder() - .addJar(HIVE_SQL_CONNECOTR_JAR, JarLocation.LIB) - .build()); - - @Test - public void testGateway() throws Exception { - try (ClusterController clusterController = flink.startCluster(1)) { - ((LocalStandaloneFlinkResource) flink) - .startSQLGateway("-Dsql-gateway.endpoint.type=hiveserver2"); - Thread.sleep(2000); - Class.forName(DRIVER_NAME); - try { - DriverManager.getConnection(JDBC_URL); - } catch (SQLException e) { - assertThat(e.getMessage()) - .contains( - "Embedded metastore is not allowed. Make sure you have set a valid value for hive.metastore.uris"); - } - } - } -} diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java new file mode 100644 index 00000000000..b7a4e903ffa --- /dev/null +++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.hive.HiveCatalog; +import org.apache.flink.table.endpoint.hive.HiveServer2Endpoint; +import org.apache.flink.table.gateway.containers.HiveContainer; +import org.apache.flink.test.util.SQLJobSubmission; +import org.apache.flink.tests.util.TestUtils; +import org.apache.flink.tests.util.flink.ClusterController; +import org.apache.flink.tests.util.flink.FlinkResource; +import org.apache.flink.tests.util.flink.FlinkResourceSetup; +import org.apache.flink.tests.util.flink.GatewayController; +import org.apache.flink.tests.util.flink.JarLocation; +import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.NetUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.apache.flink.table.catalog.hive.HiveCatalog.isEmbeddedMetastore; +import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.CATALOG_HIVE_CONF_DIR; +import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions.THRIFT_PORT; +import static org.apache.flink.tests.util.TestUtils.readCsvResultFiles; +import static org.junit.Assert.assertEquals; + +/** E2E Tests for {@code SqlGateway} with {@link HiveServer2Endpoint}. */ +public class SqlGatewayE2ECase extends TestLogger { + + private static final Path HIVE_SQL_CONNECTOR_JAR = + TestUtils.getResource(".*dependencies/flink-sql-connector-hive-.*.jar"); + private static final Path HADOOP_CLASS_PATH = TestUtils.getResource(".*hadoop.classpath"); + private static final String GATEWAY_E2E_SQL = "gateway_e2e.sql"; + private static final Configuration ENDPOINT_CONFIG = new Configuration(); + private static final String RESULT_KEY = "$RESULT"; + + @ClassRule public static final TemporaryFolder FOLDER = new TemporaryFolder(); + @ClassRule public static final HiveContainer HIVE_CONTAINER = new HiveContainer(); + @Rule public final FlinkResource flinkResource = buildFlinkResource(); + + private static NetUtils.Port port; + + @BeforeClass + public static void beforeClass() { + ENDPOINT_CONFIG.setString( + getPrefixedConfigOptionName(CATALOG_HIVE_CONF_DIR), createHiveConf().getParent()); + } + + @AfterClass + public static void afterClass() throws Exception { + port.close(); + } + + @Test + public void testExecuteStatement() throws Exception { + URL url = SqlGatewayE2ECase.class.getClassLoader().getResource(GATEWAY_E2E_SQL); + if (url == null) { + throw new FileNotFoundException(GATEWAY_E2E_SQL); + } + File result = FOLDER.newFolder("csv"); + String sql = + Files.readAllLines(new File(url.getFile()).toPath()).stream() + .filter(line -> !line.trim().startsWith("--")) + .collect(Collectors.joining()); + List<String> lines = + Arrays.stream(sql.split(";")) + .map(line -> line.replace(RESULT_KEY, result.getAbsolutePath())) + .collect(Collectors.toList()); + + try (GatewayController gateway = flinkResource.startSqlGateway(); + ClusterController ignore = flinkResource.startCluster(1)) { + gateway.submitSQLJob( + new SQLJobSubmission.SQLJobSubmissionBuilder(lines) + .setClientMode(SQLJobSubmission.ClientMode.HIVE_JDBC) + .build(), + Duration.ofSeconds(60)); + } + assertEquals(Collections.singletonList("1"), readCsvResultFiles(result.toPath())); + } + + private static File createHiveConf() { + HiveConf hiveConf = new HiveConf(); + try (InputStream inputStream = + new FileInputStream( + new File( + Objects.requireNonNull( + SqlGatewayE2ECase.class + .getClassLoader() + .getResource(HiveCatalog.HIVE_SITE_FILE)) + .toURI()))) { + hiveConf.addResource(inputStream, HiveCatalog.HIVE_SITE_FILE); + // trigger a read from the conf so that the input stream is read + isEmbeddedMetastore(hiveConf); + } catch (Exception e) { + throw new RuntimeException("Failed to load hive-site.xml from specified path", e); + } + hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, HIVE_CONTAINER.getHiveMetastoreURI()); + try { + File site = FOLDER.newFile(HiveCatalog.HIVE_SITE_FILE); + try (OutputStream out = new FileOutputStream(site)) { + hiveConf.writeXml(out); + } + return site; + } catch (Exception e) { + throw new RuntimeException("Failed to create hive conf.", e); + } + } + + /** + * Build required environment. It prepares all hadoop jars to mock HADOOP_CLASSPATH, use + * hadoop.classpath which contains all hadoop jars. It also moves planner to the lib and remove + * the planner load to make the Hive sql connector works. + */ + private static FlinkResource buildFlinkResource() { + // add hive jar and planner jar + FlinkResourceSetup.FlinkResourceSetupBuilder builder = + FlinkResourceSetup.builder() + .addJar(HIVE_SQL_CONNECTOR_JAR, JarLocation.LIB) + .moveJar("flink-table-planner", JarLocation.OPT, JarLocation.LIB) + .moveJar("flink-table-planner-loader", JarLocation.LIB, JarLocation.OPT); + // add hadoop jars + File hadoopClasspathFile = new File(HADOOP_CLASS_PATH.toAbsolutePath().toString()); + if (!hadoopClasspathFile.exists()) { + throw new RuntimeException( + "File that contains hadoop classpath " + + HADOOP_CLASS_PATH + + " does not exist."); + } + try { + String classPathContent = FileUtils.readFileUtf8(hadoopClasspathFile); + Arrays.stream(classPathContent.split(":")) + .map(Paths::get) + .forEach(jar -> builder.addJar(jar, JarLocation.LIB)); + } catch (Exception e) { + throw new RuntimeException("Failed to build the FlinkResource.", e); + } + // add hive server2 endpoint related configs + port = NetUtils.getAvailablePort(); + Map<String, String> endpointConfig = new HashMap<>(); + endpointConfig.put("sql-gateway.endpoint.type", "hiveserver2"); + endpointConfig.put( + getPrefixedConfigOptionName(THRIFT_PORT), String.valueOf(port.getPort())); + ENDPOINT_CONFIG.addAll(Configuration.fromMap(endpointConfig)); + builder.addConfiguration(ENDPOINT_CONFIG); + + return new LocalStandaloneFlinkResourceFactory().create(builder.build()); + } + + private static String getPrefixedConfigOptionName(ConfigOption<?> option) { + String prefix = "sql-gateway.endpoint.hiveserver2."; + return prefix + option.key(); + } +} diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/containers/HiveContainer.java b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/containers/HiveContainer.java new file mode 100644 index 00000000000..35dc6325779 --- /dev/null +++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/containers/HiveContainer.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.containers; + +import org.apache.flink.util.DockerImageVersions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerImageName; + +/** Test Container for hive. */ +public class HiveContainer extends GenericContainer<HiveContainer> { + + private static final Logger LOG = LoggerFactory.getLogger(HiveContainer.class); + + public static final String HOST_NAME = "hadoop-master"; + public static final int HIVE_METASTORE_PORT = 9083; + + public HiveContainer() { + super(DockerImageName.parse(DockerImageVersions.HIVE2)); + withExtraHost(HOST_NAME, "127.0.0.1"); + addExposedPort(HIVE_METASTORE_PORT); + } + + @Override + protected void doStart() { + super.doStart(); + if (LOG.isInfoEnabled()) { + followOutput(new Slf4jLogConsumer(LOG)); + } + } + + public String getHiveMetastoreURI() { + return String.format("thrift://%s:%s", getHost(), getMappedPort(HIVE_METASTORE_PORT)); + } +} diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/gateway_e2e.sql b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/gateway_e2e.sql new file mode 100644 index 00000000000..88fd47f445e --- /dev/null +++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/gateway_e2e.sql @@ -0,0 +1,30 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +SET table.sql-dialect=default; + +CREATE TABLE CsvTable ( + val INT +) WITH ( + 'connector' = 'filesystem', + 'path' = '$RESULT', + 'sink.rolling-policy.rollover-interval' = '2s', + 'sink.rolling-policy.check-interval' = '2s', + 'format' = 'csv', + 'csv.disable-quote-character' = 'true' +); + +INSERT INTO CsvTable SELECT 1; diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/hive-site.xml b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/hive-site.xml new file mode 100644 index 00000000000..9f74734e801 --- /dev/null +++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/hive-site.xml @@ -0,0 +1,86 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<configuration> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + <property> + <name>hive.metastore.uris</name> + <value>thrift://localhost:9083</value> + </property> + + <property> + <name>javax.jdo.option.ConnectionURL</name> + <value>jdbc:mysql://localhost:3306/metastore?useSSL=false</value> + </property> + + <property> + <name>javax.jdo.option.ConnectionDriverName</name> + <value>com.mysql.cj.jdbc.Driver</value> + </property> + + <property> + <name>javax.jdo.option.ConnectionUserName</name> + <value>root</value> + </property> + + <property> + <name>javax.jdo.option.ConnectionPassword</name> + <value>root</value> + </property> + + <property> + <name>hive.metastore.connect.retries</name> + <value>15</value> + </property> + + <property> + <name>hive.metastore.disallow.incompatible.col.type.changes</name> + <value>false</value> + </property> + + <property> + <!-- https://community.hortonworks.com/content/supportkb/247055/errorjavalangunsupportedoperationexception-storage.html --> + <name>metastore.storage.schema.reader.impl</name> + <value>org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader</value> + </property> + + <property> + <name>hive.support.concurrency</name> + <value>true</value> + </property> + + <property> + <name>hive.txn.manager</name> + <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value> + </property> + + <property> + <name>hive.compactor.initiator.on</name> + <value>true</value> + </property> + + <property> + <name>hive.compactor.worker.threads</name> + <value>1</value> + </property> + + <property> + <name>hive.users.in.admin.role</name> + <value>hdfs,hive</value> + </property> + +</configuration> diff --git a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/log4j2-test.properties similarity index 56% copy from tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist copy to flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/log4j2-test.properties index a1ac43e7cad..835c2ec9a3d 100644 --- a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist +++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/resources/log4j2-test.properties @@ -16,27 +16,13 @@ # limitations under the License. ################################################################################ -# These modules are not deployed to maven central, despite their use of the shade plugin. +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = OFF +rootLogger.appenderRef.test.ref = TestLogger -flink-examples-streaming-gcp-pubsub -flink-yarn-tests -flink-docs -flink-datastream-allround-test -flink-queryable-state-test -flink-confluent-schema-registry -flink-stream-stateful-job-upgrade-test -flink-elasticsearch7-test -flink-stream-state-ttl-test -flink-state-evolution-test -flink-elasticsearch6-test -flink-rocksdb-state-memory-control-test -flink-python-test -flink-streaming-kinesis-test -flink-tpch-test -flink-streaming-kafka-test-base -flink-heavy-deployment-stress-test -flink-high-parallelism-iterations-test -flink-end-to-end-tests-common-kafka -flink-end-to-end-tests-pulsar -flink-end-to-end-tests-elasticsearch7 -flink-end-to-end-tests-elasticsearch6 +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index 31bd7facf9a..bc477e349cf 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -55,6 +55,7 @@ under the License. <module>flink-confluent-schema-registry</module> <module>flink-stream-state-ttl-test</module> <module>flink-sql-client-test</module> + <module>flink-sql-gateway-test</module> <module>flink-file-sink-test</module> <module>flink-state-evolution-test</module> <module>flink-rocksdb-state-memory-control-test</module> @@ -83,7 +84,6 @@ under the License. <module>flink-end-to-end-tests-elasticsearch7</module> <module>flink-end-to-end-tests-common-elasticsearch</module> <module>flink-end-to-end-tests-sql</module> - <module>flink-sql-gateway-test</module> </modules> <dependencyManagement> diff --git a/flink-table/flink-sql-gateway/bin/sql-gateway.sh b/flink-table/flink-sql-gateway/bin/sql-gateway.sh index f925a25adcf..5b300fb1c48 100644 --- a/flink-table/flink-sql-gateway/bin/sql-gateway.sh +++ b/flink-table/flink-sql-gateway/bin/sql-gateway.sh @@ -17,34 +17,16 @@ # limitations under the License. # -# Start/stop a Flink SQL Gateway. - function usage() { - echo "Usage: bin/sql-gateway.sh command" + echo "Usage: sql-gateway.sh [start|start-foreground|stop|stop-all] [args]" echo " commands:" - echo " start - Run a SQL Gateway as a daemon" - echo " start-foreground - Run a SQL Gateway as a console application" - echo " stop - Stop the SQL Gateway daemon" - echo " stop-all - Stop all the SQL Gateway daemons" - echo " -h | --help - Show this help message" + echo " start - Run a SQL Gateway as a daemon" + echo " start-foreground - Run a SQL Gateway as a console application" + echo " stop - Stop the SQL Gateway daemon" + echo " stop-all - Stop all the SQL Gateway daemons" + echo " -h | --help - Show this help message" } -if [[ "$*" = *--help ]] || [[ "$*" = *-h ]]; then - usage - exit 0 -fi - -STARTSTOP=$1 - -if [ -z "$STARTSTOP" ]; then - STARTSTOP="start" -fi - -if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then - usage - exit 1 -fi - ################################################################################ # Adopted from "flink" bash script ################################################################################ @@ -75,7 +57,35 @@ if [ "$FLINK_IDENT_STRING" = "" ]; then FLINK_IDENT_STRING="$USER" fi -ENTRYPOINT=sqlgateway +################################################################################ +# SQL gateway specific logic +################################################################################ + +ENTRYPOINT=sql-gateway + +if [[ "$1" = *--help ]] || [[ "$1" = *-h ]]; then + usage + exit 0 +fi + +STARTSTOP=$1 + +if [ -z "$STARTSTOP" ]; then + STARTSTOP="start" +fi + +if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then + usage + exit 1 +fi + +# ./sql-gateway.sh start --help, print the message to the console +if [[ "$STARTSTOP" = start* ]] && ( [[ "$*" = *--help* ]] || [[ "$*" = *-h* ]] ); then + FLINK_TM_CLASSPATH=`constructFlinkClassPath` + SQL_GATEWAY_CLASSPATH=`findSqlGatewayJar` + "$JAVA_RUN" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$SQL_GATEWAY_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.table.gateway.SqlGateway "${@:2}" + exit 0 +fi if [[ $STARTSTOP == "start-foreground" ]]; then exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${@:2}" diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/SqlGateway.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/SqlGateway.java index f2b2fa384f6..9f6f409063b 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/SqlGateway.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/SqlGateway.java @@ -67,18 +67,16 @@ public class SqlGateway { sessionManager.start(); SqlGatewayService sqlGatewayService = new SqlGatewayServiceImpl(sessionManager); - endpoints.addAll( - SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint( - sqlGatewayService, context.getFlinkConfig())); - - for (SqlGatewayEndpoint endpoint : endpoints) { - try { + try { + endpoints.addAll( + SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint( + sqlGatewayService, context.getFlinkConfig())); + for (SqlGatewayEndpoint endpoint : endpoints) { endpoint.start(); - } catch (Throwable t) { - LOG.error("Failed to start the endpoint.", t); - stop(); - throw new SqlGatewayException("Failed to start the endpoint.", t); } + } catch (Throwable t) { + LOG.error("Failed to start the endpoints.", t); + throw new SqlGatewayException("Failed to start the endpoints.", t); } } @@ -97,11 +95,6 @@ public class SqlGateway { } public static void main(String[] args) { - // startup checks and logging - EnvironmentInformation.logEnvironmentInfo(LOG, "SqlGateway", args); - SignalHandler.register(LOG); - JvmShutdownSafeguard.installAsShutdownHook(LOG); - startSqlGateway(System.out, args); } @@ -114,6 +107,11 @@ public class SqlGateway { return; } + // startup checks and logging + EnvironmentInformation.logEnvironmentInfo(LOG, "SqlGateway", args); + SignalHandler.register(LOG); + JvmShutdownSafeguard.installAsShutdownHook(LOG); + SqlGateway gateway = new SqlGateway(cliOptions.getDynamicConfigs()); try { Runtime.getRuntime().addShutdownHook(new ShutdownThread(gateway)); @@ -128,11 +126,19 @@ public class SqlGateway { // make space in terminal stream.println(); stream.println(); - LOG.error( - "SqlGateway must stop. Unexpected exception. This is a bug. Please consider filing an issue.", - t); - throw new SqlGatewayException( - "Unexpected exception. This is a bug. Please consider filing an issue.", t); + + if (t instanceof SqlGatewayException) { + // Exception that the gateway can not handle. + throw (SqlGatewayException) t; + } else { + LOG.error( + "SqlGateway must stop. Unexpected exception. This is a bug. Please consider filing an issue.", + t); + throw new SqlGatewayException( + "Unexpected exception. This is a bug. Please consider filing an issue.", t); + } + } finally { + gateway.stop(); } } diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/SqlGatewayTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/SqlGatewayTest.java index 10095dd3d76..57d6bcac381 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/SqlGatewayTest.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/SqlGatewayTest.java @@ -38,6 +38,7 @@ import java.util.UUID; import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; /** Tests for the {@link SqlGateway}. */ public class SqlGatewayTest { @@ -95,26 +96,36 @@ public class SqlGatewayTest { "-Dsql-gateway.endpoint.mocked.host=localhost", "-Dsql-gateway.endpoint.mocked.port=9999" }; - PrintStream stream = new PrintStream(output); - Thread thread = - new ExecutorThreadFactory( - "SqlGateway-thread-pool", - (t, exception) -> exception.printStackTrace(stream)) - .newThread(() -> SqlGateway.startSqlGateway(stream, args)); - thread.start(); - - CommonTestUtils.waitUtil( - () -> MockedSqlGatewayEndpoint.isRunning(id), - Duration.ofSeconds(10), - "Failed to get the endpoint starts."); - - thread.interrupt(); - CommonTestUtils.waitUtil( - () -> !thread.isAlive(), - Duration.ofSeconds(10), - "Failed to get the endpoint starts."); - assertThat(output.toString()) - .doesNotContain( - "Unexpected exception. This is a bug. Please consider filing an issue."); + try (PrintStream stream = new PrintStream(output)) { + Thread thread = + new ExecutorThreadFactory( + "SqlGateway-thread-pool", + (t, exception) -> exception.printStackTrace(stream)) + .newThread(() -> SqlGateway.startSqlGateway(stream, args)); + thread.start(); + + CommonTestUtils.waitUtil( + () -> MockedSqlGatewayEndpoint.isRunning(id), + Duration.ofSeconds(10), + "Failed to get the endpoint starts."); + + thread.interrupt(); + CommonTestUtils.waitUtil( + () -> !thread.isAlive(), + Duration.ofSeconds(10), + "Failed to get the endpoint starts."); + assertThat(output.toString()) + .doesNotContain( + "Unexpected exception. This is a bug. Please consider filing an issue."); + } + } + + @Test + public void testFailedToStartSqlGateway() { + try (PrintStream stream = new PrintStream(output)) { + assertThatThrownBy(() -> SqlGateway.startSqlGateway(stream, new String[0])) + .doesNotHaveToString( + "Unexpected exception. This is a bug. Please consider filing an issue."); + } } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index 8d00017f46e..e2ef76b26eb 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -36,6 +36,7 @@ import org.apache.flink.table.planner.calcite._ import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema import org.apache.flink.table.planner.connectors.DynamicSinkUtils import org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast +import org.apache.flink.table.planner.delegation.DialectFactory.DefaultParserContext import org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl import org.apache.flink.table.planner.hint.FlinkHints import org.apache.flink.table.planner.operations.PlannerQueryOperation @@ -55,7 +56,6 @@ import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter import _root_.scala.collection.JavaConversions._ -import DialectFactory.DefaultParserContext import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema import org.apache.calcite.plan.{RelTrait, RelTraitDef} import org.apache.calcite.rel.RelNode diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java index e59e939500a..f0f367f7bf9 100644 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java @@ -54,4 +54,6 @@ public class DockerImageVersions { public static final String GOOGLE_CLOUD_PUBSUB_EMULATOR = "gcr.io/google.com/cloudsdktool/cloud-sdk:379.0.0"; + + public static final String HIVE2 = "prestodb/hdp2.6-hive:10"; } diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SQLJobSubmission.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SQLJobSubmission.java index cff93ccaf0d..5ec15f8dfe2 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SQLJobSubmission.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SQLJobSubmission.java @@ -27,14 +27,20 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** Programmatic definition of a SQL job-submission. */ public class SQLJobSubmission { + private final ClientMode clientMode; private final List<String> sqlLines; private final List<String> jars; - private SQLJobSubmission(List<String> sqlLines, List<String> jars) { + private SQLJobSubmission(ClientMode clientMode, List<String> sqlLines, List<String> jars) { + this.clientMode = clientMode; this.sqlLines = checkNotNull(sqlLines); this.jars = checkNotNull(jars); } + public ClientMode getClientMode() { + return clientMode; + } + public List<String> getJars() { return this.jars; } @@ -45,6 +51,7 @@ public class SQLJobSubmission { /** Builder for the {@link SQLJobSubmission}. */ public static class SQLJobSubmissionBuilder { + private ClientMode clientMode = ClientMode.SQL_CLIENT; private final List<String> sqlLines; private final List<String> jars = new ArrayList<>(); @@ -52,6 +59,11 @@ public class SQLJobSubmission { this.sqlLines = sqlLines; } + public SQLJobSubmissionBuilder setClientMode(ClientMode clientMode) { + this.clientMode = clientMode; + return this; + } + public SQLJobSubmissionBuilder addJar(Path jarFile) { this.jars.add(jarFile.toAbsolutePath().toString()); return this; @@ -70,7 +82,14 @@ public class SQLJobSubmission { } public SQLJobSubmission build() { - return new SQLJobSubmission(sqlLines, jars); + return new SQLJobSubmission(clientMode, sqlLines, jars); } } + + /** Use which client to submit job. */ + public enum ClientMode { + SQL_CLIENT, + + HIVE_JDBC + } } diff --git a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist b/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist index a1ac43e7cad..7b0be584fee 100644 --- a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist +++ b/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist @@ -40,3 +40,4 @@ flink-end-to-end-tests-common-kafka flink-end-to-end-tests-pulsar flink-end-to-end-tests-elasticsearch7 flink-end-to-end-tests-elasticsearch6 +flink-sql-gateway-test