zentol closed pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml new file mode 100644 index 00000000000..1ef4a1bd954 --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml @@ -0,0 +1,63 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <parent> + <artifactId>flink-end-to-end-tests</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.8-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-end-to-end-tests-common</artifactId> + <version>1.8-SNAPSHOT</version> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.squareup.okhttp3</groupId> + <artifactId>okhttp</artifactId> + <version>3.7.0</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-jackson</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <!-- To ensure that flink-dist is built beforehand --> + <groupId>org.apache.flink</groupId> + <artifactId>flink-dist_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + +</project> diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosablePath.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosablePath.java new file mode 100644 index 00000000000..0098889ead6 --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosablePath.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.nio.file.Path; + +/** + * Utility class to delete a given {@link Path} when exiting a try-with-resources statement. + */ +public final class AutoClosablePath implements AutoCloseable { + + private final Path path; + + public AutoClosablePath(final Path path) { + Preconditions.checkNotNull(path, "Path must not be null."); + Preconditions.checkArgument(path.isAbsolute(), "Path must be absolute."); + this.path = path; + } + + @Override + public void close() throws IOException { + FileUtils.deleteFileOrDirectory(path.toFile()); + } +} diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java new file mode 100644 index 00000000000..02359302e13 --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Utility class to terminate a given {@link Process} when exiting a try-with-resources statement. + */ +public class AutoClosableProcess implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(AutoClosableProcess.class); + + private final Process process; + + public AutoClosableProcess(final Process process) { + Preconditions.checkNotNull(process); + this.process = process; + } + + public static AutoClosableProcess runNonBlocking(String step, String... commands) throws IOException { + LOG.info("Step Started: " + step); + Process process = new ProcessBuilder() + .command(commands) + .inheritIO() + .start(); + return new AutoClosableProcess(process); + } + + public static void runBlocking(String step, String... commands) throws IOException { + runBlocking(step, Duration.ofSeconds(30), commands); + } + + public static void runBlocking(String step, Duration timeout, String... commands) throws IOException { + LOG.info("Step started: " + step); + Process process = new ProcessBuilder() + .command(commands) + .inheritIO() + .start(); + + try (AutoClosableProcess autoProcess = new AutoClosableProcess(process)) { + final boolean success = process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS); + if (!success) { + throw new TimeoutException(); + } + } catch (TimeoutException | InterruptedException e) { + throw new RuntimeException(step + " failed due to timeout."); + } + LOG.info("Step complete: " + step); + } + + @Override + public void close() throws IOException { + if (process.isAlive()) { + process.destroy(); + try { + process.waitFor(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } +} diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/CommandLineWrapper.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/CommandLineWrapper.java new file mode 100644 index 00000000000..50fd2f81f02 --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/CommandLineWrapper.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +/** + * Utility class for setting up command-line tool usages in a readable fashion. + */ +public enum CommandLineWrapper { + ; + + public static WGetBuilder wget(String url) { + return new WGetBuilder(url); + } + + /** + * Wrapper around wget used for downloading files. + */ + public static final class WGetBuilder { + + private final String url; + private Path targetDir; + + WGetBuilder(String url) { + this.url = url; + } + + public WGetBuilder targetDir(Path dir) { + this.targetDir = dir; + return this; + } + + public String[] build() { + final List<String> commandsList = new ArrayList<>(5); + commandsList.add("wget"); + commandsList.add("-q"); // silent + //commandsList.add("--show-progress"); // enable progress bar + if (targetDir != null) { + commandsList.add("-P"); + commandsList.add(targetDir.toAbsolutePath().toString()); + } + commandsList.add(url); + return commandsList.toArray(new String[commandsList.size()]); + } + } + + public static SedBuilder sed(final String command, final Path file) { + return new SedBuilder(command, file); + } + + /** + * Wrapper around sed used for processing text. + */ + public static final class SedBuilder { + + private final String command; + private final Path file; + + private boolean inPlace = false; + + SedBuilder(final String command, final Path file) { + this.command = command; + this.file = file; + } + + public SedBuilder inPlace() { + inPlace = true; + return this; + } + + public String[] build() { + final List<String> commandsList = new ArrayList<>(5); + commandsList.add("sed"); + if (inPlace) { + commandsList.add("-i"); + } + commandsList.add("-e"); + commandsList.add(command); + commandsList.add(file.toAbsolutePath().toString()); + return commandsList.toArray(new String[commandsList.size()]); + } + } + + public static TarBuilder tar(final Path file) { + return new TarBuilder(file); + } + + /** + * Wrapper around tar used for extracting .tar archives. + */ + public static final class TarBuilder { + + private final Path file; + private boolean zipped = false; + private boolean extract = false; + private Path targetDir; + + public TarBuilder(final Path file) { + this.file = file; + } + + public TarBuilder zipped() { + zipped = true; + return this; + } + + public TarBuilder extract() { + extract = true; + return this; + } + + public TarBuilder targetDir(final Path dir) { + targetDir = dir; + return this; + } + + public String[] build() { + final List<String> commandsList = new ArrayList<>(4); + commandsList.add("tar"); + if (zipped) { + commandsList.add("-z"); + } + if (extract) { + commandsList.add("-x"); + } + if (targetDir != null) { + commandsList.add("--directory"); + commandsList.add(targetDir.toAbsolutePath().toString()); + } + commandsList.add("-f"); + commandsList.add(file.toAbsolutePath().toString()); + return commandsList.toArray(new String[commandsList.size()]); + } + } +} diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java new file mode 100644 index 00000000000..7031d88c890 --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final List<AutoClosablePath> filesToDelete = new ArrayList<>(4); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir=<path> ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + Files.copy(originalConfig, backupConfig); + filesToDelete.add(new AutoClosablePath(backupConfig)); + } + + @Override + protected void after() { + try { + stopFlinkCluster(); + } catch (IOException e) { + LOG.error("Failure while shutting down Flink cluster.", e); + } + + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + + try { + Files.move(backupConfig, originalConfig, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + LOG.error("Failed to restore flink-conf.yaml", e); + } + + for (AutoCloseable fileToDelete : filesToDelete) { + try { + fileToDelete.close(); + } catch (Exception e) { + LOG.error("Failure while cleaning up file.", e); + } + } + } + + public void startFlinkCluster() throws IOException { + AutoClosableProcess.runBlocking("Start Flink cluster", bin.resolve("start-cluster.sh").toAbsolutePath().toString()); + + final OkHttpClient client = new OkHttpClient(); + + final Request request = new Request.Builder() + .get() + .url("http://localhost:8081/taskmanagers") + .build(); + + Exception reportedException = null; + for (int retryAttempt = 0; retryAttempt < 30; retryAttempt++) { + try (Response response = client.newCall(request).execute()) { + if (response.isSuccessful()) { + final String json = response.body().string(); + final JsonNode taskManagerList = OBJECT_MAPPER.readTree(json) + .get("taskmanagers"); + + if (taskManagerList != null && taskManagerList.size() > 0) { + LOG.info("Dispatcher REST endpoint is up."); + return; + } + } + } catch (IOException ioe) { + reportedException = ExceptionUtils.firstOrSuppressed(ioe, reportedException); + } + + LOG.info("Waiting for dispatcher REST endpoint to come up..."); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + reportedException = ExceptionUtils.firstOrSuppressed(e, reportedException); + } + } + throw new AssertionError("Dispatcher REST endpoint did not start in time.", reportedException); + } + + public void stopFlinkCluster() throws IOException { + AutoClosableProcess.runBlocking("Stop Flink Cluster", bin.resolve("stop-cluster.sh").toAbsolutePath().toString()); + } + + public void copyOptJarsToLib(String jarNamePrefix) throws FileNotFoundException, IOException { + final Optional<Path> reporterJarOptional = Files.walk(opt) + .filter(path -> path.getFileName().toString().startsWith(jarNamePrefix)) + .findFirst(); + if (reporterJarOptional.isPresent()) { + final Path optReporterJar = reporterJarOptional.get(); + final Path libReporterJar = lib.resolve(optReporterJar.getFileName()); + Files.copy(optReporterJar, libReporterJar); + filesToDelete.add(new AutoClosablePath(libReporterJar)); + } else { + throw new FileNotFoundException("No jar could be found matching the pattern " + jarNamePrefix + "."); + } + } + + public void appendConfiguration(Configuration config) throws IOException { + final Configuration mergedConfig = new Configuration(); + mergedConfig.addAll(defaultConfig); + mergedConfig.addAll(config); + + final List<String> configurationLines = mergedConfig.toMap().entrySet().stream() + .map(entry -> entry.getKey() + ": " + entry.getValue()) + .collect(Collectors.toList()); + + Files.write(conf.resolve("flink-conf.yaml"), configurationLines); + } + + public Stream<String> searchAllLogs(Pattern pattern, Function<Matcher, String> matchProcessor) throws IOException { + final List<String> matches = new ArrayList<>(2); + + try (Stream<Path> logFilesStream = Files.list(log)) { + final Iterator<Path> logFiles = logFilesStream.iterator(); + while (logFiles.hasNext()) { + final Path logFile = logFiles.next(); + if (!logFile.getFileName().toString().endsWith(".log")) { + // ignore logs for previous runs that have a number suffix + continue; + } + try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(logFile.toFile()), StandardCharsets.UTF_8))) { + String line; + while ((line = br.readLine()) != null) { + Matcher matcher = pattern.matcher(line); + if (matcher.matches()) { + matches.add(matchProcessor.apply(matcher)); + } + } + } + } + } + return matches.stream(); + } +} diff --git a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/pom.xml b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/pom.xml new file mode 100644 index 00000000000..1efb40ed38e --- /dev/null +++ b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/pom.xml @@ -0,0 +1,104 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <parent> + <artifactId>flink-end-to-end-tests</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.8-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-metrics-reporter-prometheus-test</artifactId> + <version>1.8-SNAPSHOT</version> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-prometheus</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.squareup.okhttp3</groupId> + <artifactId>okhttp</artifactId> + <version>3.7.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-jackson</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-end-to-end-tests-common</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils-junit</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>e2e-prometheus</id> + <activation> + <property> + <name>e2e-metrics</name> + </property> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <executions> + <execution> + <id>e2e-prometheus</id> + <phase>integration-test</phase> + <goals> + <goal>test</goal> + </goals> + <configuration> + <includes> + <include>**/*ITCase.*</include> + </includes> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + +</project> diff --git a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java new file mode 100644 index 00000000000..269754eaf69 --- /dev/null +++ b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus.tests; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.prometheus.PrometheusReporter; +import org.apache.flink.tests.util.AutoClosableProcess; +import org.apache.flink.tests.util.CommandLineWrapper; +import org.apache.flink.tests.util.FlinkDistribution; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.flink.tests.util.AutoClosableProcess.runBlocking; +import static org.apache.flink.tests.util.AutoClosableProcess.runNonBlocking; + +/** + * End-to-end test for the PrometheusReporter. + */ +public class PrometheusReporterEndToEndITCase extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(PrometheusReporterEndToEndITCase.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final String PROMETHEUS_VERSION = "2.4.3"; + private static final String PROMETHEUS_FILE_NAME; + + static { + final String base = "prometheus-" + PROMETHEUS_VERSION + '.'; + switch (OperatingSystem.getCurrentOperatingSystem()) { + case MAC_OS: + PROMETHEUS_FILE_NAME = base + "darwin-amd64"; + break; + case WINDOWS: + PROMETHEUS_FILE_NAME = base + "windows-amd64"; + break; + default: + PROMETHEUS_FILE_NAME = base + "linux-amd64"; + break; + } + } + + private static final Pattern LOG_REPORTER_PORT_PATTERN = Pattern.compile(".*Started PrometheusReporter HTTP server on port ([0-9]+).*"); + + @BeforeClass + public static void checkOS() { + Assume.assumeFalse("This test does not run on Windows.", OperatingSystem.isWindows()); + } + + @Rule + public final FlinkDistribution dist = new FlinkDistribution(); + + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + + @Test + public void testReporter() throws Exception { + dist.copyOptJarsToLib("flink-metrics-prometheus"); + + final Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, PrometheusReporter.class.getCanonicalName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom.port", "9000-9100"); + + dist.appendConfiguration(config); + + final Path tmpPrometheusDir = tmp.newFolder().toPath().resolve("prometheus"); + final Path prometheusArchive = tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME + ".tar.gz"); + final Path prometheusBinDir = tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME); + final Path prometheusConfig = prometheusBinDir.resolve("prometheus.yml"); + final Path prometheusBinary = prometheusBinDir.resolve("prometheus"); + Files.createDirectory(tmpPrometheusDir); + + runBlocking( + "Download of Prometheus", + Duration.ofMinutes(5), + CommandLineWrapper + .wget("https://github.com/prometheus/prometheus/releases/download/v" + PROMETHEUS_VERSION + '/' + prometheusArchive.getFileName()) + .targetDir(tmpPrometheusDir) + .build()); + + runBlocking("Extraction of Prometheus archive", + CommandLineWrapper + .tar(prometheusArchive) + .extract() + .zipped() + .targetDir(tmpPrometheusDir) + .build()); + + runBlocking("Set Prometheus scrape interval", + CommandLineWrapper + .sed("s/\\(scrape_interval:\\).*/\\1 1s/", prometheusConfig) + .inPlace() + .build()); + + dist.startFlinkCluster(); + + final List<Integer> ports = dist + .searchAllLogs(LOG_REPORTER_PORT_PATTERN, matcher -> matcher.group(1)) + .map(Integer::valueOf) + .collect(Collectors.toList()); + + final String scrapeTargets = ports.stream() + .map(port -> "'localhost:" + port + "'") + .collect(Collectors.joining(", ")); + + runBlocking("Set Prometheus scrape targets to (" + scrapeTargets + ")", + CommandLineWrapper + .sed("s/\\(targets:\\).*/\\1 [" + scrapeTargets + "]/", prometheusConfig) + .inPlace() + .build()); + + try (AutoClosableProcess prometheus = runNonBlocking( + "Start Prometheus server", + prometheusBinary.toAbsolutePath().toString(), + "--config.file=" + prometheusConfig.toAbsolutePath(), + "--storage.tsdb.path=" + prometheusBinDir.resolve("data").toAbsolutePath())) { + + final OkHttpClient client = new OkHttpClient(); + + checkMetricAvailability(client, "flink_jobmanager_numRegisteredTaskManagers"); + checkMetricAvailability(client, "flink_taskmanager_Status_Network_TotalMemorySegments"); + } + } + + private static void checkMetricAvailability(final OkHttpClient client, final String metric) throws InterruptedException { + final Request jobManagerRequest = new Request.Builder() + .get() + .url("http://localhost:9090/api/v1/query?query=" + metric) + .build(); + + Exception reportedException = null; + for (int x = 0; x < 30; x++) { + try (Response response = client.newCall(jobManagerRequest).execute()) { + if (response.isSuccessful()) { + final String json = response.body().string(); + + // Sample response: + //{ + // "status": "success", + // "data": { + // "resultType": "vector", + // "result": [{ + // "metric": { + // "__name__": "flink_jobmanager_numRegisteredTaskManagers", + // "host": "localhost", + // "instance": "localhost:9000", + // "job": "prometheus" + // }, + // "value": [1540548500.107, "1"] + // }] + // } + //} + OBJECT_MAPPER.readTree(json) + .get("data") + .get("result") + .get(0) + .get("value") + .get(1).asInt(); + // if we reach this point some value for the given metric was reported to prometheus + return; + } else { + LOG.info("Retrieving metric failed. Retrying... " + response.code() + ":" + response.message()); + Thread.sleep(1000); + } + } catch (Exception e) { + reportedException = ExceptionUtils.firstOrSuppressed(e, reportedException); + Thread.sleep(1000); + } + } + throw new AssertionError("Could not retrieve metric " + metric + " from Prometheus.", reportedException); + } +} diff --git a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/resources/log4j-test.properties b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/resources/log4j-test.properties new file mode 100644 index 00000000000..f7425cd14c0 --- /dev/null +++ b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=INFO, testlogger + +# A1 is set to be a ConsoleAppender. +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%m%n diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index 07b65f9a1e3..4f132f923b9 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -56,6 +56,8 @@ under the License. <module>flink-streaming-file-sink-test</module> <module>flink-state-evolution-test</module> <module>flink-e2e-test-utils</module> + <module>flink-end-to-end-tests-common</module> + <module>flink-metrics-reporter-prometheus-test</module> </modules> <build> @@ -67,6 +69,22 @@ under the License. <skip>true</skip> </configuration> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <executions> + <!-- Do not execute any tests by default. + E2E tests should specify an additional execution within a profile. --> + <execution> + <id>default-test</id> + <phase>none</phase> + </execution> + <execution> + <id>integration-tests</id> + <phase>none</phase> + </execution> + </executions> + </plugin> </plugins> <pluginManagement> ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
