Copilot commented on code in PR #2306:
URL: 
https://github.com/apache/incubator-pegasus/pull/2306#discussion_r2514974424


##########
python-client/pypegasus/pgclient.py:
##########
@@ -245,20 +247,146 @@ class MetaSessionManager(SessionManager):
     def __init__(self, table_name, timeout):
         SessionManager.__init__(self, table_name, timeout)
         self.addr_list = []
-
-    def add_meta_server(self, meta_addr):
-        rpc_addr = rpc_address()
-        if rpc_addr.from_string(meta_addr):
-            ip_port = meta_addr.split(':')
-            if not len(ip_port) == 2:
+        self.host_ports = []
+        self.query_times = 0
+        
+    # validate if the given string is a valid IP address     
+    def is_valid_ip(self, address):
+        try:
+            socket.inet_pton(socket.AF_INET, address)
+            return True
+        except Exception as e:
+            # maybe ipv6
+            try:
+                socket.inet_pton(socket.AF_INET6, address)
+                return True
+            except Exception as e:
                 return False
+            
+    def get_host_type(self, ip_list):
+        has_ipv4 = False
+        has_ipv6 = False
+
+        for ip_str in ip_list:
+            try:
+                ip = ipaddress.ip_address(ip_str)
+                if ip.version == 4:
+                    has_ipv4 = True
+                elif ip.version == 6:
+                    has_ipv6 = True
+            except ValueError:
+                continue 
+
+        if has_ipv4 and has_ipv6:
+            return host_port_types.kHostTypeGroup
+        elif has_ipv4:
+            return host_port_types.kHostTypeIpv4
+        elif has_ipv6:
+            return host_port_types.kHostTypeIpv6
+        else:
+            return host_port_types.kHostTypeInvalid
+    
+    def resolve_all_ips(self, hostname):

Review Comment:
   Missing docstrings for newly added methods. Add docstrings for 
`get_host_type`, `resolve_all_ips`, and `resolve_host` methods explaining their 
purpose, parameters, and return values. This is especially important for 
complex asynchronous methods that perform DNS resolution.



##########
python-client/pypegasus/pgclient.py:
##########
@@ -245,20 +247,146 @@ class MetaSessionManager(SessionManager):
     def __init__(self, table_name, timeout):
         SessionManager.__init__(self, table_name, timeout)
         self.addr_list = []
-
-    def add_meta_server(self, meta_addr):
-        rpc_addr = rpc_address()
-        if rpc_addr.from_string(meta_addr):
-            ip_port = meta_addr.split(':')
-            if not len(ip_port) == 2:
+        self.host_ports = []
+        self.query_times = 0
+        
+    # validate if the given string is a valid IP address     
+    def is_valid_ip(self, address):
+        try:
+            socket.inet_pton(socket.AF_INET, address)
+            return True
+        except Exception as e:
+            # maybe ipv6
+            try:
+                socket.inet_pton(socket.AF_INET6, address)
+                return True
+            except Exception as e:
                 return False

Review Comment:
   Code duplication and inconsistency: The code uses both `socket.inet_pton()` 
in `is_valid_ip` (lines 256, 261) and `ipaddress.ip_address()` in 
`get_host_type` (line 272) to validate IP addresses. Consider using a 
consistent approach throughout. The `ipaddress` module is more Pythonic and 
provides better error handling. Consider refactoring `is_valid_ip` to use 
`ipaddress.ip_address()` instead.
   ```suggestion
               ipaddress.ip_address(address)
               return True
           except ValueError:
               return False
   ```



##########
python-client/pypegasus/pgclient.py:
##########
@@ -245,20 +247,146 @@ class MetaSessionManager(SessionManager):
     def __init__(self, table_name, timeout):
         SessionManager.__init__(self, table_name, timeout)
         self.addr_list = []
-
-    def add_meta_server(self, meta_addr):
-        rpc_addr = rpc_address()
-        if rpc_addr.from_string(meta_addr):
-            ip_port = meta_addr.split(':')
-            if not len(ip_port) == 2:
+        self.host_ports = []
+        self.query_times = 0
+        
+    # validate if the given string is a valid IP address     
+    def is_valid_ip(self, address):
+        try:
+            socket.inet_pton(socket.AF_INET, address)
+            return True
+        except Exception as e:
+            # maybe ipv6
+            try:
+                socket.inet_pton(socket.AF_INET6, address)
+                return True
+            except Exception as e:
                 return False
+            
+    def get_host_type(self, ip_list):
+        has_ipv4 = False
+        has_ipv6 = False
+
+        for ip_str in ip_list:
+            try:
+                ip = ipaddress.ip_address(ip_str)
+                if ip.version == 4:
+                    has_ipv4 = True
+                elif ip.version == 6:
+                    has_ipv6 = True
+            except ValueError:
+                continue 
+
+        if has_ipv4 and has_ipv6:
+            return host_port_types.kHostTypeGroup
+        elif has_ipv4:
+            return host_port_types.kHostTypeIpv4
+        elif has_ipv6:
+            return host_port_types.kHostTypeIpv6
+        else:
+            return host_port_types.kHostTypeInvalid
+    
+    def resolve_all_ips(self, hostname):
+
+        def extract_ips(result, record_type):
+            answers, _, _ = result
+            ips = []
+            for answer in answers:
+                if record_type == dns.A and isinstance(answer.payload, 
dns.Record_A):
+                    ips.append(answer.payload.dottedQuad())
+                elif record_type == dns.AAAA and isinstance(answer.payload, 
dns.Record_AAAA):
+                    if isinstance(answer.payload.address, bytes):
+                        ip6_bytes = answer.payload.address
+                        if len(ip6_bytes) == 16:
+                            ip_str = socket.inet_ntop(socket.AF_INET6, 
ip6_bytes)
+                            ips.append(ip_str)
+                        else:
+                            logger.error('Invalid IPv6 bytes length: %s', 
len(ip6_bytes))
+                    else:
+                        ips.append(str(answer.payload.address))
+            return ips
+
+        resolver = client.getResolver()
+
+        # query A record (ipv4)
+        d_a = resolver.query(dns.Query(hostname, dns.A, dns.IN))
+        d_a_ips = d_a.addCallback(lambda res: extract_ips(res, dns.A))
+
+        # query AAAA record (ipv6)
+        d_aaaa = resolver.query(dns.Query(hostname, dns.AAAA, dns.IN))
+        d_aaaa_ips = d_aaaa.addCallback(lambda res: extract_ips(res, dns.AAAA))
+
+        # gather A record and AAAA record
+        return defer.gatherResults([d_a_ips, d_aaaa_ips], consumeErrors=True) \
+                .addCallback(lambda results: results[0] + results[1])
+        
+    @inlineCallbacks
+    def add_meta_server(self, meta_addr):
+        if not isinstance(meta_addr, str):
+            logger.error("meta_addr must be a string: %s", meta_addr)
+            returnValue(False)
+        
+        try:
+            host, port_str = meta_addr.split(':', 1)
+        except ValueError:
+            logger.error("invalid address format (expected host:port): %s", 
meta_addr)
+            returnValue(False)
+
+        ips = []
+        if self.is_valid_ip(host):
+            ips = [host]
+        else:
+            try:
+                ips = yield self.resolve_all_ips(host)
+                hp = host_port()
+                if hp.from_string(meta_addr):
+                    hp.type = self.get_host_type(ips)
+                    self.host_ports.append(hp)
+                    logger.info("resolved hostname %s to IP type:%s, addr:%s", 
hp.host, hp.type, ips)
+            except Exception as e:
+                logger.error("failed to resolve hostname %s: %s", host, e)
+                returnValue(False)
+            
+        for ip in ips:
+            self.addr_list.append((ip, int(port_str)))
+
+        returnValue(True)
 
-            ip, port = ip_port[0], int(ip_port[1])
-            self.addr_list.append((ip, port))
+    @inlineCallbacks
+    def resolve_host(self, err):
+        if not self.host_ports:
+            returnValue(None) 
+        
+        new_addr_list = []
+        for host_port in self.host_ports:
+            host, port = host_port.to_host_port()
+            try:
+                ips = yield self.resolve_all_ips(host)
+                host_port.type = self.get_host_type(ips)
+                for ip in ips:
+                    new_addr_list.append((ip, port))
+                logger.info("resolved hostname %s to IP type:%s, addr:%s", 
host_port.host, host_port.type, ips)
+            except Exception as e:
+                logger.error("failed to resolve hostname %s: %s", host, e)
+                continue
+                
+        if not new_addr_list or new_addr_list == self.addr_list:
+            returnValue(None)
+     
+        stale_sessions = []
+        for rpc_addr in list(self.session_dict):
+            ip, port = rpc_addr.to_ip_port()
+            if (ip, port) not in new_addr_list:
+                stale_sessions.append(self.session_dict.pop(rpc_addr))
+                logger.info("removed stale server: %s:%s", ip, port)
 
-            return True
-        else:
-            return False
+        self.addr_list = new_addr_list
+
+        for session in stale_sessions:
+            if session: 

Review Comment:
   The `session` parameter at line 386 is checked with `if session:` but 
session objects may have custom `__bool__` implementations. Use `if session is 
not None:` for more explicit and reliable checking. Additionally, using 
`reactor.callFromThread` suggests potential thread safety issues - ensure that 
session closing is thread-safe or document why `callFromThread` is necessary 
here.
   ```suggestion
               if session is not None:
                   # Ensure session.close is called in the reactor thread for 
thread safety.
   ```



##########
python-client/pypegasus/pgclient.py:
##########
@@ -245,20 +247,146 @@ class MetaSessionManager(SessionManager):
     def __init__(self, table_name, timeout):
         SessionManager.__init__(self, table_name, timeout)
         self.addr_list = []
-
-    def add_meta_server(self, meta_addr):
-        rpc_addr = rpc_address()
-        if rpc_addr.from_string(meta_addr):
-            ip_port = meta_addr.split(':')
-            if not len(ip_port) == 2:
+        self.host_ports = []
+        self.query_times = 0
+        
+    # validate if the given string is a valid IP address     
+    def is_valid_ip(self, address):
+        try:
+            socket.inet_pton(socket.AF_INET, address)
+            return True
+        except Exception as e:
+            # maybe ipv6
+            try:
+                socket.inet_pton(socket.AF_INET6, address)
+                return True
+            except Exception as e:
                 return False
+            
+    def get_host_type(self, ip_list):
+        has_ipv4 = False
+        has_ipv6 = False
+
+        for ip_str in ip_list:
+            try:
+                ip = ipaddress.ip_address(ip_str)
+                if ip.version == 4:
+                    has_ipv4 = True
+                elif ip.version == 6:
+                    has_ipv6 = True
+            except ValueError:
+                continue 
+
+        if has_ipv4 and has_ipv6:
+            return host_port_types.kHostTypeGroup
+        elif has_ipv4:
+            return host_port_types.kHostTypeIpv4
+        elif has_ipv6:
+            return host_port_types.kHostTypeIpv6
+        else:
+            return host_port_types.kHostTypeInvalid
+    
+    def resolve_all_ips(self, hostname):
+
+        def extract_ips(result, record_type):
+            answers, _, _ = result
+            ips = []
+            for answer in answers:
+                if record_type == dns.A and isinstance(answer.payload, 
dns.Record_A):
+                    ips.append(answer.payload.dottedQuad())
+                elif record_type == dns.AAAA and isinstance(answer.payload, 
dns.Record_AAAA):
+                    if isinstance(answer.payload.address, bytes):
+                        ip6_bytes = answer.payload.address
+                        if len(ip6_bytes) == 16:
+                            ip_str = socket.inet_ntop(socket.AF_INET6, 
ip6_bytes)
+                            ips.append(ip_str)
+                        else:
+                            logger.error('Invalid IPv6 bytes length: %s', 
len(ip6_bytes))
+                    else:
+                        ips.append(str(answer.payload.address))
+            return ips
+
+        resolver = client.getResolver()
+
+        # query A record (ipv4)
+        d_a = resolver.query(dns.Query(hostname, dns.A, dns.IN))
+        d_a_ips = d_a.addCallback(lambda res: extract_ips(res, dns.A))
+
+        # query AAAA record (ipv6)
+        d_aaaa = resolver.query(dns.Query(hostname, dns.AAAA, dns.IN))
+        d_aaaa_ips = d_aaaa.addCallback(lambda res: extract_ips(res, dns.AAAA))
+
+        # gather A record and AAAA record
+        return defer.gatherResults([d_a_ips, d_aaaa_ips], consumeErrors=True) \
+                .addCallback(lambda results: results[0] + results[1])
+        
+    @inlineCallbacks
+    def add_meta_server(self, meta_addr):
+        if not isinstance(meta_addr, str):
+            logger.error("meta_addr must be a string: %s", meta_addr)
+            returnValue(False)
+        
+        try:
+            host, port_str = meta_addr.split(':', 1)
+        except ValueError:
+            logger.error("invalid address format (expected host:port): %s", 
meta_addr)
+            returnValue(False)
+
+        ips = []
+        if self.is_valid_ip(host):
+            ips = [host]
+        else:
+            try:
+                ips = yield self.resolve_all_ips(host)
+                hp = host_port()
+                if hp.from_string(meta_addr):
+                    hp.type = self.get_host_type(ips)
+                    self.host_ports.append(hp)
+                    logger.info("resolved hostname %s to IP type:%s, addr:%s", 
hp.host, hp.type, ips)
+            except Exception as e:
+                logger.error("failed to resolve hostname %s: %s", host, e)
+                returnValue(False)
+            
+        for ip in ips:
+            self.addr_list.append((ip, int(port_str)))
+
+        returnValue(True)
 
-            ip, port = ip_port[0], int(ip_port[1])
-            self.addr_list.append((ip, port))
+    @inlineCallbacks
+    def resolve_host(self, err):

Review Comment:
   The `err` parameter in `resolve_host` is declared but never used. If this 
parameter is intended to receive error information from the errback chain, 
document its purpose and consider logging or handling the error. If it's not 
needed, consider removing it from the function signature.
   ```suggestion
       def resolve_host(self):
   ```



##########
python-client/pypegasus/pgclient.py:
##########
@@ -245,20 +247,146 @@ class MetaSessionManager(SessionManager):
     def __init__(self, table_name, timeout):
         SessionManager.__init__(self, table_name, timeout)
         self.addr_list = []
-
-    def add_meta_server(self, meta_addr):
-        rpc_addr = rpc_address()
-        if rpc_addr.from_string(meta_addr):
-            ip_port = meta_addr.split(':')
-            if not len(ip_port) == 2:
+        self.host_ports = []
+        self.query_times = 0
+        
+    # validate if the given string is a valid IP address     
+    def is_valid_ip(self, address):
+        try:
+            socket.inet_pton(socket.AF_INET, address)
+            return True
+        except Exception as e:
+            # maybe ipv6
+            try:
+                socket.inet_pton(socket.AF_INET6, address)
+                return True
+            except Exception as e:

Review Comment:
   The variable `e` in the exception handler is unused. Consider either 
removing it (`except Exception:`) or using it in the error handling logic if 
additional debugging information is needed.
   ```suggestion
           except Exception:
               # maybe ipv6
               try:
                   socket.inet_pton(socket.AF_INET6, address)
                   return True
               except Exception:
   ```



##########
python-client/pypegasus/pgclient.py:
##########
@@ -245,20 +247,146 @@ class MetaSessionManager(SessionManager):
     def __init__(self, table_name, timeout):
         SessionManager.__init__(self, table_name, timeout)
         self.addr_list = []
-
-    def add_meta_server(self, meta_addr):
-        rpc_addr = rpc_address()
-        if rpc_addr.from_string(meta_addr):
-            ip_port = meta_addr.split(':')
-            if not len(ip_port) == 2:
+        self.host_ports = []
+        self.query_times = 0
+        
+    # validate if the given string is a valid IP address     
+    def is_valid_ip(self, address):
+        try:
+            socket.inet_pton(socket.AF_INET, address)
+            return True
+        except Exception as e:
+            # maybe ipv6
+            try:
+                socket.inet_pton(socket.AF_INET6, address)
+                return True
+            except Exception as e:

Review Comment:
   The variable `e` in the exception handler is unused. Consider either 
removing it (`except Exception:`) or using it in the error handling logic.
   ```suggestion
               except Exception:
   ```



##########
python-client/pypegasus/pgclient.py:
##########
@@ -245,20 +247,146 @@ class MetaSessionManager(SessionManager):
     def __init__(self, table_name, timeout):
         SessionManager.__init__(self, table_name, timeout)
         self.addr_list = []
-
-    def add_meta_server(self, meta_addr):
-        rpc_addr = rpc_address()
-        if rpc_addr.from_string(meta_addr):
-            ip_port = meta_addr.split(':')
-            if not len(ip_port) == 2:
+        self.host_ports = []
+        self.query_times = 0
+        
+    # validate if the given string is a valid IP address     
+    def is_valid_ip(self, address):
+        try:
+            socket.inet_pton(socket.AF_INET, address)

Review Comment:
   Missing import for the `socket` module. While `socket` is imported in 
`pypegasus.base.ttypes`, relying on transitive imports through wildcard imports 
(`from pypegasus.base.ttypes import *`) is fragile and not a best practice. Add 
an explicit `import socket` at the top of the file.



##########
python-client/pypegasus/pgclient.py:
##########
@@ -245,20 +247,146 @@ class MetaSessionManager(SessionManager):
     def __init__(self, table_name, timeout):
         SessionManager.__init__(self, table_name, timeout)
         self.addr_list = []
-
-    def add_meta_server(self, meta_addr):
-        rpc_addr = rpc_address()
-        if rpc_addr.from_string(meta_addr):
-            ip_port = meta_addr.split(':')
-            if not len(ip_port) == 2:
+        self.host_ports = []
+        self.query_times = 0
+        
+    # validate if the given string is a valid IP address     
+    def is_valid_ip(self, address):
+        try:
+            socket.inet_pton(socket.AF_INET, address)
+            return True
+        except Exception as e:
+            # maybe ipv6
+            try:
+                socket.inet_pton(socket.AF_INET6, address)
+                return True
+            except Exception as e:
                 return False
+            
+    def get_host_type(self, ip_list):
+        has_ipv4 = False
+        has_ipv6 = False
+
+        for ip_str in ip_list:
+            try:
+                ip = ipaddress.ip_address(ip_str)
+                if ip.version == 4:
+                    has_ipv4 = True
+                elif ip.version == 6:
+                    has_ipv6 = True
+            except ValueError:
+                continue 
+
+        if has_ipv4 and has_ipv6:
+            return host_port_types.kHostTypeGroup
+        elif has_ipv4:
+            return host_port_types.kHostTypeIpv4
+        elif has_ipv6:
+            return host_port_types.kHostTypeIpv6
+        else:
+            return host_port_types.kHostTypeInvalid
+    
+    def resolve_all_ips(self, hostname):
+
+        def extract_ips(result, record_type):
+            answers, _, _ = result
+            ips = []
+            for answer in answers:
+                if record_type == dns.A and isinstance(answer.payload, 
dns.Record_A):
+                    ips.append(answer.payload.dottedQuad())
+                elif record_type == dns.AAAA and isinstance(answer.payload, 
dns.Record_AAAA):
+                    if isinstance(answer.payload.address, bytes):
+                        ip6_bytes = answer.payload.address
+                        if len(ip6_bytes) == 16:
+                            ip_str = socket.inet_ntop(socket.AF_INET6, 
ip6_bytes)
+                            ips.append(ip_str)
+                        else:
+                            logger.error('Invalid IPv6 bytes length: %s', 
len(ip6_bytes))
+                    else:
+                        ips.append(str(answer.payload.address))
+            return ips
+
+        resolver = client.getResolver()
+
+        # query A record (ipv4)
+        d_a = resolver.query(dns.Query(hostname, dns.A, dns.IN))
+        d_a_ips = d_a.addCallback(lambda res: extract_ips(res, dns.A))
+
+        # query AAAA record (ipv6)
+        d_aaaa = resolver.query(dns.Query(hostname, dns.AAAA, dns.IN))
+        d_aaaa_ips = d_aaaa.addCallback(lambda res: extract_ips(res, dns.AAAA))
+
+        # gather A record and AAAA record
+        return defer.gatherResults([d_a_ips, d_aaaa_ips], consumeErrors=True) \
+                .addCallback(lambda results: results[0] + results[1])
+        
+    @inlineCallbacks
+    def add_meta_server(self, meta_addr):
+        if not isinstance(meta_addr, str):
+            logger.error("meta_addr must be a string: %s", meta_addr)
+            returnValue(False)
+        
+        try:
+            host, port_str = meta_addr.split(':', 1)
+        except ValueError:
+            logger.error("invalid address format (expected host:port): %s", 
meta_addr)
+            returnValue(False)
+
+        ips = []
+        if self.is_valid_ip(host):
+            ips = [host]
+        else:
+            try:
+                ips = yield self.resolve_all_ips(host)
+                hp = host_port()
+                if hp.from_string(meta_addr):
+                    hp.type = self.get_host_type(ips)
+                    self.host_ports.append(hp)
+                    logger.info("resolved hostname %s to IP type:%s, addr:%s", 
hp.host, hp.type, ips)
+            except Exception as e:
+                logger.error("failed to resolve hostname %s: %s", host, e)
+                returnValue(False)
+            
+        for ip in ips:
+            self.addr_list.append((ip, int(port_str)))
+
+        returnValue(True)
 
-            ip, port = ip_port[0], int(ip_port[1])
-            self.addr_list.append((ip, port))
+    @inlineCallbacks
+    def resolve_host(self, err):
+        if not self.host_ports:
+            returnValue(None) 
+        
+        new_addr_list = []
+        for host_port in self.host_ports:
+            host, port = host_port.to_host_port()
+            try:
+                ips = yield self.resolve_all_ips(host)
+                host_port.type = self.get_host_type(ips)
+                for ip in ips:
+                    new_addr_list.append((ip, port))
+                logger.info("resolved hostname %s to IP type:%s, addr:%s", 
host_port.host, host_port.type, ips)
+            except Exception as e:
+                logger.error("failed to resolve hostname %s: %s", host, e)
+                continue
+                
+        if not new_addr_list or new_addr_list == self.addr_list:

Review Comment:
   Potential logic issue: The comparison `new_addr_list == self.addr_list` at 
line 373 compares lists of tuples. However, the order of addresses might differ 
even if the same servers are present, leading to unnecessary re-initialization. 
Consider using set comparison (`set(new_addr_list) == set(self.addr_list)`) to 
check if the addresses have actually changed, regardless of order.
   ```suggestion
           if not new_addr_list or set(new_addr_list) == set(self.addr_list):
   ```



##########
python-client/pypegasus/pgclient.py:
##########
@@ -21,12 +21,14 @@
 import os
 import logging.config
 import six
+import ipaddress
 
 from thrift.Thrift import TMessageType, TApplicationException
 from twisted.internet import defer
 from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks, succeed, fail
+from twisted.internet.defer import inlineCallbacks, succeed, fail, returnValue

Review Comment:
   Import of 'succeed' is not used.
   Import of 'fail' is not used.
   ```suggestion
   from twisted.internet.defer import inlineCallbacks, returnValue
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to