This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f8039b6a41 [INLONG-8564][Manager] Fix unable to issue tasks when
modifying data node info (#8565)
f8039b6a41 is described below
commit f8039b6a41d8174c8647fa90097ee735e1c3bd97
Author: fuweng11 <[email protected]>
AuthorDate: Wed Jul 19 18:58:54 2023 +0800
[INLONG-8564][Manager] Fix unable to issue tasks when modifying data node
info (#8565)
* [INLONG-8564][Manager] Fix modify dataNode information unable to issue
task
* [INLONG-8564][Manager] Improve the code readability
---------
Co-authored-by: healchow <[email protected]>
---
.../service/node/AbstractDataNodeOperator.java | 2 +-
.../manager/service/node/DataNodeOperator.java | 4 +-
.../manager/service/node/DataNodeServiceImpl.java | 56 +++++-----------------
.../service/node/mysql/MySQLDataNodeOperator.java | 14 +++---
.../manager/service/user/UserServiceImpl.java | 5 +-
5 files changed, 24 insertions(+), 57 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
index f5817d897e..c50207817a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
@@ -107,7 +107,7 @@ public abstract class AbstractDataNodeOperator implements
DataNodeOperator {
}
@Override
- public void updateRelatedStreamSource(DataNodeRequest request,
DataNodeEntity entity, String operator) {
+ public void updateRelatedStreamSource(DataNodeRequest request,
DataNodeEntity oldEntity, String operator) {
LOGGER.info("do nothing for the data node type ={}",
request.getType());
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
index 58a12b7321..1974b544aa 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
@@ -83,9 +83,9 @@ public interface DataNodeOperator {
* Update related stream source.
*
* @param request data node request
- * @param entity data node entity
+ * @param oldEntity old data node entity
* @param operator operator
*/
- void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity
entity, String operator);
+ void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity
oldEntity, String operator);
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
index 145435fef1..c950937dfe 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
@@ -21,6 +21,7 @@ import
org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
@@ -184,7 +185,7 @@ public class DataNodeServiceImpl implements DataNodeService
{
@Transactional(rollbackFor = Throwable.class)
public Boolean update(DataNodeRequest request, String operator) {
LOGGER.info("begin to update data node by id: {}", request);
- // check whether record existed
+ // check whether the record existed
DataNodeEntity curEntity = dataNodeMapper.selectById(request.getId());
if (curEntity == null) {
throw new BusinessException(ErrorCodeEnum.RECORD_NOT_FOUND,
@@ -192,25 +193,18 @@ public class DataNodeServiceImpl implements
DataNodeService {
}
userService.checkUser(curEntity.getInCharges(), operator,
"Current user does not have permission to update data node
info");
+
// check whether modify unmodifiable parameters
chkUnmodifiableParams(curEntity, request);
- // Check whether the data node name exists with the same name and type
- if (request.getName() != null) {
- if (StringUtils.isBlank(request.getName())) {
- throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
- "the name changed of data node is blank!");
- }
- DataNodeEntity existEntity =
- dataNodeMapper.selectByUniqueKey(request.getName(),
request.getType());
- if (existEntity != null &&
!existEntity.getId().equals(request.getId())) {
- throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
- String.format("data node already exist for name=%s,
type=%s, required id=%s, exist id=%s",
- request.getName(), request.getType(),
request.getId(), existEntity.getId()));
- }
- }
+
+ // after the update operation, `curEntity` will be updated to the
latest info by the MyBatis cache mechanism,
+ // so we need to get an `oldEntity` by copying `curEntity` before the
update operation.
+ DataNodeEntity oldEntity = CommonBeanUtils.copyProperties(curEntity,
DataNodeEntity::new);
DataNodeOperator dataNodeOperator =
operatorFactory.getInstance(request.getType());
dataNodeOperator.updateOpt(request, operator);
- dataNodeOperator.updateRelatedStreamSource(request, curEntity,
operator);
+
+ // update the related stream sources if the request and old entity ha
+ dataNodeOperator.updateRelatedStreamSource(request, oldEntity,
operator);
LOGGER.info("success to update data node={}", request);
return true;
}
@@ -218,34 +212,8 @@ public class DataNodeServiceImpl implements
DataNodeService {
@Override
@Transactional(rollbackFor = Throwable.class)
public Boolean update(DataNodeRequest request, UserInfo opInfo) {
- // check the record existed
- DataNodeEntity curEntity = dataNodeMapper.selectById(request.getId());
- if (curEntity == null) {
- throw new BusinessException(ErrorCodeEnum.RECORD_NOT_FOUND,
- String.format("data node record not found by id=%d",
request.getId()));
- }
- userService.checkUser(curEntity.getInCharges(), opInfo.getName(),
- "Current user does not have permission to update data node
info");
- // check whether modify unmodifiable parameters
- chkUnmodifiableParams(curEntity, request);
- // Check whether the data node name exists with the same name and type
- if (request.getName() != null) {
- if (StringUtils.isBlank(request.getName())) {
- throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
- "the name changed of data node is blank!");
- }
- DataNodeEntity existEntity =
- dataNodeMapper.selectByUniqueKey(request.getName(),
request.getType());
- if (existEntity != null &&
!existEntity.getId().equals(request.getId())) {
- throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
- String.format("data node already exist for name=%s,
type=%s, required id=%s, exist id=%s",
- request.getName(), request.getType(),
request.getId(), existEntity.getId()));
- }
- }
- DataNodeOperator dataNodeOperator =
operatorFactory.getInstance(request.getType());
- dataNodeOperator.updateOpt(request, opInfo.getName());
- dataNodeOperator.updateRelatedStreamSource(request, curEntity,
opInfo.getName());
- return true;
+ String operator = opInfo.getName();
+ return this.update(request, operator);
}
@Override
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
index 8336bf6ed3..f55699febc 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
@@ -110,13 +110,13 @@ public class MySQLDataNodeOperator extends
AbstractDataNodeOperator {
}
@Override
- public void updateRelatedStreamSource(DataNodeRequest request,
DataNodeEntity entity, String operator) {
- MySQLDataNodeRequest mySQLDataNodeRequest = (MySQLDataNodeRequest)
request;
- MySQLDataNodeInfo mySQLDataNodeInfo = (MySQLDataNodeInfo)
this.getFromEntity(entity);
- boolean changed = !Objects.equals(mySQLDataNodeRequest.getUrl(),
mySQLDataNodeInfo.getUrl())
- || !Objects.equals(mySQLDataNodeRequest.getBackupUrl(),
mySQLDataNodeInfo.getBackupUrl())
- || !Objects.equals(mySQLDataNodeRequest.getUsername(),
mySQLDataNodeInfo.getUsername())
- || !Objects.equals(mySQLDataNodeRequest.getToken(),
mySQLDataNodeInfo.getToken());
+ public void updateRelatedStreamSource(DataNodeRequest request,
DataNodeEntity oldEntity, String operator) {
+ MySQLDataNodeRequest nodeRequest = (MySQLDataNodeRequest) request;
+ MySQLDataNodeInfo nodeInfo = (MySQLDataNodeInfo)
this.getFromEntity(oldEntity);
+ boolean changed = !Objects.equals(nodeRequest.getUrl(),
nodeInfo.getUrl())
+ || !Objects.equals(nodeRequest.getBackupUrl(),
nodeInfo.getBackupUrl())
+ || !Objects.equals(nodeRequest.getUsername(),
nodeInfo.getUsername())
+ || !Objects.equals(nodeRequest.getToken(),
nodeInfo.getToken());
if (changed) {
retryStreamSourceByDataNodeNameAndType(request.getName(),
SourceType.MYSQL_SQL, operator);
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/user/UserServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/user/UserServiceImpl.java
index 450699746f..a3f29d4494 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/user/UserServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/user/UserServiceImpl.java
@@ -357,9 +357,8 @@ public class UserServiceImpl implements UserService {
public void checkUser(String inCharges, String user, String errMsg) {
UserEntity userEntity = userMapper.selectByName(user);
boolean isInCharge = Preconditions.inSeparatedString(user, inCharges,
InlongConstants.COMMA);
- Preconditions.expectTrue(
- isInCharge ||
TenantUserTypeEnum.TENANT_ADMIN.getCode().equals(userEntity.getAccountType()),
- errMsg);
+ Preconditions.expectTrue(isInCharge
+ ||
TenantUserTypeEnum.TENANT_ADMIN.getCode().equals(userEntity.getAccountType()),
errMsg);
}
public void removeInChargeForGroup(String user, String operator) {