zhuzhurk commented on code in PR #21197:
URL: https://github.com/apache/flink/pull/21197#discussion_r1020004748


##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -192,10 +193,13 @@ public int currentParallelism() {
     //  OperatorCoordinator Interface
     // ------------------------------------------------------------------------
 
-    public void start() throws Exception {
+    public void start(MetricGroup metricGroup) throws Exception {
         mainThreadExecutor.assertRunningInMainThread();
         checkState(context.isInitialized(), "Coordinator Context is not yet 
initialized");
         coordinator.start();
+        if (Objects.nonNull(metricGroup)) {

Review Comment:
   We should do `checkNotNull(metricGroup)` right after the above `checkState`.
   And the if can be removed. Because we never expect the `metricGroup` to be 
null.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##########
@@ -603,7 +603,7 @@ public final void startScheduling() {
                 executionGraph::registerJobStatusListener,
                 executionGraph.getStatusTimestamp(JobStatus.INITIALIZING),
                 jobStatusMetricsSettings);
-        operatorCoordinatorHandler.startAllOperatorCoordinators();
+        
operatorCoordinatorHandler.startAllOperatorCoordinators(this.jobManagerJobMetricGroup);

Review Comment:
   Flink codestyle prefers to avoid using `this.`, unless in the construction 
method. Because it unnecessarily complicated the code.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java:
##########
@@ -238,4 +240,13 @@ public OperatorCoordinator 
create(OperatorCoordinator.Context context) {
             return factory.apply(context);
         }
     }
+
+    @Override
+    public void registerMetrics(MetricGroup metricGroup) {
+       this.metricGroups.add(metricGroup);

Review Comment:
   ```suggestion
         metricGroups.add(metricGroup);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultOperatorCoordinatorHandlerTest.java:
##########
@@ -43,6 +46,7 @@
 import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
 import static org.apache.flink.runtime.jobgraph.DistributionPattern.ALL_TO_ALL;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.comparesEqualTo;

Review Comment:
   Looks to me the matcher is introduced without use?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java:
##########
@@ -259,6 +265,22 @@ public void setExpectedExecuting(Consumer<ExecutionGraph> 
asserter) {
             executingStateValidator.expectInput(asserter);
         }
 
+        @Override
+        public MetricGroup getJobMetricGroup() {
+            final CompletableFuture<Gauge<Long>> numRestartsMetricFuture = new 
CompletableFuture<>();

Review Comment:
   You can simply return an `UnregisteredJobManagerJobMetricGroup` for testing.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultOperatorCoordinatorHandlerTest.java:
##########
@@ -51,6 +55,12 @@ public class DefaultOperatorCoordinatorHandlerTest {
     public static final TestExecutorResource<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
             TestingUtils.defaultExecutorResource();
 
+    private MetricRegistryImpl registry =
+            new MetricRegistryImpl(
+                    
MetricRegistryTestUtils.defaultMetricRegistryConfiguration());
+    private final JobManagerMetricGroup jobManagerMetricGroup =

Review Comment:
   You can simply return an `UnregisteredJobManagerJobMetricGroup` for testing.
   `MetricRegistryImpl` is heavy and must be closed after the testing.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java:
##########
@@ -57,6 +60,12 @@ public class OperatorCoordinatorHolderTest extends 
TestLogger {
     private final GlobalFailureHandler globalFailureHandler = (t) -> 
globalFailure = t;
     private Throwable globalFailure;
 
+    private MetricRegistryImpl registry =

Review Comment:
   You can simply return an `UnregisteredJobManagerJobMetricGroup` for testing.
   `MetricRegistryImpl` is heavy and must be closed after the testing.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java:
##########
@@ -101,6 +102,15 @@ public interface OperatorCoordinator extends 
CheckpointListener, AutoCloseable {
     @Override
     void close() throws Exception;
 
+    /**
+     * Init job manager metric group for current operator coordinator for 
report metric.
+     * Called after start method.
+     * @throws Exception Any exception thrown from this method causes a full 
job failure.
+     */
+    default void registerMetrics(MetricGroup metricGroup) {
+        // do nothing default.

Review Comment:
   ```suggestion
           // do nothing by default
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OperatorCoordinatorHandler.java:
##########
@@ -78,5 +79,5 @@ CompletableFuture<CoordinationResponse> 
deliverCoordinationRequestToCoordinator(
      */
     void registerAndStartNewCoordinators(

Review Comment:
   The documentation is not updated.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -1060,6 +1064,11 @@ public boolean canScaleUp(ExecutionGraph executionGraph) 
{
         return false;
     }
 
+    @Override
+    public MetricGroup getJobMetricGroup() {
+        return this.jobManagerJobMetricGroup;

Review Comment:
   ```suggestion
           return jobManagerJobMetricGroup;
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -290,7 +290,7 @@ private void updateTopology(final List<ExecutionJobVertex> 
newlyInitializedJobVe
 
     private void initializeOperatorCoordinatorsFor(ExecutionJobVertex vertex) {
         operatorCoordinatorHandler.registerAndStartNewCoordinators(
-                vertex.getOperatorCoordinators(), getMainThreadExecutor());
+                vertex.getOperatorCoordinators(), getMainThreadExecutor(), 
this.jobManagerJobMetricGroup);

Review Comment:
   ```suggestion
                   vertex.getOperatorCoordinators(), getMainThreadExecutor(), 
jobManagerJobMetricGroup);
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OperatorCoordinatorHandler.java:
##########
@@ -42,7 +43,7 @@ public interface OperatorCoordinatorHandler {
     void initializeOperatorCoordinators(ComponentMainThreadExecutor 
mainThreadExecutor);
 
     /** Start all operator coordinators. */

Review Comment:
   The documentation is not updated.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java:
##########
@@ -35,6 +36,7 @@
 /** A simple testing implementation of the {@link OperatorCoordinator}. */
 public class TestingOperatorCoordinator implements OperatorCoordinator {
 
+   private final List<MetricGroup> metricGroups = new ArrayList<>();

Review Comment:
   Wrong indentation.
   And it should be placed after the `static final` fields.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -27,6 +27,7 @@
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.testutils.ScheduledTask;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.OperatorIDPair;

Review Comment:
   These changes are not needed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -290,7 +290,7 @@ private void updateTopology(final List<ExecutionJobVertex> 
newlyInitializedJobVe
 
     private void initializeOperatorCoordinatorsFor(ExecutionJobVertex vertex) {
         operatorCoordinatorHandler.registerAndStartNewCoordinators(
-                vertex.getOperatorCoordinators(), getMainThreadExecutor());
+                vertex.getOperatorCoordinators(), getMainThreadExecutor(), 
this.jobManagerJobMetricGroup);

Review Comment:
   Besides that, the line is too long and violated the checkstyle.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java:
##########
@@ -466,6 +472,21 @@ public void 
testBatchGlobalFailoverDoesNotNotifyLocalRestore() throws Exception
         assertThat(coordinator.getRestoredTasks()).isEmpty();
     }
 
+    @Test
+    public void testAllOperatorCoordinatorRegisterMetrics() throws Exception {
+        final DefaultScheduler scheduler =
+                setupTestJobAndScheduler(
+                        new 
TestingOperatorCoordinator.Provider(testOperatorId), null, null, true);
+        scheduleAllTasksToRunning(scheduler);
+        List<OperatorCoordinatorHolder> operatorCoordinatorHolderList = new 
ArrayList<>(scheduler.getExecutionGraph()
+                .getJobVertex(testVertexId).getOperatorCoordinators());
+        assertEquals(1, operatorCoordinatorHolderList.size());

Review Comment:
   We should use `assertJ` assertions in tests.
   See 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to