kezhenxu94 commented on code in PR #10981:
URL: https://github.com/apache/dolphinscheduler/pull/10981#discussion_r921754931


##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/pom.xml:
##########
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>

Review Comment:
   License header



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/README.md:
##########
@@ -0,0 +1,28 @@
+# Introduction
+
+This module is the etcd registry plugin module, this plugin will use etcd as 
the registry center.
+
+# How to use
+
+If you want to set the registry center as mysql,You need to set the registry 
properties in master/worker/api's appplication.yml

Review Comment:
   ```suggestion
   If you want to set the registry center as mysql, you need to set the 
registry properties in master/worker/api's appplication.yml
   ```



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdConnectionStateListener.java:
##########
@@ -0,0 +1,133 @@
+package org.apache.dolphinscheduler.plugin.registry.etcd;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.etcd.jetcd.Client;
+import io.grpc.ConnectivityState;
+import io.grpc.ManagedChannel;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Get the connection status by listening to the Client's Channel
+ */
+public class EtcdConnectionStateListener implements AutoCloseable{
+    private final List<ConnectionListener> connectionListeners = 
Collections.synchronizedList(new ArrayList<>());
+    // A thread pool that periodically obtains connection status
+    private final ScheduledExecutorService scheduledExecutorService;
+    // Client's Channel
+    private AtomicReference<ManagedChannel> channel;
+    // monitored client
+    private Client client;
+    // The state of the last monitor
+    private ConnectionState connectionState;
+    private long initialDelay = 500L;
+    private long delay = 500L;
+    public EtcdConnectionStateListener(Client client) {
+        this.client = client;
+        channel = new AtomicReference<>();
+        this.scheduledExecutorService = Executors.newScheduledThreadPool(
+                1,
+                new 
ThreadFactoryBuilder().setNameFormat("EtcdConnectionStateListenerThread").setDaemon(true).build());
+    }
+
+    public void addConnectionListener(ConnectionListener connectionListener) {
+        connectionListeners.add(connectionListener);
+    }
+
+    @Override
+    public void close() throws Exception {
+        connectionListeners.clear();
+        scheduledExecutorService.shutdownNow();
+    }
+
+    /**
+     * try to get jetcd client ManagedChannel
+     * @param client the etcd client
+     * @return current connection channel
+     */
+    private ManagedChannel newChannel(Client client) {
+        try {
+            Field connectField 
=client.getClass().getDeclaredField("connectManager");
+            if(!connectField.isAccessible()){
+                connectField.setAccessible(true);
+            }
+            Object connection = connectField.get(client);
+            Method channel = 
connection.getClass().getDeclaredMethod("getChannel");
+            if (!channel.isAccessible()) {
+                channel.setAccessible(true);
+            }
+            return (ManagedChannel) channel.invoke(connection);
+        } catch (Exception e) {
+            throw new RegistryException("Failed to get the etcd client 
channel", e);
+        }

Review Comment:
   I don't think this is a good way to implement the connectivity state 
listener, why do you choose to use reflection instead of some native method of 
jetcd client?



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java:
##########
@@ -0,0 +1,301 @@
+package org.apache.dolphinscheduler.plugin.registry.etcd;
+
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import io.etcd.jetcd.*;
+import io.etcd.jetcd.options.DeleteOption;
+import io.etcd.jetcd.options.GetOption;
+import io.etcd.jetcd.options.PutOption;
+import io.etcd.jetcd.options.WatchOption;
+import io.etcd.jetcd.support.Observers;
+import io.etcd.jetcd.watch.WatchEvent;
+import org.apache.dolphinscheduler.registry.api.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.temporal.ChronoUnit;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR;
+
+
+@Component
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = 
"etcd")
+public class EtcdRegistry implements Registry {
+    private static Logger LOGGER = LoggerFactory.getLogger(EtcdRegistry.class);
+    private final Client client;
+    private EtcdConnectionStateListener etcdConnectionStateListener;
+    // save the lock info for thread
+    // key:lockKey Value:leaseId
+    private static final ThreadLocal<Map<String, Long>> threadLocalLockMap = 
new ThreadLocal<>();
+
+    private final Map<String, Watch.Watcher> watcherMap = new 
ConcurrentHashMap<>();
+
+    private static Long TIME_TO_LIVE_SECONDS=30L;
+    public EtcdRegistry(EtcdRegistryProperties registryProperties) {
+        LOGGER.info("Starting Etcd Registry...");
+        ClientBuilder clientBuilder = Client.builder()
+                
.endpoints(Util.toURIs(Splitter.on(",").trimResults().splitToList(registryProperties.getEndpoints())))
+                .namespace(byteSequence(registryProperties.getNamespace()))
+                .connectTimeout(registryProperties.getConnectionTimeout())
+                .retryChronoUnit(ChronoUnit.MILLIS)
+                .retryDelay(registryProperties.getRetryDelay())
+                .retryMaxDelay(registryProperties.getRetryMaxDelay())
+                .retryMaxDuration(registryProperties.getRetryMaxDuration());
+        
if(!Strings.isNullOrEmpty(registryProperties.getUser())&&(!Strings.isNullOrEmpty(registryProperties.getPassword()))){
+            clientBuilder.user(byteSequence(registryProperties.getUser()));
+            
clientBuilder.password(byteSequence(registryProperties.getPassword()));
+        }
+        if(!Strings.isNullOrEmpty(registryProperties.getLoadBalancerPolicy())){
+            
clientBuilder.loadBalancerPolicy(registryProperties.getLoadBalancerPolicy());
+        }
+        if(!Strings.isNullOrEmpty(registryProperties.getAuthority())){
+            clientBuilder.authority(registryProperties.getAuthority());
+        }
+        client = clientBuilder.build();
+        LOGGER.info("Started Etcd Registry...");
+        etcdConnectionStateListener = new EtcdConnectionStateListener(client);
+    }
+
+    /**
+     * Start the etcd Connection stateListeer
+     */
+    @PostConstruct
+    public void start() {
+        LOGGER.info("Starting Etcd ConnectionListener...");
+        etcdConnectionStateListener.start();
+        LOGGER.info("Started Etcd ConnectionListener...");
+    }
+
+    /**
+     *
+     * @param path The prefix of the key being listened to
+     * @param listener
+     * @return if subcribe Returns true if no exception was thrown
+     */
+    @Override
+    public boolean subscribe(String path, SubscribeListener listener) {
+        try {
+            ByteSequence watchKey = byteSequence(path);
+            WatchOption watchOption = 
WatchOption.newBuilder().isPrefix(true).build();
+            watcherMap.computeIfAbsent(path, $ -> 
client.getWatchClient().watch(watchKey, watchOption,watchResponse -> {
+                for (WatchEvent event : watchResponse.getEvents()) {
+                    listener.notify(new EventAdaptor(event, path));
+                }
+            }));
+        } catch (Exception e){
+            throw new RegistryException("Failed to subscribe listener for key: 
" + path, e);
+        }
+        return true;
+    }
+
+    /**
+     * @throws throws an exception if the unsubscribe path does not exist
+     * @param path The prefix of the key being listened to
+     */
+    @Override
+    public void unsubscribe(String path) {
+        try {
+            watcherMap.get(path).close();
+            watcherMap.remove(path);
+        } catch (Exception e) {
+            throw new RegistryException("Failed to unsubscribe listener for 
key: " + path, e);
+        }
+    }
+
+    @Override
+    public void addConnectionStateListener(ConnectionListener listener) {
+        etcdConnectionStateListener.addConnectionListener(listener);
+    }
+
+    /**
+     *
+     * @param key
+     * @return Returns the value corresponding to the key
+     * @throws throws an exception if the key does not exist
+     */
+    @Override
+    public String get(String key) {
+        try {
+            List<KeyValue> keyValues = 
client.getKVClient().get(byteSequence(key)).get().getKvs();
+            return 
keyValues.iterator().next().getValue().toString(StandardCharsets.UTF_8);
+        } catch (Exception e) {
+            throw new RegistryException("etcd get data error", e);
+        }
+    }
+
+    /**
+     *
+     * @param key
+     * @param value
+     * @param deleteOnDisconnect Does the put data disappear when the client 
disconnects
+     */
+    @Override
+    public void put(String key, String value, boolean deleteOnDisconnect) {
+        try{
+            if(deleteOnDisconnect) {
+                // keep the key by lease, if disconnected, the lease will ,the 
key will delete

Review Comment:
   This sentence is not complete...



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryProperties.java:
##########
@@ -0,0 +1,31 @@
+package org.apache.dolphinscheduler.plugin.registry.etcd;
+
+import lombok.Data;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+import java.time.Duration;
+
+@Data
+@Configuration
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = 
"etcd")
+@ConfigurationProperties(prefix = "registry")
+public class EtcdRegistryProperties {
+    private String endpoints;
+    private String namespace="dolphinscheduler";
+    private Duration connectionTimeout = Duration.ofSeconds(9);
+
+    // auth
+    private String user;
+    private String password;
+    private String authority;
+
+    // RetryPolicy
+    private Long retryDelay=60L;
+    private Long retryMaxDelay=300L;

Review Comment:
   > The time unit for these two parameters is set by the ChronUnitRetry in 
jetcd. The type of these two parameters is long, but the type of 
retryMaxDuration is Duration.
   > 
   > You can see the reference of these three parameters in EtcdRegistry.java 
line 51 to 54.
   
   Hi, you don't have to just copy the type from JETCD client, using `Duration` 
as type in our own configurations is for users' convenient, they can just set 
something like `1s`, `500ms`, without knowing what's the time unit of the 
config. Also, users don't care what types we pass into `EtcdRegistry.java`, 
let's provide simplicity to users



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to