github-advanced-security[bot] commented on code in PR #15932: URL: https://github.com/apache/dolphinscheduler/pull/15932#discussion_r1582162206
########## dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AbstractEventPendingQueue.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.alert.service; + +import org.apache.dolphinscheduler.alert.metrics.AlertServerMetrics; + +import java.util.concurrent.LinkedBlockingQueue; + +public abstract class AbstractEventPendingQueue<T> implements EventPendingQueue<T> { + + private final LinkedBlockingQueue<T> pendingAlertQueue; + + private final int capacity; + + public AbstractEventPendingQueue(int capacity) { + this.capacity = capacity; + this.pendingAlertQueue = new LinkedBlockingQueue<>(capacity); + AlertServerMetrics.registerPendingAlertGauge(this::size); + } + + public void put(T alert) throws InterruptedException { + pendingAlertQueue.put(alert); + } + + public T take() throws InterruptedException { + return pendingAlertQueue.take(); + } + + public int size() { Review Comment: ## Missing Override annotation This method overrides [EventPendingQueue<T>.size](1); it is advisable to add an Override annotation. [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/4124) ########## dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertSender.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.alert.service; + +import org.apache.dolphinscheduler.alert.api.AlertData; +import org.apache.dolphinscheduler.alert.api.AlertResult; +import org.apache.dolphinscheduler.alert.config.AlertConfig; +import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager; +import org.apache.dolphinscheduler.common.enums.AlertStatus; +import org.apache.dolphinscheduler.dao.AlertDao; +import org.apache.dolphinscheduler.dao.entity.Alert; +import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance; +import org.apache.dolphinscheduler.extract.alert.request.AlertSendResponse; + +import org.apache.commons.collections4.CollectionUtils; + +import java.util.ArrayList; +import java.util.List; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class AlertSender extends AbstractEventSender<Alert> { + + private final AlertDao alertDao; + + public AlertSender(AlertDao alertDao, + AlertPluginManager alertPluginManager, + AlertConfig alertConfig) { + super(alertPluginManager, alertConfig.getWaitTimeout()); + this.alertDao = alertDao; + } + + /** + * sync send alert handler + * + * @param alertGroupId alertGroupId + * @param title title + * @param content content + * @return AlertSendResponseCommand + */ + public AlertSendResponse syncHandler(int alertGroupId, String title, String content, int warnType) { + List<AlertPluginInstance> alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId); + AlertData alertData = AlertData.builder() + .content(content) + .title(title) + .warnType(warnType) Review Comment: ## Deprecated method or constructor invocation Invoking [AlertDataBuilder.warnType](1) should be avoided because it has been deprecated. [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/4119) ########## dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AbstractEventSender.java: ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.alert.service; + +import org.apache.dolphinscheduler.alert.api.AlertChannel; +import org.apache.dolphinscheduler.alert.api.AlertConstants; +import org.apache.dolphinscheduler.alert.api.AlertData; +import org.apache.dolphinscheduler.alert.api.AlertInfo; +import org.apache.dolphinscheduler.alert.api.AlertResult; +import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager; +import org.apache.dolphinscheduler.common.enums.AlertStatus; +import org.apache.dolphinscheduler.common.enums.AlertType; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance; +import org.apache.dolphinscheduler.dao.entity.AlertSendStatus; +import org.apache.dolphinscheduler.extract.alert.request.AlertSendResponse; +import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer; + +import org.apache.commons.collections4.CollectionUtils; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public abstract class AbstractEventSender<T> implements EventSender<T> { + + protected final AlertPluginManager alertPluginManager; + + private final long sendEventTimeout; + + public AbstractEventSender(AlertPluginManager alertPluginManager, long sendEventTimeout) { + this.alertPluginManager = alertPluginManager; + this.sendEventTimeout = sendEventTimeout; + } + + @Override + public void sendEvent(T event) { + List<AlertPluginInstance> alertPluginInstanceList = getAlertPluginInstanceList(event); + if (CollectionUtils.isEmpty(alertPluginInstanceList)) { + onError(event, "No bind plugin instance found"); + return; + } + AlertData alertData = getAlertData(event); + List<AlertSendStatus> alertSendStatuses = new ArrayList<>(); + for (AlertPluginInstance instance : alertPluginInstanceList) { + AlertResult alertResult = doSendEvent(instance, alertData); + AlertStatus alertStatus = + alertResult.isSuccess() ? AlertStatus.EXECUTION_SUCCESS : AlertStatus.EXECUTION_FAILURE; + AlertSendStatus alertSendStatus = AlertSendStatus.builder() + .alertId(getEventId(event)) + .alertPluginInstanceId(instance.getId()) + .sendStatus(alertStatus) + .log(JSONUtils.toJsonString(alertResult)) + .createTime(new Date()) + .build(); + alertSendStatuses.add(alertSendStatus); + } + long failureCount = alertSendStatuses.stream() + .map(alertSendStatus -> alertSendStatus.getSendStatus() == AlertStatus.EXECUTION_FAILURE) + .count(); + long successCount = alertSendStatuses.stream() + .map(alertSendStatus -> alertSendStatus.getSendStatus() == AlertStatus.EXECUTION_SUCCESS) + .count(); + if (successCount == 0) { + onError(event, JSONUtils.toJsonString(alertSendStatuses)); + } else { + if (failureCount > 0) { + onPartialSuccess(event, JSONUtils.toJsonString(alertSendStatuses)); + } else { + onSuccess(event, JSONUtils.toJsonString(alertSendStatuses)); + } + } + } + + public abstract List<AlertPluginInstance> getAlertPluginInstanceList(T event); + + public abstract AlertData getAlertData(T event); + + public abstract Integer getEventId(T event); + + public abstract void onError(T event, String log); + + public abstract void onPartialSuccess(T event, String log); + + public abstract void onSuccess(T event, String log); + + @Override + public AlertResult doSendEvent(AlertPluginInstance instance, AlertData alertData) { + String pluginInstanceName = instance.getInstanceName(); + int pluginDefineId = instance.getPluginDefineId(); + Optional<AlertChannel> alertChannelOptional = alertPluginManager.getAlertChannel(instance.getPluginDefineId()); + if (!alertChannelOptional.isPresent()) { + String message = String.format("Alert Plugin %s send error: the channel doesn't exist, pluginDefineId: %s", + pluginInstanceName, + pluginDefineId); + log.error("Alert Plugin {} send error : not found plugin {}", pluginInstanceName, pluginDefineId); + return new AlertResult(false, message); + } + AlertChannel alertChannel = alertChannelOptional.get(); + + Map<String, String> paramsMap = JSONUtils.toMap(instance.getPluginInstanceParams()); + + AlertInfo alertInfo = AlertInfo.builder() + .alertData(alertData) + .alertParams(paramsMap) + .alertPluginInstanceId(instance.getId()) + .build(); + try { + AlertResult alertResult; + if (sendEventTimeout <= 0) { + if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) { + alertResult = alertChannel.closeAlert(alertInfo); + } else { + alertResult = alertChannel.process(alertInfo); + } + } else { + CompletableFuture<AlertResult> future; + if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) { + future = CompletableFuture.supplyAsync(() -> alertChannel.closeAlert(alertInfo)); + } else { + future = CompletableFuture.supplyAsync(() -> alertChannel.process(alertInfo)); + } + alertResult = future.get(sendEventTimeout, TimeUnit.MILLISECONDS); + } + if (alertResult == null) { + throw new RuntimeException("AlertResult cannot be null"); + } + return alertResult; + } catch (Exception e) { + log.error("Send alert data {} failed", alertData, e); + return new AlertResult(false, e.getMessage()); + } + } + + public AlertSendResponse syncTestSend(int pluginDefineId, String pluginInstanceParams) { + + boolean sendResponseStatus = true; + List<AlertSendResponse.AlertSendResponseResult> sendResponseResults = new ArrayList<>(); + + Optional<AlertChannel> alertChannelOptional = alertPluginManager.getAlertChannel(pluginDefineId); + if (!alertChannelOptional.isPresent()) { + String message = String.format("Test send alert error: the channel doesn't exist, pluginDefineId: %s", + pluginDefineId); + AlertSendResponse.AlertSendResponseResult alertSendResponseResult = + new AlertSendResponse.AlertSendResponseResult(); + alertSendResponseResult.setSuccess(false); + alertSendResponseResult.setMessage(message); + sendResponseResults.add(alertSendResponseResult); + log.error("Test send alert error : not found plugin {}", pluginDefineId); + return new AlertSendResponse(false, sendResponseResults); + } + AlertChannel alertChannel = alertChannelOptional.get(); + + Map<String, String> paramsMap = PluginParamsTransfer.getPluginParamsMap(pluginInstanceParams); + + AlertData alertData = AlertData.builder() + .title(AlertConstants.TEST_TITLE) + .content(AlertConstants.TEST_CONTENT) + .warnType(WarningType.ALL.getCode()) Review Comment: ## Deprecated method or constructor invocation Invoking [AlertDataBuilder.warnType](1) should be avoided because it has been deprecated. [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/4122) ########## dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AbstractEventSender.java: ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.alert.service; + +import org.apache.dolphinscheduler.alert.api.AlertChannel; +import org.apache.dolphinscheduler.alert.api.AlertConstants; +import org.apache.dolphinscheduler.alert.api.AlertData; +import org.apache.dolphinscheduler.alert.api.AlertInfo; +import org.apache.dolphinscheduler.alert.api.AlertResult; +import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager; +import org.apache.dolphinscheduler.common.enums.AlertStatus; +import org.apache.dolphinscheduler.common.enums.AlertType; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance; +import org.apache.dolphinscheduler.dao.entity.AlertSendStatus; +import org.apache.dolphinscheduler.extract.alert.request.AlertSendResponse; +import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer; + +import org.apache.commons.collections4.CollectionUtils; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public abstract class AbstractEventSender<T> implements EventSender<T> { + + protected final AlertPluginManager alertPluginManager; + + private final long sendEventTimeout; + + public AbstractEventSender(AlertPluginManager alertPluginManager, long sendEventTimeout) { + this.alertPluginManager = alertPluginManager; + this.sendEventTimeout = sendEventTimeout; + } + + @Override + public void sendEvent(T event) { + List<AlertPluginInstance> alertPluginInstanceList = getAlertPluginInstanceList(event); + if (CollectionUtils.isEmpty(alertPluginInstanceList)) { + onError(event, "No bind plugin instance found"); + return; + } + AlertData alertData = getAlertData(event); + List<AlertSendStatus> alertSendStatuses = new ArrayList<>(); + for (AlertPluginInstance instance : alertPluginInstanceList) { + AlertResult alertResult = doSendEvent(instance, alertData); + AlertStatus alertStatus = + alertResult.isSuccess() ? AlertStatus.EXECUTION_SUCCESS : AlertStatus.EXECUTION_FAILURE; + AlertSendStatus alertSendStatus = AlertSendStatus.builder() + .alertId(getEventId(event)) + .alertPluginInstanceId(instance.getId()) + .sendStatus(alertStatus) + .log(JSONUtils.toJsonString(alertResult)) + .createTime(new Date()) + .build(); + alertSendStatuses.add(alertSendStatus); + } + long failureCount = alertSendStatuses.stream() + .map(alertSendStatus -> alertSendStatus.getSendStatus() == AlertStatus.EXECUTION_FAILURE) + .count(); + long successCount = alertSendStatuses.stream() + .map(alertSendStatus -> alertSendStatus.getSendStatus() == AlertStatus.EXECUTION_SUCCESS) + .count(); + if (successCount == 0) { + onError(event, JSONUtils.toJsonString(alertSendStatuses)); + } else { + if (failureCount > 0) { + onPartialSuccess(event, JSONUtils.toJsonString(alertSendStatuses)); + } else { + onSuccess(event, JSONUtils.toJsonString(alertSendStatuses)); + } + } + } + + public abstract List<AlertPluginInstance> getAlertPluginInstanceList(T event); + + public abstract AlertData getAlertData(T event); + + public abstract Integer getEventId(T event); + + public abstract void onError(T event, String log); + + public abstract void onPartialSuccess(T event, String log); + + public abstract void onSuccess(T event, String log); + + @Override + public AlertResult doSendEvent(AlertPluginInstance instance, AlertData alertData) { + String pluginInstanceName = instance.getInstanceName(); + int pluginDefineId = instance.getPluginDefineId(); + Optional<AlertChannel> alertChannelOptional = alertPluginManager.getAlertChannel(instance.getPluginDefineId()); + if (!alertChannelOptional.isPresent()) { + String message = String.format("Alert Plugin %s send error: the channel doesn't exist, pluginDefineId: %s", + pluginInstanceName, + pluginDefineId); + log.error("Alert Plugin {} send error : not found plugin {}", pluginInstanceName, pluginDefineId); + return new AlertResult(false, message); + } + AlertChannel alertChannel = alertChannelOptional.get(); + + Map<String, String> paramsMap = JSONUtils.toMap(instance.getPluginInstanceParams()); + + AlertInfo alertInfo = AlertInfo.builder() + .alertData(alertData) + .alertParams(paramsMap) + .alertPluginInstanceId(instance.getId()) + .build(); + try { + AlertResult alertResult; + if (sendEventTimeout <= 0) { + if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) { + alertResult = alertChannel.closeAlert(alertInfo); + } else { + alertResult = alertChannel.process(alertInfo); + } + } else { + CompletableFuture<AlertResult> future; + if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) { + future = CompletableFuture.supplyAsync(() -> alertChannel.closeAlert(alertInfo)); + } else { + future = CompletableFuture.supplyAsync(() -> alertChannel.process(alertInfo)); + } + alertResult = future.get(sendEventTimeout, TimeUnit.MILLISECONDS); + } + if (alertResult == null) { + throw new RuntimeException("AlertResult cannot be null"); + } + return alertResult; + } catch (Exception e) { + log.error("Send alert data {} failed", alertData, e); + return new AlertResult(false, e.getMessage()); + } + } + + public AlertSendResponse syncTestSend(int pluginDefineId, String pluginInstanceParams) { Review Comment: ## Missing Override annotation This method overrides [EventSender<T>.syncTestSend](1); it is advisable to add an Override annotation. [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/4127) ########## dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.registry.api.ha; + +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.registry.api.Event; +import org.apache.dolphinscheduler.registry.api.Registry; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public abstract class AbstractHAServer implements HAServer { + + private final Registry registry; + + private final String serverPath; + + private ServerStatus serverStatus; + + private final List<ServerStatusChangeListener> serverStatusChangeListeners; + + public AbstractHAServer(Registry registry, String serverPath) { + this.registry = registry; + this.serverPath = serverPath; + this.serverStatus = ServerStatus.STAND_BY; + this.serverStatusChangeListeners = new ArrayList<>(); + } + + public void start() { Review Comment: ## Missing Override annotation This method overrides [HAServer.start](1); it is advisable to add an Override annotation. [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/4129) ########## dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AbstractEventPendingQueue.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.alert.service; + +import org.apache.dolphinscheduler.alert.metrics.AlertServerMetrics; + +import java.util.concurrent.LinkedBlockingQueue; + +public abstract class AbstractEventPendingQueue<T> implements EventPendingQueue<T> { + + private final LinkedBlockingQueue<T> pendingAlertQueue; + + private final int capacity; + + public AbstractEventPendingQueue(int capacity) { + this.capacity = capacity; + this.pendingAlertQueue = new LinkedBlockingQueue<>(capacity); + AlertServerMetrics.registerPendingAlertGauge(this::size); + } + + public void put(T alert) throws InterruptedException { Review Comment: ## Missing Override annotation This method overrides [EventPendingQueue<T>.put](1); it is advisable to add an Override annotation. [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/4126) ########## dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AbstractEventPendingQueue.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.alert.service; + +import org.apache.dolphinscheduler.alert.metrics.AlertServerMetrics; + +import java.util.concurrent.LinkedBlockingQueue; + +public abstract class AbstractEventPendingQueue<T> implements EventPendingQueue<T> { + + private final LinkedBlockingQueue<T> pendingAlertQueue; + + private final int capacity; + + public AbstractEventPendingQueue(int capacity) { + this.capacity = capacity; + this.pendingAlertQueue = new LinkedBlockingQueue<>(capacity); + AlertServerMetrics.registerPendingAlertGauge(this::size); + } + + public void put(T alert) throws InterruptedException { + pendingAlertQueue.put(alert); + } + + public T take() throws InterruptedException { Review Comment: ## Missing Override annotation This method overrides [EventPendingQueue<T>.take](1); it is advisable to add an Override annotation. [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/4125) ########## dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AbstractEventPendingQueue.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.alert.service; + +import org.apache.dolphinscheduler.alert.metrics.AlertServerMetrics; + +import java.util.concurrent.LinkedBlockingQueue; + +public abstract class AbstractEventPendingQueue<T> implements EventPendingQueue<T> { + + private final LinkedBlockingQueue<T> pendingAlertQueue; + + private final int capacity; + + public AbstractEventPendingQueue(int capacity) { + this.capacity = capacity; + this.pendingAlertQueue = new LinkedBlockingQueue<>(capacity); + AlertServerMetrics.registerPendingAlertGauge(this::size); + } + + public void put(T alert) throws InterruptedException { + pendingAlertQueue.put(alert); + } + + public T take() throws InterruptedException { + return pendingAlertQueue.take(); + } + + public int size() { + return pendingAlertQueue.size(); + } + + public int capacity() { Review Comment: ## Missing Override annotation This method overrides [EventPendingQueue<T>.capacity](1); it is advisable to add an Override annotation. [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/4123) ########## dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractServerStatusChangeListener.java: ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.registry.api.ha; + +public abstract class AbstractServerStatusChangeListener implements ServerStatusChangeListener { + + public void change(HAServer.ServerStatus originStatus, HAServer.ServerStatus currentStatus) { Review Comment: ## Missing Override annotation This method overrides [ServerStatusChangeListener.change](1); it is advisable to add an Override annotation. [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/4128) ########## dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/ListenerEventSender.java: ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.alert.service; + +import org.apache.dolphinscheduler.alert.api.AlertData; +import org.apache.dolphinscheduler.alert.config.AlertConfig; +import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager; +import org.apache.dolphinscheduler.common.enums.AlertStatus; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance; +import org.apache.dolphinscheduler.dao.entity.ListenerEvent; +import org.apache.dolphinscheduler.dao.entity.event.AbstractListenerEvent; +import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionCreatedListenerEvent; +import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionDeletedListenerEvent; +import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionUpdatedListenerEvent; +import org.apache.dolphinscheduler.dao.entity.event.ProcessEndListenerEvent; +import org.apache.dolphinscheduler.dao.entity.event.ProcessFailListenerEvent; +import org.apache.dolphinscheduler.dao.entity.event.ProcessStartListenerEvent; +import org.apache.dolphinscheduler.dao.entity.event.ServerDownListenerEvent; +import org.apache.dolphinscheduler.dao.entity.event.TaskEndListenerEvent; +import org.apache.dolphinscheduler.dao.entity.event.TaskFailListenerEvent; +import org.apache.dolphinscheduler.dao.entity.event.TaskStartListenerEvent; +import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper; +import org.apache.dolphinscheduler.dao.repository.ListenerEventDao; + +import org.apache.curator.shaded.com.google.common.collect.Lists; + +import java.util.Date; +import java.util.List; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class ListenerEventSender extends AbstractEventSender<ListenerEvent> { + + private ListenerEventDao listenerEventDao; + + private AlertPluginInstanceMapper alertPluginInstanceMapper; + + private AlertPluginManager alertPluginManager; + + private AlertConfig alertConfig; + + public ListenerEventSender(ListenerEventDao listenerEventDao, + AlertPluginInstanceMapper alertPluginInstanceMapper, + AlertPluginManager alertPluginManager, + AlertConfig alertConfig) { + super(alertPluginManager, alertConfig.getWaitTimeout()); + this.listenerEventDao = listenerEventDao; + this.alertPluginInstanceMapper = alertPluginInstanceMapper; + } + + private AbstractListenerEvent generateEventFromContent(ListenerEvent listenerEvent) { + String content = listenerEvent.getContent(); + switch (listenerEvent.getEventType()) { + case SERVER_DOWN: + return JSONUtils.parseObject(content, ServerDownListenerEvent.class); + case PROCESS_DEFINITION_CREATED: + return JSONUtils.parseObject(content, ProcessDefinitionCreatedListenerEvent.class); + case PROCESS_DEFINITION_UPDATED: + return JSONUtils.parseObject(content, ProcessDefinitionUpdatedListenerEvent.class); + case PROCESS_DEFINITION_DELETED: + return JSONUtils.parseObject(content, ProcessDefinitionDeletedListenerEvent.class); + case PROCESS_START: + return JSONUtils.parseObject(content, ProcessStartListenerEvent.class); + case PROCESS_END: + return JSONUtils.parseObject(content, ProcessEndListenerEvent.class); + case PROCESS_FAIL: + return JSONUtils.parseObject(content, ProcessFailListenerEvent.class); + case TASK_START: + return JSONUtils.parseObject(content, TaskStartListenerEvent.class); + case TASK_END: + return JSONUtils.parseObject(content, TaskEndListenerEvent.class); + case TASK_FAIL: + return JSONUtils.parseObject(content, TaskFailListenerEvent.class); + default: + return null; + } + } + + @Override + public List<AlertPluginInstance> getAlertPluginInstanceList(ListenerEvent event) { + return alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList(); + } + + @Override + public AlertData getAlertData(ListenerEvent listenerEvent) { + AbstractListenerEvent event = generateEventFromContent(listenerEvent); + return AlertData.builder() + .id(listenerEvent.getId()) + .content(JSONUtils.toJsonString(Lists.newArrayList(event))) + .log(listenerEvent.getLog()) + .title(event.getTitle()) + .warnType(WarningType.GLOBAL.getCode()) Review Comment: ## Deprecated method or constructor invocation Invoking [AlertDataBuilder.warnType](1) should be avoided because it has been deprecated. [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/4121) ########## dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertSender.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.alert.service; + +import org.apache.dolphinscheduler.alert.api.AlertData; +import org.apache.dolphinscheduler.alert.api.AlertResult; +import org.apache.dolphinscheduler.alert.config.AlertConfig; +import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager; +import org.apache.dolphinscheduler.common.enums.AlertStatus; +import org.apache.dolphinscheduler.dao.AlertDao; +import org.apache.dolphinscheduler.dao.entity.Alert; +import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance; +import org.apache.dolphinscheduler.extract.alert.request.AlertSendResponse; + +import org.apache.commons.collections4.CollectionUtils; + +import java.util.ArrayList; +import java.util.List; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class AlertSender extends AbstractEventSender<Alert> { + + private final AlertDao alertDao; + + public AlertSender(AlertDao alertDao, + AlertPluginManager alertPluginManager, + AlertConfig alertConfig) { + super(alertPluginManager, alertConfig.getWaitTimeout()); + this.alertDao = alertDao; + } + + /** + * sync send alert handler + * + * @param alertGroupId alertGroupId + * @param title title + * @param content content + * @return AlertSendResponseCommand + */ + public AlertSendResponse syncHandler(int alertGroupId, String title, String content, int warnType) { + List<AlertPluginInstance> alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId); + AlertData alertData = AlertData.builder() + .content(content) + .title(title) + .warnType(warnType) + .build(); + + boolean sendResponseStatus = true; + List<AlertSendResponse.AlertSendResponseResult> sendResponseResults = new ArrayList<>(); + + if (CollectionUtils.isEmpty(alertInstanceList)) { + AlertSendResponse.AlertSendResponseResult alertSendResponseResult = + new AlertSendResponse.AlertSendResponseResult(); + String message = String.format("Alert GroupId %s send error : not found alert instance", alertGroupId); + alertSendResponseResult.setSuccess(false); + alertSendResponseResult.setMessage(message); + sendResponseResults.add(alertSendResponseResult); + log.error("Alert GroupId {} send error : not found alert instance", alertGroupId); + return new AlertSendResponse(false, sendResponseResults); + } + + for (AlertPluginInstance instance : alertInstanceList) { + AlertResult alertResult = doSendEvent(instance, alertData); + if (alertResult != null) { + AlertSendResponse.AlertSendResponseResult alertSendResponseResult = + new AlertSendResponse.AlertSendResponseResult( + alertResult.isSuccess(), + alertResult.getMessage()); + sendResponseStatus = sendResponseStatus && alertSendResponseResult.isSuccess(); + sendResponseResults.add(alertSendResponseResult); + } + } + + return new AlertSendResponse(sendResponseStatus, sendResponseResults); + } + + @Override + public List<AlertPluginInstance> getAlertPluginInstanceList(Alert event) { + return alertDao.listInstanceByAlertGroupId(event.getAlertGroupId()); + } + + @Override + public AlertData getAlertData(Alert event) { + return AlertData.builder() + .id(event.getId()) + .content(event.getContent()) + .log(event.getLog()) + .title(event.getTitle()) + .warnType(event.getWarningType().getCode()) Review Comment: ## Deprecated method or constructor invocation Invoking [AlertDataBuilder.warnType](1) should be avoided because it has been deprecated. [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/4120) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
