wjf222 commented on code in PR #10981:
URL: https://github.com/apache/dolphinscheduler/pull/10981#discussion_r921884048


##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdConnectionStateListener.java:
##########
@@ -0,0 +1,133 @@
+package org.apache.dolphinscheduler.plugin.registry.etcd;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.etcd.jetcd.Client;
+import io.grpc.ConnectivityState;
+import io.grpc.ManagedChannel;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Get the connection status by listening to the Client's Channel
+ */
+public class EtcdConnectionStateListener implements AutoCloseable{
+    private final List<ConnectionListener> connectionListeners = 
Collections.synchronizedList(new ArrayList<>());
+    // A thread pool that periodically obtains connection status
+    private final ScheduledExecutorService scheduledExecutorService;
+    // Client's Channel
+    private AtomicReference<ManagedChannel> channel;
+    // monitored client
+    private Client client;
+    // The state of the last monitor
+    private ConnectionState connectionState;
+    private long initialDelay = 500L;
+    private long delay = 500L;
+    public EtcdConnectionStateListener(Client client) {
+        this.client = client;
+        channel = new AtomicReference<>();
+        this.scheduledExecutorService = Executors.newScheduledThreadPool(
+                1,
+                new 
ThreadFactoryBuilder().setNameFormat("EtcdConnectionStateListenerThread").setDaemon(true).build());
+    }
+
+    public void addConnectionListener(ConnectionListener connectionListener) {
+        connectionListeners.add(connectionListener);
+    }
+
+    @Override
+    public void close() throws Exception {
+        connectionListeners.clear();
+        scheduledExecutorService.shutdownNow();
+    }
+
+    /**
+     * try to get jetcd client ManagedChannel
+     * @param client the etcd client
+     * @return current connection channel
+     */
+    private ManagedChannel newChannel(Client client) {
+        try {
+            Field connectField 
=client.getClass().getDeclaredField("connectManager");
+            if(!connectField.isAccessible()){
+                connectField.setAccessible(true);
+            }
+            Object connection = connectField.get(client);
+            Method channel = 
connection.getClass().getDeclaredMethod("getChannel");
+            if (!channel.isAccessible()) {
+                channel.setAccessible(true);
+            }
+            return (ManagedChannel) channel.invoke(connection);
+        } catch (Exception e) {
+            throw new RegistryException("Failed to get the etcd client 
channel", e);
+        }

Review Comment:
   Ok, I will try to use keepalive in lease client to listen for connection 
status.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to