This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 6bf4f0b68bd [feature](doris compose) start node wait for ready service
#42266 (#42540)
6bf4f0b68bd is described below
commit 6bf4f0b68bd4b3edf3aa9520f0b1036173114161
Author: yujun <[email protected]>
AuthorDate: Tue Oct 29 22:47:56 2024 +0800
[feature](doris compose) start node wait for ready service #42266 (#42540)
cherry pick from #42266
---
docker/runtime/doris-compose/command.py | 102 +++++++++++++++------
docker/runtime/doris-compose/utils.py | 12 ++-
.../org/apache/doris/clone/TabletScheduler.java | 9 +-
.../apache/doris/journal/bdbje/BDBJEJournal.java | 4 +-
.../doris/regression/suite/SuiteCluster.groovy | 34 +++----
.../test_fe_tablet_same_backend.groovy | 8 +-
6 files changed, 109 insertions(+), 60 deletions(-)
diff --git a/docker/runtime/doris-compose/command.py
b/docker/runtime/doris-compose/command.py
index 48863003223..604eb8f206e 100644
--- a/docker/runtime/doris-compose/command.py
+++ b/docker/runtime/doris-compose/command.py
@@ -30,6 +30,47 @@ import time
LOG = utils.get_logger()
+def wait_ready_service(wait_timeout, cluster, fe_ids, be_ids):
+
+ def is_fe_ready_service(ip):
+ return utils.is_socket_avail(ip, CLUSTER.FE_QUERY_PORT)
+
+ def is_be_ready_service(ip):
+ return utils.is_socket_avail(ip, CLUSTER.BE_HEARTBEAT_PORT)
+
+ if wait_timeout == 0:
+ return
+ if wait_timeout == -1:
+ wait_timeout = 1000000000
+ expire_ts = time.time() + wait_timeout
+ while True:
+ db_mgr = database.get_db_mgr(cluster.name, False)
+ dead_frontends = []
+ for id in fe_ids:
+ fe = cluster.get_node(CLUSTER.Node.TYPE_FE, id)
+ fe_state = db_mgr.get_fe(id)
+ if not fe_state or not fe_state.alive or not is_fe_ready_service(
+ fe.get_ip()):
+ dead_frontends.append(id)
+ dead_backends = []
+ for id in be_ids:
+ be = cluster.get_node(CLUSTER.Node.TYPE_BE, id)
+ be_state = db_mgr.get_be(id)
+ if not be_state or not be_state.alive or not is_be_ready_service(
+ be.get_ip()):
+ dead_backends.append(id)
+ if not dead_frontends and not dead_backends:
+ break
+ if time.time() >= expire_ts:
+ err = ""
+ if dead_frontends:
+ err += "dead fe: " + str(dead_frontends) + ". "
+ if dead_backends:
+ err += "dead be: " + str(dead_backends) + ". "
+ raise Exception(err)
+ time.sleep(1)
+
+
# return for_all, related_nodes, related_node_num
def get_ids_related_nodes(cluster,
fe_ids,
@@ -156,10 +197,11 @@ class SimpleCommand(Command):
parser.add_argument("NAME", help="Specify cluster name.")
self._add_parser_ids_args(parser)
self._add_parser_common_args(parser)
+ return parser
def run(self, args):
cluster = CLUSTER.Cluster.load(args.NAME)
- _, related_nodes, related_node_num = get_ids_related_nodes(
+ for_all, related_nodes, related_node_num = get_ids_related_nodes(
cluster, args.fe_id, args.be_id, args.ms_id, args.recycle_id,
args.fdb_id)
utils.exec_docker_compose_command(cluster.get_compose_file(),
@@ -170,6 +212,31 @@ class SimpleCommand(Command):
utils.render_green("{} succ, total related node num {}".format(
show_cmd, related_node_num)))
+ if for_all:
+ related_nodes = cluster.get_all_nodes()
+ return cluster, related_nodes
+
+
+class NeedStartCommand(SimpleCommand):
+
+ def add_parser(self, args_parsers):
+ parser = super().add_parser(args_parsers)
+ parser.add_argument(
+ "--wait-timeout",
+ type=int,
+ default=0,
+ help=
+ "Specify wait seconds for fe/be ready for service: 0 not wait
(default), "\
+ "> 0 max wait seconds, -1 wait unlimited."
+ )
+ return parser
+
+ def run(self, args):
+ cluster, related_nodes = super().run(args)
+ fe_ids = [node.id for node in related_nodes if node.is_fe()]
+ be_ids = [node.id for node in related_nodes if node.is_be()]
+ wait_ready_service(args.wait_timeout, cluster, fe_ids, be_ids)
+
class UpCommand(Command):
@@ -568,33 +635,8 @@ class UpCommand(Command):
db_mgr.create_default_storage_vault(cloud_store_config)
- if args.wait_timeout != 0:
- if args.wait_timeout == -1:
- args.wait_timeout = 1000000000
- expire_ts = time.time() + args.wait_timeout
- while True:
- db_mgr = database.get_db_mgr(args.NAME, False)
- dead_frontends = []
- for id in add_fe_ids:
- fe_state = db_mgr.get_fe(id)
- if not fe_state or not fe_state.alive:
- dead_frontends.append(id)
- dead_backends = []
- for id in add_be_ids:
- be_state = db_mgr.get_be(id)
- if not be_state or not be_state.alive:
- dead_backends.append(id)
- if not dead_frontends and not dead_backends:
- break
- if time.time() >= expire_ts:
- err = ""
- if dead_frontends:
- err += "dead fe: " + str(dead_frontends) + ". "
- if dead_backends:
- err += "dead be: " + str(dead_backends) + ". "
- raise Exception(err)
- time.sleep(1)
-
+ wait_ready_service(args.wait_timeout, cluster, add_fe_ids,
+ add_be_ids)
LOG.info(
utils.render_green(
"Up cluster {} succ, related node num {}".format(
@@ -1216,9 +1258,9 @@ class GetCloudIniCommand(Command):
ALL_COMMANDS = [
UpCommand("up"),
DownCommand("down"),
- SimpleCommand("start", "Start the doris containers. "),
+ NeedStartCommand("start", "Start the doris containers. "),
SimpleCommand("stop", "Stop the doris containers. "),
- SimpleCommand("restart", "Restart the doris containers. "),
+ NeedStartCommand("restart", "Restart the doris containers. "),
SimpleCommand("pause", "Pause the doris containers. "),
SimpleCommand("unpause", "Unpause the doris containers. "),
GetCloudIniCommand("get-cloud-ini"),
diff --git a/docker/runtime/doris-compose/utils.py
b/docker/runtime/doris-compose/utils.py
index 735947e86bd..4332ae6cf48 100644
--- a/docker/runtime/doris-compose/utils.py
+++ b/docker/runtime/doris-compose/utils.py
@@ -15,11 +15,13 @@
# specific language governing permissions and limitations
# under the License.
+import contextlib
import docker
-import json
+import jsonpickle
import logging
import os
import pwd
+import socket
import subprocess
import time
import yaml
@@ -278,6 +280,12 @@ def copy_image_directory(image, image_dir, local_dir):
entrypoint="cp -r {} /opt/mount/".format(image_dir))
+def is_socket_avail(ip, port):
+ with contextlib.closing(socket.socket(socket.AF_INET,
+ socket.SOCK_STREAM)) as sock:
+ return sock.connect_ex((ip, port)) == 0
+
+
def enable_dir_with_rw_perm(dir):
if not os.path.exists(dir):
return
@@ -313,7 +321,7 @@ def write_compose_file(file, compose):
def pretty_json(json_data):
- return json.dumps(json_data, indent=4, sort_keys=True)
+ return jsonpickle.dumps(json_data, indent=4)
def is_true(val):
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index ed2a8335db7..bb88afd2b6b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -85,6 +85,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
+import java.util.stream.Collectors;
/**
* TabletScheduler saved the tablets produced by TabletChecker and try to
schedule them.
@@ -1535,9 +1536,13 @@ public class TabletScheduler extends MasterDaemon {
List<BePathLoadStatPair> allFitPaths =
!allFitPathsSameMedium.isEmpty() ? allFitPathsSameMedium :
allFitPathsDiffMedium;
if (allFitPaths.isEmpty()) {
+ List<String> backendsInfo =
Env.getCurrentSystemInfo().getAllClusterBackendsNoException().values().stream()
+ .filter(be -> be.getLocationTag() == tag)
+ .map(Backend::getDetailsForCreateReplica)
+ .collect(Collectors.toList());
throw new SchedException(Status.UNRECOVERABLE,
String.format("unable to find dest path for new replica"
- + " for replica allocation { %s } with tag %s storage
medium %s",
- tabletCtx.getReplicaAlloc(), tag,
tabletCtx.getStorageMedium()));
+ + " for replica allocation { %s } with tag %s storage
medium %s, backends on this tag is: %s",
+ tabletCtx.getReplicaAlloc(), tag,
tabletCtx.getStorageMedium(), backendsInfo));
}
BePathLoadStatPairComparator comparator = new
BePathLoadStatPairComparator(allFitPaths);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
index 924f10b5ea1..1c1bcf6354c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
@@ -163,12 +163,12 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
MetricRepo.HISTO_JOURNAL_BATCH_DATA_SIZE.update(dataSize);
}
- if (entitySize > 32) {
+ if (entitySize > Config.batch_edit_log_max_item_num) {
LOG.warn("write bdb journal batch is too large, batch size
{}, the first journal id {}, "
+ "data size {}", entitySize, firstId, dataSize);
}
- if (dataSize > 640 * 1024) { // 640KB
+ if (dataSize > Config.batch_edit_log_max_byte_size) { // 640KB
LOG.warn("write bdb journal batch data is too large, data
size {}, the first journal id {}, "
+ "batch size {}", dataSize, firstId, entitySize);
}
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
index 2aaece2c678..159e622f454 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
@@ -522,40 +522,38 @@ class SuiteCluster {
return this.isCloudMode
}
+ int START_WAIT_TIMEOUT = 120
+
// if not specific fe indices, then start all frontends
void startFrontends(int... indices) {
- runFrontendsCmd('start', indices)
- waitHbChanged()
+ runFrontendsCmd(START_WAIT_TIMEOUT + 5, "start --wait-timeout
${START_WAIT_TIMEOUT}".toString(), indices)
}
// if not specific be indices, then start all backends
void startBackends(int... indices) {
- runBackendsCmd('start', indices)
- waitHbChanged()
+ runBackendsCmd(START_WAIT_TIMEOUT + 5, "start --wait-timeout
${START_WAIT_TIMEOUT}".toString(), indices)
}
// if not specific fe indices, then stop all frontends
void stopFrontends(int... indices) {
- runFrontendsCmd('stop', indices)
+ runFrontendsCmd(60, 'stop', indices)
waitHbChanged()
}
// if not specific be indices, then stop all backends
void stopBackends(int... indices) {
- runBackendsCmd('stop', indices)
+ runBackendsCmd(60, 'stop', indices)
waitHbChanged()
}
// if not specific fe indices, then restart all frontends
void restartFrontends(int... indices) {
- runFrontendsCmd('restart', indices)
- waitHbChanged()
+ runFrontendsCmd(START_WAIT_TIMEOUT + 5, "restart --wait-timeout
${START_WAIT_TIMEOUT}".toString(), indices)
}
// if not specific be indices, then restart all backends
void restartBackends(int... indices) {
- runBackendsCmd('restart', indices)
- waitHbChanged()
+ runBackendsCmd(START_WAIT_TIMEOUT + 5, "restart --wait-timeout
${START_WAIT_TIMEOUT}".toString(), indices)
}
// if not specific fe indices, then drop all frontends
@@ -564,7 +562,7 @@ class SuiteCluster {
if (clean) {
cmd += ' --clean'
}
- runFrontendsCmd(cmd, indices)
+ runFrontendsCmd(60, cmd, indices)
}
// if not specific be indices, then decommission all backends
@@ -582,7 +580,7 @@ class SuiteCluster {
if (clean) {
cmd += ' --clean'
}
- runBackendsCmd(cmd, indices)
+ runBackendsCmd(60, cmd, indices)
}
void checkFeIsAlive(int index, boolean isAlive) {
@@ -622,18 +620,14 @@ class SuiteCluster {
Thread.sleep(7000)
}
- private void runFrontendsCmd(String op, int... indices) {
+ private void runFrontendsCmd(int timeoutSecond, String op, int... indices)
{
def cmd = op + ' ' + name + ' --fe-id ' + indices.join(' ')
- runCmd(cmd)
+ runCmd(cmd, timeoutSecond)
}
- private void runBackendsCmd(Integer timeoutSecond = null, String op,
int... indices) {
+ private void runBackendsCmd(int timeoutSecond, String op, int... indices) {
def cmd = op + ' ' + name + ' --be-id ' + indices.join(' ')
- if (timeoutSecond == null) {
- runCmd(cmd)
- } else {
- runCmd(cmd, timeoutSecond)
- }
+ runCmd(cmd, timeoutSecond)
}
private Object runCmd(String cmd, int timeoutSecond = 60) throws Exception
{
diff --git
a/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
b/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
index 13e67bd0b48..973492bd4b5 100644
---
a/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
+++
b/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
@@ -98,7 +98,7 @@ suite('test_fe_tablet_same_backend', 'multi_cluster,docker') {
}
def checkAllTable = { isAllBeAliveOrDeadLong ->
- dockerAwaitUntil(50) {
+ dockerAwaitUntil(60) {
checkAllTableImpl(isAllBeAliveOrDeadLong, true)
}
checkAllTableImpl(isAllBeAliveOrDeadLong, false)
@@ -127,8 +127,8 @@ suite('test_fe_tablet_same_backend',
'multi_cluster,docker') {
// all fe alive
checkAllTable(true)
- dockerAwaitUntil(30) {
cluster.stopBackends(choseDeadBeIndex)
+ dockerAwaitUntil(60) {
def chosenBe = cluster.getBeByIndex(choseDeadBeIndex)
!chosenBe.alive
}
@@ -144,13 +144,13 @@ suite('test_fe_tablet_same_backend',
'multi_cluster,docker') {
def choseRestartFeIndex = cluster.getOneFollowerFe().index
cluster.stopFrontends(choseRestartFeIndex)
- dockerAwaitUntil(30) {
+ dockerAwaitUntil(60) {
def chosenFe = cluster.getFeByIndex(choseRestartFeIndex)
!chosenFe.alive
}
cluster.startFrontends(choseRestartFeIndex)
- dockerAwaitUntil(30) {
+ dockerAwaitUntil(60) {
def chosenFe = cluster.getFeByIndex(choseRestartFeIndex)
chosenFe.alive
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]