We have opened a new issue https://github.com/apache/arrow-java/issues/753
Attached all screenshots and thread dumps, requesting for the assistance if possible. Thanks David, James and Julian for the help till now @David Li <lidav...@apache.org> : I hope you will be able to view the attachment in the issue mentioned above Regards, Susmit On Thu, May 8, 2025 at 8:50 AM Susmit Sarkar <susmitsir...@gmail.com> wrote: > Let me open a thread, its a valid suggestion, will attach the discussions > > Thanks, > Susmit > > On Wed, May 7, 2025 at 8:34 PM Julian Hyde <jhyde.apa...@gmail.com> wrote: > >> This thread is getting rather long. I think you should open a case. >> That’s a permanent place that you can attach screenshots, stack traces. And >> it will also be found by anyone who has the same problem in future. >> >> Julian >> >> > On May 7, 2025, at 2:51 AM, David Li <lidav...@apache.org> wrote: >> > >> > Thanks for reporting back. (Note, the attachment didn't make it.) Hmm, >> I'd probably need to poke at it in a debugger. But it sounds like the >> callbacks are getting lost somehow. (I'm not a fan of how the Java API >> tries to adapt an async API into a synchronous one, not least because it >> leads to weird issues like this.) >> > >> >> On Wed, May 7, 2025, at 14:42, Susmit Sarkar wrote: >> >> Hi David >> >> >> >> 1. the python server thinks the RPC is complete: Yes >> >> 2. the Java client got any (or all) of the data before getting stuck? >> Depends, if we are reading in batches of 1000 which is pretty low number we >> see data being streamed till 15000-18000 rows ultimately stuck, if we >> select a batch size of 100K client is stuck indefinitely >> >> 3. Sharing the debug screenshot , the method was invoked as expected >> >> >> >> image.png >> >> >> >> The issue is only seen when the client runs on a local machine >> irrespective of the OS (both windows and Mac). We don't see the issue if >> the same client code runs in a VM server pointing to the Arrow flight >> server in another VM >> >> Thanks, >> >> Susmit >> >> >> >>> On Sat, May 3, 2025 at 12:17 PM David Li <lidav...@apache.org> wrote: >> >>> Thanks Susmit. I can't quite see a bug but I'm willing to believe >> that Flight is buggy in Java (I have never understood why the code tries to >> manually handle flow control and without my memories of 6 years ago the >> code seems suspect to me now) >> >>> >> >>> Do you know if (1) the python server thinks the RPC is complete, (2) >> the Java client got any (or all) of the data before getting stuck? It may >> also be interesting to step though the Java code with a debugger attached, >> and see what the values of `pending` and `completed` are in the >> FlightStream instance, and if the methods here[1] are all being hit as >> expected. >> >>> >> >>> [1] >> https://github.com/apache/arrow-java/blob/b9e37f0ccecc2651fec3487472c203bd223290e8/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java#L356 >> >>> >> >>> On Fri, May 2, 2025, at 14:56, Susmit Sarkar wrote: >> >>>> Hi James, >> >>>> >> >>>> The flight server is written in python. Here is the below code >> snippet. >> >>>> >> >>>> We have overridden the *do_get* method only >> >>>> >> >>>> getStream() and getFlightInfo() is left untouched / default >> implementation. >> >>>> >> >>>> def do_get(self, context, ticket): >> >>>> """ >> >>>> Handles client requests for data. The ticket will contain: >> >>>> - access_key: S3 access key >> >>>> - secret_key: S3 secret key >> >>>> - s3_path: Full S3 path (e.g., bucket_name/object_key) >> >>>> - mode: 'batch' (for batch streaming) or 'full' (for loading >> >>>> the entire dataset) >> >>>> """ >> >>>> try: >> >>>> # Parse the ticket to extract credentials, S3 path, and mode >> >>>> access_key, secret_key, s3_path, mode, batch_size = >> parse_ticket(ticket) >> >>>> except InvalidTicketFormatError as e: >> >>>> logging.error(str(e)) >> >>>> raise >> >>>> except InvalidModeError as e: >> >>>> logging.error(str(e)) >> >>>> raise >> >>>> >> >>>> # s3fs dont need s3a protocol. >> >>>> if s3_path.startswith("s3://"): >> >>>> s3_path = s3_path.replace("s3://", "", 1) >> >>>> >> >>>> logging.info(f"Cloudian S3 Override endpoint: >> >>>> {Config.S3_ENDPOINT_OVERRIDE}") >> >>>> logging.info(f"Cloudian S3 Region: {Config.S3_REGION}") >> >>>> logging.info(f"Fetching Parquet data from S3: {s3_path} in mode: >> {mode}") >> >>>> >> >>>> # Initialize the S3 handler with credentials >> >>>> try: >> >>>> s3_handler = S3Handler( >> >>>> endpoint=Config.S3_ENDPOINT_OVERRIDE, >> >>>> region=Config.S3_REGION, >> >>>> access_key=access_key, >> >>>> secret_key=secret_key >> >>>> ) >> >>>> except Exception as e: >> >>>> logging.error(f"Error initializing S3 handler: {str(e)}") >> >>>> raise S3AccessError(f"Error initializing S3 handler: >> {str(e)}") from e >> >>>> >> >>>> if mode == DataRetrievalMode.BATCH: >> >>>> try: >> >>>> # Use the get_parquet_data method for both wildcard and >> >>>> non-wildcard cases >> >>>> parquet_data = s3_handler.get_parquet_data(s3_path) >> >>>> # parquet_data.schema: This is used when parquet_data is >> >>>> an instance of ds.Dataset >> >>>> # (i.e., when multiple Parquet files are being processed >> >>>> as a dataset). >> >>>> # >> >>>> # parquet_data.schema_arrow: This is used when >> >>>> parquet_data is an instance of pq (pyarrow.parquet) module. >> >>>> # A single Parquet file has its own schema, accessible >> >>>> via schema_arrow in PyArrow >> >>>> schema = parquet_data.schema if isinstance(parquet_data, >> >>>> ds.Dataset) else parquet_data.schema_arrow >> >>>> return flight.GeneratorStream(schema, >> >>>> s3_handler.stream_parquet_batches(parquet_data, batch_size)) >> >>>> except OSError as e: >> >>>> logging.error(f"AWS S3 access error: {str(e)}") >> >>>> raise S3AccessError(f"Failed to access S3: {str(e)}") >> from e >> >>>> except Exception as e: >> >>>> logging.error(f"Error streaming Parquet data: {str(e)}") >> >>>> raise DataProcessingError(f"Error streaming Parquet data: >> >>>> {str(e)}") from e >> >>>> >> >>>> # Handle 'full' mode to load the entire dataset >> >>>> elif mode == DataRetrievalMode.FULL: >> >>>> try: >> >>>> # Check if the S3 path contains a wildcard and the mode >> is FULL >> >>>> if "*" in s3_path: >> >>>> logging.warning( >> >>>> f"Wildcard pattern detected in S3 path '{s3_path}' >> >>>> with FULL data retrieval mode. " >> >>>> f"This may put pressure on memory as all files >> >>>> will be loaded into memory at once." >> >>>> ) >> >>>> # Use the same get_parquet_data method for both wildcard >> >>>> and non-wildcard cases >> >>>> parquet_data = s3_handler.get_parquet_data(s3_path) >> >>>> # Load the entire dataset into memory / Chance of OOM. >> >>>> # table = parquet_data.to_table() if >> >>>> isinstance(parquet_data, ds.Dataset) else parquet_data.read_table() >> >>>> # Load the entire dataset into memory, with consideration >> >>>> for Dataset vs. ParquetFile >> >>>> if isinstance(parquet_data, ds.Dataset): >> >>>> table = parquet_data.to_table() >> >>>> else: >> >>>> table = parquet_data.read() >> >>>> return flight.RecordBatchStream(table) >> >>>> except OSError as e: >> >>>> logging.error(f"AWS S3 access error: {str(e)}") >> >>>> raise S3AccessError(f"Failed to access S3: {str(e)}") >> from e >> >>>> except Exception as e: >> >>>> logging.error(f"Error loading full Parquet dataset: >> {str(e)}") >> >>>> raise DataProcessingError(f"Error loading full Parquet >> >>>> dataset: {str(e)}") from e >> >>>> >> >>>> else: >> >>>> logging.error(f"Invalid mode: >> >>>> {DataRetrievalMode.from_string(mode)}. Expected 'batch' or 'full'.") >> >>>> raise InvalidModeError() >> >>>> >> >>>> >> >>>> // Helper functions. >> >>>> >> >>>> def get_parquet_data(self, s3_path): >> >>>> """ >> >>>> Retrieves Parquet data from S3. If the path contains a >> >>>> wildcard pattern, it lists all matching files manually. >> >>>> If it's a single file, it reads the file directly. >> >>>> >> >>>> :param s3_path: The S3 path, which could be a wildcard pattern >> >>>> or a direct file path. >> >>>> :return: PyArrow Dataset object if it's a wildcard, or a >> >>>> ParquetFile object for a single file. >> >>>> """ >> >>>> try: >> >>>> # Check if the path contains a wildcard >> >>>> if "*" in s3_path: >> >>>> # Split the directory and pattern (e.g., `*.parquet`) >> >>>> directory, pattern = s3_path.rsplit("/", 1) >> >>>> >> >>>> # List all files in the directory and filter using the >> pattern >> >>>> logging.info(f"Fetching Parquet files matching wildcard: >> {s3_path}") >> >>>> files = >> self.s3_fs.get_file_info(fs.FileSelector(directory)) >> >>>> >> >>>> # Filter files matching the pattern (e.g., *.parquet) and >> >>>> sort them by modification time (mtime_ns) >> >>>> sorted_file_paths = [file.path for file in sorted(files, >> >>>> key=lambda file: file.mtime_ns) if fnmatch.fnmatch(file.path, >> >>>> f"{directory}/{pattern}")] >> >>>> >> >>>> if not sorted_file_paths: >> >>>> raise FileNotFoundError(f"No files matching pattern >> >>>> {pattern} found in {directory}") >> >>>> >> >>>> logging.info(f"Sorted files: {sorted_file_paths}") >> >>>> >> >>>> # Validate schemas across all files >> >>>> if not validate_schemas(sorted_file_paths, self.s3_fs): >> >>>> raise ValueError("Schema mismatch detected across >> files.") >> >>>> >> >>>> # Create a dataset from the matching files >> >>>> dataset = ds.dataset(sorted_file_paths, format="parquet", >> >>>> filesystem=self.s3_fs) >> >>>> return dataset >> >>>> else: >> >>>> # Handle single file case: read the specific Parquet file >> >>>> logging.info(f"Fetching single Parquet file: {s3_path}") >> >>>> parquet_file = >> pq.ParquetFile(self.s3_fs.open_input_file(s3_path)) >> >>>> return parquet_file >> >>>> except Exception as e: >> >>>> logging.error(f"Error fetching Parquet data from S3: {e}") >> >>>> raise e >> >>>> >> >>>> @staticmethod >> >>>> def stream_parquet_batches(parquet_data, batch_size=None): >> >>>> """ >> >>>> Stream the Parquet data in batches. Supports both datasets >> >>>> (multiple files) and single Parquet files. >> >>>> >> >>>> :param parquet_data: The Dataset or ParquetFile object to >> >>>> stream data from. >> >>>> :param batch_size: The size of the batches to stream. Default >> >>>> is 100,000 if not provided. >> >>>> :return: Generator for streaming Parquet batches. >> >>>> """ >> >>>> try: >> >>>> # Ensure batch_size is an integer, set default if None >> >>>> if batch_size is None or not isinstance(batch_size, int): >> >>>> batch_size = 100000 >> >>>> >> >>>> if isinstance(parquet_data, ds.Dataset): >> >>>> # If it's a dataset (multiple files), stream dataset >> >>>> batches using `int_batch_size` >> >>>> logging.info(f"Streaming Parquet data in batches from a >> dataset") >> >>>> for batch in >> parquet_data.to_batches(batch_size=batch_size): >> >>>> yield batch >> >>>> else: >> >>>> # If it's a single file (ParquetFile), stream file batches >> >>>> (iter_batches) >> >>>> logging.info(f"Streaming Parquet data in batches from a >> >>>> single file") >> >>>> for batch in >> parquet_data.iter_batches(batch_size=batch_size): >> >>>> yield batch >> >>>> except Exception as e: >> >>>> logging.error(f"Error streaming Parquet batches: {e}") >> >>>> raise e >> >>>> >> >>>> >> >>>> On Wed, Apr 30, 2025 at 11:14 PM James Duong >> >>>> <james.du...@improving.com.invalid> wrote: >> >>>> >> >>>>> Would you be able to share the server’s getStream() and >> getFlightInfo() >> >>>>> implementations? >> >>>>> >> >>>>> Note that getStream() needs should be written such that it doesn’t >> block >> >>>>> the grpc thread. >> >>>>> >> >>>>> >> >>>>> Get Outlook for Mac <https://aka.ms/GetOutlookForMac> >> >>>>> >> >>>>> From: Susmit Sarkar <susmitsir...@gmail.com> >> >>>>> Date: Wednesday, April 30, 2025 at 2:59 AM >> >>>>> To: David Li <lidav...@apache.org> >> >>>>> Cc: nik.9...@gmail.com <nik.9...@gmail.com>, dev@arrow.apache.org < >> >>>>> dev@arrow.apache.org> >> >>>>> Subject: Re: Query on stuck Arrow Flight Client while interacting >> from >> >>>>> local workstation (mac) >> >>>>> >> >>>>> Hi David >> >>>>> >> >>>>> Sharing the arrow client thread dump for reference. Strangely if we >> pass a >> >>>>> dummy non existent s3 path we are getting proper error from server >> >>>>> >> >>>>> cef_flight_server.exceptions.S3AccessError: Failed to access S3: >> [Errno 2] >> >>>>> Path does not exist >> >>>>> >> 'bg0975-cef-ccmedev-data/pp/load_date=2024-11-21/part-00007.c008.snappy.parquet'. >> >>>>> Detail: [errno 2] No such file or directory >> >>>>> >> >>>>> Which translates the server is reachable and we do see the logs in >> server >> >>>>> as well >> >>>>> >> >>>>> It works fine if we call the client within a VM issue arises in >> local >> >>>>> workstation, where its stuck indefinitely. >> >>>>> >> >>>>> Thanks, >> >>>>> Susmit >> >>>>> >> >>>>> On Wed, Apr 30, 2025 at 12:54 PM David Li <lidav...@apache.org >> <mailto: >> >>>>> lidav...@apache.org>> wrote: >> >>>>> This is not specific to Flight; use jstack or your favorite >> >>>>> instrumentation tool (VisualVM etc.) >> >>>>> >> >>>>> On Wed, Apr 30, 2025, at 15:41, NIKHIL RANJAN wrote: >> >>>>>> Hi David, >> >>>>>> >> >>>>>> How to enable thread dump logs for both client and server code. >> >>>>>> >> >>>>>> As of now, I don't see any error on either client side or server >> side. It >> >>>>>> just hangs/gets stuck. >> >>>>>> >> >>>>>> Thanks, >> >>>>>> Nikhil >> >>>>>> >> >>>>>> On Thu, 24 Apr, 2025, 14:39 Susmit Sarkar, <susmitsir...@gmail.com >> >>>>> <mailto:susmitsir...@gmail.com>> wrote: >> >>>>>> >> >>>>>>> Hi Team, >> >>>>>>> >> >>>>>>> We are using this below code snippet in Scala to query the flight >> >>>>> server, >> >>>>>>> but seems to be stuck indefinitely, this issue is seen when we are >> >>>>> testing >> >>>>>>> from our local workstation (Mac to be precise) >> >>>>>>> >> >>>>>>> Another interesting thing, it's able to propagate the error >> message >> >>>>>>> correctly but not the FlightStream data, the same code works fine >> when >> >>>>> we >> >>>>>>> run inside a linux VM. >> >>>>>>> >> >>>>>>> Do you folks see any issue in the code? >> >>>>>>> >> >>>>>>> def fetchDataStreamIterator(details: BaseDataAccessDetails): >> >>>>> Iterator[FlightStream] = { >> >>>>>>> logger.info<http://logger.info>(s"Fetching data for details: >> >>>>> ${details.toString}") >> >>>>>>> val ticketStr = buildTicketStr(details) >> >>>>>>> logger.info<http://logger.info>(s"Generated ticket string: >> >>>>> $ticketStr") >> >>>>>>> >> >>>>>>> val allocator = new RootAllocator(Long.MaxValue) >> >>>>>>> val client = FlightClient.builder(allocator, >> >>>>> Location.forGrpcInsecure(serverHost, serverPort)).build() >> >>>>>>> >> >>>>>>> try { >> >>>>>>> val ticket = new >> Ticket(ticketStr.getBytes(StandardCharsets.UTF_8)) >> >>>>>>> val stream = client.getStream(ticket) >> >>>>>>> >> >>>>>>> Iterator.continually { >> >>>>>>> if (stream.next()) Some(stream) else { >> >>>>>>> // Cleanup when no more batches >> >>>>>>> close(stream, client) >> >>>>>>> None >> >>>>>>> } >> >>>>>>> }.takeWhile(_.isDefined).flatten >> >>>>>>> } catch { >> >>>>>>> case e: FlightRuntimeException => >> >>>>>>> logger.error(s"Error communicating with Flight server: >> >>>>> ${e.getMessage}") >> >>>>>>> throw new CefFlightServerException(s"Error communicating with >> >>>>> Flight server: ${e.getMessage}", e) >> >>>>>>> case e: Exception => >> >>>>>>> logger.error(s"Failed to fetch data: ${e.getMessage}") >> >>>>>>> throw new CefFlightServerException("Failed to fetch data from >> >>>>> Flight Server", e) >> >>>>>>> } >> >>>>>>> } >> >>>>>>> >> >>>>>>> >> >>>>>>> Thanks, >> >>>>>>> >> >>>>>>> Susmit >> >>>>>>> >> >>>>>>> >> >>>>> >> >>>>> Warning: The sender of this message could not be validated and may >> not be >> >>>>> the actual sender. >> >>>>> >> >