luchunliang commented on a change in pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#discussion_r776135712



##########
File path: 
inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperWatcherUtils.java
##########
@@ -38,38 +39,74 @@
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.configuration.Constants;
+import org.apache.inlong.sort.meta.MetaManager;
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.DataFlowStorageInfo;
+import org.apache.inlong.sort.util.ZooKeeperUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener 
{
+import com.google.common.base.Preconditions;
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperWatcher.class);
+public class ZookeeperWatcherUtils implements AutoCloseable, 
UnhandledErrorListener {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperWatcherUtils.class);
 
     private final Object lock = new Object();
 
     /**
      * Connection to the used ZooKeeper quorum.
      */
-    private final CuratorFramework client;
+    private CuratorFramework client;
 
     /**
      * Curator recipe to watch changes of a specific ZooKeeper node.
      */
     @GuardedBy("lock")
-    private final Map<String, NodeCache> caches;
+    private Map<String, NodeCache> caches;
 
     @GuardedBy("lock")
-    private final Map<String, PathChildrenCache> pathChildrenCaches;
+    private Map<String, PathChildrenCache> pathChildrenCaches;
 
-    private final ConnectionStateListener connectionStateListener =
-            (client, newState) -> handleStateChange(newState);
+    private ConnectionStateListener connectionStateListener = (client, 
newState) -> handleStateChange(newState);
 
-    public ZookeeperWatcher(CuratorFramework client) {
-        this.client = checkNotNull(client, "CuratorFramework client");
+    /**
+     * open
+     * 
+     * @param  config       command parameters when process start.
+     * @param  metaListener a listener of DataFlowInfo
+     * @throws Exception    any exception
+     */
+    public void open(Configuration config, DataFlowInfoListener metaListener) 
throws Exception {
+        this.client = ZooKeeperUtils.startCuratorFramework(config);
         this.caches = new HashMap<>();
         this.pathChildrenCaches = new HashMap<>();
         client.getUnhandledErrorListenable().addListener(this);
         
client.getConnectionStateListenable().addListener(connectionStateListener);
+        synchronized (MetaManager.LOCK) {
+            final String dataFlowsWatchingPath = 
getWatchingPathOfDataFlowsInCluster(
+                    config.getString(Constants.CLUSTER_ID));
+            DataFlowsChildrenWatcherListener dataFlowsChildrenWatcherListener 
= new DataFlowsChildrenWatcherListener(
+                    this.client, metaListener);
+            this.registerPathChildrenWatcher(
+                    dataFlowsWatchingPath, dataFlowsChildrenWatcherListener, 
true);
+            final List<ChildData> childData = 
this.getCurrentPathChildrenDatum(dataFlowsWatchingPath);
+            if (childData != null) {
+                dataFlowsChildrenWatcherListener.onInitialized(childData);
+            }
+        }
+    }
+
+    /**
+     * If you want to change the path rule here, please change 
ZkTools#getNodePathOfDataFlowStorageInfoInCluster in api
+     * too.
+     */
+    public static String getWatchingPathOfDataFlowsInCluster(String cluster) {

Review comment:
       ok, move those codes of about DataFlowInfo to ZookeeperMetaWatcher.

##########
File path: 
inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperWatcherUtils.java
##########
@@ -337,4 +377,95 @@ public void childEvent(
             }
         }
     }
+
+    private static class DataFlowsChildrenWatcherListener implements 
ChildrenWatcherListener {

Review comment:
       ok, move those codes of about DataFlowInfo to ZookeeperMetaWatcher.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to