This is an automated email from the ASF dual-hosted git repository.

weiraowang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3148dba7a5 [Improvement-14658][etcd] Distinguish between add and 
update by using pre kv (#14659)
3148dba7a5 is described below

commit 3148dba7a52f7917ca5c8eecd01157ab5b62c585
Author: eye-gu <[email protected]>
AuthorDate: Mon Aug 28 09:28:52 2023 +0800

    [Improvement-14658][etcd] Distinguish between add and update by using pre 
kv (#14659)
    
    Co-authored-by: eye <[email protected]>
---
 .../dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java      | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
index 21e9253529..57a17c6519 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
@@ -151,7 +151,8 @@ public class EtcdRegistry implements Registry {
     public boolean subscribe(String path, SubscribeListener listener) {
         try {
             ByteSequence watchKey = byteSequence(path);
-            WatchOption watchOption = 
WatchOption.newBuilder().isPrefix(true).build();
+            WatchOption watchOption =
+                    
WatchOption.newBuilder().withPrevKV(true).isPrefix(true).build();
             watcherMap.computeIfAbsent(path,
                     $ -> client.getWatchClient().watch(watchKey, watchOption, 
watchResponse -> {
                         for (WatchEvent event : watchResponse.getEvents()) {
@@ -352,7 +353,11 @@ public class EtcdRegistry implements Registry {
 
             switch (event.getEventType()) {
                 case PUT:
-                    type(Type.ADD);
+                    if (event.getPrevKV().getKey().isEmpty()) {
+                        type(Type.ADD);
+                    } else {
+                        type(Type.UPDATE);
+                    }
                     break;
                 case DELETE:
                     type(Type.REMOVE);

Reply via email to