WJSGDBZ opened a new pull request, #2305: URL: https://github.com/apache/incubator-pegasus/pull/2305
### What problem does this PR solve? <!--add issue link with summary if exists--> Add support for hashkey and sortkey scan filters ### What is changed and how does it work? 1. in original logic `generate_next_bytes` function has two problem a. The input buff (i.e. hashkey) can be either ‘str’ or ‘bytearray’. If it's a str, in-place modification like `buff[pos] += 1` won't work since strings are immutable. b. The pos variable was initialized to a fixed index (len(buff) - 1), which is counterintuitive and could lead to an infinite loop. https://github.com/apache/incubator-pegasus/blob/44400f6e3ca1fb1ce48d04d1c9145aad7ac4e991/python-client/pypegasus/pgclient.py#L613-L624 ### Checklist <!--REMOVE the items that are not applicable--> ##### Tests <!-- At least one of them must be included. --> - Manual test (add detailed scripts or steps below) the test script as below ``` from twisted.internet.defer import inlineCallbacks from twisted.internet import reactor from pypegasus.pgclient import * from pypegasus.rrdb.ttypes import filter_type @inlineCallbacks def test_prefix_match_with_boundaries_full_scan(): print("Start test_prefix_match_with_boundaries_full_scan") c = Pegasus(['127.0.0.1:34601', '127.0.0.1:34602', '127.0.0.1:34603'], 'full_scan_test') try: suc = yield c.init() if not suc: print("Failed to connect to Pegasus server") reactor.stop() return except Exception as e: print(f"Init failed: {e}") reactor.stop() return hkey = 'hkey1' for i in range(110): skey = f'skey{i}' yield c.set(hkey, skey, str(i), 0) yield c.set(hkey, 'aa', 'aa', 0) yield c.set(hkey, 'b', 'b', 0) yield c.set(hkey, 'z', 'z', 0) hkey = 'hkey2' for i in range(110): skey = f'skey{i}' yield c.set(hkey, skey, str(i), 0) yield c.set(hkey, 'aa', 'aa', 0) yield c.set(hkey, 'b', 'b', 0) yield c.set(hkey, 'z', 'z', 0) test_cases = [ { 'skey_filter_type':filter_type.FT_MATCH_PREFIX, 'skey_pattern': b'skey9', 'hkey_filter_type':filter_type.FT_MATCH_PREFIX, 'hkey_pattern': 'hkey1', 'batch_size': 2, 'expected': ['skey9', 'skey90', 'skey91', 'skey92', 'skey93', 'skey94', 'skey95', 'skey96', 'skey97', 'skey98', 'skey99'] }, { 'skey_filter_type':filter_type.FT_MATCH_PREFIX, 'skey_pattern': b'', 'hkey_filter_type':filter_type.FT_MATCH_PREFIX, 'hkey_pattern': b'', 'batch_size': 2, 'expected': 2*['aa', 'b', 'z'] + 2*[f'skey{i}' for i in range(0, 110)] }, { 'skey_filter_type':filter_type.FT_MATCH_PREFIX, 'skey_pattern': b'skey', 'hkey_filter_type':filter_type.FT_MATCH_PREFIX, 'hkey_pattern': 'hkey2', 'batch_size': 2, 'expected': [f'skey{i}' for i in range(0, 110)] }, { 'skey_filter_type': filter_type.FT_MATCH_PREFIX, 'skey_pattern': b'skey10', 'hkey_filter_type': filter_type.FT_MATCH_PREFIX, 'hkey_pattern': b'hkey1', 'batch_size': 2, 'expected': ['skey10', 'skey100', 'skey101', 'skey102', 'skey103', 'skey104', 'skey105', 'skey106', 'skey107', 'skey108', 'skey109'] }, # 后缀测试 { 'skey_filter_type': filter_type.FT_MATCH_POSTFIX, 'skey_pattern': b'9', 'hkey_filter_type': filter_type.FT_MATCH_PREFIX, 'hkey_pattern': b'hkey1', 'batch_size': 2, 'expected': ['skey9', 'skey19', 'skey29', 'skey39', 'skey49', 'skey59', 'skey69', 'skey79', 'skey89', 'skey99', 'skey109'] }, { 'skey_filter_type': filter_type.FT_MATCH_POSTFIX, 'skey_pattern': b'z', 'hkey_filter_type': filter_type.FT_MATCH_PREFIX, 'hkey_pattern': b'hkey1', 'batch_size': 2, 'expected': ['z'] }, { 'skey_filter_type': filter_type.FT_MATCH_POSTFIX, 'skey_pattern': b'', 'hkey_filter_type': filter_type.FT_MATCH_PREFIX, 'hkey_pattern': b'hkey1', 'batch_size': 2, 'expected': ['aa', 'b', 'z'] + [f'skey{i}' for i in range(0, 110)] }, # 任意匹配测试 { 'skey_filter_type': filter_type.FT_MATCH_ANYWHERE, 'skey_pattern': b'9', 'hkey_filter_type': filter_type.FT_MATCH_PREFIX, 'hkey_pattern': b'hkey1', 'batch_size': 2, 'expected': ['skey9', 'skey19', 'skey29', 'skey39', 'skey49', 'skey59', 'skey69', 'skey79', 'skey89', 'skey90', 'skey91', 'skey92', 'skey93', 'skey94', 'skey95', 'skey96', 'skey97', 'skey98', 'skey99', 'skey109'] }, { 'skey_filter_type': filter_type.FT_MATCH_ANYWHERE, 'skey_pattern': b'key', 'hkey_filter_type': filter_type.FT_MATCH_PREFIX, 'hkey_pattern': b'hkey1', 'batch_size': 2, 'expected': [f'skey{i}' for i in range(110)] }, { 'skey_filter_type': filter_type.FT_MATCH_ANYWHERE, 'skey_pattern': b'99', 'hkey_filter_type': filter_type.FT_MATCH_PREFIX, 'hkey_pattern': b'hkey1', 'batch_size': 2, 'expected': ['skey99'] }, { 'skey_filter_type': filter_type.FT_MATCH_ANYWHERE, 'skey_pattern': b'xyz', 'hkey_filter_type': filter_type.FT_MATCH_PREFIX, 'hkey_pattern': b'hkey1', 'batch_size': 2, 'expected': [] } ] for idx, case in enumerate(test_cases): print(f"\nRunning test case {idx + 1}...") o = ScanOptions() o.sortkey_filter_type = case['skey_filter_type'] o.sortkey_filter_pattern = case['skey_pattern'] o.batch_size = case['batch_size'] o.hashkey_filter_type = case['hkey_filter_type'] o.hashkey_filter_pattern = case['hkey_pattern'] ss = c.get_unordered_scanners(3, o) results = [] for s in ss: while True: try: ret = yield s.get_next() if not ret: break results.append(ret) except Exception as e: print(f"Exception during scan: {e}") break s.close() actual_keys = [k[1] for k, _ in results] expected_keys = case['expected'] assert len(actual_keys) == len(expected_keys) and set(actual_keys) == set(expected_keys), \ f"Test case {idx + 1} failed: Expected {expected_keys} \n got {actual_keys}" print(f"✅ Test case {idx + 1} passed: {len(actual_keys)} keys matched") @inlineCallbacks def test_prefix_match_with_boundaries(): print("Start test_prefix_match_with_boundaries") c = Pegasus(['127.0.0.1:34601', '127.0.0.1:34602', '127.0.0.1:34603'], 'scan_test') try: suc = yield c.init() if not suc: print("Failed to connect to Pegasus server") reactor.stop() return except Exception as e: print(f"Init failed: {e}") reactor.stop() return hkey = 'hkey1' # 写入测试数据:skey0 ~ skey109 for i in range(110): skey = f'skey{i}' yield c.set(hkey, skey, str(i), 0) yield c.set(hkey, 'aa', 'aa', 0) yield c.set(hkey, 'b', 'b', 0) yield c.set(hkey, 'z', 'z', 0) test_cases = [ { 'pattern': b'', 'start': b'aa', 'stop': b'b', 'start_inclusive': True, 'stop_inclusive': True, 'batch_size': 1, 'expected': ['aa', 'b'] }, { 'pattern': b'skey9', 'start': b'skey50', 'stop': b'skey99', 'start_inclusive': True, 'stop_inclusive': True, 'batch_size': 1, 'expected': ['skey9', 'skey90', 'skey91', 'skey92', 'skey93', 'skey94', 'skey95', 'skey96', 'skey97', 'skey98', 'skey99'] }, { 'pattern': b'skey9', 'start': b'skey50', 'stop': b'skey99', 'start_inclusive': False, 'stop_inclusive': False, 'batch_size': 10, 'expected': ['skey9', 'skey90', 'skey91', 'skey92', 'skey93', 'skey94', 'skey95', 'skey96', 'skey97', 'skey98'] }, { 'pattern': b'skey9', 'start': b'skey9', 'stop': b'skey99', 'start_inclusive': True, 'stop_inclusive': False, 'batch_size': 50, 'expected': ['skey9'] + [f'skey{i}' for i in range(90, 99)] }, { 'pattern': b'skey9', 'start': b'skey9', 'stop': b'skey99', 'start_inclusive': False, 'stop_inclusive': True, 'batch_size': 1, 'expected': [f'skey{i}' for i in range(90, 100)] }, { 'pattern': b'skey', 'start': b'skey9', 'stop': b'skey99', 'start_inclusive': False, 'stop_inclusive': True, 'batch_size': 1, 'expected': [f'skey{i}' for i in range(90, 100)] }, { 'pattern': b'skey9', 'start': b'skey9', 'stop': b'skey99', 'start_inclusive': False, 'stop_inclusive': True, 'batch_size': 1, 'expected': [f'skey{i}' for i in range(90, 100)] }, { 'pattern': b'skey9', 'start': b'skey100', 'stop': b'skey105', 'start_inclusive': True, 'stop_inclusive': True, 'batch_size': 10, 'expected': [] }, { 'pattern': b'skey9', 'start': b'skey9', 'stop': b'skey9', 'start_inclusive': True, 'stop_inclusive': True, 'batch_size': 1, 'expected': ['skey9'] }, { 'pattern': b'skey9', 'start': b'skey91', 'stop': b'skey9', 'start_inclusive': True, 'stop_inclusive': True, 'batch_size': 1, 'expected': [] }, { 'pattern': b'', 'start': b'skey91', 'stop': b'', 'start_inclusive': True, 'stop_inclusive': True, 'batch_size': 1, 'expected': ['z']+[f'skey{i}' for i in range(91, 100)] }, { 'pattern': b'skey', 'start': b'skey88', 'stop': b'skey98', 'start_inclusive': True, 'stop_inclusive': True, 'batch_size': 1, 'expected': ['skey9'] + [f'skey{i}' for i in range(88, 99)] }, { 'pattern': b'skey', 'start': b'skey88', 'stop': b'skey98', 'start_inclusive': False, 'stop_inclusive': False, 'batch_size': 1, 'expected': ['skey9'] + [f'skey{i}' for i in range(89, 98)] }, { 'pattern': b'skey1', 'start': b'aa', 'stop': b'', 'start_inclusive': True, 'stop_inclusive': False, 'batch_size': 1, 'expected': ['skey1', 'skey10', 'skey100', 'skey101', 'skey102', 'skey103', 'skey104', 'skey105', 'skey106', 'skey107', 'skey108', 'skey109', 'skey11', 'skey12', 'skey13', 'skey14', 'skey15', 'skey16', 'skey17', 'skey18', 'skey19'] } ] for idx, case in enumerate(test_cases): print(f"\nRunning test case {idx + 1}...") o = ScanOptions() o.sortkey_filter_type = filter_type.FT_MATCH_PREFIX o.sortkey_filter_pattern = case['pattern'] o.start_inclusive = case['start_inclusive'] o.stop_inclusive = case['stop_inclusive'] o.batch_size = case['batch_size'] s = c.get_scanner(hkey, case['start'], case['stop'], o) results = [] while True: try: ret = yield s.get_next() if not ret: break results.append(ret) except Exception as e: print(f"Exception during scan: {e}") break s.close() actual_keys = [k[1] for k, _ in results] expected_keys = case['expected'] assert set(actual_keys) == set(expected_keys), \ f"Test case {idx + 1} failed: \n Expected {expected_keys} \n got {actual_keys}" print(f"✅ Test case {idx + 1} passed: {len(actual_keys)} keys matched") print("All test cases passed!") if __name__ == "__main__": tests = [ test_prefix_match_with_boundaries, test_prefix_match_with_boundaries_full_scan ] deferreds = [defer.maybeDeferred(test) for test in tests] d = defer.gatherResults(deferreds, consumeErrors=False) d.addBoth(lambda _: reactor.stop()) reactor.run() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
