KarmaGYZ commented on a change in pull request #18303:
URL: https://github.com/apache/flink/pull/18303#discussion_r780953755



##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/concurrent/PeriodicTaskScheduledExecutor.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Scheduled executor for periodic tasks. When the specified time arrives, the 
executor will send
+ * the periodic tasks to gateway and execute them.
+ */
+public class PeriodicTaskScheduledExecutor implements ScheduledExecutor, 
Closeable {
+    private static final Logger log = 
LoggerFactory.getLogger(PeriodicTaskScheduledExecutor.class);
+
+    private final ScheduledExecutorService scheduledExecutorService;
+    private final MainThreadExecutable gateway;
+
+    public PeriodicTaskScheduledExecutor(MainThreadExecutable gateway) {
+        this.gateway = gateway;
+        this.scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
+    }
+
+    @Override
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit 
unit) {
+        if (scheduledExecutorService.isShutdown()) {
+            log.warn("The scheduled executor for periodic tasks is shutdown.");
+            return ThrowingScheduledFuture.create();
+        } else {
+            return this.scheduledExecutorService.schedule(
+                    () -> gateway.runAsync(command), delay, unit);
+        }
+    }
+
+    @Override
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, 
TimeUnit unit) {
+        if (scheduledExecutorService.isShutdown()) {
+            log.warn("The scheduled executor for periodic tasks is shutdown.");
+            return ThrowingScheduledFuture.create();
+        } else {
+            final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, 
unit);
+            FutureTask<V> ft = new FutureTask<>(callable);
+            this.scheduledExecutorService.schedule(() -> gateway.runAsync(ft), 
delay, unit);
+            return new ScheduledFutureAdapter<>(ft, delayMillis, 
TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleAtFixedRate(
+            Runnable command, long initialDelay, long period, TimeUnit unit) {
+        throw new UnsupportedOperationException("Only support to schedule 
periodic tasks.");
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleWithFixedDelay(
+            Runnable command, long initialDelay, long delay, TimeUnit unit) {
+        throw new UnsupportedOperationException("Only support to schedule 
periodic tasks.");

Review comment:
       ```suggestion
           throw new UnsupportedOperationException("Not implemented because the 
method is currently not required.");
   ```

##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/concurrent/PeriodicTaskScheduledExecutor.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Scheduled executor for periodic tasks. When the specified time arrives, the 
executor will send
+ * the periodic tasks to gateway and execute them.
+ */
+public class PeriodicTaskScheduledExecutor implements ScheduledExecutor, 
Closeable {
+    private static final Logger log = 
LoggerFactory.getLogger(PeriodicTaskScheduledExecutor.class);
+
+    private final ScheduledExecutorService scheduledExecutorService;
+    private final MainThreadExecutable gateway;
+
+    public PeriodicTaskScheduledExecutor(MainThreadExecutable gateway) {
+        this.gateway = gateway;
+        this.scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
+    }
+
+    @Override
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit 
unit) {
+        if (scheduledExecutorService.isShutdown()) {
+            log.warn("The scheduled executor for periodic tasks is shutdown.");
+            return ThrowingScheduledFuture.create();
+        } else {
+            return this.scheduledExecutorService.schedule(
+                    () -> gateway.runAsync(command), delay, unit);
+        }
+    }
+
+    @Override
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, 
TimeUnit unit) {
+        if (scheduledExecutorService.isShutdown()) {
+            log.warn("The scheduled executor for periodic tasks is shutdown.");
+            return ThrowingScheduledFuture.create();
+        } else {
+            final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, 
unit);
+            FutureTask<V> ft = new FutureTask<>(callable);
+            this.scheduledExecutorService.schedule(() -> gateway.runAsync(ft), 
delay, unit);
+            return new ScheduledFutureAdapter<>(ft, delayMillis, 
TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleAtFixedRate(
+            Runnable command, long initialDelay, long period, TimeUnit unit) {
+        throw new UnsupportedOperationException("Only support to schedule 
periodic tasks.");

Review comment:
       ```suggestion
           throw new UnsupportedOperationException("Not implemented because the 
method is currently not required.");
   ```

##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -211,11 +231,29 @@ protected final void stop() {
      */
     public final CompletableFuture<Void> internalCallOnStop() {
         validateRunsInMainThread();
+        try {
+            resourceRegistry.close();
+        } catch (IOException e) {
+            throw new RuntimeException("Close resource registry fail", e);
+        }
         CompletableFuture<Void> stopFuture = onStop();
         isRunning = false;
         return stopFuture;
     }
 
+    protected void registerResource(Closeable closeableResource) {

Review comment:
       Missing javadoc.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
##########
@@ -1811,6 +1834,12 @@ public void 
testJobMasterOnlyTerminatesAfterTheSchedulerHasClosed() throws Excep
         schedulerTerminationFuture.complete(null);
 
         jobMasterTerminationFuture.get();
+        assertJobMasterResourceClosed(jobMaster);
+    }
+
+    private void assertJobMasterResourceClosed(JobMaster jobMaster) {

Review comment:
       I think we'd better test it in `RpcEndpointTest` and 
`FencedRpcEndpointTest`. It would be good to cover the scenario in which the 
fencing token changed.

##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/concurrent/PeriodicTaskScheduledExecutor.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Scheduled executor for periodic tasks. When the specified time arrives, the 
executor will send
+ * the periodic tasks to gateway and execute them.
+ */
+public class PeriodicTaskScheduledExecutor implements ScheduledExecutor, 
Closeable {

Review comment:
       nit: I think this executor is also used for scheduling tasks in the 
future. So, the word "periodic" might not be good enough. I don't have a better 
idea though.

##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/concurrent/ThrowingScheduledFuture.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Completed {@link ScheduledFuture} implementation.
+ *
+ * @param <T> type of the {@link ScheduledFuture}
+ */
+public final class ThrowingScheduledFuture<T> implements ScheduledFuture<T> {
+    private static final ThrowingScheduledFuture instance = new 
ThrowingScheduledFuture();
+
+    private ThrowingScheduledFuture() {}
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int compareTo(Delayed o) {
+        return Long.compare(getDelay(TimeUnit.MILLISECONDS), 
o.getDelay(TimeUnit.MILLISECONDS));
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        return true;
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return true;
+    }
+
+    @Override
+    public boolean isDone() {
+        return true;
+    }
+
+    @Override
+    public T get() throws InterruptedException, ExecutionException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public T get(long timeout, TimeUnit unit)

Review comment:
       ```suggestion
       public T get(long timeout, @Nonnull TimeUnit unit)
   ```

##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -211,11 +231,29 @@ protected final void stop() {
      */
     public final CompletableFuture<Void> internalCallOnStop() {
         validateRunsInMainThread();
+        try {
+            resourceRegistry.close();
+        } catch (IOException e) {
+            throw new RuntimeException("Close resource registry fail", e);
+        }
         CompletableFuture<Void> stopFuture = onStop();
         isRunning = false;
         return stopFuture;
     }
 
+    protected void registerResource(Closeable closeableResource) {
+        try {
+            resourceRegistry.registerCloseable(closeableResource);
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "Registry closeable resource " + closeableResource + " 
fail", e);
+        }
+    }
+
+    protected boolean unregisterResource(Closeable closeableResource) {

Review comment:
       Missing javadoc

##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/concurrent/ThrowingScheduledFuture.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Completed {@link ScheduledFuture} implementation.
+ *
+ * @param <T> type of the {@link ScheduledFuture}
+ */
+public final class ThrowingScheduledFuture<T> implements ScheduledFuture<T> {
+    private static final ThrowingScheduledFuture instance = new 
ThrowingScheduledFuture();
+
+    private ThrowingScheduledFuture() {}
+
+    @Override
+    public long getDelay(TimeUnit unit) {

Review comment:
       ```suggestion
       public long getDelay(@Nonnull TimeUnit unit) {
   ```

##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/concurrent/PeriodicTaskScheduledExecutor.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Scheduled executor for periodic tasks. When the specified time arrives, the 
executor will send
+ * the periodic tasks to gateway and execute them.
+ */
+public class PeriodicTaskScheduledExecutor implements ScheduledExecutor, 
Closeable {
+    private static final Logger log = 
LoggerFactory.getLogger(PeriodicTaskScheduledExecutor.class);
+
+    private final ScheduledExecutorService scheduledExecutorService;
+    private final MainThreadExecutable gateway;
+
+    public PeriodicTaskScheduledExecutor(MainThreadExecutable gateway) {
+        this.gateway = gateway;
+        this.scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
+    }
+
+    @Override
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit 
unit) {
+        if (scheduledExecutorService.isShutdown()) {
+            log.warn("The scheduled executor for periodic tasks is shutdown.");
+            return ThrowingScheduledFuture.create();
+        } else {
+            return this.scheduledExecutorService.schedule(
+                    () -> gateway.runAsync(command), delay, unit);
+        }
+    }
+
+    @Override
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, 
TimeUnit unit) {
+        if (scheduledExecutorService.isShutdown()) {
+            log.warn("The scheduled executor for periodic tasks is shutdown.");
+            return ThrowingScheduledFuture.create();
+        } else {
+            final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, 
unit);
+            FutureTask<V> ft = new FutureTask<>(callable);
+            this.scheduledExecutorService.schedule(() -> gateway.runAsync(ft), 
delay, unit);
+            return new ScheduledFutureAdapter<>(ft, delayMillis, 
TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleAtFixedRate(
+            Runnable command, long initialDelay, long period, TimeUnit unit) {
+        throw new UnsupportedOperationException("Only support to schedule 
periodic tasks.");
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleWithFixedDelay(
+            Runnable command, long initialDelay, long delay, TimeUnit unit) {
+        throw new UnsupportedOperationException("Only support to schedule 
periodic tasks.");
+    }
+
+    @Override
+    public void execute(Runnable command) {

Review comment:
       ```suggestion
       public void execute(@Nonnull Runnable command) {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to