zentol closed pull request #7003: [FLINK-10633][prometheus] Add E2E test
URL: https://github.com/apache/flink/pull/7003
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml
new file mode 100644
index 00000000000..1ef4a1bd954
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+       <parent>
+               <artifactId>flink-end-to-end-tests</artifactId>
+               <groupId>org.apache.flink</groupId>
+               <version>1.8-SNAPSHOT</version>
+       </parent>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <artifactId>flink-end-to-end-tests-common</artifactId>
+       <version>1.8-SNAPSHOT</version>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>com.squareup.okhttp3</groupId>
+                       <artifactId>okhttp</artifactId>
+                       <version>3.7.0</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-shaded-jackson</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>junit</groupId>
+                       <artifactId>junit</artifactId>
+                       <version>${junit.version}</version>
+                       <scope>compile</scope>
+               </dependency>
+               <dependency>
+                       <!-- To ensure that flink-dist is built beforehand -->
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-dist_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+       </dependencies>
+
+</project>
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosablePath.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosablePath.java
new file mode 100644
index 00000000000..0098889ead6
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosablePath.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+/**
+ * Utility class to delete a given {@link Path} when exiting a 
try-with-resources statement.
+ */
+public final class AutoClosablePath implements AutoCloseable {
+
+       private final Path path;
+
+       public AutoClosablePath(final Path path) {
+               Preconditions.checkNotNull(path, "Path must not be null.");
+               Preconditions.checkArgument(path.isAbsolute(), "Path must be 
absolute.");
+               this.path = path;
+       }
+
+       @Override
+       public void close() throws IOException {
+               FileUtils.deleteFileOrDirectory(path.toFile());
+       }
+}
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
new file mode 100644
index 00000000000..02359302e13
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Utility class to terminate a given {@link Process} when exiting a 
try-with-resources statement.
+ */
+public class AutoClosableProcess implements AutoCloseable {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(AutoClosableProcess.class);
+
+       private final Process process;
+
+       public AutoClosableProcess(final Process process) {
+               Preconditions.checkNotNull(process);
+               this.process = process;
+       }
+
+       public static AutoClosableProcess runNonBlocking(String step, String... 
commands) throws IOException {
+               LOG.info("Step Started: " + step);
+               Process process = new ProcessBuilder()
+                       .command(commands)
+                       .inheritIO()
+                       .start();
+               return new AutoClosableProcess(process);
+       }
+
+       public static void runBlocking(String step, String... commands) throws 
IOException {
+               runBlocking(step, Duration.ofSeconds(30), commands);
+       }
+
+       public static void runBlocking(String step, Duration timeout, String... 
commands) throws IOException {
+               LOG.info("Step started: " + step);
+               Process process = new ProcessBuilder()
+                       .command(commands)
+                       .inheritIO()
+                       .start();
+
+               try (AutoClosableProcess autoProcess = new 
AutoClosableProcess(process)) {
+                       final boolean success = 
process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS);
+                       if (!success) {
+                               throw new TimeoutException();
+                       }
+               } catch (TimeoutException | InterruptedException e) {
+                       throw new RuntimeException(step + " failed due to 
timeout.");
+               }
+               LOG.info("Step complete: " + step);
+       }
+
+       @Override
+       public void close() throws IOException {
+               if (process.isAlive()) {
+                       process.destroy();
+                       try {
+                               process.waitFor(10, TimeUnit.SECONDS);
+                       } catch (InterruptedException e) {
+                               Thread.currentThread().interrupt();
+                       }
+               }
+       }
+}
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/CommandLineWrapper.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/CommandLineWrapper.java
new file mode 100644
index 00000000000..50fd2f81f02
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/CommandLineWrapper.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility class for setting up command-line tool usages in a readable fashion.
+ */
+public enum CommandLineWrapper {
+       ;
+
+       public static WGetBuilder wget(String url) {
+               return new WGetBuilder(url);
+       }
+
+       /**
+        * Wrapper around wget used for downloading files.
+        */
+       public static final class WGetBuilder {
+
+               private final String url;
+               private Path targetDir;
+
+               WGetBuilder(String url) {
+                       this.url = url;
+               }
+
+               public WGetBuilder targetDir(Path dir) {
+                       this.targetDir = dir;
+                       return this;
+               }
+
+               public String[] build() {
+                       final List<String> commandsList = new ArrayList<>(5);
+                       commandsList.add("wget");
+                       commandsList.add("-q"); // silent
+                       //commandsList.add("--show-progress"); // enable 
progress bar
+                       if (targetDir != null) {
+                               commandsList.add("-P");
+                               
commandsList.add(targetDir.toAbsolutePath().toString());
+                       }
+                       commandsList.add(url);
+                       return commandsList.toArray(new 
String[commandsList.size()]);
+               }
+       }
+
+       public static SedBuilder sed(final String command, final Path file) {
+               return new SedBuilder(command, file);
+       }
+
+       /**
+        * Wrapper around sed used for processing text.
+        */
+       public static final class SedBuilder {
+
+               private final String command;
+               private final Path file;
+
+               private boolean inPlace = false;
+
+               SedBuilder(final String command, final Path file) {
+                       this.command = command;
+                       this.file = file;
+               }
+
+               public SedBuilder inPlace() {
+                       inPlace = true;
+                       return this;
+               }
+
+               public String[] build() {
+                       final List<String> commandsList = new ArrayList<>(5);
+                       commandsList.add("sed");
+                       if (inPlace) {
+                               commandsList.add("-i");
+                       }
+                       commandsList.add("-e");
+                       commandsList.add(command);
+                       commandsList.add(file.toAbsolutePath().toString());
+                       return commandsList.toArray(new 
String[commandsList.size()]);
+               }
+       }
+
+       public static TarBuilder tar(final Path file) {
+               return new TarBuilder(file);
+       }
+
+       /**
+        * Wrapper around tar used for extracting .tar archives.
+        */
+       public static final class TarBuilder {
+
+               private final Path file;
+               private boolean zipped = false;
+               private boolean extract = false;
+               private Path targetDir;
+
+               public TarBuilder(final Path file) {
+                       this.file = file;
+               }
+
+               public TarBuilder zipped() {
+                       zipped = true;
+                       return this;
+               }
+
+               public TarBuilder extract() {
+                       extract = true;
+                       return this;
+               }
+
+               public TarBuilder targetDir(final Path dir) {
+                       targetDir = dir;
+                       return this;
+               }
+
+               public String[] build() {
+                       final List<String> commandsList = new ArrayList<>(4);
+                       commandsList.add("tar");
+                       if (zipped) {
+                               commandsList.add("-z");
+                       }
+                       if (extract) {
+                               commandsList.add("-x");
+                       }
+                       if (targetDir != null) {
+                               commandsList.add("--directory");
+                               
commandsList.add(targetDir.toAbsolutePath().toString());
+                       }
+                       commandsList.add("-f");
+                       commandsList.add(file.toAbsolutePath().toString());
+                       return commandsList.toArray(new 
String[commandsList.size()]);
+               }
+       }
+}
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
new file mode 100644
index 00000000000..7031d88c890
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+       private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+       private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+       private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+       private final List<AutoClosablePath> filesToDelete = new ArrayList<>(4);
+
+       private final Path opt;
+       private final Path lib;
+       private final Path conf;
+       private final Path log;
+       private final Path bin;
+
+       private Configuration defaultConfig;
+
+       public FlinkDistribution() {
+               final String distDirProperty = System.getProperty("distDir");
+               if (distDirProperty == null) {
+                       Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir=<path> .");
+               }
+               final Path flinkDir = Paths.get(distDirProperty);
+               bin = flinkDir.resolve("bin");
+               opt = flinkDir.resolve("opt");
+               lib = flinkDir.resolve("lib");
+               conf = flinkDir.resolve("conf");
+               log = flinkDir.resolve("log");
+       }
+
+       @Override
+       protected void before() throws IOException {
+               defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+               final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+               final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+               Files.copy(originalConfig, backupConfig);
+               filesToDelete.add(new AutoClosablePath(backupConfig));
+       }
+
+       @Override
+       protected void after() {
+               try {
+                       stopFlinkCluster();
+               } catch (IOException e) {
+                       LOG.error("Failure while shutting down Flink cluster.", 
e);
+               }
+
+               final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+               final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+
+               try {
+                       Files.move(backupConfig, originalConfig, 
StandardCopyOption.REPLACE_EXISTING);
+               } catch (IOException e) {
+                       LOG.error("Failed to restore flink-conf.yaml", e);
+               }
+
+               for (AutoCloseable fileToDelete : filesToDelete) {
+                       try {
+                               fileToDelete.close();
+                       } catch (Exception e) {
+                               LOG.error("Failure while cleaning up file.", e);
+                       }
+               }
+       }
+
+       public void startFlinkCluster() throws IOException {
+               AutoClosableProcess.runBlocking("Start Flink cluster", 
bin.resolve("start-cluster.sh").toAbsolutePath().toString());
+
+               final OkHttpClient client = new OkHttpClient();
+
+               final Request request = new Request.Builder()
+                       .get()
+                       .url("http://localhost:8081/taskmanagers";)
+                       .build();
+
+               Exception reportedException = null;
+               for (int retryAttempt = 0; retryAttempt < 30; retryAttempt++) {
+                       try (Response response = 
client.newCall(request).execute()) {
+                               if (response.isSuccessful()) {
+                                       final String json = 
response.body().string();
+                                       final JsonNode taskManagerList = 
OBJECT_MAPPER.readTree(json)
+                                               .get("taskmanagers");
+
+                                       if (taskManagerList != null && 
taskManagerList.size() > 0) {
+                                               LOG.info("Dispatcher REST 
endpoint is up.");
+                                               return;
+                                       }
+                               }
+                       } catch (IOException ioe) {
+                               reportedException = 
ExceptionUtils.firstOrSuppressed(ioe, reportedException);
+                       }
+
+                       LOG.info("Waiting for dispatcher REST endpoint to come 
up...");
+                       try {
+                               Thread.sleep(1000);
+                       } catch (InterruptedException e) {
+                               Thread.currentThread().interrupt();
+                               reportedException = 
ExceptionUtils.firstOrSuppressed(e, reportedException);
+                       }
+               }
+               throw new AssertionError("Dispatcher REST endpoint did not 
start in time.", reportedException);
+       }
+
+       public void stopFlinkCluster() throws IOException {
+               AutoClosableProcess.runBlocking("Stop Flink Cluster", 
bin.resolve("stop-cluster.sh").toAbsolutePath().toString());
+       }
+
+       public void copyOptJarsToLib(String jarNamePrefix) throws 
FileNotFoundException, IOException {
+               final Optional<Path> reporterJarOptional = Files.walk(opt)
+                       .filter(path -> 
path.getFileName().toString().startsWith(jarNamePrefix))
+                       .findFirst();
+               if (reporterJarOptional.isPresent()) {
+                       final Path optReporterJar = reporterJarOptional.get();
+                       final Path libReporterJar = 
lib.resolve(optReporterJar.getFileName());
+                       Files.copy(optReporterJar, libReporterJar);
+                       filesToDelete.add(new AutoClosablePath(libReporterJar));
+               } else {
+                       throw new FileNotFoundException("No jar could be found 
matching the pattern " + jarNamePrefix + ".");
+               }
+       }
+
+       public void appendConfiguration(Configuration config) throws 
IOException {
+               final Configuration mergedConfig = new Configuration();
+               mergedConfig.addAll(defaultConfig);
+               mergedConfig.addAll(config);
+
+               final List<String> configurationLines = 
mergedConfig.toMap().entrySet().stream()
+                       .map(entry -> entry.getKey() + ": " + entry.getValue())
+                       .collect(Collectors.toList());
+
+               Files.write(conf.resolve("flink-conf.yaml"), 
configurationLines);
+       }
+
+       public Stream<String> searchAllLogs(Pattern pattern, Function<Matcher, 
String> matchProcessor) throws IOException {
+               final List<String> matches = new ArrayList<>(2);
+
+               try (Stream<Path> logFilesStream = Files.list(log)) {
+                       final Iterator<Path> logFiles = 
logFilesStream.iterator();
+                       while (logFiles.hasNext()) {
+                               final Path logFile = logFiles.next();
+                               if 
(!logFile.getFileName().toString().endsWith(".log")) {
+                                       // ignore logs for previous runs that 
have a number suffix
+                                       continue;
+                               }
+                               try (BufferedReader br = new BufferedReader(new 
InputStreamReader(new FileInputStream(logFile.toFile()), 
StandardCharsets.UTF_8))) {
+                                       String line;
+                                       while ((line = br.readLine()) != null) {
+                                               Matcher matcher = 
pattern.matcher(line);
+                                               if (matcher.matches()) {
+                                                       
matches.add(matchProcessor.apply(matcher));
+                                               }
+                                       }
+                               }
+                       }
+               }
+               return matches.stream();
+       }
+}
diff --git 
a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/pom.xml 
b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/pom.xml
new file mode 100644
index 00000000000..1efb40ed38e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/pom.xml
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+       <parent>
+               <artifactId>flink-end-to-end-tests</artifactId>
+               <groupId>org.apache.flink</groupId>
+               <version>1.8-SNAPSHOT</version>
+       </parent>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <artifactId>flink-metrics-reporter-prometheus-test</artifactId>
+       <version>1.8-SNAPSHOT</version>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-metrics-prometheus</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>com.squareup.okhttp3</groupId>
+                       <artifactId>okhttp</artifactId>
+                       <version>3.7.0</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-shaded-jackson</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-end-to-end-tests-common</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils-junit</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+
+       <profiles>
+               <profile>
+                       <id>e2e-prometheus</id>
+                       <activation>
+                               <property>
+                                       <name>e2e-metrics</name>
+                               </property>
+                       </activation>
+                       <build>
+                               <plugins>
+                                       <plugin>
+                                               
<groupId>org.apache.maven.plugins</groupId>
+                                               
<artifactId>maven-surefire-plugin</artifactId>
+                                               <executions>
+                                                       <execution>
+                                                               
<id>e2e-prometheus</id>
+                                                               
<phase>integration-test</phase>
+                                                               <goals>
+                                                                       
<goal>test</goal>
+                                                               </goals>
+                                                               <configuration>
+                                                                       
<includes>
+                                                                               
<include>**/*ITCase.*</include>
+                                                                       
</includes>
+                                                               </configuration>
+                                                       </execution>
+                                               </executions>
+                                       </plugin>
+                               </plugins>
+                       </build>
+               </profile>
+       </profiles>
+
+</project>
diff --git 
a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
 
b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
new file mode 100644
index 00000000000..269754eaf69
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus.tests;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.prometheus.PrometheusReporter;
+import org.apache.flink.tests.util.AutoClosableProcess;
+import org.apache.flink.tests.util.CommandLineWrapper;
+import org.apache.flink.tests.util.FlinkDistribution;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.tests.util.AutoClosableProcess.runBlocking;
+import static org.apache.flink.tests.util.AutoClosableProcess.runNonBlocking;
+
+/**
+ * End-to-end test for the PrometheusReporter.
+ */
+public class PrometheusReporterEndToEndITCase extends TestLogger {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusReporterEndToEndITCase.class);
+
+       private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+       private static final String PROMETHEUS_VERSION = "2.4.3";
+       private static final String PROMETHEUS_FILE_NAME;
+
+       static {
+               final String base = "prometheus-" + PROMETHEUS_VERSION + '.';
+               switch (OperatingSystem.getCurrentOperatingSystem()) {
+                       case MAC_OS:
+                               PROMETHEUS_FILE_NAME = base + "darwin-amd64";
+                               break;
+                       case WINDOWS:
+                               PROMETHEUS_FILE_NAME = base + "windows-amd64";
+                               break;
+                       default:
+                               PROMETHEUS_FILE_NAME = base + "linux-amd64";
+                               break;
+               }
+       }
+
+       private static final Pattern LOG_REPORTER_PORT_PATTERN = 
Pattern.compile(".*Started PrometheusReporter HTTP server on port ([0-9]+).*");
+
+       @BeforeClass
+       public static void checkOS() {
+               Assume.assumeFalse("This test does not run on Windows.", 
OperatingSystem.isWindows());
+       }
+
+       @Rule
+       public final FlinkDistribution dist = new FlinkDistribution();
+
+       @Rule
+       public final TemporaryFolder tmp = new TemporaryFolder();
+
+       @Test
+       public void testReporter() throws Exception {
+               dist.copyOptJarsToLib("flink-metrics-prometheus");
+
+               final Configuration config = new Configuration();
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"prom." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
PrometheusReporter.class.getCanonicalName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"prom.port", "9000-9100");
+
+               dist.appendConfiguration(config);
+
+               final Path tmpPrometheusDir = 
tmp.newFolder().toPath().resolve("prometheus");
+               final Path prometheusArchive = 
tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME + ".tar.gz");
+               final Path prometheusBinDir = 
tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME);
+               final Path prometheusConfig = 
prometheusBinDir.resolve("prometheus.yml");
+               final Path prometheusBinary = 
prometheusBinDir.resolve("prometheus");
+               Files.createDirectory(tmpPrometheusDir);
+
+               runBlocking(
+                       "Download of Prometheus",
+                       Duration.ofMinutes(5),
+                       CommandLineWrapper
+                               
.wget("https://github.com/prometheus/prometheus/releases/download/v"; + 
PROMETHEUS_VERSION + '/' + prometheusArchive.getFileName())
+                               .targetDir(tmpPrometheusDir)
+                               .build());
+
+               runBlocking("Extraction of Prometheus archive",
+                       CommandLineWrapper
+                               .tar(prometheusArchive)
+                               .extract()
+                               .zipped()
+                               .targetDir(tmpPrometheusDir)
+                               .build());
+
+               runBlocking("Set Prometheus scrape interval",
+                       CommandLineWrapper
+                               .sed("s/\\(scrape_interval:\\).*/\\1 1s/", 
prometheusConfig)
+                               .inPlace()
+                               .build());
+
+               dist.startFlinkCluster();
+
+               final List<Integer> ports = dist
+                       .searchAllLogs(LOG_REPORTER_PORT_PATTERN, matcher -> 
matcher.group(1))
+                       .map(Integer::valueOf)
+                       .collect(Collectors.toList());
+
+               final String scrapeTargets = ports.stream()
+                       .map(port -> "'localhost:" + port + "'")
+                       .collect(Collectors.joining(", "));
+
+               runBlocking("Set Prometheus scrape targets to (" + 
scrapeTargets + ")",
+                       CommandLineWrapper
+                               .sed("s/\\(targets:\\).*/\\1 [" + scrapeTargets 
+ "]/", prometheusConfig)
+                               .inPlace()
+                               .build());
+
+               try (AutoClosableProcess prometheus = runNonBlocking(
+                       "Start Prometheus server",
+                       prometheusBinary.toAbsolutePath().toString(),
+                       "--config.file=" + prometheusConfig.toAbsolutePath(),
+                       "--storage.tsdb.path=" + 
prometheusBinDir.resolve("data").toAbsolutePath())) {
+
+                       final OkHttpClient client = new OkHttpClient();
+
+                       checkMetricAvailability(client, 
"flink_jobmanager_numRegisteredTaskManagers");
+                       checkMetricAvailability(client, 
"flink_taskmanager_Status_Network_TotalMemorySegments");
+               }
+       }
+
+       private static void checkMetricAvailability(final OkHttpClient client, 
final String metric) throws InterruptedException {
+               final Request jobManagerRequest = new Request.Builder()
+                       .get()
+                       .url("http://localhost:9090/api/v1/query?query="; + 
metric)
+                       .build();
+
+               Exception reportedException = null;
+               for (int x = 0; x < 30; x++) {
+                       try (Response response = 
client.newCall(jobManagerRequest).execute()) {
+                               if (response.isSuccessful()) {
+                                       final String json = 
response.body().string();
+
+                                       // Sample response:
+                                       //{
+                                       //      "status": "success",
+                                       //      "data": {
+                                       //              "resultType": "vector",
+                                       //              "result": [{
+                                       //                      "metric": {
+                                       //                              
"__name__": "flink_jobmanager_numRegisteredTaskManagers",
+                                       //                              "host": 
"localhost",
+                                       //                              
"instance": "localhost:9000",
+                                       //                              "job": 
"prometheus"
+                                       //                      },
+                                       //                      "value": 
[1540548500.107, "1"]
+                                       //              }]
+                                       //      }
+                                       //}
+                                       OBJECT_MAPPER.readTree(json)
+                                               .get("data")
+                                               .get("result")
+                                               .get(0)
+                                               .get("value")
+                                               .get(1).asInt();
+                                       // if we reach this point some value 
for the given metric was reported to prometheus
+                                       return;
+                               } else {
+                                       LOG.info("Retrieving metric failed. 
Retrying... " + response.code() + ":" + response.message());
+                                       Thread.sleep(1000);
+                               }
+                       } catch (Exception e) {
+                               reportedException = 
ExceptionUtils.firstOrSuppressed(e, reportedException);
+                               Thread.sleep(1000);
+                       }
+               }
+               throw new AssertionError("Could not retrieve metric " + metric 
+ " from Prometheus.", reportedException);
+       }
+}
diff --git 
a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/resources/log4j-test.properties
 
b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/resources/log4j-test.properties
new file mode 100644
index 00000000000..f7425cd14c0
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=INFO, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%m%n
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 07b65f9a1e3..4f132f923b9 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -56,6 +56,8 @@ under the License.
                <module>flink-streaming-file-sink-test</module>
                <module>flink-state-evolution-test</module>
                <module>flink-e2e-test-utils</module>
+               <module>flink-end-to-end-tests-common</module>
+               <module>flink-metrics-reporter-prometheus-test</module>
        </modules>
 
        <build>
@@ -67,6 +69,22 @@ under the License.
                                        <skip>true</skip>
                                </configuration>
                        </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <executions>
+                                       <!-- Do not execute any tests by 
default.
+                                               E2E tests should specify an 
additional execution within a profile. -->
+                                       <execution>
+                                               <id>default-test</id>
+                                               <phase>none</phase>
+                                       </execution>
+                                       <execution>
+                                               <id>integration-tests</id>
+                                               <phase>none</phase>
+                                       </execution>
+                               </executions>
+                       </plugin>
                </plugins>
 
                <pluginManagement>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to