Copilot commented on code in PR #2306:
URL: 
https://github.com/apache/incubator-pegasus/pull/2306#discussion_r2436532783


##########
python-client/pypegasus/pgclient.py:
##########
@@ -245,20 +248,148 @@ class MetaSessionManager(SessionManager):
     def __init__(self, table_name, timeout):
         SessionManager.__init__(self, table_name, timeout)
         self.addr_list = []
-
-    def add_meta_server(self, meta_addr):
-        rpc_addr = rpc_address()
-        if rpc_addr.from_string(meta_addr):
-            ip_port = meta_addr.split(':')
-            if not len(ip_port) == 2:
+        self.host_ports = []
+        self.query_times = 0
+        self.solved_times = 0
+        
+    # validate if the given string is a valid IP address     
+    def is_valid_ip(self, address):
+        try:
+            socket.inet_pton(socket.AF_INET, address)
+            return True
+        except Exception as e:
+            # maybe ipv6
+            try:
+                socket.inet_pton(socket.AF_INET6, address)
+                return True
+            except Exception as e:
                 return False
+            
+    def get_host_type(self, ip_list):
+        has_ipv4 = False
+        has_ipv6 = False
+
+        for ip_str in ip_list:
+            try:
+                ip = ipaddress.ip_address(ip_str)
+                if ip.version == 4:
+                    has_ipv4 = True
+                elif ip.version == 6:
+                    has_ipv6 = True
+            except ValueError:
+                continue 
+
+        if has_ipv4 and has_ipv6:
+            return host_port_types.kHostTypeGroup
+        elif has_ipv4:
+            return host_port_types.kHostTypeIpv4
+        elif has_ipv6:
+            return host_port_types.kHostTypeIpv6
+        else:
+            return host_port_types.kHostTypeInvalid
+    
+    def resolve_all_ips(self, hostname):
+
+        def extract_ips(result, record_type):
+            answers, _, _ = result
+            ips = []
+            for answer in answers:
+                if record_type == dns.A and isinstance(answer.payload, 
dns.Record_A):
+                    ips.append(answer.payload.dottedQuad())
+                elif record_type == dns.AAAA and isinstance(answer.payload, 
dns.Record_AAAA):
+                    if isinstance(answer.payload.address, bytes):
+                        ip6_bytes = answer.payload.address
+                        if len(ip6_bytes) == 16:
+                            ip_str = socket.inet_ntop(socket.AF_INET6, 
ip6_bytes)
+                            ips.append(ip_str)
+                        else:
+                            print(f"Invalid IPv6 bytes length: 
{len(ip6_bytes)}")

Review Comment:
   Using print() for error reporting is inconsistent with the rest of the 
codebase which uses the logger. Replace with logger.warning() or logger.error() 
to maintain consistent logging practices.



##########
python-client/pypegasus/pgclient.py:
##########
@@ -276,8 +407,16 @@ def got_results(self, res):
                             self.name, result)
                 return result
 
-        logger.error('query partition info err. table: %s err: %s',
-                     self.name, res)
+        # all meta server queries failed, maybe need to re-resolve hostname 
+        # to get new IP addresses
+        self.query_times += 1
+        logger.error('query partition info err(%s) table: %s, query_times: %d, 
err: %s',
+                    res.err.errno, self.name, self.query_times, res)
+        

Review Comment:
   The code attempts to access res.err.errno, but res is a list of tuples from 
DeferredList, not an error object with an err attribute. This will cause an 
AttributeError when the error logging is triggered.
   ```suggestion
           # Collect error info from failed results
           error_details = []
           for (suc, result) in res:
               if not suc:
                   # result is a Failure instance from Twisted
                   err_type = getattr(result.value, '__class__', 
type(result.value)).__name__ if hasattr(result, 'value') else 
type(result).__name__
                   err_msg = str(result.value) if hasattr(result, 'value') else 
str(result)
                   error_details.append(f"{err_type}: {err_msg}")
           logger.error('query partition info failed table: %s, query_times: 
%d, errors: %s',
                        self.name, self.query_times, "; ".join(error_details))
   ```



##########
python-client/pypegasus/pgclient.py:
##########
@@ -276,8 +407,16 @@ def got_results(self, res):
                             self.name, result)
                 return result
 
-        logger.error('query partition info err. table: %s err: %s',
-                     self.name, res)
+        # all meta server queries failed, maybe need to re-resolve hostname 
+        # to get new IP addresses
+        self.query_times += 1
+        logger.error('query partition info err(%s) table: %s, query_times: %d, 
err: %s',
+                    res.err.errno, self.name, self.query_times, res)
+        
+        if self.query_times >= MAX_META_QUERY_THRESHOLD and self.solved_times 
<= MAX_SOLVED_THRESHOLD:

Review Comment:
   The logic uses '<=' for MAX_SOLVED_THRESHOLD but '>=' for 
MAX_META_QUERY_THRESHOLD, which is inconsistent. Consider using the same 
comparison operator ('>=' for both) to improve code clarity, or document why 
different operators are needed.
   ```suggestion
           if self.query_times >= MAX_META_QUERY_THRESHOLD and 
self.solved_times >= MAX_SOLVED_THRESHOLD:
   ```



##########
python-client/pypegasus/pgclient.py:
##########
@@ -245,20 +248,148 @@ class MetaSessionManager(SessionManager):
     def __init__(self, table_name, timeout):
         SessionManager.__init__(self, table_name, timeout)
         self.addr_list = []
-
-    def add_meta_server(self, meta_addr):
-        rpc_addr = rpc_address()
-        if rpc_addr.from_string(meta_addr):
-            ip_port = meta_addr.split(':')
-            if not len(ip_port) == 2:
+        self.host_ports = []
+        self.query_times = 0
+        self.solved_times = 0
+        
+    # validate if the given string is a valid IP address     
+    def is_valid_ip(self, address):
+        try:
+            socket.inet_pton(socket.AF_INET, address)
+            return True
+        except Exception as e:
+            # maybe ipv6
+            try:
+                socket.inet_pton(socket.AF_INET6, address)
+                return True
+            except Exception as e:
                 return False
+            
+    def get_host_type(self, ip_list):
+        has_ipv4 = False
+        has_ipv6 = False
+
+        for ip_str in ip_list:
+            try:
+                ip = ipaddress.ip_address(ip_str)
+                if ip.version == 4:
+                    has_ipv4 = True
+                elif ip.version == 6:
+                    has_ipv6 = True
+            except ValueError:
+                continue 
+
+        if has_ipv4 and has_ipv6:
+            return host_port_types.kHostTypeGroup
+        elif has_ipv4:
+            return host_port_types.kHostTypeIpv4
+        elif has_ipv6:
+            return host_port_types.kHostTypeIpv6
+        else:
+            return host_port_types.kHostTypeInvalid
+    
+    def resolve_all_ips(self, hostname):
+
+        def extract_ips(result, record_type):
+            answers, _, _ = result
+            ips = []
+            for answer in answers:
+                if record_type == dns.A and isinstance(answer.payload, 
dns.Record_A):
+                    ips.append(answer.payload.dottedQuad())
+                elif record_type == dns.AAAA and isinstance(answer.payload, 
dns.Record_AAAA):
+                    if isinstance(answer.payload.address, bytes):
+                        ip6_bytes = answer.payload.address
+                        if len(ip6_bytes) == 16:
+                            ip_str = socket.inet_ntop(socket.AF_INET6, 
ip6_bytes)
+                            ips.append(ip_str)
+                        else:
+                            print(f"Invalid IPv6 bytes length: 
{len(ip6_bytes)}")
+                    else:
+                        ips.append(str(answer.payload.address))
+            return ips
+
+        resolver = client.getResolver()
+
+        # query A record (ipv4)
+        d_a = resolver.query(dns.Query(hostname, dns.A, dns.IN))
+        d_a_ips = d_a.addCallback(lambda res: extract_ips(res, dns.A))
+
+        # query AAAA record (ipv6)
+        d_aaaa = resolver.query(dns.Query(hostname, dns.AAAA, dns.IN))
+        d_aaaa_ips = d_aaaa.addCallback(lambda res: extract_ips(res, dns.AAAA))
+
+        # gather A record and AAAA record
+        return defer.gatherResults([d_a_ips, d_aaaa_ips], consumeErrors=True) \
+                .addCallback(lambda results: results[0] + results[1])
+        
+    @inlineCallbacks
+    def add_meta_server(self, meta_addr):
+        if not isinstance(meta_addr, str):
+            logger.error("meta_addr must be a string: %s", meta_addr)
+            returnValue(False)
+        
+        try:
+            host, port_str = meta_addr.split(':', 1)
+        except ValueError:
+            logger.error("invalid address format (expected host:port): %s", 
meta_addr)
+            returnValue(False)
+
+        ips = []
+        if self.is_valid_ip(host):
+            ips = [host]
+        else:
+            try:
+                ips = yield self.resolve_all_ips(host)
+                hp = host_port()
+                if hp.from_string(meta_addr):
+                    hp.type = self.get_host_type(ips)
+                    self.host_ports.append(hp)
+                    logger.info("resolved hostname %s to IP type:%s, addr:%s", 
hp.host, hp.type, ips)
+            except Exception as e:
+                logger.error("failed to resolve hostname %s: %s", host, e)
+                returnValue(False)
+            
+        for ip in ips:
+            self.addr_list.append((ip, int(port_str)))
+
+        returnValue(True)
+
+    @inlineCallbacks
+    def resolve_host(self, err):
+        if not self.host_ports:
+            returnValue(None) 
+        
+        new_addr_list = []
+        for host_port in self.host_ports:
+            host, port = host_port.to_host_port()
+            try:
+                ips = yield self.resolve_all_ips(host)
+                host_port.type = self.get_host_type(ips)
+                for ip in ips:
+                    new_addr_list.append((ip, port))
+                logger.info("resolved hostname %s to IP type:%s, addr:%s", 
host_port.host, host_port.type, ips)
+            except Exception as e:
+                    logger.error("failed to resolve hostname %s: %s", host, e)
+                    continue

Review Comment:
   Inconsistent indentation: line 373 has excessive indentation (20 spaces 
instead of 16). This should be aligned with the exception block above it.
   ```suggestion
                   logger.error("failed to resolve hostname %s: %s", host, e)
                   continue
   ```



##########
python-client/pypegasus/pgclient.py:
##########
@@ -641,23 +781,34 @@ def __init__(self, meta_addrs=None, table_name='',
         self.name = table_name
         self.table = Table(table_name, self, timeout)
         self.meta_session_manager = MetaSessionManager(table_name, timeout)
-        if isinstance(meta_addrs, list):
-            for meta_addr in meta_addrs:
-                self.meta_session_manager.add_meta_server(meta_addr)
+        self.initial_meta = False
+        self.meta_addrs = meta_addrs if isinstance(meta_addrs, list) else []
         PegasusHash.populate_table()
         self.timeout_times = 0
         self.update_partition = False
         self.timer = reactor.callLater(META_CHECK_INTERVAL, self.check_state)
 
+    @inlineCallbacks
     def init(self):
         """
         Initialize the client before you can use it.
 
         :return: (DeferredList) True when initialized succeed, others when 
failed.
         """
+        if not self.initial_meta:
+            deferreds = []
+            self.initial_meta = True
+            for host_port in self.meta_addrs:
+                d = self.meta_session_manager.add_meta_server(host_port)
+                deferreds.append(d)
+            results = yield defer.DeferredList(deferreds, consumeErrors=True)
+            if not any(success for success, _ in results):
+                raise Exception("all meta servers failed to initialize")

Review Comment:
   The error message 'all meta servers failed to initialize' doesn't provide 
actionable information. Consider including details about what addresses failed 
or the specific errors encountered to aid troubleshooting.
   ```suggestion
                   # Collect details about failed meta servers
                   failed_details = []
                   for idx, (success, result) in enumerate(results):
                       if not success:
                           addr = self.meta_addrs[idx] if idx < 
len(self.meta_addrs) else "<unknown>"
                           error_str = str(result)
                           failed_details.append(f"{addr}: {error_str}")
                   details_msg = "; ".join(failed_details) if failed_details 
else "No details available"
                   raise Exception(f"all meta servers failed to initialize. 
Details: {details_msg}")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to