This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ae984c1c8 [INLONG-6515][Manager] NPE is thrown in the cache 
configuration of DataProxy (#6520)
ae984c1c8 is described below

commit ae984c1c8b57580d3f885a8644eead6a1aec7839
Author: 卢春亮 <[email protected]>
AuthorDate: Tue Nov 15 19:31:46 2022 +0800

    [INLONG-6515][Manager] NPE is thrown in the cache configuration of 
DataProxy (#6520)
    
    Co-authored-by: healchow <[email protected]>
---
 .../repository/DataProxyConfigRepository.java      | 64 ++++++++++++----------
 .../repository/DataProxyConfigRepositoryTest.java  |  4 +-
 2 files changed, 38 insertions(+), 30 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index 3e93612cb..adaa7c1f3 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -21,7 +21,6 @@ import com.google.common.base.Splitter;
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
-
 import org.apache.commons.beanutils.BeanUtils;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -61,6 +60,7 @@ import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Repository;
 import org.springframework.transaction.annotation.Transactional;
 
+import javax.annotation.PostConstruct;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Date;
@@ -68,14 +68,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 
-import javax.annotation.PostConstruct;
-
 /**
  * DataProxyConfigRepository
  */
@@ -90,14 +87,14 @@ public class DataProxyConfigRepository implements 
IRepository {
     public static final String KEY_BACKUP_TOPIC = "backup_topic";
     public static final String KEY_SORT_TASK_NAME = "defaultSortTaskName";
     public static final String KEY_DATA_NODE_NAME = "defaultDataNodeName";
-    public static final String KEY_SORT_CONSUEMER_GROUP = 
"defaultSortConsumerGroup";
+    public static final String KEY_SORT_CONSUMER_GROUP = 
"defaultSortConsumerGroup";
     public static final String KEY_SINK_NAME = "defaultSinkName";
 
     public static final Splitter.MapSplitter MAP_SPLITTER = 
Splitter.on(SEPARATOR).trimResults()
             .withKeyValueSeparator(KEY_VALUE_SEPARATOR);
     public static final String CACHE_CLUSTER_PRODUCER_TAG = "producer";
     public static final String CACHE_CLUSTER_CONSUMER_TAG = "consumer";
-    private static final Gson gson = new Gson();
+    private static final Gson GSON = new Gson();
 
     // key: proxyClusterName, value: jsonString
     private Map<String, String> proxyConfigJson = new ConcurrentHashMap<>();
@@ -214,7 +211,7 @@ public class DataProxyConfigRepository implements 
IRepository {
         if (proxyClusterMap.size() == 0) {
             return;
         }
-        // reoload cache cluster
+        // reload cache cluster
         this.reloadCacheCluster(proxyClusterMap);
         // reload inlong group id and inlong stream id
         this.reloadInlongId(proxyClusterMap);
@@ -284,7 +281,7 @@ public class DataProxyConfigRepository implements 
IRepository {
                             CacheClusterObject obj = new CacheClusterObject();
                             obj.setName(cacheCluster.getClusterName());
                             obj.setZone(cacheCluster.getExtTag());
-                            
obj.setParams(fromJson(cacheCluster.getExtParams()));
+                            
obj.setParams(fromJsonToMap(cacheCluster.getExtParams()));
                             cacheClusters.add(obj);
                         }
                     }
@@ -294,16 +291,15 @@ public class DataProxyConfigRepository implements 
IRepository {
     }
 
     /**
-     * fromJson
+     * Parse Json string to Java Map
      */
-    private Map<String, String> fromJson(String jsonString) {
+    private Map<String, String> fromJsonToMap(String jsonString) {
         Map<String, String> mapObj = new HashMap<>();
+        if (StringUtils.isBlank(jsonString)) {
+            return mapObj;
+        }
         try {
-            JsonObject obj = gson.fromJson(jsonString, JsonObject.class);
-            // when jsonString is null or empty str, The return value 
parameter will become null
-            if (Objects.isNull(obj)) {
-                return mapObj;
-            }
+            JsonObject obj = GSON.fromJson(jsonString, JsonObject.class);
             for (String key : obj.keySet()) {
                 JsonElement child = obj.get(key);
                 if (child.isJsonPrimitive()) {
@@ -313,11 +309,26 @@ public class DataProxyConfigRepository implements 
IRepository {
                 }
             }
         } catch (Exception e) {
-            LOGGER.error(e.getMessage(), e);
+            LOGGER.error("parse json string to map error", e);
         }
         return mapObj;
     }
 
+    /**
+     * Parse Json string to JsonObject
+     */
+    private JsonObject fromJsonToJson(String jsonString) {
+        if (StringUtils.isBlank(jsonString)) {
+            return new JsonObject();
+        }
+        try {
+            return GSON.fromJson(jsonString, JsonObject.class);
+        } catch (Exception e) {
+            LOGGER.error("parse json string to json object error", e);
+            return new JsonObject();
+        }
+    }
+
     /**
      * reloadInlongId
      */
@@ -327,7 +338,7 @@ public class DataProxyConfigRepository implements 
IRepository {
         clusterSetMapper.selectInlongGroupId().forEach(value -> 
groupIdMap.put(value.getInlongGroupId(), value));
         // reload inlong group ext params
         Map<String, Map<String, String>> groupParams = new HashMap<>();
-        groupIdMap.forEach((k, v) -> groupParams.put(k, 
fromJson(v.getExtParams())));
+        groupIdMap.forEach((k, v) -> groupParams.put(k, 
fromJsonToMap(v.getExtParams())));
         // reload inlong group ext
         List<InlongGroupExtEntity> groupExtCursor = sortConfigLoader
                 .loadGroupBackupInfo(ClusterSwitch.BACKUP_CLUSTER_TAG);
@@ -339,7 +350,7 @@ public class DataProxyConfigRepository implements 
IRepository {
                 .forEach(v -> 
streamIdMap.put(getInlongId(v.getInlongGroupId(), v.getInlongStreamId()), v));
         // reload inlong stream ext params
         Map<String, Map<String, String>> streamParams = new HashMap<>();
-        streamIdMap.forEach((k, v) -> streamParams.put(k, 
fromJson(v.getExtParams())));
+        streamIdMap.forEach((k, v) -> streamParams.put(k, 
fromJsonToMap(v.getExtParams())));
         // reload inlong stream ext
         List<InlongStreamExtEntity> streamExtCursor = sortConfigLoader
                 .loadStreamBackupInfo(ClusterSwitch.BACKUP_MQ_RESOURCE);
@@ -435,14 +446,14 @@ public class DataProxyConfigRepository implements 
IRepository {
         for (Entry<String, DataProxyCluster> entry : 
proxyClusterMap.entrySet()) {
             DataProxyCluster proxyObj = entry.getValue();
             // json
-            String jsonDataProxyCluster = gson.toJson(proxyObj);
+            String jsonDataProxyCluster = GSON.toJson(proxyObj);
             String md5 = DigestUtils.md5Hex(jsonDataProxyCluster);
             DataProxyConfigResponse response = new DataProxyConfigResponse();
             response.setResult(true);
             response.setErrCode(DataProxyConfigResponse.SUCC);
             response.setMd5(md5);
             response.setData(proxyObj);
-            String jsonResponse = gson.toJson(response);
+            String jsonResponse = GSON.toJson(response);
             newProxyConfigJson.put(entry.getKey(), jsonResponse);
             newProxyMd5Map.put(entry.getKey(), md5);
         }
@@ -551,14 +562,13 @@ public class DataProxyConfigRepository implements 
IRepository {
             if (StringUtils.equals(clusterType, v.getType())
                     && StringUtils.equals(clusterTag, v.getClusterTags())) {
                 String newExtParams = v.getExtParams();
-                Gson gson = new Gson();
-                JsonObject extParams = gson.fromJson(newExtParams, 
JsonObject.class);
+                JsonObject extParams = fromJsonToJson(newExtParams);
                 if (extParams.has(KEY_SINK_NAME) && 
extParams.has(KEY_SORT_TASK_NAME)
-                        && extParams.has(KEY_DATA_NODE_NAME) && 
extParams.has(KEY_SORT_CONSUEMER_GROUP)) {
+                        && extParams.has(KEY_DATA_NODE_NAME) && 
extParams.has(KEY_SORT_CONSUMER_GROUP)) {
                     final String sinkName = 
extParams.get(KEY_SINK_NAME).getAsString();
                     final String sortTaskName = 
extParams.get(KEY_SORT_TASK_NAME).getAsString();
                     final String dataNodeName = 
extParams.get(KEY_DATA_NODE_NAME).getAsString();
-                    final String sortConsumerGroup = 
extParams.get(KEY_SORT_CONSUEMER_GROUP).getAsString();
+                    final String sortConsumerGroup = 
extParams.get(KEY_SORT_CONSUMER_GROUP).getAsString();
                     StreamSinkEntity newStreamSink = 
copyStreamSink(srcStreamSink);
                     newStreamSink.setInlongClusterName(v.getName());
                     newStreamSink.setSinkName(sinkName);
@@ -595,8 +605,7 @@ public class DataProxyConfigRepository implements 
IRepository {
             extParams = "{}";
         }
         // parse json
-        Gson gson = new Gson();
-        JsonObject extParamsObj = gson.fromJson(extParams, JsonObject.class);
+        JsonObject extParamsObj = fromJsonToJson(extParams);
         // change cluster tag
         extParamsObj.addProperty(KEY_BACKUP_CLUSTER_TAG, 
oldGroup.getInlongClusterTag());
         extParamsObj.addProperty(KEY_BACKUP_TOPIC, oldGroup.getMqResource());
@@ -627,8 +636,7 @@ public class DataProxyConfigRepository implements 
IRepository {
             return inlongGroupId;
         }
         // parse json
-        Gson gson = new Gson();
-        JsonObject extParamsObj = gson.fromJson(extParams, JsonObject.class);
+        JsonObject extParamsObj = fromJsonToJson(extParams);
         if (!extParamsObj.has(KEY_BACKUP_CLUSTER_TAG)) {
             return inlongGroupId;
         }
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryTest.java
index 87959e5bb..8f6371fc3 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryTest.java
@@ -102,7 +102,7 @@ public class DataProxyConfigRepositoryTest {
                 DataProxyConfigRepository.KEY_SINK_NAME, CLS_DATA_NODE_OLD,
                 DataProxyConfigRepository.KEY_SORT_TASK_NAME, 
CLS_DATA_NODE_OLD,
                 DataProxyConfigRepository.KEY_DATA_NODE_NAME, 
CLS_DATA_NODE_OLD,
-                DataProxyConfigRepository.KEY_SORT_CONSUEMER_GROUP, 
CLS_DATA_NODE_OLD));
+                DataProxyConfigRepository.KEY_SORT_CONSUMER_GROUP, 
CLS_DATA_NODE_OLD));
         clusters.add(clsCluster);
         InlongClusterEntity kafkaCluster = new InlongClusterEntity();
         kafkaCluster.setName("kafka_1");
@@ -117,7 +117,7 @@ public class DataProxyConfigRepositoryTest {
                 DataProxyConfigRepository.KEY_SINK_NAME, CLS_DATA_NODE_NEW,
                 DataProxyConfigRepository.KEY_SORT_TASK_NAME, 
CLS_DATA_NODE_NEW,
                 DataProxyConfigRepository.KEY_DATA_NODE_NAME, 
CLS_DATA_NODE_NEW,
-                DataProxyConfigRepository.KEY_SORT_CONSUEMER_GROUP, 
CLS_DATA_NODE_NEW));
+                DataProxyConfigRepository.KEY_SORT_CONSUMER_GROUP, 
CLS_DATA_NODE_NEW));
         clusters.add(clsCluster2);
         InlongClusterEntityMapper mapper = 
PowerMockito.mock(InlongClusterEntityMapper.class);
         
PowerMockito.when(mapper.selectByCondition(any())).thenReturn(clusters);

Reply via email to