zhuzhurk commented on code in PR #21197:
URL: https://github.com/apache/flink/pull/21197#discussion_r1020004748
##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -192,10 +193,13 @@ public int currentParallelism() {
// OperatorCoordinator Interface
// ------------------------------------------------------------------------
- public void start() throws Exception {
+ public void start(MetricGroup metricGroup) throws Exception {
mainThreadExecutor.assertRunningInMainThread();
checkState(context.isInitialized(), "Coordinator Context is not yet
initialized");
coordinator.start();
+ if (Objects.nonNull(metricGroup)) {
Review Comment:
We should do `checkNotNull(metricGroup)` right after the above `checkState`.
And the if can be removed. Because we never expect the `metricGroup` to be
null.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##########
@@ -603,7 +603,7 @@ public final void startScheduling() {
executionGraph::registerJobStatusListener,
executionGraph.getStatusTimestamp(JobStatus.INITIALIZING),
jobStatusMetricsSettings);
- operatorCoordinatorHandler.startAllOperatorCoordinators();
+
operatorCoordinatorHandler.startAllOperatorCoordinators(this.jobManagerJobMetricGroup);
Review Comment:
Flink codestyle prefers to avoid using `this.`, unless in the construction
method. Because it unnecessarily complicated the code.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java:
##########
@@ -238,4 +240,13 @@ public OperatorCoordinator
create(OperatorCoordinator.Context context) {
return factory.apply(context);
}
}
+
+ @Override
+ public void registerMetrics(MetricGroup metricGroup) {
+ this.metricGroups.add(metricGroup);
Review Comment:
```suggestion
metricGroups.add(metricGroup);
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultOperatorCoordinatorHandlerTest.java:
##########
@@ -43,6 +46,7 @@
import static
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
import static org.apache.flink.runtime.jobgraph.DistributionPattern.ALL_TO_ALL;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.comparesEqualTo;
Review Comment:
Looks to me the matcher is introduced without use?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java:
##########
@@ -259,6 +265,22 @@ public void setExpectedExecuting(Consumer<ExecutionGraph>
asserter) {
executingStateValidator.expectInput(asserter);
}
+ @Override
+ public MetricGroup getJobMetricGroup() {
+ final CompletableFuture<Gauge<Long>> numRestartsMetricFuture = new
CompletableFuture<>();
Review Comment:
You can simply return an `UnregisteredJobManagerJobMetricGroup` for testing.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultOperatorCoordinatorHandlerTest.java:
##########
@@ -51,6 +55,12 @@ public class DefaultOperatorCoordinatorHandlerTest {
public static final TestExecutorResource<ScheduledExecutorService>
EXECUTOR_RESOURCE =
TestingUtils.defaultExecutorResource();
+ private MetricRegistryImpl registry =
+ new MetricRegistryImpl(
+
MetricRegistryTestUtils.defaultMetricRegistryConfiguration());
+ private final JobManagerMetricGroup jobManagerMetricGroup =
Review Comment:
You can simply return an `UnregisteredJobManagerJobMetricGroup` for testing.
`MetricRegistryImpl` is heavy and must be closed after the testing.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java:
##########
@@ -57,6 +60,12 @@ public class OperatorCoordinatorHolderTest extends
TestLogger {
private final GlobalFailureHandler globalFailureHandler = (t) ->
globalFailure = t;
private Throwable globalFailure;
+ private MetricRegistryImpl registry =
Review Comment:
You can simply return an `UnregisteredJobManagerJobMetricGroup` for testing.
`MetricRegistryImpl` is heavy and must be closed after the testing.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java:
##########
@@ -101,6 +102,15 @@ public interface OperatorCoordinator extends
CheckpointListener, AutoCloseable {
@Override
void close() throws Exception;
+ /**
+ * Init job manager metric group for current operator coordinator for
report metric.
+ * Called after start method.
+ * @throws Exception Any exception thrown from this method causes a full
job failure.
+ */
+ default void registerMetrics(MetricGroup metricGroup) {
+ // do nothing default.
Review Comment:
```suggestion
// do nothing by default
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OperatorCoordinatorHandler.java:
##########
@@ -78,5 +79,5 @@ CompletableFuture<CoordinationResponse>
deliverCoordinationRequestToCoordinator(
*/
void registerAndStartNewCoordinators(
Review Comment:
The documentation is not updated.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -1060,6 +1064,11 @@ public boolean canScaleUp(ExecutionGraph executionGraph)
{
return false;
}
+ @Override
+ public MetricGroup getJobMetricGroup() {
+ return this.jobManagerJobMetricGroup;
Review Comment:
```suggestion
return jobManagerJobMetricGroup;
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -290,7 +290,7 @@ private void updateTopology(final List<ExecutionJobVertex>
newlyInitializedJobVe
private void initializeOperatorCoordinatorsFor(ExecutionJobVertex vertex) {
operatorCoordinatorHandler.registerAndStartNewCoordinators(
- vertex.getOperatorCoordinators(), getMainThreadExecutor());
+ vertex.getOperatorCoordinators(), getMainThreadExecutor(),
this.jobManagerJobMetricGroup);
Review Comment:
```suggestion
vertex.getOperatorCoordinators(), getMainThreadExecutor(),
jobManagerJobMetricGroup);
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OperatorCoordinatorHandler.java:
##########
@@ -42,7 +43,7 @@ public interface OperatorCoordinatorHandler {
void initializeOperatorCoordinators(ComponentMainThreadExecutor
mainThreadExecutor);
/** Start all operator coordinators. */
Review Comment:
The documentation is not updated.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java:
##########
@@ -35,6 +36,7 @@
/** A simple testing implementation of the {@link OperatorCoordinator}. */
public class TestingOperatorCoordinator implements OperatorCoordinator {
+ private final List<MetricGroup> metricGroups = new ArrayList<>();
Review Comment:
Wrong indentation.
And it should be placed after the `static final` fields.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -27,6 +27,7 @@
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.testutils.ScheduledTask;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.OperatorIDPair;
Review Comment:
These changes are not needed.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -290,7 +290,7 @@ private void updateTopology(final List<ExecutionJobVertex>
newlyInitializedJobVe
private void initializeOperatorCoordinatorsFor(ExecutionJobVertex vertex) {
operatorCoordinatorHandler.registerAndStartNewCoordinators(
- vertex.getOperatorCoordinators(), getMainThreadExecutor());
+ vertex.getOperatorCoordinators(), getMainThreadExecutor(),
this.jobManagerJobMetricGroup);
Review Comment:
Besides that, the line is too long and violated the checkstyle.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java:
##########
@@ -466,6 +472,21 @@ public void
testBatchGlobalFailoverDoesNotNotifyLocalRestore() throws Exception
assertThat(coordinator.getRestoredTasks()).isEmpty();
}
+ @Test
+ public void testAllOperatorCoordinatorRegisterMetrics() throws Exception {
+ final DefaultScheduler scheduler =
+ setupTestJobAndScheduler(
+ new
TestingOperatorCoordinator.Provider(testOperatorId), null, null, true);
+ scheduleAllTasksToRunning(scheduler);
+ List<OperatorCoordinatorHolder> operatorCoordinatorHolderList = new
ArrayList<>(scheduler.getExecutionGraph()
+ .getJobVertex(testVertexId).getOperatorCoordinators());
+ assertEquals(1, operatorCoordinatorHolderList.size());
Review Comment:
We should use `assertJ` assertions in tests.
See
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]