wjf222 commented on code in PR #10981:
URL: https://github.com/apache/dolphinscheduler/pull/10981#discussion_r921930820
##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdConnectionStateListener.java:
##########
@@ -0,0 +1,133 @@
+package org.apache.dolphinscheduler.plugin.registry.etcd;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.etcd.jetcd.Client;
+import io.grpc.ConnectivityState;
+import io.grpc.ManagedChannel;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Get the connection status by listening to the Client's Channel
+ */
+public class EtcdConnectionStateListener implements AutoCloseable{
+ private final List<ConnectionListener> connectionListeners =
Collections.synchronizedList(new ArrayList<>());
+ // A thread pool that periodically obtains connection status
+ private final ScheduledExecutorService scheduledExecutorService;
+ // Client's Channel
+ private AtomicReference<ManagedChannel> channel;
+ // monitored client
+ private Client client;
+ // The state of the last monitor
+ private ConnectionState connectionState;
+ private long initialDelay = 500L;
+ private long delay = 500L;
+ public EtcdConnectionStateListener(Client client) {
+ this.client = client;
+ channel = new AtomicReference<>();
+ this.scheduledExecutorService = Executors.newScheduledThreadPool(
+ 1,
+ new
ThreadFactoryBuilder().setNameFormat("EtcdConnectionStateListenerThread").setDaemon(true).build());
+ }
+
+ public void addConnectionListener(ConnectionListener connectionListener) {
+ connectionListeners.add(connectionListener);
+ }
+
+ @Override
+ public void close() throws Exception {
+ connectionListeners.clear();
+ scheduledExecutorService.shutdownNow();
+ }
+
+ /**
+ * try to get jetcd client ManagedChannel
+ * @param client the etcd client
+ * @return current connection channel
+ */
+ private ManagedChannel newChannel(Client client) {
+ try {
+ Field connectField
=client.getClass().getDeclaredField("connectManager");
+ if(!connectField.isAccessible()){
+ connectField.setAccessible(true);
+ }
+ Object connection = connectField.get(client);
+ Method channel =
connection.getClass().getDeclaredMethod("getChannel");
+ if (!channel.isAccessible()) {
+ channel.setAccessible(true);
+ }
+ return (ManagedChannel) channel.invoke(connection);
+ } catch (Exception e) {
+ throw new RegistryException("Failed to get the etcd client
channel", e);
+ }
Review Comment:
> I don't think this is a good way to implement the connectivity state
listener, why do you choose to use reflection instead of some native method of
jetcd client?
I made the problem complicated. I only need to periodically try to connect
to Etcd, and the connection status can be judged by the connection result.
I have removed the reflection related code. Now, i use the client to apply
for a lease to determine whether the connection is successful in [Using the
lease to listen connection
state](https://github.com/apache/dolphinscheduler/pull/10981/commits/d3ecd2a6eed4af52395d2547ef7486cf435d03a1)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]