QQxiaoyuyu commented on issue #210:
URL:
https://github.com/apache/kvrocks-controller/issues/210#issuecomment-2409663783
补充一下操作流程,但是不确定是否能够复现:
之前使用的kvcontroller 1.0
版本,迁移到16383后,迁移失败,然后重新编译unstable版本,然后重启了kvcontroller,将16383完成迁移。最后想再试试有没有问题,通过接口将已经建立的集群删除,停止了kv集群,删除已有的数据文件,通过接口重新创建6节点集群,一主一从模式,然后添加2组节点,继续测试迁移。
迁移的程序如下:
import requests
import json
#import rediscluster
import time
import logging
logging.basicConfig(filename='/tmp/migrate_kv_slog.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
def make_request(target, slotnum):
url =
"http://192.168.1.189:19380/api/v1/namespaces/test-ns/clusters/demo-kvcluster/migrate"
payload = json.dumps({
"target": target,
"slot": slotnum,
"slot_only": False
})
headers = {
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(f"Slot: {slotnum}, Status Code: {response.status_code}")
logging.info(f"Slot: {slotnum}, Status Code: {response.status_code}")
return response.status_code
def is_number_in_ranges(number, ranges):
for range_str in ranges:
parts = range_str.split('-')
if len(parts) == 1:
# 处理单个数字
single_number = int(parts[0])
if number == single_number:
return True
elif len(parts) == 2:
# 处理范围
start, end = map(int, parts)
if start <= number <= end:
return True
else:
print(f"无效的范围: {range_str}")
logging.info(f"无效的范围: {range_str}")
return False
def get_slot_status(shard_num,slot_num):
url2 =
"http://192.168.1.189:19380/api/v1/namespaces/test-ns/clusters/demo-kvcluster"
payload = {}
headers = {}
response = requests.request("GET", url2, headers=headers, data=payload)
slot_range =
response.json()['data']['cluster']['shards'][shard_num]['slot_ranges']
# print(slot_range)
if is_number_in_ranges(slot_num,slot_range):
return True
else:
return False
target = [3,4]
request_count = 0
data_source = list(range(3277, 5461)) + list(range(8738, 10922)) +
list(range(14199, 16384))
data_source.sort()
chunk_size = 3277
chunks = [data_source[i:i + chunk_size] for i in range(0, len(data_source),
chunk_size)]
start_time = time.time()
logging.info("开始时间:",start_time)
print("开始迁移")
logging.info("开始迁移")
for target_name,slot_num in zip(target,chunks):
print(target_name,slot_num)
for slotnum in slot_num:
reques_code = make_request(target_name, slotnum)
if reques_code == 200:
request_count += 1
# 判断slotnum是否迁移完成,迁移完成后再迁移下一个
while not get_slot_status(target_name, slotnum):
print(f"{slotnum} slot 还未迁移完成,等待1秒后重试...")
logging.info(f"{slotnum} slot 还未迁移完成,等待1秒后重试...")
time.sleep(1)
end_time = time.time()
logging.info("结束时间:",end_time)
print("迁移结束")
logging.info("迁移结束")
total_time = end_time - start_time
print(f"总用时: {total_time:.2f} 秒")
logging.info(f"总用时: {total_time:.2f} 秒")
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]