This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e25e5213054442dd3bb913437e56bfaf4f00a37b Author: Alexander Fedulov <[email protected]> AuthorDate: Fri Feb 21 15:50:36 2020 +0100 [FLINK-16222][metrics][prometheus] Add plugin e2e test --- .../tests/PrometheusReporterEndToEndITCase.java | 92 +++++++++++++++++++--- 1 file changed, 83 insertions(+), 9 deletions(-) diff --git a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java index 1dba568..5bde088 100644 --- a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java +++ b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java @@ -21,6 +21,7 @@ package org.apache.flink.metrics.prometheus.tests; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.prometheus.PrometheusReporter; +import org.apache.flink.metrics.prometheus.PrometheusReporterFactory; import org.apache.flink.tests.util.AutoClosableProcess; import org.apache.flink.tests.util.CommandLineWrapper; import org.apache.flink.tests.util.cache.DownloadCache; @@ -46,15 +47,22 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; import java.util.List; +import java.util.function.Consumer; import java.util.regex.Pattern; import java.util.stream.Collectors; +import static org.apache.flink.metrics.prometheus.tests.PrometheusReporterEndToEndITCase.TestParams.InstantiationType.FACTORY; +import static org.apache.flink.metrics.prometheus.tests.PrometheusReporterEndToEndITCase.TestParams.InstantiationType.REFLECTION; import static org.apache.flink.tests.util.AutoClosableProcess.runBlocking; import static org.apache.flink.tests.util.AutoClosableProcess.runNonBlocking; @@ -62,6 +70,7 @@ import static org.apache.flink.tests.util.AutoClosableProcess.runNonBlocking; * End-to-end test for the PrometheusReporter. */ @Category(TravisGroup1.class) +@RunWith(Parameterized.class) public class PrometheusReporterEndToEndITCase extends TestLogger { private static final Logger LOG = LoggerFactory.getLogger(PrometheusReporterEndToEndITCase.class); @@ -70,6 +79,7 @@ public class PrometheusReporterEndToEndITCase extends TestLogger { private static final String PROMETHEUS_VERSION = "2.4.3"; private static final String PROMETHEUS_FILE_NAME; + private static final String PROMETHEUS_JAR_PREFIX = "flink-metrics-prometheus"; static { final String base = "prometheus-" + PROMETHEUS_VERSION + '.'; @@ -114,13 +124,36 @@ public class PrometheusReporterEndToEndITCase extends TestLogger { Assume.assumeFalse("This test does not run on Windows.", OperatingSystem.isWindows()); } + @Parameterized.Parameters(name = "{index}: {0}") + public static Collection<TestParams> testParameters() { + return Arrays.asList( + TestParams.from("Jar in 'lib'", + builder -> builder.copyJar(PROMETHEUS_JAR_PREFIX, JarLocation.OPT, JarLocation.LIB), + REFLECTION), + TestParams.from("Jar in 'lib'", + builder -> builder.copyJar(PROMETHEUS_JAR_PREFIX, JarLocation.OPT, JarLocation.LIB), + FACTORY), + TestParams.from("Jar in 'plugins'", + builder -> builder.copyJar(PROMETHEUS_JAR_PREFIX, JarLocation.OPT, JarLocation.PLUGINS), + FACTORY), + TestParams.from("Jar in 'lib' and 'plugins'", + builder -> { + builder.copyJar(PROMETHEUS_JAR_PREFIX, JarLocation.OPT, JarLocation.LIB); + builder.copyJar(PROMETHEUS_JAR_PREFIX, JarLocation.OPT, JarLocation.PLUGINS); + }, + FACTORY) + ); + } + @Rule - public final FlinkResource dist = new LocalStandaloneFlinkResourceFactory() - .create(FlinkResourceSetup.builder() - .moveJar("flink-metrics-prometheus", JarLocation.OPT, JarLocation.LIB) - .addConfiguration(getFlinkConfig()) - .build()) - .get(); + public final FlinkResource dist; + + public PrometheusReporterEndToEndITCase(TestParams params) { + final FlinkResourceSetup.FlinkResourceSetupBuilder builder = FlinkResourceSetup.builder(); + params.getBuilderSetup().accept(builder); + builder.addConfiguration(getFlinkConfig(params.getInstantiationType())); + dist = new LocalStandaloneFlinkResourceFactory().create(builder.build()).get(); + } @Rule public final TemporaryFolder tmp = new TemporaryFolder(); @@ -128,11 +161,18 @@ public class PrometheusReporterEndToEndITCase extends TestLogger { @Rule public final DownloadCache downloadCache = DownloadCache.get(); - private static Configuration getFlinkConfig() { + private static Configuration getFlinkConfig(TestParams.InstantiationType instantiationType) { final Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, PrometheusReporter.class.getCanonicalName()); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom.port", "9000-9100"); + switch (instantiationType) { + case FACTORY: + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, PrometheusReporterFactory.class.getName()); + break; + case REFLECTION: + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, PrometheusReporter.class.getCanonicalName()); + } + + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom.port", "9000-9100"); return config; } @@ -244,4 +284,38 @@ public class PrometheusReporterEndToEndITCase extends TestLogger { } throw new AssertionError("Could not retrieve metric " + metric + " from Prometheus.", reportedException); } + + static class TestParams { + private final String jarLocationDescription; + private final Consumer<FlinkResourceSetup.FlinkResourceSetupBuilder> builderSetup; + private final InstantiationType instantiationType; + + private TestParams(String jarLocationDescription, Consumer<FlinkResourceSetup.FlinkResourceSetupBuilder> builderSetup, InstantiationType instantiationType) { + this.jarLocationDescription = jarLocationDescription; + this.builderSetup = builderSetup; + this.instantiationType = instantiationType; + } + + public static TestParams from(String jarLocationDesription, Consumer<FlinkResourceSetup.FlinkResourceSetupBuilder> builderSetup, InstantiationType instantiationType) { + return new TestParams(jarLocationDesription, builderSetup, instantiationType); + } + + public Consumer<FlinkResourceSetup.FlinkResourceSetupBuilder> getBuilderSetup() { + return builderSetup; + } + + public InstantiationType getInstantiationType() { + return instantiationType; + } + + @Override + public String toString() { + return jarLocationDescription + ", instantiated via " + instantiationType.name().toLowerCase(); + } + + public enum InstantiationType { + REFLECTION, + FACTORY + } + } }
