Susmit07 opened a new issue, #44560:
URL: https://github.com/apache/arrow/issues/44560
### Describe the usage question you have. Please include as many useful
details as possible.
```
# cef_flight_sdk/client/cef_flight_data_client.py
import logging
import pyarrow.flight as flight
from cef_flight_sdk.exceptions.sdk_exceptions import CefFlightException
from cef_flight_sdk.model.object_store_details import ObjectStoreDetails
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s
- %(message)s')
def build_ticket_str(details: ObjectStoreDetails) -> str:
"""
Builds the ticket string used to request data from the Flight server.
This ticket string is formatted with the following information from
ObjectStoreDetails:
- Access key
- Secret key
- S3 path
- Data retrieval mode
- Optional batch size for "batch" mode
:param details: ObjectStoreDetails with all necessary fields.
:return: Formatted ticket string.
:raises ValueError: If batch size is provided in 'full' mode or an
invalid mode is specified.
"""
mode = details.data_retrieval_mode.lower()
base_ticket_str =
f"{details.access_key}|{details.secret_key}|{details.s3_path}|{mode}"
if mode == "batch":
return f"{base_ticket_str}|{details.batch_size or 100000}" # Use
100k as the default batch size
elif mode == "full":
if details.batch_size is not None:
raise ValueError("Batch size cannot be provided when mode is
'full'.")
return base_ticket_str
else:
raise ValueError(f"Invalid data retrieval mode: {mode}")
class CefFlightDataClient:
"""
A client for connecting to an Apache Arrow Flight server to retrieve
data streams from an S3-compatible object store.
The CefFlightDataClient creates a new FlightClient connection for each
stream request to improve
resource management and scalability in multi-client scenarios. This
setup helps avoid potential
issues with lingering connections when handling multiple concurrent
clients, as each request
operates with a fresh, temporary connection that is closed upon
completion.
Attributes:
host (str): The Flight server hostname or IP address.
port (int): The Flight server port number.
use_tls (bool): Specifies if TLS should be used for secure
connections.
tls_roots_certs_path (str): Path to trusted TLS certificates if TLS
is enabled.
"""
def __init__(self, host: str, port: int, use_tls: bool = False,
tls_roots_certs_path: str = None):
"""
Initialize the CefFlightDataClient with server host, port, and
optional TLS settings.
:param host: Hostname or IP address of the Flight server.
:param port: Port number of the Flight server.
:param use_tls: Whether to use TLS for secure connections.
:param tls_roots_certs_path: Path to trusted TLS certificates if TLS
is enabled.
"""
self.host = host
self.port = port
self.use_tls = use_tls
self.tls_roots_certs_path = tls_roots_certs_path
def _connect_to_flight_server(self):
"""Establishes a connection to the Flight server with optional
TLS."""
# Create Location based on whether TLS is used
connection_args = {}
self.location = (
flight.Location.for_grpc_tls(self.host, self.port) if
self.use_tls
else flight.Location.for_grpc_tcp(self.host, self.port)
)
# Handle TLS if enabled
if self.tls_roots_certs_path:
with open(self.tls_roots_certs_path, "rb") as root_certs_file:
connection_args["tls_roots_certs_path"] =
root_certs_file.read()
# Use Location to initialize the client
return flight.FlightClient(self.location)
def get_stream_iterator(self, object_store_details: ObjectStoreDetails):
"""
Retrieves a data stream from the Flight server as an iterator of
RecordBatch objects.
A new FlightClient instance is created for each call to
`get_stream_iterator` to ensure that
connections are fresh and not reused across multiple requests. This
design improves scalability
by avoiding potential connection reuse issues in multi-client
scenarios, allowing each data
retrieval to use an isolated, short-lived connection.
:param object_store_details: ObjectStoreDetails with access key,
secret key, S3 path,
data retrieval mode, and optional batch
size.
:yield: pyarrow.RecordBatch from each FlightStreamChunk in the data
stream.
:raises CefFlightException: If there is an error in fetching the
data stream.
"""
ticket_str = build_ticket_str(object_store_details)
ticket = flight.Ticket(ticket_str.encode('utf-8'))
client = None
try:
client = self._connect_to_flight_server() # Initialize a new
client
data_stream = client.do_get(ticket)
for chunk in data_stream:
yield chunk.data
except Exception as e:
logging.error(f"Error processing stream: {e}")
raise CefFlightException("Error processing stream", e)
finally:
if client:
client.close()
logging.info("Flight client closed after streaming.")
```
Main code to interact with get_stream_iterator
```
from cef_flight_sdk.builder.object_store_details_builder import
ObjectStoreDetailsBuilder
from cef_flight_sdk.client.cef_flight_client import CefFlightDataClient
from cef_flight_sdk.config.config_loader import ConfigLoader
from cef_flight_sdk.model.data_retrieval_mode import DataRetrievalMode
def main():
"""
Main entry point for processing data from an Apache Arrow Flight server.
This script:
1. Loads configuration settings.
2. Initializes a Flight client to connect to the Flight server.
3. Builds ObjectStoreDetails for specific and wildcard S3 paths.
4. Retrieves and displays data streams for both specific and wildcard
paths.
Important:
- Ensure the configuration file is correctly set up with appropriate S3
and Flight server details.
"""
# Load configuration
ConfigLoader.load_config()
host = ConfigLoader.get_property("flight_server.host")
port = int(ConfigLoader.get_property("flight_server.port"))
# Initialize the Flight client
client = CefFlightDataClient(host=host, port=port)
# Build ObjectStoreDetails for non-wildcard and wildcard paths
object_store_details_non_wildcard = build_object_store_details(
s3_path=ConfigLoader.get_property("s3.non_wildcard_path"),
mode=DataRetrievalMode.BATCH
)
object_store_details_wildcard = build_object_store_details(
s3_path=ConfigLoader.get_property("s3.wildcard_path"),
mode=DataRetrievalMode.BATCH
)
# Process and print the schema and top 10 rows for wildcard and
non-wildcard paths
print("\n--- Processing Non-Wildcard Path ---")
process_stream(client, object_store_details_non_wildcard)
print("\n--- Processing Wildcard Path ---")
process_stream(client, object_store_details_wildcard)
# Optionally, write the non-wildcard data to a local Parquet file
# output_file_path = ConfigLoader.get_property("output.file_path")
# write_data_to_parquet(client, object_store_details_non_wildcard,
output_file_path=output_file_path)
def build_object_store_details(s3_path: str, mode: DataRetrievalMode):
"""
Helper function to construct ObjectStoreDetails using the
ObjectStoreDetailsBuilder.
:param s3_path: Path in the S3-compatible storage for the target data.
:param mode: Data retrieval mode, either 'batch' or 'full'.
:return: Configured ObjectStoreDetails instance.
"""
return (
ObjectStoreDetailsBuilder()
.with_access_key(ConfigLoader.get_property("s3.access_key"))
.with_secret_key(ConfigLoader.get_property("s3.secret_key"))
.with_s3_path(s3_path)
.with_data_retrieval_mode(mode)
.build()
)
def process_stream(client, object_store_details):
"""
Fetches the data stream from the Flight server and displays row counts
and top 10 rows.
:param client: CefFlightDataClient instance to interact with the Flight
server.
:param object_store_details: ObjectStoreDetails instance containing the
access and retrieval details.
"""
for batch in client.get_stream_iterator(object_store_details):
print("Number of rows:", batch.num_rows)
# Optionally convert to pandas for easier handling
df = batch.to_pandas()
print("Top 10 rows:\n", df.head(10))
if __name__ == "__main__":
main()
```
Hello Developers and Community members I am seeing an issue, the flight
client is getting stuck very often during reading the streams. Never seen the
issue with scala but with python seeing the same.
Requesting your guidance
### Component(s)
FlightRPC
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]