This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3aa267b [FLINK-26076] Fix ArchUnit violations in
Source(Sink)MetricsITCase
3aa267b is described below
commit 3aa267bf511287e39b0a9d781b9aaf38843e1e91
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Feb 10 16:15:04 2022 +0100
[FLINK-26076] Fix ArchUnit violations in Source(Sink)MetricsITCase
This closes #18709
---
.../8bad5118-af5d-4976-ac57-382ed16f7f7e | 1 -
.../base/source/reader/SourceMetricsITCase.java | 42 ++++++++------------
.../flink/runtime/testutils/InMemoryReporter.java | 41 ++++++++++++--------
.../test/streaming/runtime/SinkMetricsITCase.java | 45 ++++++++--------------
4 files changed, 60 insertions(+), 69 deletions(-)
diff --git
a/flink-connectors/flink-connector-base/archunit-violations/8bad5118-af5d-4976-ac57-382ed16f7f7e
b/flink-connectors/flink-connector-base/archunit-violations/8bad5118-af5d-4976-ac57-382ed16f7f7e
index e82c8cd..1f0ea28 100644
---
a/flink-connectors/flink-connector-base/archunit-violations/8bad5118-af5d-4976-ac57-382ed16f7f7e
+++
b/flink-connectors/flink-connector-base/archunit-violations/8bad5118-af5d-4976-ac57-382ed16f7f7e
@@ -1,4 +1,3 @@
org.apache.flink.connector.base.sink.AsyncSinkBaseITCase does not satisfy:
contain any fields that are public, static, and of type MiniClusterExtension
and final and contain any fields that are public, static, and of type
AllCallbackWrapper and final and annotated with @RegisterExtension or contain
any fields that are public, static, and of type MiniClusterWithClientResource
and final and annotated with @ClassRule or contain any fields that is of type
MiniClusterWithClientResource and pu [...]
org.apache.flink.connector.base.source.reader.CoordinatedSourceITCase does not
satisfy: contain any fields that are public, static, and of type
MiniClusterExtension and final and contain any fields that are public, static,
and of type AllCallbackWrapper and final and annotated with @RegisterExtension
or contain any fields that are public, static, and of type
MiniClusterWithClientResource and final and annotated with @ClassRule or
contain any fields that is of type MiniClusterWithClientRe [...]
org.apache.flink.connector.base.source.reader.CoordinatedSourceRescaleITCase
does not satisfy: contain any fields that are public, static, and of type
MiniClusterExtension and final and contain any fields that are public, static,
and of type AllCallbackWrapper and final and annotated with @RegisterExtension
or contain any fields that are public, static, and of type
MiniClusterWithClientResource and final and annotated with @ClassRule or
contain any fields that is of type MiniClusterWithC [...]
-org.apache.flink.connector.base.source.reader.SourceMetricsITCase does not
satisfy: contain any fields that are public, static, and of type
MiniClusterExtension and final and contain any fields that are public, static,
and of type AllCallbackWrapper and final and annotated with @RegisterExtension
or contain any fields that are public, static, and of type
MiniClusterWithClientResource and final and annotated with @ClassRule or
contain any fields that is of type MiniClusterWithClientResour [...]
\ No newline at end of file
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
index 508e1ef..d298a26 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.base.source.reader;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
@@ -44,8 +45,7 @@ import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
@@ -74,29 +74,16 @@ public class SourceMetricsITCase extends TestLogger {
// this basically is the time a build is allowed to be frozen before the
test fails
private static final long WATERMARK_EPSILON =
Duration.ofHours(6).toMillis();
@Rule public final SharedObjects sharedObjects = SharedObjects.create();
- private InMemoryReporter reporter;
+ private static final InMemoryReporter reporter =
InMemoryReporter.createWithRetainedMetrics();
- private MiniClusterWithClientResource miniClusterResource;
-
- @Before
- public void setup() throws Exception {
- reporter = InMemoryReporter.createWithRetainedMetrics();
- Configuration configuration = new Configuration();
- reporter.addToConfiguration(configuration);
- miniClusterResource =
- new MiniClusterWithClientResource(
- new MiniClusterResourceConfiguration.Builder()
- .setNumberTaskManagers(1)
-
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
- .setConfiguration(configuration)
- .build());
- miniClusterResource.before();
- }
-
- @After
- public void teardown() {
- miniClusterResource.after();
- }
+ @ClassRule
+ public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+ .setConfiguration(reporter.addToConfiguration(new
Configuration()))
+ .build());
@Test
public void testMetricsWithTimestamp() throws Exception {
@@ -145,9 +132,11 @@ public class SourceMetricsITCase extends TestLogger {
});
stream.addSink(new DiscardingSink<>());
JobClient jobClient = env.executeAsync();
+ final JobID jobId = jobClient.getJobID();
beforeBarrier.get().await();
assertSourceMetrics(
+ jobId,
reporter,
stopAtRecord1 + 1,
numRecordsPerSplit,
@@ -158,6 +147,7 @@ public class SourceMetricsITCase extends TestLogger {
beforeBarrier.get().await();
assertSourceMetrics(
+ jobId,
reporter,
stopAtRecord2 + 1,
numRecordsPerSplit,
@@ -170,13 +160,15 @@ public class SourceMetricsITCase extends TestLogger {
}
private void assertSourceMetrics(
+ JobID jobId,
InMemoryReporter reporter,
long processedRecordsPerSubtask,
long numTotalPerSubtask,
int parallelism,
int numSplits,
boolean hasTimestamps) {
- List<OperatorMetricGroup> groups =
reporter.findOperatorMetricGroups("MetricTestingSource");
+ List<OperatorMetricGroup> groups =
+ reporter.findOperatorMetricGroups(jobId,
"MetricTestingSource");
assertThat(groups, hasSize(parallelism));
int subtaskWithMetrics = 0;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
index 2254841..1a992e8 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.testutils;
import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.LogicalScopeProvider;
@@ -27,6 +28,7 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.MetricReporterFactory;
+import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,6 +43,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -102,9 +105,9 @@ public class InMemoryReporter implements MetricReporter {
}
}
- public Map<String, Metric> getMetricsByIdentifiers() {
+ public Map<String, Metric> getMetricsByIdentifiers(JobID jobId) {
synchronized (this) {
- return getMetricStream().collect(Collectors.toMap(Entry::getKey,
Entry::getValue));
+ return
getMetricStream(jobId).collect(Collectors.toMap(Entry::getKey,
Entry::getValue));
}
}
@@ -123,16 +126,16 @@ public class InMemoryReporter implements MetricReporter {
}
}
- public Map<String, Metric> findMetrics(String identifierPattern) {
+ public Map<String, Metric> findMetrics(JobID jobId, String
identifierPattern) {
synchronized (this) {
- return getMetricStream(identifierPattern)
+ return getMetricStream(jobId, identifierPattern)
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
}
}
- public Optional<Metric> findMetric(String patternString) {
+ public Optional<Metric> findMetric(JobID jobId, String patternString) {
synchronized (this) {
- return
getMetricStream(patternString).map(Entry::getValue).findFirst();
+ return getMetricStream(jobId,
patternString).map(Entry::getValue).findFirst();
}
}
@@ -148,14 +151,15 @@ public class InMemoryReporter implements MetricReporter {
}
}
- public List<OperatorMetricGroup> findOperatorMetricGroups(String
operatorPattern) {
+ public List<OperatorMetricGroup> findOperatorMetricGroups(JobID jobId,
String operatorPattern) {
Pattern pattern = Pattern.compile(operatorPattern);
synchronized (this) {
return metrics.keySet().stream()
.filter(
g ->
g instanceof OperatorMetricGroup
- &&
pattern.matcher(getOperatorName(g)).find())
+ &&
pattern.matcher(getOperatorName(g)).find()
+ &&
getJobId(g).equals(jobId.toString()))
.map(OperatorMetricGroup.class::cast)
.sorted(Comparator.comparing(this::getSubtaskId))
.collect(Collectors.toList());
@@ -163,11 +167,15 @@ public class InMemoryReporter implements MetricReporter {
}
private String getSubtaskId(OperatorMetricGroup g) {
- return g.getScopeComponents()[g.getScopeComponents().length - 1];
+ return g.getAllVariables().get(ScopeFormat.SCOPE_TASK_SUBTASK_INDEX);
}
private String getOperatorName(MetricGroup g) {
- return g.getScopeComponents()[g.getScopeComponents().length - 2];
+ return g.getAllVariables().get(ScopeFormat.SCOPE_OPERATOR_NAME);
+ }
+
+ private String getJobId(MetricGroup g) {
+ return g.getAllVariables().get(ScopeFormat.SCOPE_JOB_ID);
}
@Override
@@ -195,13 +203,15 @@ public class InMemoryReporter implements MetricReporter {
}
}
- private Stream<Entry<String, Metric>> getMetricStream(String
identifierPattern) {
+ private Stream<Entry<String, Metric>> getMetricStream(JobID jobID, String
identifierPattern) {
Pattern pattern = Pattern.compile(identifierPattern);
- return getMetricStream().filter(m ->
pattern.matcher(m.getKey()).find());
+ return getMetricStream(jobID).filter(m ->
pattern.matcher(m.getKey()).find());
}
- private Stream<Entry<String, Metric>> getMetricStream() {
- return metrics.entrySet().stream().flatMap(this::getGroupMetricStream);
+ private Stream<Entry<String, Metric>> getMetricStream(JobID jobId) {
+ return metrics.entrySet().stream()
+ .filter(gr -> Objects.equals(getJobId(gr.getKey()),
jobId.toString()))
+ .flatMap(this::getGroupMetricStream);
}
private Stream<MetricGroup> getGroupStream(String groupPattern) {
@@ -231,7 +241,7 @@ public class InMemoryReporter implements MetricReporter {
: group;
}
- public void addToConfiguration(Configuration configuration) {
+ public Configuration addToConfiguration(Configuration configuration) {
configuration.setString(
ConfigConstants.METRICS_REPORTER_PREFIX
+ "mini_cluster_resource_reporter."
@@ -240,6 +250,7 @@ public class InMemoryReporter implements MetricReporter {
configuration.setString(
ConfigConstants.METRICS_REPORTER_PREFIX +
"mini_cluster_resource_reporter." + ID,
id.toString());
+ return configuration;
}
/** The factory for the {@link InMemoryReporter}. */
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
index f91a013..e2e4203 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
@@ -17,6 +17,7 @@
package org.apache.flink.test.streaming.runtime;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.configuration.Configuration;
@@ -35,8 +36,7 @@ import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.TestLogger;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
@@ -55,29 +55,16 @@ import static org.hamcrest.Matchers.hasSize;
public class SinkMetricsITCase extends TestLogger {
private static final int DEFAULT_PARALLELISM = 4;
@Rule public final SharedObjects sharedObjects = SharedObjects.create();
- private InMemoryReporter reporter;
-
- private MiniClusterWithClientResource miniClusterResource;
-
- @Before
- public void setup() throws Exception {
- reporter = InMemoryReporter.createWithRetainedMetrics();
- Configuration configuration = new Configuration();
- reporter.addToConfiguration(configuration);
- miniClusterResource =
- new MiniClusterWithClientResource(
- new MiniClusterResourceConfiguration.Builder()
- .setNumberTaskManagers(1)
-
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
- .setConfiguration(configuration)
- .build());
- miniClusterResource.before();
- }
+ private static final InMemoryReporter reporter =
InMemoryReporter.createWithRetainedMetrics();
- @After
- public void teardown() {
- miniClusterResource.after();
- }
+ @ClassRule
+ public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+ .setConfiguration(reporter.addToConfiguration(new
Configuration()))
+ .build());
@Test
public void testMetrics() throws Exception {
@@ -112,21 +99,23 @@ public class SinkMetricsITCase extends TestLogger {
.sinkTo(TestSink.newBuilder().setWriter(new
MetricWriter()).build())
.name("MetricTestSink");
JobClient jobClient = env.executeAsync();
+ final JobID jobId = jobClient.getJobID();
beforeBarrier.get().await();
- assertSinkMetrics(stopAtRecord1, env.getParallelism(), numSplits);
+ assertSinkMetrics(jobId, stopAtRecord1, env.getParallelism(),
numSplits);
afterBarrier.get().await();
beforeBarrier.get().await();
- assertSinkMetrics(stopAtRecord2, env.getParallelism(), numSplits);
+ assertSinkMetrics(jobId, stopAtRecord2, env.getParallelism(),
numSplits);
afterBarrier.get().await();
jobClient.getJobExecutionResult().get();
}
private void assertSinkMetrics(
- long processedRecordsPerSubtask, int parallelism, int numSplits) {
- List<OperatorMetricGroup> groups =
reporter.findOperatorMetricGroups("MetricTestSink");
+ JobID jobId, long processedRecordsPerSubtask, int parallelism, int
numSplits) {
+ List<OperatorMetricGroup> groups =
+ reporter.findOperatorMetricGroups(jobId, "MetricTestSink");
assertThat(groups, hasSize(parallelism));
int subtaskWithMetrics = 0;