Susmit07 opened a new issue, #44560:
URL: https://github.com/apache/arrow/issues/44560

   ### Describe the usage question you have. Please include as many useful 
details as  possible.
   
   
   ```
   # cef_flight_sdk/client/cef_flight_data_client.py
   
   import logging
   
   import pyarrow.flight as flight
   
   from cef_flight_sdk.exceptions.sdk_exceptions import CefFlightException
   from cef_flight_sdk.model.object_store_details import ObjectStoreDetails
   
   logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s 
- %(message)s')
   
   def build_ticket_str(details: ObjectStoreDetails) -> str:
       """
       Builds the ticket string used to request data from the Flight server.
   
       This ticket string is formatted with the following information from 
ObjectStoreDetails:
       - Access key
       - Secret key
       - S3 path
       - Data retrieval mode
       - Optional batch size for "batch" mode
   
       :param details: ObjectStoreDetails with all necessary fields.
       :return: Formatted ticket string.
       :raises ValueError: If batch size is provided in 'full' mode or an 
invalid mode is specified.
       """
       mode = details.data_retrieval_mode.lower()
       base_ticket_str = 
f"{details.access_key}|{details.secret_key}|{details.s3_path}|{mode}"
   
       if mode == "batch":
           return f"{base_ticket_str}|{details.batch_size or 100000}"  # Use 
100k as the default batch size
       elif mode == "full":
           if details.batch_size is not None:
               raise ValueError("Batch size cannot be provided when mode is 
'full'.")
           return base_ticket_str
       else:
           raise ValueError(f"Invalid data retrieval mode: {mode}")
   
   class CefFlightDataClient:
       """
       A client for connecting to an Apache Arrow Flight server to retrieve 
data streams from an S3-compatible object store.
   
       The CefFlightDataClient creates a new FlightClient connection for each 
stream request to improve
       resource management and scalability in multi-client scenarios. This 
setup helps avoid potential
       issues with lingering connections when handling multiple concurrent 
clients, as each request
       operates with a fresh, temporary connection that is closed upon 
completion.
   
       Attributes:
           host (str): The Flight server hostname or IP address.
           port (int): The Flight server port number.
           use_tls (bool): Specifies if TLS should be used for secure 
connections.
           tls_roots_certs_path (str): Path to trusted TLS certificates if TLS 
is enabled.
       """
   
       def __init__(self, host: str, port: int, use_tls: bool = False, 
tls_roots_certs_path: str = None):
           """
           Initialize the CefFlightDataClient with server host, port, and 
optional TLS settings.
   
           :param host: Hostname or IP address of the Flight server.
           :param port: Port number of the Flight server.
           :param use_tls: Whether to use TLS for secure connections.
           :param tls_roots_certs_path: Path to trusted TLS certificates if TLS 
is enabled.
           """
           self.host = host
           self.port = port
           self.use_tls = use_tls
           self.tls_roots_certs_path = tls_roots_certs_path
   
       def _connect_to_flight_server(self):
           """Establishes a connection to the Flight server with optional 
TLS."""
           # Create Location based on whether TLS is used
           connection_args = {}
   
           self.location = (
               flight.Location.for_grpc_tls(self.host, self.port) if 
self.use_tls
               else flight.Location.for_grpc_tcp(self.host, self.port)
           )
   
           # Handle TLS if enabled
           if self.tls_roots_certs_path:
               with open(self.tls_roots_certs_path, "rb") as root_certs_file:
                   connection_args["tls_roots_certs_path"] = 
root_certs_file.read()
   
           # Use Location to initialize the client
           return flight.FlightClient(self.location)
   
       def get_stream_iterator(self, object_store_details: ObjectStoreDetails):
           """
           Retrieves a data stream from the Flight server as an iterator of 
RecordBatch objects.
   
           A new FlightClient instance is created for each call to 
`get_stream_iterator` to ensure that
           connections are fresh and not reused across multiple requests. This 
design improves scalability
           by avoiding potential connection reuse issues in multi-client 
scenarios, allowing each data
           retrieval to use an isolated, short-lived connection.
   
           :param object_store_details: ObjectStoreDetails with access key, 
secret key, S3 path,
                                        data retrieval mode, and optional batch 
size.
           :yield: pyarrow.RecordBatch from each FlightStreamChunk in the data 
stream.
           :raises CefFlightException: If there is an error in fetching the 
data stream.
           """
           ticket_str = build_ticket_str(object_store_details)
           ticket = flight.Ticket(ticket_str.encode('utf-8'))
           client = None
           try:
               client = self._connect_to_flight_server()  # Initialize a new 
client
               data_stream = client.do_get(ticket)
               for chunk in data_stream:
                   yield chunk.data
           except Exception as e:
               logging.error(f"Error processing stream: {e}")
               raise CefFlightException("Error processing stream", e)
           finally:
               if client:
                   client.close()
                   logging.info("Flight client closed after streaming.")
   ```
   
   Main code to interact with get_stream_iterator
   
   ```
   from cef_flight_sdk.builder.object_store_details_builder import 
ObjectStoreDetailsBuilder
   from cef_flight_sdk.client.cef_flight_client import CefFlightDataClient
   from cef_flight_sdk.config.config_loader import ConfigLoader
   from cef_flight_sdk.model.data_retrieval_mode import DataRetrievalMode
   
   def main():
       """
       Main entry point for processing data from an Apache Arrow Flight server.
   
       This script:
         1. Loads configuration settings.
         2. Initializes a Flight client to connect to the Flight server.
         3. Builds ObjectStoreDetails for specific and wildcard S3 paths.
         4. Retrieves and displays data streams for both specific and wildcard 
paths.
   
       Important:
       - Ensure the configuration file is correctly set up with appropriate S3 
and Flight server details.
       """
       # Load configuration
       ConfigLoader.load_config()
       host = ConfigLoader.get_property("flight_server.host")
       port = int(ConfigLoader.get_property("flight_server.port"))
   
       # Initialize the Flight client
       client = CefFlightDataClient(host=host, port=port)
   
       # Build ObjectStoreDetails for non-wildcard and wildcard paths
       object_store_details_non_wildcard = build_object_store_details(
           s3_path=ConfigLoader.get_property("s3.non_wildcard_path"),
           mode=DataRetrievalMode.BATCH
       )
       object_store_details_wildcard = build_object_store_details(
           s3_path=ConfigLoader.get_property("s3.wildcard_path"),
           mode=DataRetrievalMode.BATCH
       )
   
       # Process and print the schema and top 10 rows for wildcard and 
non-wildcard paths
       print("\n--- Processing Non-Wildcard Path ---")
       process_stream(client, object_store_details_non_wildcard)
   
       print("\n--- Processing Wildcard Path ---")
       process_stream(client, object_store_details_wildcard)
   
       # Optionally, write the non-wildcard data to a local Parquet file
       # output_file_path = ConfigLoader.get_property("output.file_path")
       # write_data_to_parquet(client, object_store_details_non_wildcard, 
output_file_path=output_file_path)
   
   
   def build_object_store_details(s3_path: str, mode: DataRetrievalMode):
       """
       Helper function to construct ObjectStoreDetails using the 
ObjectStoreDetailsBuilder.
   
       :param s3_path: Path in the S3-compatible storage for the target data.
       :param mode: Data retrieval mode, either 'batch' or 'full'.
       :return: Configured ObjectStoreDetails instance.
       """
       return (
           ObjectStoreDetailsBuilder()
           .with_access_key(ConfigLoader.get_property("s3.access_key"))
           .with_secret_key(ConfigLoader.get_property("s3.secret_key"))
           .with_s3_path(s3_path)
           .with_data_retrieval_mode(mode)
           .build()
       )
   
   
   def process_stream(client, object_store_details):
       """
       Fetches the data stream from the Flight server and displays row counts 
and top 10 rows.
   
       :param client: CefFlightDataClient instance to interact with the Flight 
server.
       :param object_store_details: ObjectStoreDetails instance containing the 
access and retrieval details.
       """
   
       for batch in client.get_stream_iterator(object_store_details):
           print("Number of rows:", batch.num_rows)
   
           # Optionally convert to pandas for easier handling
           df = batch.to_pandas()
           print("Top 10 rows:\n", df.head(10))
   
   
   if __name__ == "__main__":
       main()
   ```
   
   Hello Developers and Community members I am seeing an issue, the flight 
client is getting stuck very often during reading the streams. Never seen the 
issue with scala but with python seeing the same.
   
   Requesting your guidance
   
   
   
   
   ### Component(s)
   
   FlightRPC


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to