This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6bad73345 [INLONG-6988][Manager] Use the data node info for StarRocks
(#6989)
6bad73345 is described below
commit 6bad73345a13c9f8f8ad7f2b83e73ccfd7fc4653
Author: fuweng11 <[email protected]>
AuthorDate: Wed Dec 21 10:34:12 2022 +0800
[INLONG-6988][Manager] Use the data node info for StarRocks (#6989)
---
.../inlong/manager/common/consts/DataNodeType.java | 1 +
.../pojo/node/starrocks/StarRocksDataNodeDTO.java | 64 ++++++++++++++++
.../pojo/node/starrocks/StarRocksDataNodeInfo.java | 51 +++++++++++++
.../node/starrocks/StarRocksDataNodeRequest.java | 46 ++++++++++++
.../node/starrocks/StarRocksDataNodeOperator.java | 86 ++++++++++++++++++++++
.../sink/starrocks/StarRocksResourceOperator.java | 24 +++++-
.../sink/starrocks/StarRocksSinkOperator.java | 10 +++
7 files changed, 281 insertions(+), 1 deletion(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
index 16fa92c77..a32adac37 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
@@ -29,5 +29,6 @@ public class DataNodeType {
public static final String CLICKHOUSE = "CLICKHOUSE";
public static final String ELASTICSEARCH = "ELASTICSEARCH";
public static final String MYSQL = "MYSQL";
+ public static final String STARROCKS = "STARROCKS";
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeDTO.java
new file mode 100644
index 000000000..c01d99c6b
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeDTO.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.starrocks;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * StarRocks data node info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("StarRocks data node info")
+public class StarRocksDataNodeDTO {
+
+ @ApiModelProperty("StarRocks FE http address")
+ private String loadUrl;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static StarRocksDataNodeDTO getFromRequest(StarRocksDataNodeRequest
request) throws Exception {
+ return CommonBeanUtils.copyProperties(request,
StarRocksDataNodeDTO::new, true);
+ }
+
+ /**
+ * Get the dto instance from the JSON string.
+ */
+ public static StarRocksDataNodeDTO getFromJson(@NotNull String extParams) {
+ try {
+ return JsonUtils.parseObject(extParams,
StarRocksDataNodeDTO.class);
+ } catch (Exception e) {
+ throw new
BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage());
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeInfo.java
new file mode 100644
index 000000000..8595445e8
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeInfo.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.starrocks;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+
+/**
+ * StarRocks data node info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.STARROCKS)
+@ApiModel("StarRocks data node info")
+public class StarRocksDataNodeInfo extends DataNodeInfo {
+
+ @ApiModelProperty("StarRocks FE http address")
+ private String loadUrl;
+
+ public StarRocksDataNodeInfo() {
+ this.setType(DataNodeType.STARROCKS);
+ }
+
+ @Override
+ public StarRocksDataNodeRequest genRequest() {
+ return CommonBeanUtils.copyProperties(this,
StarRocksDataNodeRequest::new);
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeRequest.java
new file mode 100644
index 000000000..164b94479
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.starrocks;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+/**
+ * StarRocks data node request
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.STARROCKS)
+@ApiModel("StarRocks data node request")
+public class StarRocksDataNodeRequest extends DataNodeRequest {
+
+ @ApiModelProperty("StarRocks FE http address")
+ private String loadUrl;
+
+ public StarRocksDataNodeRequest() {
+ this.setType(DataNodeType.STARROCKS);
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java
new file mode 100644
index 000000000..fb82d894c
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.starrocks;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class StarRocksDataNodeOperator extends AbstractDataNodeOperator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(StarRocksDataNodeOperator.class);
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(String dataNodeType) {
+ return getDataNodeType().equals(dataNodeType);
+ }
+
+ @Override
+ public String getDataNodeType() {
+ return DataNodeType.STARROCKS;
+ }
+
+ @Override
+ public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+ if (entity == null) {
+ throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+ }
+
+ StarRocksDataNodeInfo starRocksDataNodeInfo = new
StarRocksDataNodeInfo();
+ CommonBeanUtils.copyProperties(entity, starRocksDataNodeInfo);
+ if (StringUtils.isNotBlank(entity.getExtParams())) {
+ StarRocksDataNodeDTO dto =
StarRocksDataNodeDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(dto, starRocksDataNodeInfo);
+ }
+
+ LOGGER.debug("success to get starRocks data node from entity");
+ return starRocksDataNodeInfo;
+ }
+
+ @Override
+ protected void setTargetEntity(DataNodeRequest request, DataNodeEntity
targetEntity) {
+ StarRocksDataNodeRequest starRocksDataNodeRequest =
(StarRocksDataNodeRequest) request;
+ CommonBeanUtils.copyProperties(starRocksDataNodeRequest, targetEntity,
true);
+ try {
+ StarRocksDataNodeDTO dto =
StarRocksDataNodeDTO.getFromRequest(starRocksDataNodeRequest);
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ LOGGER.debug("success to set entity for starRocks data node");
+ } catch (Exception e) {
+ LOGGER.error("failed to set entity for starRocks data node: ", e);
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ }
+ }
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksResourceOperator.java
index b657481b7..828bae027 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksResourceOperator.java
@@ -24,12 +24,15 @@ import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeInfo;
import org.apache.inlong.manager.pojo.sink.SinkInfo;
import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksColumnInfo;
import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSinkDTO;
import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksTableInfo;
+import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
import
org.apache.inlong.manager.service.resource.sink.mysql.MySQLResourceOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
@@ -55,6 +58,9 @@ public class StarRocksResourceOperator implements
SinkResourceOperator {
@Autowired
private StreamSinkFieldEntityMapper fieldEntityMapper;
+ @Autowired
+ private DataNodeOperateHelper dataNodeHelper;
+
@Override
public Boolean accept(String sinkType) {
return SinkType.STARROCKS.equals(sinkType);
@@ -87,7 +93,7 @@ public class StarRocksResourceOperator implements
SinkResourceOperator {
// get columns
List<StarRocksColumnInfo> columnList =
getStarRocksColumnInfoFromSink(fieldList);
- StarRocksSinkDTO sinkDTO =
StarRocksSinkDTO.getFromJson(sinkInfo.getExtParams());
+ StarRocksSinkDTO sinkDTO = getStarRocksInfo(sinkInfo);
StarRocksTableInfo tableInfo = StarRocksSinkDTO.getTableInfo(sinkDTO,
columnList);
String url = sinkDTO.getJdbcUrl();
String username = sinkDTO.getUsername();
@@ -131,4 +137,20 @@ public class StarRocksResourceOperator implements
SinkResourceOperator {
}
return columnInfoList;
}
+
+ private StarRocksSinkDTO getStarRocksInfo(SinkInfo sinkInfo) {
+ StarRocksSinkDTO starRocksInfo =
StarRocksSinkDTO.getFromJson(sinkInfo.getExtParams());
+
+ // read from data node if not supplied by user
+ if (StringUtils.isBlank(starRocksInfo.getJdbcUrl())) {
+ String dataNodeName = sinkInfo.getDataNodeName();
+ Preconditions.checkNotEmpty(dataNodeName, "starRocks jdbc url not
specified and data node is empty");
+ StarRocksDataNodeInfo dataNodeInfo = (StarRocksDataNodeInfo)
dataNodeHelper.getDataNodeInfo(
+ dataNodeName, sinkInfo.getSinkType());
+ CommonBeanUtils.copyProperties(dataNodeInfo, starRocksInfo);
+ starRocksInfo.setJdbcUrl(dataNodeInfo.getUrl());
+ starRocksInfo.setPassword(dataNodeInfo.getToken());
+ }
+ return starRocksInfo;
+ }
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
index e9ff58717..5b690fc19 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeInfo;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -88,6 +89,15 @@ public class StarRocksSinkOperator extends
AbstractSinkOperator {
}
StarRocksSinkDTO dto =
StarRocksSinkDTO.getFromJson(entity.getExtParams());
+ if (StringUtils.isBlank(dto.getJdbcUrl())) {
+ Preconditions.checkNotEmpty(entity.getDataNodeName(),
+ "starRocks jdbc url unspecified and data node is empty");
+ StarRocksDataNodeInfo dataNodeInfo = (StarRocksDataNodeInfo)
dataNodeHelper.getDataNodeInfo(
+ entity.getDataNodeName(), entity.getSinkType());
+ CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
+ dto.setJdbcUrl(dataNodeInfo.getUrl());
+ dto.setPassword(dataNodeInfo.getToken());
+ }
Preconditions.checkNotEmpty(dto.getLoadUrl(), "StarRocks load url is
empty");
Preconditions.checkNotEmpty(dto.getJdbcUrl(), "StarRocks jdbc url is
empty");
CommonBeanUtils.copyProperties(entity, sink, true);