chantccc commented on a change in pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#discussion_r776124907



##########
File path: 
inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperWatcherUtils.java
##########
@@ -38,38 +39,74 @@
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.configuration.Constants;
+import org.apache.inlong.sort.meta.MetaManager;
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.DataFlowStorageInfo;
+import org.apache.inlong.sort.util.ZooKeeperUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener 
{
+import com.google.common.base.Preconditions;
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperWatcher.class);
+public class ZookeeperWatcherUtils implements AutoCloseable, 
UnhandledErrorListener {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperWatcherUtils.class);
 
     private final Object lock = new Object();
 
     /**
      * Connection to the used ZooKeeper quorum.
      */
-    private final CuratorFramework client;
+    private CuratorFramework client;
 
     /**
      * Curator recipe to watch changes of a specific ZooKeeper node.
      */
     @GuardedBy("lock")
-    private final Map<String, NodeCache> caches;
+    private Map<String, NodeCache> caches;
 
     @GuardedBy("lock")
-    private final Map<String, PathChildrenCache> pathChildrenCaches;
+    private Map<String, PathChildrenCache> pathChildrenCaches;
 
-    private final ConnectionStateListener connectionStateListener =
-            (client, newState) -> handleStateChange(newState);
+    private ConnectionStateListener connectionStateListener = (client, 
newState) -> handleStateChange(newState);
 
-    public ZookeeperWatcher(CuratorFramework client) {
-        this.client = checkNotNull(client, "CuratorFramework client");
+    /**
+     * open
+     * 
+     * @param  config       command parameters when process start.
+     * @param  metaListener a listener of DataFlowInfo
+     * @throws Exception    any exception
+     */
+    public void open(Configuration config, DataFlowInfoListener metaListener) 
throws Exception {
+        this.client = ZooKeeperUtils.startCuratorFramework(config);
         this.caches = new HashMap<>();
         this.pathChildrenCaches = new HashMap<>();
         client.getUnhandledErrorListenable().addListener(this);
         
client.getConnectionStateListenable().addListener(connectionStateListener);
+        synchronized (MetaManager.LOCK) {

Review comment:
       Basic util class means  that ZookeeperWatcherUtils is just a wrapper of 
curator, all business codes should not be placed in it.
   The watch of dataflow should be placed in ZookeeperMetaWatcher(in our case,  
data of dataflow is called meta)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to