Copilot commented on code in PR #2306:
URL:
https://github.com/apache/incubator-pegasus/pull/2306#discussion_r2436532783
##########
python-client/pypegasus/pgclient.py:
##########
@@ -245,20 +248,148 @@ class MetaSessionManager(SessionManager):
def __init__(self, table_name, timeout):
SessionManager.__init__(self, table_name, timeout)
self.addr_list = []
-
- def add_meta_server(self, meta_addr):
- rpc_addr = rpc_address()
- if rpc_addr.from_string(meta_addr):
- ip_port = meta_addr.split(':')
- if not len(ip_port) == 2:
+ self.host_ports = []
+ self.query_times = 0
+ self.solved_times = 0
+
+ # validate if the given string is a valid IP address
+ def is_valid_ip(self, address):
+ try:
+ socket.inet_pton(socket.AF_INET, address)
+ return True
+ except Exception as e:
+ # maybe ipv6
+ try:
+ socket.inet_pton(socket.AF_INET6, address)
+ return True
+ except Exception as e:
return False
+
+ def get_host_type(self, ip_list):
+ has_ipv4 = False
+ has_ipv6 = False
+
+ for ip_str in ip_list:
+ try:
+ ip = ipaddress.ip_address(ip_str)
+ if ip.version == 4:
+ has_ipv4 = True
+ elif ip.version == 6:
+ has_ipv6 = True
+ except ValueError:
+ continue
+
+ if has_ipv4 and has_ipv6:
+ return host_port_types.kHostTypeGroup
+ elif has_ipv4:
+ return host_port_types.kHostTypeIpv4
+ elif has_ipv6:
+ return host_port_types.kHostTypeIpv6
+ else:
+ return host_port_types.kHostTypeInvalid
+
+ def resolve_all_ips(self, hostname):
+
+ def extract_ips(result, record_type):
+ answers, _, _ = result
+ ips = []
+ for answer in answers:
+ if record_type == dns.A and isinstance(answer.payload,
dns.Record_A):
+ ips.append(answer.payload.dottedQuad())
+ elif record_type == dns.AAAA and isinstance(answer.payload,
dns.Record_AAAA):
+ if isinstance(answer.payload.address, bytes):
+ ip6_bytes = answer.payload.address
+ if len(ip6_bytes) == 16:
+ ip_str = socket.inet_ntop(socket.AF_INET6,
ip6_bytes)
+ ips.append(ip_str)
+ else:
+ print(f"Invalid IPv6 bytes length:
{len(ip6_bytes)}")
Review Comment:
Using print() for error reporting is inconsistent with the rest of the
codebase which uses the logger. Replace with logger.warning() or logger.error()
to maintain consistent logging practices.
##########
python-client/pypegasus/pgclient.py:
##########
@@ -276,8 +407,16 @@ def got_results(self, res):
self.name, result)
return result
- logger.error('query partition info err. table: %s err: %s',
- self.name, res)
+ # all meta server queries failed, maybe need to re-resolve hostname
+ # to get new IP addresses
+ self.query_times += 1
+ logger.error('query partition info err(%s) table: %s, query_times: %d,
err: %s',
+ res.err.errno, self.name, self.query_times, res)
+
Review Comment:
The code attempts to access res.err.errno, but res is a list of tuples from
DeferredList, not an error object with an err attribute. This will cause an
AttributeError when the error logging is triggered.
```suggestion
# Collect error info from failed results
error_details = []
for (suc, result) in res:
if not suc:
# result is a Failure instance from Twisted
err_type = getattr(result.value, '__class__',
type(result.value)).__name__ if hasattr(result, 'value') else
type(result).__name__
err_msg = str(result.value) if hasattr(result, 'value') else
str(result)
error_details.append(f"{err_type}: {err_msg}")
logger.error('query partition info failed table: %s, query_times:
%d, errors: %s',
self.name, self.query_times, "; ".join(error_details))
```
##########
python-client/pypegasus/pgclient.py:
##########
@@ -276,8 +407,16 @@ def got_results(self, res):
self.name, result)
return result
- logger.error('query partition info err. table: %s err: %s',
- self.name, res)
+ # all meta server queries failed, maybe need to re-resolve hostname
+ # to get new IP addresses
+ self.query_times += 1
+ logger.error('query partition info err(%s) table: %s, query_times: %d,
err: %s',
+ res.err.errno, self.name, self.query_times, res)
+
+ if self.query_times >= MAX_META_QUERY_THRESHOLD and self.solved_times
<= MAX_SOLVED_THRESHOLD:
Review Comment:
The logic uses '<=' for MAX_SOLVED_THRESHOLD but '>=' for
MAX_META_QUERY_THRESHOLD, which is inconsistent. Consider using the same
comparison operator ('>=' for both) to improve code clarity, or document why
different operators are needed.
```suggestion
if self.query_times >= MAX_META_QUERY_THRESHOLD and
self.solved_times >= MAX_SOLVED_THRESHOLD:
```
##########
python-client/pypegasus/pgclient.py:
##########
@@ -245,20 +248,148 @@ class MetaSessionManager(SessionManager):
def __init__(self, table_name, timeout):
SessionManager.__init__(self, table_name, timeout)
self.addr_list = []
-
- def add_meta_server(self, meta_addr):
- rpc_addr = rpc_address()
- if rpc_addr.from_string(meta_addr):
- ip_port = meta_addr.split(':')
- if not len(ip_port) == 2:
+ self.host_ports = []
+ self.query_times = 0
+ self.solved_times = 0
+
+ # validate if the given string is a valid IP address
+ def is_valid_ip(self, address):
+ try:
+ socket.inet_pton(socket.AF_INET, address)
+ return True
+ except Exception as e:
+ # maybe ipv6
+ try:
+ socket.inet_pton(socket.AF_INET6, address)
+ return True
+ except Exception as e:
return False
+
+ def get_host_type(self, ip_list):
+ has_ipv4 = False
+ has_ipv6 = False
+
+ for ip_str in ip_list:
+ try:
+ ip = ipaddress.ip_address(ip_str)
+ if ip.version == 4:
+ has_ipv4 = True
+ elif ip.version == 6:
+ has_ipv6 = True
+ except ValueError:
+ continue
+
+ if has_ipv4 and has_ipv6:
+ return host_port_types.kHostTypeGroup
+ elif has_ipv4:
+ return host_port_types.kHostTypeIpv4
+ elif has_ipv6:
+ return host_port_types.kHostTypeIpv6
+ else:
+ return host_port_types.kHostTypeInvalid
+
+ def resolve_all_ips(self, hostname):
+
+ def extract_ips(result, record_type):
+ answers, _, _ = result
+ ips = []
+ for answer in answers:
+ if record_type == dns.A and isinstance(answer.payload,
dns.Record_A):
+ ips.append(answer.payload.dottedQuad())
+ elif record_type == dns.AAAA and isinstance(answer.payload,
dns.Record_AAAA):
+ if isinstance(answer.payload.address, bytes):
+ ip6_bytes = answer.payload.address
+ if len(ip6_bytes) == 16:
+ ip_str = socket.inet_ntop(socket.AF_INET6,
ip6_bytes)
+ ips.append(ip_str)
+ else:
+ print(f"Invalid IPv6 bytes length:
{len(ip6_bytes)}")
+ else:
+ ips.append(str(answer.payload.address))
+ return ips
+
+ resolver = client.getResolver()
+
+ # query A record (ipv4)
+ d_a = resolver.query(dns.Query(hostname, dns.A, dns.IN))
+ d_a_ips = d_a.addCallback(lambda res: extract_ips(res, dns.A))
+
+ # query AAAA record (ipv6)
+ d_aaaa = resolver.query(dns.Query(hostname, dns.AAAA, dns.IN))
+ d_aaaa_ips = d_aaaa.addCallback(lambda res: extract_ips(res, dns.AAAA))
+
+ # gather A record and AAAA record
+ return defer.gatherResults([d_a_ips, d_aaaa_ips], consumeErrors=True) \
+ .addCallback(lambda results: results[0] + results[1])
+
+ @inlineCallbacks
+ def add_meta_server(self, meta_addr):
+ if not isinstance(meta_addr, str):
+ logger.error("meta_addr must be a string: %s", meta_addr)
+ returnValue(False)
+
+ try:
+ host, port_str = meta_addr.split(':', 1)
+ except ValueError:
+ logger.error("invalid address format (expected host:port): %s",
meta_addr)
+ returnValue(False)
+
+ ips = []
+ if self.is_valid_ip(host):
+ ips = [host]
+ else:
+ try:
+ ips = yield self.resolve_all_ips(host)
+ hp = host_port()
+ if hp.from_string(meta_addr):
+ hp.type = self.get_host_type(ips)
+ self.host_ports.append(hp)
+ logger.info("resolved hostname %s to IP type:%s, addr:%s",
hp.host, hp.type, ips)
+ except Exception as e:
+ logger.error("failed to resolve hostname %s: %s", host, e)
+ returnValue(False)
+
+ for ip in ips:
+ self.addr_list.append((ip, int(port_str)))
+
+ returnValue(True)
+
+ @inlineCallbacks
+ def resolve_host(self, err):
+ if not self.host_ports:
+ returnValue(None)
+
+ new_addr_list = []
+ for host_port in self.host_ports:
+ host, port = host_port.to_host_port()
+ try:
+ ips = yield self.resolve_all_ips(host)
+ host_port.type = self.get_host_type(ips)
+ for ip in ips:
+ new_addr_list.append((ip, port))
+ logger.info("resolved hostname %s to IP type:%s, addr:%s",
host_port.host, host_port.type, ips)
+ except Exception as e:
+ logger.error("failed to resolve hostname %s: %s", host, e)
+ continue
Review Comment:
Inconsistent indentation: line 373 has excessive indentation (20 spaces
instead of 16). This should be aligned with the exception block above it.
```suggestion
logger.error("failed to resolve hostname %s: %s", host, e)
continue
```
##########
python-client/pypegasus/pgclient.py:
##########
@@ -641,23 +781,34 @@ def __init__(self, meta_addrs=None, table_name='',
self.name = table_name
self.table = Table(table_name, self, timeout)
self.meta_session_manager = MetaSessionManager(table_name, timeout)
- if isinstance(meta_addrs, list):
- for meta_addr in meta_addrs:
- self.meta_session_manager.add_meta_server(meta_addr)
+ self.initial_meta = False
+ self.meta_addrs = meta_addrs if isinstance(meta_addrs, list) else []
PegasusHash.populate_table()
self.timeout_times = 0
self.update_partition = False
self.timer = reactor.callLater(META_CHECK_INTERVAL, self.check_state)
+ @inlineCallbacks
def init(self):
"""
Initialize the client before you can use it.
:return: (DeferredList) True when initialized succeed, others when
failed.
"""
+ if not self.initial_meta:
+ deferreds = []
+ self.initial_meta = True
+ for host_port in self.meta_addrs:
+ d = self.meta_session_manager.add_meta_server(host_port)
+ deferreds.append(d)
+ results = yield defer.DeferredList(deferreds, consumeErrors=True)
+ if not any(success for success, _ in results):
+ raise Exception("all meta servers failed to initialize")
Review Comment:
The error message 'all meta servers failed to initialize' doesn't provide
actionable information. Consider including details about what addresses failed
or the specific errors encountered to aid troubleshooting.
```suggestion
# Collect details about failed meta servers
failed_details = []
for idx, (success, result) in enumerate(results):
if not success:
addr = self.meta_addrs[idx] if idx <
len(self.meta_addrs) else "<unknown>"
error_str = str(result)
failed_details.append(f"{addr}: {error_str}")
details_msg = "; ".join(failed_details) if failed_details
else "No details available"
raise Exception(f"all meta servers failed to initialize.
Details: {details_msg}")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]