This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9b5999764a [INLONG-9926][Audit] Audit-service support HA active and
backup (#9927)
9b5999764a is described below
commit 9b5999764a7ea21117e06ffc773ecf98f0912a18
Author: doleyzi <[email protected]>
AuthorDate: Mon Apr 8 10:11:36 2024 +0800
[INLONG-9926][Audit] Audit-service support HA active and backup (#9927)
Co-authored-by: doleyzi <[email protected]>
---
.../src/main/java/elector/api/Selector.java | 42 +++++++++++
.../java/elector/api/SelectorChangeListener.java | 26 +++++++
.../src/main/java/elector/api/SelectorConfig.java | 88 ++++++++++++++++++++++
.../src/main/java/elector/task/DBMonitorTask.java | 62 +++++++++++++++
4 files changed, 218 insertions(+)
diff --git a/inlong-audit/audit-service/src/main/java/elector/api/Selector.java
b/inlong-audit/audit-service/src/main/java/elector/api/Selector.java
new file mode 100644
index 0000000000..d4aa342dea
--- /dev/null
+++ b/inlong-audit/audit-service/src/main/java/elector/api/Selector.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package elector.api;
+
+/**
+ * Selector
+ */
+public abstract class Selector {
+
+ protected boolean isLeader;
+
+ public abstract void init() throws Exception;
+
+ public abstract boolean isLeader();
+
+ public abstract void releaseLeader() throws Exception;
+
+ public abstract void replaceLeader(String newLeaderId) throws Exception;
+
+ public abstract String getLeader(String serviceId);
+
+ public abstract void canSelect(boolean canSelect);
+
+ public abstract boolean rebuildSelectorDBSource();
+
+ public abstract boolean close();
+}
diff --git
a/inlong-audit/audit-service/src/main/java/elector/api/SelectorChangeListener.java
b/inlong-audit/audit-service/src/main/java/elector/api/SelectorChangeListener.java
new file mode 100644
index 0000000000..a4af6a9598
--- /dev/null
+++
b/inlong-audit/audit-service/src/main/java/elector/api/SelectorChangeListener.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package elector.api;
+
+/**
+ * Selector change listener
+ */
+public abstract interface SelectorChangeListener {
+
+ public abstract void leaderChanged(boolean paramBoolean);
+}
\ No newline at end of file
diff --git
a/inlong-audit/audit-service/src/main/java/elector/api/SelectorConfig.java
b/inlong-audit/audit-service/src/main/java/elector/api/SelectorConfig.java
new file mode 100644
index 0000000000..d037985332
--- /dev/null
+++ b/inlong-audit/audit-service/src/main/java/elector/api/SelectorConfig.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package elector.api;
+
+import lombok.Data;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.Objects;
+
+/**
+ * Selector config
+ */
+@Data
+public class SelectorConfig {
+
+ private static final Logger logger =
LoggerFactory.getLogger(SelectorConfig.class);
+ public static final String MONITOR_COMMON_NAME = "audit";
+ private final String serviceId;
+ private final String leaderId;
+ private String defaultLeaderId;
+ private boolean useDefaultLeader = false;
+ private String dbDriver;
+ private String dbUrl;
+ private String dbUser;
+ private String dbPasswd;
+ private String electorDbName = "leader_election";
+ private int leaderTimeout = 20;
+ private int tryToBeLeaderInterval = 5;
+ private int dbMonitorRunInterval = 20;
+ private int connectionTimeout = 10000;
+ private int idleTimeout = 60000;
+ private int maxLifetime = 1800000;
+ private int maximumPoolSize = 2;
+ private String cachePrepStmts = "true";
+ private int prepStmtCacheSize = 250;
+ private int prepStmtCacheSqlLimit = 2048;
+ private String monitorName = "elector_leader_state";
+ private String ip;
+ private SelectorChangeListener selectorChangeListener;
+
+ public SelectorConfig(String serviceId, String leaderId, String dbUrl,
String dbUser, String dbPasswd,
+ String dbDriver) {
+ assert (Objects.nonNull(serviceId)
+ && Objects.nonNull(leaderId)
+ && Objects.nonNull(dbUrl)
+ && Objects.nonNull(dbUser)
+ && Objects.nonNull(dbPasswd)
+ && Objects.nonNull(dbDriver));
+
+ this.serviceId = serviceId;
+ this.leaderId = leaderId;
+ this.dbUrl = dbUrl;
+ this.dbUser = dbUser;
+ this.dbPasswd = dbPasswd;
+ this.dbDriver = dbDriver;
+
+ }
+
+ public String getIp() {
+ if (StringUtils.isEmpty(ip))
+ try {
+ ip = InetAddress.getLocalHost().getHostAddress();
+ } catch (Exception e) {
+ logger.error("Get local ip has exception:{}", e.getMessage());
+ ip = "N/A";
+ }
+ return ip;
+ }
+
+}
diff --git
a/inlong-audit/audit-service/src/main/java/elector/task/DBMonitorTask.java
b/inlong-audit/audit-service/src/main/java/elector/task/DBMonitorTask.java
new file mode 100644
index 0000000000..8ccf92ab3b
--- /dev/null
+++ b/inlong-audit/audit-service/src/main/java/elector/task/DBMonitorTask.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package elector.task;
+
+import elector.api.SelectorConfig;
+import elector.impl.DBDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * DB monitor task
+ */
+public class DBMonitorTask implements Runnable {
+
+ private static final Logger logger =
LoggerFactory.getLogger(DBMonitorTask.class);
+ private SelectorConfig electorConfig;
+ private DBDataSource dbDataSource;
+ private int dbClosedTimes = 0;
+ private boolean replaced = true;
+
+ public DBMonitorTask(SelectorConfig electorConfig, DBDataSource
dbDataSource) {
+ this.electorConfig = electorConfig;
+ this.dbDataSource = dbDataSource;
+ }
+
+ public void run() {
+ try {
+ while (true) {
+ logger.info("DB monitor task run once");
+
TimeUnit.SECONDS.sleep(electorConfig.getDbMonitorRunInterval());
+
+ if (!(electorConfig.isUseDefaultLeader()))
+ break;
+ }
+ if (dbDataSource.isDBDataSourceClosed()) {
+ dbClosedTimes += 1;
+ logger.info("DB closed times :{}", dbClosedTimes);
+ } else {
+ dbClosedTimes = 0;
+ }
+ } catch (Exception e) {
+ logger.error("DB monitor task has exception {}", e.getMessage());
+ }
+ }
+}
\ No newline at end of file