wjf222 commented on code in PR #10981:
URL: https://github.com/apache/dolphinscheduler/pull/10981#discussion_r921930820


##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdConnectionStateListener.java:
##########
@@ -0,0 +1,133 @@
+package org.apache.dolphinscheduler.plugin.registry.etcd;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.etcd.jetcd.Client;
+import io.grpc.ConnectivityState;
+import io.grpc.ManagedChannel;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Get the connection status by listening to the Client's Channel
+ */
+public class EtcdConnectionStateListener implements AutoCloseable{
+    private final List<ConnectionListener> connectionListeners = 
Collections.synchronizedList(new ArrayList<>());
+    // A thread pool that periodically obtains connection status
+    private final ScheduledExecutorService scheduledExecutorService;
+    // Client's Channel
+    private AtomicReference<ManagedChannel> channel;
+    // monitored client
+    private Client client;
+    // The state of the last monitor
+    private ConnectionState connectionState;
+    private long initialDelay = 500L;
+    private long delay = 500L;
+    public EtcdConnectionStateListener(Client client) {
+        this.client = client;
+        channel = new AtomicReference<>();
+        this.scheduledExecutorService = Executors.newScheduledThreadPool(
+                1,
+                new 
ThreadFactoryBuilder().setNameFormat("EtcdConnectionStateListenerThread").setDaemon(true).build());
+    }
+
+    public void addConnectionListener(ConnectionListener connectionListener) {
+        connectionListeners.add(connectionListener);
+    }
+
+    @Override
+    public void close() throws Exception {
+        connectionListeners.clear();
+        scheduledExecutorService.shutdownNow();
+    }
+
+    /**
+     * try to get jetcd client ManagedChannel
+     * @param client the etcd client
+     * @return current connection channel
+     */
+    private ManagedChannel newChannel(Client client) {
+        try {
+            Field connectField 
=client.getClass().getDeclaredField("connectManager");
+            if(!connectField.isAccessible()){
+                connectField.setAccessible(true);
+            }
+            Object connection = connectField.get(client);
+            Method channel = 
connection.getClass().getDeclaredMethod("getChannel");
+            if (!channel.isAccessible()) {
+                channel.setAccessible(true);
+            }
+            return (ManagedChannel) channel.invoke(connection);
+        } catch (Exception e) {
+            throw new RegistryException("Failed to get the etcd client 
channel", e);
+        }

Review Comment:
   > I don't think this is a good way to implement the connectivity state 
listener, why do you choose to use reflection instead of some native method of 
jetcd client?
   
   I made the problem complicated. I only need to periodically try to connect 
to Etcd, and the connection status can be judged by the connection result.
   I have removed the reflection related code. Now, i use the client to apply 
for a lease to determine whether the connection is successful in [Using the 
lease to listen connection 
state](https://github.com/apache/dolphinscheduler/pull/10981/commits/d3ecd2a6eed4af52395d2547ef7486cf435d03a1)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to