chantccc commented on a change in pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#discussion_r775695298
##########
File path:
inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaManager.java
##########
@@ -132,149 +129,116 @@ public void
registerDataFlowInfoListener(DataFlowInfoListener dataFlowInfoListen
LOG.info("Register DataFlowInfoListener successfully");
}
+ /**
+ * getDataFlowInfo
+ *
+ * @param id
+ * @return
+ */
public DataFlowInfo getDataFlowInfo(long id) {
- synchronized (lock) {
+ synchronized (LOCK) {
return dataFlowInfoMap.get(id);
}
}
- public interface DataFlowInfoListener {
- void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
- void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
- void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
- }
-
- private class DataFlowsChildrenWatcherListener implements
ChildrenWatcherListener {
-
- @Override
- public void onChildAdded(ChildData childData) throws Exception {
- LOG.info("DataFlow Added event retrieved");
-
- final byte[] data = childData.getData();
- if (data == null) {
- return;
+ /**
+ * addDataFlow
+ *
+ * @param dataFlowInfo
+ * @throws Exception
+ */
+ public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+ long dataFlowId = dataFlowInfo.getId();
+ DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId,
dataFlowInfo);
Review comment:
modification of dataFlowInfoMap should be under `LOCK`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]