reswqa commented on code in PR #21132:
URL: https://github.com/apache/flink/pull/21132#discussion_r1003944639
##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java:
##########
@@ -43,120 +43,77 @@
import org.junit.Test;
Review Comment:
we should use `org.junit.jupiter.api.Test` instead of `org.junit.Test`
##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java:
##########
@@ -43,120 +43,77 @@
import org.junit.Test;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
/** Tests for the MetricFetcher. */
public class MetricFetcherTest extends TestLogger {
@Test
public void testUpdate() {
Review Comment:
```suggestion
void testUpdate() {
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java:
##########
@@ -43,120 +43,77 @@
import org.junit.Test;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
/** Tests for the MetricFetcher. */
public class MetricFetcherTest extends TestLogger {
Review Comment:
```suggestion
@ExtendWith(TestLoggerExtension.class)
class MetricFetcherTest {
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java:
##########
@@ -43,120 +43,77 @@
import org.junit.Test;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
/** Tests for the MetricFetcher. */
public class MetricFetcherTest extends TestLogger {
@Test
public void testUpdate() {
final Time timeout = Time.seconds(10L);
-
- // ========= setup TaskManager
- //
=================================================================================
-
JobID jobID = new JobID();
ResourceID tmRID = ResourceID.generate();
- // ========= setup QueryServices
- //
================================================================================
-
- final MetricQueryServiceGateway jmQueryService =
- new TestingMetricQueryServiceGateway.Builder()
- .setQueryMetricsSupplier(
- () ->
- CompletableFuture.completedFuture(
- new MetricDumpSerialization
-
.MetricSerializationResult(
- new byte[0],
- new byte[0],
- new byte[0],
- new byte[0],
- 0,
- 0,
- 0,
- 0)))
- .build();
-
- MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer
=
- createRequestDumpAnswer(tmRID, jobID);
- final MetricQueryServiceGateway tmQueryService =
- new TestingMetricQueryServiceGateway.Builder()
- .setQueryMetricsSupplier(
- () ->
CompletableFuture.completedFuture(requestMetricsAnswer))
- .build();
-
- // ========= setup JobManager
- //
==================================================================================
-
- final TestingRestfulGateway restfulGateway =
- new TestingRestfulGateway.Builder()
- .setRequestMultipleJobDetailsSupplier(
- () ->
- CompletableFuture.completedFuture(
- new
MultipleJobsDetails(Collections.emptyList())))
- .setRequestMetricQueryServiceGatewaysSupplier(
- () ->
- CompletableFuture.completedFuture(
-
Collections.singleton(jmQueryService.getAddress())))
-
.setRequestTaskManagerMetricQueryServiceGatewaysSupplier(
- () ->
- CompletableFuture.completedFuture(
- Collections.singleton(
- Tuple2.of(
- tmRID,
-
tmQueryService.getAddress()))))
- .build();
-
- final GatewayRetriever<RestfulGateway> retriever =
- () -> CompletableFuture.completedFuture(restfulGateway);
-
- // ========= start MetricFetcher testing
- //
=======================================================================
+ // Create metric fetcher
MetricFetcher fetcher =
- new MetricFetcherImpl<>(
- retriever,
- address ->
CompletableFuture.completedFuture(tmQueryService),
- Executors.directExecutor(),
+ createMetricFetcherWithServiceGateways(
+ jobID,
+ tmRID,
timeout,
-
MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue());
+
MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue(),
+ 0,
+ null);
// verify that update fetches metrics and updates the store
fetcher.update();
MetricStore store = fetcher.getMetricStore();
synchronized (store) {
- assertEquals("7",
store.getJobManagerMetricStore().getMetric("abc.hist_min"));
- assertEquals("6",
store.getJobManagerMetricStore().getMetric("abc.hist_max"));
- assertEquals("4.0",
store.getJobManagerMetricStore().getMetric("abc.hist_mean"));
- assertEquals("0.5",
store.getJobManagerMetricStore().getMetric("abc.hist_median"));
- assertEquals("5.0",
store.getJobManagerMetricStore().getMetric("abc.hist_stddev"));
- assertEquals("0.75",
store.getJobManagerMetricStore().getMetric("abc.hist_p75"));
- assertEquals("0.9",
store.getJobManagerMetricStore().getMetric("abc.hist_p90"));
- assertEquals("0.95",
store.getJobManagerMetricStore().getMetric("abc.hist_p95"));
- assertEquals("0.98",
store.getJobManagerMetricStore().getMetric("abc.hist_p98"));
- assertEquals("0.99",
store.getJobManagerMetricStore().getMetric("abc.hist_p99"));
- assertEquals("0.999",
store.getJobManagerMetricStore().getMetric("abc.hist_p999"));
-
- assertEquals(
- "x",
-
store.getTaskManagerMetricStore(tmRID.toString()).metrics.get("abc.gauge"));
- assertEquals("5.0",
store.getJobMetricStore(jobID.toString()).metrics.get("abc.jc"));
- assertEquals(
- "2",
- store.getTaskMetricStore(jobID.toString(),
"taskid").metrics.get("2.abc.tc"));
- assertEquals(
- "1",
- store.getTaskMetricStore(jobID.toString(), "taskid")
- .metrics
- .get("2.opname.abc.oc"));
+
assertThat("7").isEqualTo(store.getJobManagerMetricStore().getMetric("abc.hist_min"));
Review Comment:
```suggestion
assertThat(store.getJobManagerMetricStore().getMetric("abc.hist_min")).isEqualTo("7");
```
Other places should also be modified like this
##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java:
##########
@@ -263,7 +220,151 @@ public void testShortUpdateInterval() throws
InterruptedException {
fetcher.update();
- assertThat(requestMetricQueryServiceGatewaysCounter.get(), is(2));
+
assertThat(requestMetricQueryServiceGatewaysCounter.get()).isEqualTo(2);
+ }
+
+ @Test
+ public void testIgnoreUpdateRequestWhenFetchingMetrics() throws
InterruptedException {
+ final long updateInterval = 1000L;
+ final long waitTimeBeforeReturnMetricResults = updateInterval * 2;
+ final Time timeout = Time.seconds(10L);
+ final AtomicInteger requestMetricQueryServiceGatewaysCounter = new
AtomicInteger(0);
+ final JobID jobID = new JobID();
+ final ResourceID tmRID = ResourceID.generate();
+
+ // Create metric fetcher
+ MetricFetcher fetcher =
+ createMetricFetcherWithServiceGateways(
+ jobID,
+ tmRID,
+ timeout,
+ updateInterval,
+ waitTimeBeforeReturnMetricResults,
+ requestMetricQueryServiceGatewaysCounter);
+
+ fetcher.update();
+
+ final long start = System.currentTimeMillis();
+ long difference = 0L;
+
+ while (difference <= updateInterval) {
+ Thread.sleep((int) (updateInterval * 1.5f));
+ difference = System.currentTimeMillis() - start;
+ }
+
+ fetcher.update();
+
+
assertThat(requestMetricQueryServiceGatewaysCounter.get()).isEqualTo(1);
+ }
+
+ @Nonnull
+ private MetricFetcher createMetricFetcherWithServiceGateways(
+ JobID jobID,
+ ResourceID tmRID,
+ Time timeout,
+ long updateInterval,
+ long waitTimeBeforeReturnMetricResults,
+ @Nullable AtomicInteger requestMetricQueryServiceGatewaysCounter) {
+ final ExecutorService executor =
java.util.concurrent.Executors.newSingleThreadExecutor();
+ // ========= setup QueryServices
+ //
================================================================================
+
+ final MetricQueryServiceGateway jmQueryService =
+ new TestingMetricQueryServiceGateway.Builder()
+ .setQueryMetricsSupplier(
+ () ->
+ CompletableFuture.completedFuture(
+ new MetricDumpSerialization
+
.MetricSerializationResult(
+ new byte[0],
+ new byte[0],
+ new byte[0],
+ new byte[0],
+ 0,
+ 0,
+ 0,
+ 0)))
+ .build();
+
+ MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer
=
+ createRequestDumpAnswer(tmRID, jobID);
+ final MetricQueryServiceGateway tmQueryService =
+ new TestingMetricQueryServiceGateway.Builder()
+ .setQueryMetricsSupplier(
+ () -> {
+ if (waitTimeBeforeReturnMetricResults > 0)
{
+ CompletableFuture<
+ MetricDumpSerialization
+
.MetricSerializationResult>
+ metricsAnswerFuture = new
CompletableFuture<>();
+ CompletableFuture.completedFuture(null)
+ .thenRunAsync(
+ waitTimeMs(
+
waitTimeBeforeReturnMetricResults),
+ executor)
+ .whenCompleteAsync(
+ (ignore, throwable) ->
{
+ if (throwable !=
null) {
+
fail(throwable.getMessage());
Review Comment:
We should try to avoid using the `fail` method. Maybe we can use other
approach to do the similar logic, such as making the future complete with
exception.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java:
##########
@@ -263,7 +220,151 @@ public void testShortUpdateInterval() throws
InterruptedException {
fetcher.update();
- assertThat(requestMetricQueryServiceGatewaysCounter.get(), is(2));
+
assertThat(requestMetricQueryServiceGatewaysCounter.get()).isEqualTo(2);
+ }
+
+ @Test
+ public void testIgnoreUpdateRequestWhenFetchingMetrics() throws
InterruptedException {
Review Comment:
```suggestion
void testIgnoreUpdateRequestWhenFetchingMetrics() throws
InterruptedException {
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java:
##########
@@ -263,7 +220,151 @@ public void testShortUpdateInterval() throws
InterruptedException {
fetcher.update();
- assertThat(requestMetricQueryServiceGatewaysCounter.get(), is(2));
+
assertThat(requestMetricQueryServiceGatewaysCounter.get()).isEqualTo(2);
+ }
+
+ @Test
+ public void testIgnoreUpdateRequestWhenFetchingMetrics() throws
InterruptedException {
+ final long updateInterval = 1000L;
+ final long waitTimeBeforeReturnMetricResults = updateInterval * 2;
+ final Time timeout = Time.seconds(10L);
+ final AtomicInteger requestMetricQueryServiceGatewaysCounter = new
AtomicInteger(0);
+ final JobID jobID = new JobID();
+ final ResourceID tmRID = ResourceID.generate();
+
+ // Create metric fetcher
+ MetricFetcher fetcher =
+ createMetricFetcherWithServiceGateways(
+ jobID,
+ tmRID,
+ timeout,
+ updateInterval,
+ waitTimeBeforeReturnMetricResults,
+ requestMetricQueryServiceGatewaysCounter);
+
+ fetcher.update();
+
+ final long start = System.currentTimeMillis();
+ long difference = 0L;
+
+ while (difference <= updateInterval) {
+ Thread.sleep((int) (updateInterval * 1.5f));
+ difference = System.currentTimeMillis() - start;
+ }
+
+ fetcher.update();
+
+
assertThat(requestMetricQueryServiceGatewaysCounter.get()).isEqualTo(1);
+ }
+
+ @Nonnull
Review Comment:
If I remember correctly, it is non-null by default in flink, except for
special case that we annotations as @Nullable
##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java:
##########
@@ -239,7 +196,7 @@ public void testLongUpdateInterval() {
fetcher.update();
fetcher.update();
- assertThat(requestMetricQueryServiceGatewaysCounter.get(), is(1));
+
assertThat(requestMetricQueryServiceGatewaysCounter.get()).isEqualTo(1);
Review Comment:
```suggestion
assertThat(requestMetricQueryServiceGatewaysCounter).hasValue(1);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java:
##########
@@ -263,7 +220,151 @@ public void testShortUpdateInterval() throws
InterruptedException {
fetcher.update();
- assertThat(requestMetricQueryServiceGatewaysCounter.get(), is(2));
+
assertThat(requestMetricQueryServiceGatewaysCounter.get()).isEqualTo(2);
Review Comment:
```suggestion
assertThat(requestMetricQueryServiceGatewaysCounter).hasValue(2);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java:
##########
@@ -50,9 +50,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
Review Comment:
Sure, I will take a look.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java:
##########
@@ -263,7 +220,151 @@ public void testShortUpdateInterval() throws
InterruptedException {
fetcher.update();
- assertThat(requestMetricQueryServiceGatewaysCounter.get(), is(2));
+
assertThat(requestMetricQueryServiceGatewaysCounter.get()).isEqualTo(2);
+ }
+
+ @Test
+ public void testIgnoreUpdateRequestWhenFetchingMetrics() throws
InterruptedException {
+ final long updateInterval = 1000L;
+ final long waitTimeBeforeReturnMetricResults = updateInterval * 2;
+ final Time timeout = Time.seconds(10L);
+ final AtomicInteger requestMetricQueryServiceGatewaysCounter = new
AtomicInteger(0);
+ final JobID jobID = new JobID();
+ final ResourceID tmRID = ResourceID.generate();
+
+ // Create metric fetcher
+ MetricFetcher fetcher =
+ createMetricFetcherWithServiceGateways(
+ jobID,
+ tmRID,
+ timeout,
+ updateInterval,
+ waitTimeBeforeReturnMetricResults,
+ requestMetricQueryServiceGatewaysCounter);
+
+ fetcher.update();
+
+ final long start = System.currentTimeMillis();
+ long difference = 0L;
+
+ while (difference <= updateInterval) {
+ Thread.sleep((int) (updateInterval * 1.5f));
+ difference = System.currentTimeMillis() - start;
+ }
+
+ fetcher.update();
+
+
assertThat(requestMetricQueryServiceGatewaysCounter.get()).isEqualTo(1);
Review Comment:
```suggestion
assertThat(requestMetricQueryServiceGatewaysCounter).hasValue(1);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java:
##########
@@ -263,7 +220,151 @@ public void testShortUpdateInterval() throws
InterruptedException {
fetcher.update();
- assertThat(requestMetricQueryServiceGatewaysCounter.get(), is(2));
+
assertThat(requestMetricQueryServiceGatewaysCounter.get()).isEqualTo(2);
+ }
+
+ @Test
+ public void testIgnoreUpdateRequestWhenFetchingMetrics() throws
InterruptedException {
+ final long updateInterval = 1000L;
+ final long waitTimeBeforeReturnMetricResults = updateInterval * 2;
+ final Time timeout = Time.seconds(10L);
+ final AtomicInteger requestMetricQueryServiceGatewaysCounter = new
AtomicInteger(0);
+ final JobID jobID = new JobID();
+ final ResourceID tmRID = ResourceID.generate();
+
+ // Create metric fetcher
+ MetricFetcher fetcher =
+ createMetricFetcherWithServiceGateways(
+ jobID,
+ tmRID,
+ timeout,
+ updateInterval,
+ waitTimeBeforeReturnMetricResults,
+ requestMetricQueryServiceGatewaysCounter);
+
+ fetcher.update();
+
+ final long start = System.currentTimeMillis();
+ long difference = 0L;
+
+ while (difference <= updateInterval) {
+ Thread.sleep((int) (updateInterval * 1.5f));
+ difference = System.currentTimeMillis() - start;
+ }
+
+ fetcher.update();
+
+
assertThat(requestMetricQueryServiceGatewaysCounter.get()).isEqualTo(1);
+ }
+
+ @Nonnull
+ private MetricFetcher createMetricFetcherWithServiceGateways(
+ JobID jobID,
+ ResourceID tmRID,
+ Time timeout,
+ long updateInterval,
+ long waitTimeBeforeReturnMetricResults,
+ @Nullable AtomicInteger requestMetricQueryServiceGatewaysCounter) {
+ final ExecutorService executor =
java.util.concurrent.Executors.newSingleThreadExecutor();
+ // ========= setup QueryServices
+ //
================================================================================
+
+ final MetricQueryServiceGateway jmQueryService =
+ new TestingMetricQueryServiceGateway.Builder()
+ .setQueryMetricsSupplier(
+ () ->
+ CompletableFuture.completedFuture(
+ new MetricDumpSerialization
+
.MetricSerializationResult(
+ new byte[0],
+ new byte[0],
+ new byte[0],
+ new byte[0],
+ 0,
+ 0,
+ 0,
+ 0)))
+ .build();
+
+ MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer
=
+ createRequestDumpAnswer(tmRID, jobID);
+ final MetricQueryServiceGateway tmQueryService =
+ new TestingMetricQueryServiceGateway.Builder()
+ .setQueryMetricsSupplier(
+ () -> {
+ if (waitTimeBeforeReturnMetricResults > 0)
{
+ CompletableFuture<
+ MetricDumpSerialization
+
.MetricSerializationResult>
+ metricsAnswerFuture = new
CompletableFuture<>();
+ CompletableFuture.completedFuture(null)
+ .thenRunAsync(
+ waitTimeMs(
+
waitTimeBeforeReturnMetricResults),
+ executor)
+ .whenCompleteAsync(
+ (ignore, throwable) ->
{
+ if (throwable !=
null) {
+
fail(throwable.getMessage());
+ }
+
metricsAnswerFuture.complete(
+
requestMetricsAnswer);
+ });
+ return metricsAnswerFuture;
+ } else {
+ return
CompletableFuture.completedFuture(
+ requestMetricsAnswer);
+ }
+ })
+ .build();
+
+ // ========= setup JobManager
+ //
==================================================================================
+
+ final TestingRestfulGateway restfulGateway =
+ new TestingRestfulGateway.Builder()
+ .setRequestMultipleJobDetailsSupplier(
+ () ->
+ CompletableFuture.completedFuture(
+ new
MultipleJobsDetails(Collections.emptyList())))
+ .setRequestMetricQueryServiceGatewaysSupplier(
+ () -> {
+ if
(requestMetricQueryServiceGatewaysCounter != null) {
+
requestMetricQueryServiceGatewaysCounter.incrementAndGet();
+ }
+ return CompletableFuture.completedFuture(
+
Collections.singleton(jmQueryService.getAddress()));
+ })
+
.setRequestTaskManagerMetricQueryServiceGatewaysSupplier(
+ () ->
+ CompletableFuture.completedFuture(
+ Collections.singleton(
+ Tuple2.of(
+ tmRID,
+
tmQueryService.getAddress()))))
+ .build();
+
+ final GatewayRetriever<RestfulGateway> retriever =
+ () -> CompletableFuture.completedFuture(restfulGateway);
+
+ // ========= start MetricFetcher testing
+ //
=======================================================================
+ return new MetricFetcherImpl<>(
+ retriever,
+ address -> CompletableFuture.completedFuture(tmQueryService),
+ Executors.directExecutor(),
+ timeout,
+ updateInterval);
+ }
+
+ private static Runnable waitTimeMs(long sleepTimeMs) {
+ return () -> {
+ try {
+ Thread.sleep(sleepTimeMs);
+ } catch (Throwable throwable) {
+ fail(throwable.getMessage());
Review Comment:
Maybe just throw an exception directly is ok.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java:
##########
@@ -43,120 +43,77 @@
import org.junit.Test;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
/** Tests for the MetricFetcher. */
public class MetricFetcherTest extends TestLogger {
Review Comment:
Unless otherwise specified, the test class and methods should be
package-private.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]