This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ae984c1c8 [INLONG-6515][Manager] NPE is thrown in the cache
configuration of DataProxy (#6520)
ae984c1c8 is described below
commit ae984c1c8b57580d3f885a8644eead6a1aec7839
Author: 卢春亮 <[email protected]>
AuthorDate: Tue Nov 15 19:31:46 2022 +0800
[INLONG-6515][Manager] NPE is thrown in the cache configuration of
DataProxy (#6520)
Co-authored-by: healchow <[email protected]>
---
.../repository/DataProxyConfigRepository.java | 64 ++++++++++++----------
.../repository/DataProxyConfigRepositoryTest.java | 4 +-
2 files changed, 38 insertions(+), 30 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index 3e93612cb..adaa7c1f3 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -21,7 +21,6 @@ import com.google.common.base.Splitter;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
@@ -61,6 +60,7 @@ import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
+import javax.annotation.PostConstruct;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Date;
@@ -68,14 +68,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Objects;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
-import javax.annotation.PostConstruct;
-
/**
* DataProxyConfigRepository
*/
@@ -90,14 +87,14 @@ public class DataProxyConfigRepository implements
IRepository {
public static final String KEY_BACKUP_TOPIC = "backup_topic";
public static final String KEY_SORT_TASK_NAME = "defaultSortTaskName";
public static final String KEY_DATA_NODE_NAME = "defaultDataNodeName";
- public static final String KEY_SORT_CONSUEMER_GROUP =
"defaultSortConsumerGroup";
+ public static final String KEY_SORT_CONSUMER_GROUP =
"defaultSortConsumerGroup";
public static final String KEY_SINK_NAME = "defaultSinkName";
public static final Splitter.MapSplitter MAP_SPLITTER =
Splitter.on(SEPARATOR).trimResults()
.withKeyValueSeparator(KEY_VALUE_SEPARATOR);
public static final String CACHE_CLUSTER_PRODUCER_TAG = "producer";
public static final String CACHE_CLUSTER_CONSUMER_TAG = "consumer";
- private static final Gson gson = new Gson();
+ private static final Gson GSON = new Gson();
// key: proxyClusterName, value: jsonString
private Map<String, String> proxyConfigJson = new ConcurrentHashMap<>();
@@ -214,7 +211,7 @@ public class DataProxyConfigRepository implements
IRepository {
if (proxyClusterMap.size() == 0) {
return;
}
- // reoload cache cluster
+ // reload cache cluster
this.reloadCacheCluster(proxyClusterMap);
// reload inlong group id and inlong stream id
this.reloadInlongId(proxyClusterMap);
@@ -284,7 +281,7 @@ public class DataProxyConfigRepository implements
IRepository {
CacheClusterObject obj = new CacheClusterObject();
obj.setName(cacheCluster.getClusterName());
obj.setZone(cacheCluster.getExtTag());
-
obj.setParams(fromJson(cacheCluster.getExtParams()));
+
obj.setParams(fromJsonToMap(cacheCluster.getExtParams()));
cacheClusters.add(obj);
}
}
@@ -294,16 +291,15 @@ public class DataProxyConfigRepository implements
IRepository {
}
/**
- * fromJson
+ * Parse Json string to Java Map
*/
- private Map<String, String> fromJson(String jsonString) {
+ private Map<String, String> fromJsonToMap(String jsonString) {
Map<String, String> mapObj = new HashMap<>();
+ if (StringUtils.isBlank(jsonString)) {
+ return mapObj;
+ }
try {
- JsonObject obj = gson.fromJson(jsonString, JsonObject.class);
- // when jsonString is null or empty str, The return value
parameter will become null
- if (Objects.isNull(obj)) {
- return mapObj;
- }
+ JsonObject obj = GSON.fromJson(jsonString, JsonObject.class);
for (String key : obj.keySet()) {
JsonElement child = obj.get(key);
if (child.isJsonPrimitive()) {
@@ -313,11 +309,26 @@ public class DataProxyConfigRepository implements
IRepository {
}
}
} catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
+ LOGGER.error("parse json string to map error", e);
}
return mapObj;
}
+ /**
+ * Parse Json string to JsonObject
+ */
+ private JsonObject fromJsonToJson(String jsonString) {
+ if (StringUtils.isBlank(jsonString)) {
+ return new JsonObject();
+ }
+ try {
+ return GSON.fromJson(jsonString, JsonObject.class);
+ } catch (Exception e) {
+ LOGGER.error("parse json string to json object error", e);
+ return new JsonObject();
+ }
+ }
+
/**
* reloadInlongId
*/
@@ -327,7 +338,7 @@ public class DataProxyConfigRepository implements
IRepository {
clusterSetMapper.selectInlongGroupId().forEach(value ->
groupIdMap.put(value.getInlongGroupId(), value));
// reload inlong group ext params
Map<String, Map<String, String>> groupParams = new HashMap<>();
- groupIdMap.forEach((k, v) -> groupParams.put(k,
fromJson(v.getExtParams())));
+ groupIdMap.forEach((k, v) -> groupParams.put(k,
fromJsonToMap(v.getExtParams())));
// reload inlong group ext
List<InlongGroupExtEntity> groupExtCursor = sortConfigLoader
.loadGroupBackupInfo(ClusterSwitch.BACKUP_CLUSTER_TAG);
@@ -339,7 +350,7 @@ public class DataProxyConfigRepository implements
IRepository {
.forEach(v ->
streamIdMap.put(getInlongId(v.getInlongGroupId(), v.getInlongStreamId()), v));
// reload inlong stream ext params
Map<String, Map<String, String>> streamParams = new HashMap<>();
- streamIdMap.forEach((k, v) -> streamParams.put(k,
fromJson(v.getExtParams())));
+ streamIdMap.forEach((k, v) -> streamParams.put(k,
fromJsonToMap(v.getExtParams())));
// reload inlong stream ext
List<InlongStreamExtEntity> streamExtCursor = sortConfigLoader
.loadStreamBackupInfo(ClusterSwitch.BACKUP_MQ_RESOURCE);
@@ -435,14 +446,14 @@ public class DataProxyConfigRepository implements
IRepository {
for (Entry<String, DataProxyCluster> entry :
proxyClusterMap.entrySet()) {
DataProxyCluster proxyObj = entry.getValue();
// json
- String jsonDataProxyCluster = gson.toJson(proxyObj);
+ String jsonDataProxyCluster = GSON.toJson(proxyObj);
String md5 = DigestUtils.md5Hex(jsonDataProxyCluster);
DataProxyConfigResponse response = new DataProxyConfigResponse();
response.setResult(true);
response.setErrCode(DataProxyConfigResponse.SUCC);
response.setMd5(md5);
response.setData(proxyObj);
- String jsonResponse = gson.toJson(response);
+ String jsonResponse = GSON.toJson(response);
newProxyConfigJson.put(entry.getKey(), jsonResponse);
newProxyMd5Map.put(entry.getKey(), md5);
}
@@ -551,14 +562,13 @@ public class DataProxyConfigRepository implements
IRepository {
if (StringUtils.equals(clusterType, v.getType())
&& StringUtils.equals(clusterTag, v.getClusterTags())) {
String newExtParams = v.getExtParams();
- Gson gson = new Gson();
- JsonObject extParams = gson.fromJson(newExtParams,
JsonObject.class);
+ JsonObject extParams = fromJsonToJson(newExtParams);
if (extParams.has(KEY_SINK_NAME) &&
extParams.has(KEY_SORT_TASK_NAME)
- && extParams.has(KEY_DATA_NODE_NAME) &&
extParams.has(KEY_SORT_CONSUEMER_GROUP)) {
+ && extParams.has(KEY_DATA_NODE_NAME) &&
extParams.has(KEY_SORT_CONSUMER_GROUP)) {
final String sinkName =
extParams.get(KEY_SINK_NAME).getAsString();
final String sortTaskName =
extParams.get(KEY_SORT_TASK_NAME).getAsString();
final String dataNodeName =
extParams.get(KEY_DATA_NODE_NAME).getAsString();
- final String sortConsumerGroup =
extParams.get(KEY_SORT_CONSUEMER_GROUP).getAsString();
+ final String sortConsumerGroup =
extParams.get(KEY_SORT_CONSUMER_GROUP).getAsString();
StreamSinkEntity newStreamSink =
copyStreamSink(srcStreamSink);
newStreamSink.setInlongClusterName(v.getName());
newStreamSink.setSinkName(sinkName);
@@ -595,8 +605,7 @@ public class DataProxyConfigRepository implements
IRepository {
extParams = "{}";
}
// parse json
- Gson gson = new Gson();
- JsonObject extParamsObj = gson.fromJson(extParams, JsonObject.class);
+ JsonObject extParamsObj = fromJsonToJson(extParams);
// change cluster tag
extParamsObj.addProperty(KEY_BACKUP_CLUSTER_TAG,
oldGroup.getInlongClusterTag());
extParamsObj.addProperty(KEY_BACKUP_TOPIC, oldGroup.getMqResource());
@@ -627,8 +636,7 @@ public class DataProxyConfigRepository implements
IRepository {
return inlongGroupId;
}
// parse json
- Gson gson = new Gson();
- JsonObject extParamsObj = gson.fromJson(extParams, JsonObject.class);
+ JsonObject extParamsObj = fromJsonToJson(extParams);
if (!extParamsObj.has(KEY_BACKUP_CLUSTER_TAG)) {
return inlongGroupId;
}
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryTest.java
index 87959e5bb..8f6371fc3 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryTest.java
@@ -102,7 +102,7 @@ public class DataProxyConfigRepositoryTest {
DataProxyConfigRepository.KEY_SINK_NAME, CLS_DATA_NODE_OLD,
DataProxyConfigRepository.KEY_SORT_TASK_NAME,
CLS_DATA_NODE_OLD,
DataProxyConfigRepository.KEY_DATA_NODE_NAME,
CLS_DATA_NODE_OLD,
- DataProxyConfigRepository.KEY_SORT_CONSUEMER_GROUP,
CLS_DATA_NODE_OLD));
+ DataProxyConfigRepository.KEY_SORT_CONSUMER_GROUP,
CLS_DATA_NODE_OLD));
clusters.add(clsCluster);
InlongClusterEntity kafkaCluster = new InlongClusterEntity();
kafkaCluster.setName("kafka_1");
@@ -117,7 +117,7 @@ public class DataProxyConfigRepositoryTest {
DataProxyConfigRepository.KEY_SINK_NAME, CLS_DATA_NODE_NEW,
DataProxyConfigRepository.KEY_SORT_TASK_NAME,
CLS_DATA_NODE_NEW,
DataProxyConfigRepository.KEY_DATA_NODE_NAME,
CLS_DATA_NODE_NEW,
- DataProxyConfigRepository.KEY_SORT_CONSUEMER_GROUP,
CLS_DATA_NODE_NEW));
+ DataProxyConfigRepository.KEY_SORT_CONSUMER_GROUP,
CLS_DATA_NODE_NEW));
clusters.add(clsCluster2);
InlongClusterEntityMapper mapper =
PowerMockito.mock(InlongClusterEntityMapper.class);
PowerMockito.when(mapper.selectByCondition(any())).thenReturn(clusters);