This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1732b71b045b22375376864c5a8fab998dbaa428 Author: Chesnay Schepler <[email protected]> AuthorDate: Fri Jun 26 14:39:37 2020 +0200 [FLINK-18435][metrics] Add support for intercepting reflection-based instantiations --- .../metrics/reporter/InstantiateViaFactory.java | 3 ++ .../InterceptInstantiationViaReflection.java | 37 ++++++++++++++++++++++ .../flink/runtime/metrics/ReporterSetup.java | 27 +++++++++++++--- .../flink/runtime/metrics/ReporterSetupTest.java | 35 ++++++++++++++++++++ 4 files changed, 98 insertions(+), 4 deletions(-) diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java index 454ec91..a5c6ed1 100644 --- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java @@ -28,6 +28,9 @@ import java.lang.annotation.Target; * backwards-compatibility with existing reflection-based configurations. * * <p>When an annotated reporter is configured to be used via reflection the given factory will be used instead. + * + * <p>Attention: This annotation does not work if the reporter is loaded as a plugin. For these cases, annotate the + * factory with {@link InterceptInstantiationViaReflection} instead. */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InterceptInstantiationViaReflection.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InterceptInstantiationViaReflection.java new file mode 100644 index 0000000..6a6f05b --- /dev/null +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InterceptInstantiationViaReflection.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.reporter; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation for {@link MetricReporterFactory MetricReporterFactories} that want to maintain + * backwards-compatibility with existing reflection-based configurations. + * + * <p>When a reporter is configured to be used via reflection the annotated factory will be used instead. + * + * @see InstantiateViaFactory + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface InterceptInstantiationViaReflection { + String reporterClassName(); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java index da15079..0427918 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.MetricOptions; import org.apache.flink.core.plugin.PluginManager; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.reporter.InstantiateViaFactory; +import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.MetricReporterFactory; import org.apache.flink.runtime.metrics.scope.ScopeFormat; @@ -259,6 +260,17 @@ public final class ReporterSetup { } if (reporterClassName != null) { + final Optional<MetricReporterFactory> interceptingFactory = reporterFactories.values().stream() + .filter(factory -> { + InterceptInstantiationViaReflection annotation = factory.getClass().getAnnotation(InterceptInstantiationViaReflection.class); + return annotation != null && annotation.reporterClassName().equals(reporterClassName); + }) + .findAny(); + + if (interceptingFactory.isPresent()) { + return loadViaFactory(reporterConfig, interceptingFactory.get()); + } + return loadViaReflection(reporterClassName, reporterName, reporterConfig, reporterFactories); } @@ -278,13 +290,20 @@ public final class ReporterSetup { LOG.warn("The reporter factory ({}) could not be found for reporter {}. Available factories: {}.", factoryClassName, reporterName, reporterFactories.keySet()); return Optional.empty(); } else { - final MetricConfig metricConfig = new MetricConfig(); - reporterConfig.addAllToProperties(metricConfig); - - return Optional.of(factory.createMetricReporter(metricConfig)); + return loadViaFactory(reporterConfig, factory); } } + private static Optional<MetricReporter> loadViaFactory( + final Configuration reporterConfig, + final MetricReporterFactory factory) { + + final MetricConfig metricConfig = new MetricConfig(); + reporterConfig.addAllToProperties(metricConfig); + + return Optional.of(factory.createMetricReporter(metricConfig)); + } + private static Optional<MetricReporter> loadViaReflection( final String reporterClassName, final String reporterName, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java index f687c66..728f783 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java @@ -21,8 +21,10 @@ package org.apache.flink.runtime.metrics; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.core.plugin.TestingPluginManager; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.reporter.InstantiateViaFactory; +import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.MetricReporterFactory; import org.apache.flink.runtime.metrics.scope.ScopeFormat; @@ -32,6 +34,7 @@ import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.Test; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Properties; @@ -342,6 +345,26 @@ public class ReporterSetupTest extends TestLogger { } /** + * Verifies that the factory approach is used if the factory is annotated with {@link org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection}. + */ + @Test + public void testReflectionInterception() { + final Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, InstantiationTypeTrackingTestReporter.class.getName()); + + final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, new TestingPluginManager(Collections.singletonMap( + MetricReporterFactory.class, + Collections.singletonList(new InterceptingInstantiationTypeTrackingTestReporterFactory()).iterator()))); + + assertEquals(1, reporterSetups.size()); + + final ReporterSetup reporterSetup = reporterSetups.get(0); + final InstantiationTypeTrackingTestReporter metricReporter = (InstantiationTypeTrackingTestReporter) reporterSetup.getReporter(); + + assertTrue(metricReporter.createdByFactory); + } + + /** * Factory that exposed the last provided metric config. */ public static class ConfigExposingReporterFactory implements MetricReporterFactory { @@ -391,6 +414,18 @@ public class ReporterSetupTest extends TestLogger { } /** + * Factory for {@link InstantiationTypeTrackingTestReporter} that intercepts reflection-based instantiation attempts. + */ + @InterceptInstantiationViaReflection(reporterClassName = "org.apache.flink.runtime.metrics.ReporterSetupTest$InstantiationTypeTrackingTestReporter") + public static class InterceptingInstantiationTypeTrackingTestReporterFactory implements MetricReporterFactory { + + @Override + public MetricReporter createMetricReporter(Properties config) { + return new InstantiationTypeTrackingTestReporter(true); + } + } + + /** * Reporter that exposes which constructor was called. */ protected static class InstantiationTypeTrackingTestReporter extends TestReporter {
