This thread is getting rather long. I think you should open a case. That’s a permanent place that you can attach screenshots, stack traces. And it will also be found by anyone who has the same problem in future.
Julian > On May 7, 2025, at 2:51 AM, David Li <lidav...@apache.org> wrote: > > Thanks for reporting back. (Note, the attachment didn't make it.) Hmm, I'd > probably need to poke at it in a debugger. But it sounds like the callbacks > are getting lost somehow. (I'm not a fan of how the Java API tries to adapt > an async API into a synchronous one, not least because it leads to weird > issues like this.) > >> On Wed, May 7, 2025, at 14:42, Susmit Sarkar wrote: >> Hi David >> >> 1. the python server thinks the RPC is complete: Yes >> 2. the Java client got any (or all) of the data before getting stuck? >> Depends, if we are reading in batches of 1000 which is pretty low number we >> see data being streamed till 15000-18000 rows ultimately stuck, if we select >> a batch size of 100K client is stuck indefinitely >> 3. Sharing the debug screenshot , the method was invoked as expected >> >> image.png >> >> The issue is only seen when the client runs on a local machine irrespective >> of the OS (both windows and Mac). We don't see the issue if the same client >> code runs in a VM server pointing to the Arrow flight server in another VM >> Thanks, >> Susmit >> >>> On Sat, May 3, 2025 at 12:17 PM David Li <lidav...@apache.org> wrote: >>> Thanks Susmit. I can't quite see a bug but I'm willing to believe that >>> Flight is buggy in Java (I have never understood why the code tries to >>> manually handle flow control and without my memories of 6 years ago the >>> code seems suspect to me now) >>> >>> Do you know if (1) the python server thinks the RPC is complete, (2) the >>> Java client got any (or all) of the data before getting stuck? It may also >>> be interesting to step though the Java code with a debugger attached, and >>> see what the values of `pending` and `completed` are in the FlightStream >>> instance, and if the methods here[1] are all being hit as expected. >>> >>> [1] >>> https://github.com/apache/arrow-java/blob/b9e37f0ccecc2651fec3487472c203bd223290e8/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java#L356 >>> >>> On Fri, May 2, 2025, at 14:56, Susmit Sarkar wrote: >>>> Hi James, >>>> >>>> The flight server is written in python. Here is the below code snippet. >>>> >>>> We have overridden the *do_get* method only >>>> >>>> getStream() and getFlightInfo() is left untouched / default implementation. >>>> >>>> def do_get(self, context, ticket): >>>> """ >>>> Handles client requests for data. The ticket will contain: >>>> - access_key: S3 access key >>>> - secret_key: S3 secret key >>>> - s3_path: Full S3 path (e.g., bucket_name/object_key) >>>> - mode: 'batch' (for batch streaming) or 'full' (for loading >>>> the entire dataset) >>>> """ >>>> try: >>>> # Parse the ticket to extract credentials, S3 path, and mode >>>> access_key, secret_key, s3_path, mode, batch_size = >>>> parse_ticket(ticket) >>>> except InvalidTicketFormatError as e: >>>> logging.error(str(e)) >>>> raise >>>> except InvalidModeError as e: >>>> logging.error(str(e)) >>>> raise >>>> >>>> # s3fs dont need s3a protocol. >>>> if s3_path.startswith("s3://"): >>>> s3_path = s3_path.replace("s3://", "", 1) >>>> >>>> logging.info(f"Cloudian S3 Override endpoint: >>>> {Config.S3_ENDPOINT_OVERRIDE}") >>>> logging.info(f"Cloudian S3 Region: {Config.S3_REGION}") >>>> logging.info(f"Fetching Parquet data from S3: {s3_path} in mode: >>>> {mode}") >>>> >>>> # Initialize the S3 handler with credentials >>>> try: >>>> s3_handler = S3Handler( >>>> endpoint=Config.S3_ENDPOINT_OVERRIDE, >>>> region=Config.S3_REGION, >>>> access_key=access_key, >>>> secret_key=secret_key >>>> ) >>>> except Exception as e: >>>> logging.error(f"Error initializing S3 handler: {str(e)}") >>>> raise S3AccessError(f"Error initializing S3 handler: {str(e)}") >>>> from e >>>> >>>> if mode == DataRetrievalMode.BATCH: >>>> try: >>>> # Use the get_parquet_data method for both wildcard and >>>> non-wildcard cases >>>> parquet_data = s3_handler.get_parquet_data(s3_path) >>>> # parquet_data.schema: This is used when parquet_data is >>>> an instance of ds.Dataset >>>> # (i.e., when multiple Parquet files are being processed >>>> as a dataset). >>>> # >>>> # parquet_data.schema_arrow: This is used when >>>> parquet_data is an instance of pq (pyarrow.parquet) module. >>>> # A single Parquet file has its own schema, accessible >>>> via schema_arrow in PyArrow >>>> schema = parquet_data.schema if isinstance(parquet_data, >>>> ds.Dataset) else parquet_data.schema_arrow >>>> return flight.GeneratorStream(schema, >>>> s3_handler.stream_parquet_batches(parquet_data, batch_size)) >>>> except OSError as e: >>>> logging.error(f"AWS S3 access error: {str(e)}") >>>> raise S3AccessError(f"Failed to access S3: {str(e)}") from e >>>> except Exception as e: >>>> logging.error(f"Error streaming Parquet data: {str(e)}") >>>> raise DataProcessingError(f"Error streaming Parquet data: >>>> {str(e)}") from e >>>> >>>> # Handle 'full' mode to load the entire dataset >>>> elif mode == DataRetrievalMode.FULL: >>>> try: >>>> # Check if the S3 path contains a wildcard and the mode is FULL >>>> if "*" in s3_path: >>>> logging.warning( >>>> f"Wildcard pattern detected in S3 path '{s3_path}' >>>> with FULL data retrieval mode. " >>>> f"This may put pressure on memory as all files >>>> will be loaded into memory at once." >>>> ) >>>> # Use the same get_parquet_data method for both wildcard >>>> and non-wildcard cases >>>> parquet_data = s3_handler.get_parquet_data(s3_path) >>>> # Load the entire dataset into memory / Chance of OOM. >>>> # table = parquet_data.to_table() if >>>> isinstance(parquet_data, ds.Dataset) else parquet_data.read_table() >>>> # Load the entire dataset into memory, with consideration >>>> for Dataset vs. ParquetFile >>>> if isinstance(parquet_data, ds.Dataset): >>>> table = parquet_data.to_table() >>>> else: >>>> table = parquet_data.read() >>>> return flight.RecordBatchStream(table) >>>> except OSError as e: >>>> logging.error(f"AWS S3 access error: {str(e)}") >>>> raise S3AccessError(f"Failed to access S3: {str(e)}") from e >>>> except Exception as e: >>>> logging.error(f"Error loading full Parquet dataset: {str(e)}") >>>> raise DataProcessingError(f"Error loading full Parquet >>>> dataset: {str(e)}") from e >>>> >>>> else: >>>> logging.error(f"Invalid mode: >>>> {DataRetrievalMode.from_string(mode)}. Expected 'batch' or 'full'.") >>>> raise InvalidModeError() >>>> >>>> >>>> // Helper functions. >>>> >>>> def get_parquet_data(self, s3_path): >>>> """ >>>> Retrieves Parquet data from S3. If the path contains a >>>> wildcard pattern, it lists all matching files manually. >>>> If it's a single file, it reads the file directly. >>>> >>>> :param s3_path: The S3 path, which could be a wildcard pattern >>>> or a direct file path. >>>> :return: PyArrow Dataset object if it's a wildcard, or a >>>> ParquetFile object for a single file. >>>> """ >>>> try: >>>> # Check if the path contains a wildcard >>>> if "*" in s3_path: >>>> # Split the directory and pattern (e.g., `*.parquet`) >>>> directory, pattern = s3_path.rsplit("/", 1) >>>> >>>> # List all files in the directory and filter using the pattern >>>> logging.info(f"Fetching Parquet files matching wildcard: >>>> {s3_path}") >>>> files = self.s3_fs.get_file_info(fs.FileSelector(directory)) >>>> >>>> # Filter files matching the pattern (e.g., *.parquet) and >>>> sort them by modification time (mtime_ns) >>>> sorted_file_paths = [file.path for file in sorted(files, >>>> key=lambda file: file.mtime_ns) if fnmatch.fnmatch(file.path, >>>> f"{directory}/{pattern}")] >>>> >>>> if not sorted_file_paths: >>>> raise FileNotFoundError(f"No files matching pattern >>>> {pattern} found in {directory}") >>>> >>>> logging.info(f"Sorted files: {sorted_file_paths}") >>>> >>>> # Validate schemas across all files >>>> if not validate_schemas(sorted_file_paths, self.s3_fs): >>>> raise ValueError("Schema mismatch detected across files.") >>>> >>>> # Create a dataset from the matching files >>>> dataset = ds.dataset(sorted_file_paths, format="parquet", >>>> filesystem=self.s3_fs) >>>> return dataset >>>> else: >>>> # Handle single file case: read the specific Parquet file >>>> logging.info(f"Fetching single Parquet file: {s3_path}") >>>> parquet_file = >>>> pq.ParquetFile(self.s3_fs.open_input_file(s3_path)) >>>> return parquet_file >>>> except Exception as e: >>>> logging.error(f"Error fetching Parquet data from S3: {e}") >>>> raise e >>>> >>>> @staticmethod >>>> def stream_parquet_batches(parquet_data, batch_size=None): >>>> """ >>>> Stream the Parquet data in batches. Supports both datasets >>>> (multiple files) and single Parquet files. >>>> >>>> :param parquet_data: The Dataset or ParquetFile object to >>>> stream data from. >>>> :param batch_size: The size of the batches to stream. Default >>>> is 100,000 if not provided. >>>> :return: Generator for streaming Parquet batches. >>>> """ >>>> try: >>>> # Ensure batch_size is an integer, set default if None >>>> if batch_size is None or not isinstance(batch_size, int): >>>> batch_size = 100000 >>>> >>>> if isinstance(parquet_data, ds.Dataset): >>>> # If it's a dataset (multiple files), stream dataset >>>> batches using `int_batch_size` >>>> logging.info(f"Streaming Parquet data in batches from a >>>> dataset") >>>> for batch in parquet_data.to_batches(batch_size=batch_size): >>>> yield batch >>>> else: >>>> # If it's a single file (ParquetFile), stream file batches >>>> (iter_batches) >>>> logging.info(f"Streaming Parquet data in batches from a >>>> single file") >>>> for batch in parquet_data.iter_batches(batch_size=batch_size): >>>> yield batch >>>> except Exception as e: >>>> logging.error(f"Error streaming Parquet batches: {e}") >>>> raise e >>>> >>>> >>>> On Wed, Apr 30, 2025 at 11:14 PM James Duong >>>> <james.du...@improving.com.invalid> wrote: >>>> >>>>> Would you be able to share the server’s getStream() and getFlightInfo() >>>>> implementations? >>>>> >>>>> Note that getStream() needs should be written such that it doesn’t block >>>>> the grpc thread. >>>>> >>>>> >>>>> Get Outlook for Mac <https://aka.ms/GetOutlookForMac> >>>>> >>>>> From: Susmit Sarkar <susmitsir...@gmail.com> >>>>> Date: Wednesday, April 30, 2025 at 2:59 AM >>>>> To: David Li <lidav...@apache.org> >>>>> Cc: nik.9...@gmail.com <nik.9...@gmail.com>, dev@arrow.apache.org < >>>>> dev@arrow.apache.org> >>>>> Subject: Re: Query on stuck Arrow Flight Client while interacting from >>>>> local workstation (mac) >>>>> >>>>> Hi David >>>>> >>>>> Sharing the arrow client thread dump for reference. Strangely if we pass a >>>>> dummy non existent s3 path we are getting proper error from server >>>>> >>>>> cef_flight_server.exceptions.S3AccessError: Failed to access S3: [Errno 2] >>>>> Path does not exist >>>>> 'bg0975-cef-ccmedev-data/pp/load_date=2024-11-21/part-00007.c008.snappy.parquet'. >>>>> Detail: [errno 2] No such file or directory >>>>> >>>>> Which translates the server is reachable and we do see the logs in server >>>>> as well >>>>> >>>>> It works fine if we call the client within a VM issue arises in local >>>>> workstation, where its stuck indefinitely. >>>>> >>>>> Thanks, >>>>> Susmit >>>>> >>>>> On Wed, Apr 30, 2025 at 12:54 PM David Li <lidav...@apache.org<mailto: >>>>> lidav...@apache.org>> wrote: >>>>> This is not specific to Flight; use jstack or your favorite >>>>> instrumentation tool (VisualVM etc.) >>>>> >>>>> On Wed, Apr 30, 2025, at 15:41, NIKHIL RANJAN wrote: >>>>>> Hi David, >>>>>> >>>>>> How to enable thread dump logs for both client and server code. >>>>>> >>>>>> As of now, I don't see any error on either client side or server side. It >>>>>> just hangs/gets stuck. >>>>>> >>>>>> Thanks, >>>>>> Nikhil >>>>>> >>>>>> On Thu, 24 Apr, 2025, 14:39 Susmit Sarkar, <susmitsir...@gmail.com >>>>> <mailto:susmitsir...@gmail.com>> wrote: >>>>>> >>>>>>> Hi Team, >>>>>>> >>>>>>> We are using this below code snippet in Scala to query the flight >>>>> server, >>>>>>> but seems to be stuck indefinitely, this issue is seen when we are >>>>> testing >>>>>>> from our local workstation (Mac to be precise) >>>>>>> >>>>>>> Another interesting thing, it's able to propagate the error message >>>>>>> correctly but not the FlightStream data, the same code works fine when >>>>> we >>>>>>> run inside a linux VM. >>>>>>> >>>>>>> Do you folks see any issue in the code? >>>>>>> >>>>>>> def fetchDataStreamIterator(details: BaseDataAccessDetails): >>>>> Iterator[FlightStream] = { >>>>>>> logger.info<http://logger.info>(s"Fetching data for details: >>>>> ${details.toString}") >>>>>>> val ticketStr = buildTicketStr(details) >>>>>>> logger.info<http://logger.info>(s"Generated ticket string: >>>>> $ticketStr") >>>>>>> >>>>>>> val allocator = new RootAllocator(Long.MaxValue) >>>>>>> val client = FlightClient.builder(allocator, >>>>> Location.forGrpcInsecure(serverHost, serverPort)).build() >>>>>>> >>>>>>> try { >>>>>>> val ticket = new Ticket(ticketStr.getBytes(StandardCharsets.UTF_8)) >>>>>>> val stream = client.getStream(ticket) >>>>>>> >>>>>>> Iterator.continually { >>>>>>> if (stream.next()) Some(stream) else { >>>>>>> // Cleanup when no more batches >>>>>>> close(stream, client) >>>>>>> None >>>>>>> } >>>>>>> }.takeWhile(_.isDefined).flatten >>>>>>> } catch { >>>>>>> case e: FlightRuntimeException => >>>>>>> logger.error(s"Error communicating with Flight server: >>>>> ${e.getMessage}") >>>>>>> throw new CefFlightServerException(s"Error communicating with >>>>> Flight server: ${e.getMessage}", e) >>>>>>> case e: Exception => >>>>>>> logger.error(s"Failed to fetch data: ${e.getMessage}") >>>>>>> throw new CefFlightServerException("Failed to fetch data from >>>>> Flight Server", e) >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Susmit >>>>>>> >>>>>>> >>>>> >>>>> Warning: The sender of this message could not be validated and may not be >>>>> the actual sender. >>>>>