kezhenxu94 commented on code in PR #10981:
URL: https://github.com/apache/dolphinscheduler/pull/10981#discussion_r922597330


##########
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Event.java:
##########
@@ -20,9 +20,13 @@
 package org.apache.dolphinscheduler.registry.api;
 
 public class Event {
+    // The prefix which is watched
     private String key;
+    // The full path where the event was generated
     private String path;
+    // The value corresponding to the path
     private String data;
+    // The event type {ADD, REMOVE ,UPDATE}

Review Comment:
   ```suggestion
       // The event type {ADD, REMOVE, UPDATE}
   ```
   



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/README.md:
##########
@@ -0,0 +1,28 @@
+# Introduction
+
+This module is the etcd registry plugin module, this plugin will use etcd as 
the registry center.
+
+# How to use
+
+If you want to set the registry center as mysql,you need to set the registry 
properties in master/worker/api's appplication.yml

Review Comment:
   ```suggestion
   If you want to set the registry center as etcd, you need to set the registry 
properties in master/worker/api's appplication.yml
   ```
   



##########
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java:
##########
@@ -20,27 +20,72 @@
 package org.apache.dolphinscheduler.registry.api;
 
 import java.io.Closeable;
-import java.time.Duration;
 import java.util.Collection;
 
+/**
+ * Registry
+ *
+ * <p>
+ * The implementation may throw RegistryException during function call
+ */
 public interface Registry extends Closeable {
+    /**
+     * Watch the change of this path and subpath.
+     * The type of change contains [ADD,DELETE,UPDATE]
+     * @return if there is not a Exception, the result is true.

Review Comment:
   ```suggestion
        * @return {@code true} if succeeded.
   ```
   



##########
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java:
##########
@@ -20,27 +20,72 @@
 package org.apache.dolphinscheduler.registry.api;
 
 import java.io.Closeable;
-import java.time.Duration;
 import java.util.Collection;
 
+/**
+ * Registry
+ *
+ * <p>
+ * The implementation may throw RegistryException during function call
+ */
 public interface Registry extends Closeable {
+    /**
+     * Watch the change of this path and subpath.
+     * The type of change contains [ADD,DELETE,UPDATE]
+     * @return if there is not a Exception, the result is true.
+     */
     boolean subscribe(String path, SubscribeListener listener);
 
+    /**
+     * remove the SubscribeListener which subscribe this path

Review Comment:
   ```suggestion
        * Remove the path from the subscribe list.
   ```
   



##########
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java:
##########
@@ -20,27 +20,72 @@
 package org.apache.dolphinscheduler.registry.api;
 
 import java.io.Closeable;
-import java.time.Duration;
 import java.util.Collection;
 
+/**
+ * Registry
+ *
+ * <p>
+ * The implementation may throw RegistryException during function call
+ */
 public interface Registry extends Closeable {
+    /**
+     * Watch the change of this path and subpath.
+     * The type of change contains [ADD,DELETE,UPDATE]
+     * @return if there is not a Exception, the result is true.
+     */
     boolean subscribe(String path, SubscribeListener listener);
 
+    /**
+     * remove the SubscribeListener which subscribe this path
+     */
     void unsubscribe(String path);
 
+    /**
+     * addd a connection listener to collection

Review Comment:
   ```suggestion
        * Add a connection listener to collection.
   ```
   



##########
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectionState.java:
##########
@@ -19,6 +19,9 @@
 
 package org.apache.dolphinscheduler.registry.api;
 
+/**
+ * Connection State Between client and registry center(Etcd,MySql,Zookeeper)

Review Comment:
   ```suggestion
    * Connection state between client and registry center(Etcd, MySql, 
Zookeeper)
   ```
   



##########
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java:
##########
@@ -20,27 +20,72 @@
 package org.apache.dolphinscheduler.registry.api;
 
 import java.io.Closeable;
-import java.time.Duration;
 import java.util.Collection;
 
+/**
+ * Registry
+ *
+ * <p>
+ * The implementation may throw RegistryException during function call
+ */
 public interface Registry extends Closeable {
+    /**
+     * Watch the change of this path and subpath.
+     * The type of change contains [ADD,DELETE,UPDATE]
+     * @return if there is not a Exception, the result is true.
+     */
     boolean subscribe(String path, SubscribeListener listener);
 
+    /**
+     * remove the SubscribeListener which subscribe this path
+     */
     void unsubscribe(String path);
 
+    /**
+     * addd a connection listener to collection
+     */
     void addConnectionStateListener(ConnectionListener listener);
 
+    /**
+     * @return the value
+     */
     String get(String key);
 
+    /**
+     *
+     * @param key
+     * @param value
+     * @param deleteOnDisconnect if true, when the connection state is 
disconnected, the key will be deleted
+     */
     void put(String key, String value, boolean deleteOnDisconnect);
 
+    /**
+     * This function will delete the keys whose prefix is {@param key}
+     * @param key the prefix of deleted key
+     * @throws if the key not exists, there is a registryException
+     */
     void delete(String key);
 
+    /**
+     * This function will get the subdirectory of {@param key}
+     * E.g: registry contains  the following keys:[/test/test1/test2,]
+     * if the key: /test
+     * Return: test1
+     */
     Collection<String> children(String key);
 
+    /**
+     * @return if key exists,return true

Review Comment:
   ```suggestion
        * @return {@code true} if key exists.
   ```
   



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdConnectionStateListener.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.etcd;
+
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import io.etcd.jetcd.Client;
+import io.grpc.ManagedChannel;
+
+/**
+ * Get the connection status by listening to the Client's Channel
+ */
+public class EtcdConnectionStateListener implements AutoCloseable {
+    private final List<ConnectionListener> connectionListeners = 
Collections.synchronizedList(new ArrayList<>());
+    // A thread pool that periodically obtains connection status
+    private final ScheduledExecutorService scheduledExecutorService;
+    // Client's Channel
+    private AtomicReference<ManagedChannel> channel;
+    // monitored client
+    private Client client;
+    // The state of the last monitor
+    private ConnectionState connectionState;
+    private long initialDelay = 500L;
+    private long delay = 500L;
+    public EtcdConnectionStateListener(Client client) {
+        this.client = client;
+        channel = new AtomicReference<>();
+        this.scheduledExecutorService = Executors.newScheduledThreadPool(
+                1,
+                new 
ThreadFactoryBuilder().setNameFormat("EtcdConnectionStateListenerThread").setDaemon(true).build());
+    }
+
+    public void addConnectionListener(ConnectionListener connectionListener) {
+        connectionListeners.add(connectionListener);
+    }
+
+    @Override
+    public void close() throws Exception {
+        connectionListeners.clear();
+        scheduledExecutorService.shutdownNow();
+    }
+
+    /**
+     * Apply for a lease through the client, if there is no exception, the 
connection is normal
+     * @return the current connection state
+     * @throws if there is a exception, return is DISCONNECTED
+     */
+    private ConnectionState isConnected() {

Review Comment:
   ```suggestion
       private ConnectionState currentConnectivityState() {
   ```
   



##########
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java:
##########
@@ -20,27 +20,72 @@
 package org.apache.dolphinscheduler.registry.api;
 
 import java.io.Closeable;
-import java.time.Duration;
 import java.util.Collection;
 
+/**
+ * Registry
+ *
+ * <p>
+ * The implementation may throw RegistryException during function call

Review Comment:
   This is verbose as the function signatures already indicate that. 
   



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdConnectionStateListener.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.etcd;
+
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import io.etcd.jetcd.Client;
+import io.grpc.ManagedChannel;
+
+/**
+ * Get the connection status by listening to the Client's Channel
+ */
+public class EtcdConnectionStateListener implements AutoCloseable {
+    private final List<ConnectionListener> connectionListeners = 
Collections.synchronizedList(new ArrayList<>());
+    // A thread pool that periodically obtains connection status
+    private final ScheduledExecutorService scheduledExecutorService;
+    // Client's Channel
+    private AtomicReference<ManagedChannel> channel;
+    // monitored client
+    private Client client;
+    // The state of the last monitor
+    private ConnectionState connectionState;
+    private long initialDelay = 500L;
+    private long delay = 500L;
+    public EtcdConnectionStateListener(Client client) {
+        this.client = client;
+        channel = new AtomicReference<>();
+        this.scheduledExecutorService = Executors.newScheduledThreadPool(
+                1,
+                new 
ThreadFactoryBuilder().setNameFormat("EtcdConnectionStateListenerThread").setDaemon(true).build());
+    }
+
+    public void addConnectionListener(ConnectionListener connectionListener) {
+        connectionListeners.add(connectionListener);
+    }
+
+    @Override
+    public void close() throws Exception {
+        connectionListeners.clear();
+        scheduledExecutorService.shutdownNow();
+    }
+
+    /**
+     * Apply for a lease through the client, if there is no exception, the 
connection is normal
+     * @return the current connection state
+     * @throws if there is a exception, return is DISCONNECTED
+     */
+    private ConnectionState isConnected() {
+        try {
+            // Use Get() to ensure Future completes

Review Comment:
   Remove this. Don't explain codes if they are already very clear. 



##########
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectionListener.java:
##########
@@ -19,6 +19,10 @@
 
 package org.apache.dolphinscheduler.registry.api;
 
+/**
+ * when the connect state between client and registry center changed,
+ * the onupdate function is triggered

Review Comment:
   ```suggestion
    * the {@code onUpdate} function is triggered
   ```
   



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdConnectionStateListener.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.etcd;
+
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import io.etcd.jetcd.Client;
+import io.grpc.ManagedChannel;
+
+/**
+ * Get the connection status by listening to the Client's Channel

Review Comment:
   Reword this there is no channel anymore



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdConnectionStateListener.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.etcd;
+
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import io.etcd.jetcd.Client;
+import io.grpc.ManagedChannel;
+
+/**
+ * Get the connection status by listening to the Client's Channel
+ */
+public class EtcdConnectionStateListener implements AutoCloseable {
+    private final List<ConnectionListener> connectionListeners = 
Collections.synchronizedList(new ArrayList<>());
+    // A thread pool that periodically obtains connection status
+    private final ScheduledExecutorService scheduledExecutorService;
+    // Client's Channel
+    private AtomicReference<ManagedChannel> channel;
+    // monitored client
+    private Client client;
+    // The state of the last monitor
+    private ConnectionState connectionState;
+    private long initialDelay = 500L;
+    private long delay = 500L;
+    public EtcdConnectionStateListener(Client client) {
+        this.client = client;
+        channel = new AtomicReference<>();
+        this.scheduledExecutorService = Executors.newScheduledThreadPool(
+                1,
+                new 
ThreadFactoryBuilder().setNameFormat("EtcdConnectionStateListenerThread").setDaemon(true).build());
+    }
+
+    public void addConnectionListener(ConnectionListener connectionListener) {
+        connectionListeners.add(connectionListener);
+    }
+
+    @Override
+    public void close() throws Exception {
+        connectionListeners.clear();
+        scheduledExecutorService.shutdownNow();
+    }
+
+    /**
+     * Apply for a lease through the client, if there is no exception, the 
connection is normal
+     * @return the current connection state
+     * @throws if there is a exception, return is DISCONNECTED
+     */
+    private ConnectionState isConnected() {
+        try {
+            // Use Get() to ensure Future completes
+            client.getLeaseClient().grant(1).get().getID();
+            return ConnectionState.CONNECTED;
+        } catch (Exception e) {
+            return ConnectionState.DISCONNECTED;
+        }
+    }
+
+    /**
+     * Periodically execute thread to get connection status
+     */
+    public void start() {
+        this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
+            ConnectionState currentConnectionState = isConnected();
+            if (currentConnectionState == connectionState) {
+                return;
+            }
+            if (connectionState == ConnectionState.CONNECTED) {
+                if (currentConnectionState == ConnectionState.DISCONNECTED) {
+                    connectionState = ConnectionState.DISCONNECTED;
+                    triggerListener(ConnectionState.DISCONNECTED);
+                }
+            } else if (connectionState == ConnectionState.DISCONNECTED) {
+                if (currentConnectionState == ConnectionState.CONNECTED) {
+                    connectionState = ConnectionState.CONNECTED;
+                    triggerListener(ConnectionState.RECONNECTED);
+                }
+            } else if (connectionState == null) {
+                connectionState = currentConnectionState;
+                triggerListener(connectionState);
+            }

Review Comment:
   Why can't we just simplify this to
   
   ```suggestion
               if (connectionState != ConnectionState.SUSPENDED) {
                   connectionState = currentConnectionState;
                   triggerListener(connectionState);
   ```
   
   @ruanwenjun I think the api should also notify even if the previous state is 
suspended and if the listener isn't interested in this they can just ignored. 



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdConnectionStateListener.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.etcd;
+
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import io.etcd.jetcd.Client;
+import io.grpc.ManagedChannel;
+
+/**
+ * Get the connection status by listening to the Client's Channel
+ */
+public class EtcdConnectionStateListener implements AutoCloseable {
+    private final List<ConnectionListener> connectionListeners = 
Collections.synchronizedList(new ArrayList<>());
+    // A thread pool that periodically obtains connection status
+    private final ScheduledExecutorService scheduledExecutorService;
+    // Client's Channel
+    private AtomicReference<ManagedChannel> channel;

Review Comment:
   This is no needed right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to