Thanks for reporting back. (Note, the attachment didn't make it.) Hmm, I'd probably need to poke at it in a debugger. But it sounds like the callbacks are getting lost somehow. (I'm not a fan of how the Java API tries to adapt an async API into a synchronous one, not least because it leads to weird issues like this.)
On Wed, May 7, 2025, at 14:42, Susmit Sarkar wrote: > Hi David > > 1. the python server thinks the RPC is complete: Yes > 2. the Java client got any (or all) of the data before getting stuck? > Depends, if we are reading in batches of 1000 which is pretty low number we > see data being streamed till 15000-18000 rows ultimately stuck, if we select > a batch size of 100K client is stuck indefinitely > 3. Sharing the debug screenshot , the method was invoked as expected > > image.png > > The issue is only seen when the client runs on a local machine irrespective > of the OS (both windows and Mac). We don't see the issue if the same client > code runs in a VM server pointing to the Arrow flight server in another VM > Thanks, > Susmit > > On Sat, May 3, 2025 at 12:17 PM David Li <lidav...@apache.org> wrote: >> Thanks Susmit. I can't quite see a bug but I'm willing to believe that >> Flight is buggy in Java (I have never understood why the code tries to >> manually handle flow control and without my memories of 6 years ago the code >> seems suspect to me now) >> >> Do you know if (1) the python server thinks the RPC is complete, (2) the >> Java client got any (or all) of the data before getting stuck? It may also >> be interesting to step though the Java code with a debugger attached, and >> see what the values of `pending` and `completed` are in the FlightStream >> instance, and if the methods here[1] are all being hit as expected. >> >> [1] >> https://github.com/apache/arrow-java/blob/b9e37f0ccecc2651fec3487472c203bd223290e8/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java#L356 >> >> On Fri, May 2, 2025, at 14:56, Susmit Sarkar wrote: >> > Hi James, >> > >> > The flight server is written in python. Here is the below code snippet. >> > >> > We have overridden the *do_get* method only >> > >> > getStream() and getFlightInfo() is left untouched / default implementation. >> > >> > def do_get(self, context, ticket): >> > """ >> > Handles client requests for data. The ticket will contain: >> > - access_key: S3 access key >> > - secret_key: S3 secret key >> > - s3_path: Full S3 path (e.g., bucket_name/object_key) >> > - mode: 'batch' (for batch streaming) or 'full' (for loading >> > the entire dataset) >> > """ >> > try: >> > # Parse the ticket to extract credentials, S3 path, and mode >> > access_key, secret_key, s3_path, mode, batch_size = >> > parse_ticket(ticket) >> > except InvalidTicketFormatError as e: >> > logging.error(str(e)) >> > raise >> > except InvalidModeError as e: >> > logging.error(str(e)) >> > raise >> > >> > # s3fs dont need s3a protocol. >> > if s3_path.startswith("s3://"): >> > s3_path = s3_path.replace("s3://", "", 1) >> > >> > logging.info(f"Cloudian S3 Override endpoint: >> > {Config.S3_ENDPOINT_OVERRIDE}") >> > logging.info(f"Cloudian S3 Region: {Config.S3_REGION}") >> > logging.info(f"Fetching Parquet data from S3: {s3_path} in mode: >> > {mode}") >> > >> > # Initialize the S3 handler with credentials >> > try: >> > s3_handler = S3Handler( >> > endpoint=Config.S3_ENDPOINT_OVERRIDE, >> > region=Config.S3_REGION, >> > access_key=access_key, >> > secret_key=secret_key >> > ) >> > except Exception as e: >> > logging.error(f"Error initializing S3 handler: {str(e)}") >> > raise S3AccessError(f"Error initializing S3 handler: {str(e)}") >> > from e >> > >> > if mode == DataRetrievalMode.BATCH: >> > try: >> > # Use the get_parquet_data method for both wildcard and >> > non-wildcard cases >> > parquet_data = s3_handler.get_parquet_data(s3_path) >> > # parquet_data.schema: This is used when parquet_data is >> > an instance of ds.Dataset >> > # (i.e., when multiple Parquet files are being processed >> > as a dataset). >> > # >> > # parquet_data.schema_arrow: This is used when >> > parquet_data is an instance of pq (pyarrow.parquet) module. >> > # A single Parquet file has its own schema, accessible >> > via schema_arrow in PyArrow >> > schema = parquet_data.schema if isinstance(parquet_data, >> > ds.Dataset) else parquet_data.schema_arrow >> > return flight.GeneratorStream(schema, >> > s3_handler.stream_parquet_batches(parquet_data, batch_size)) >> > except OSError as e: >> > logging.error(f"AWS S3 access error: {str(e)}") >> > raise S3AccessError(f"Failed to access S3: {str(e)}") from e >> > except Exception as e: >> > logging.error(f"Error streaming Parquet data: {str(e)}") >> > raise DataProcessingError(f"Error streaming Parquet data: >> > {str(e)}") from e >> > >> > # Handle 'full' mode to load the entire dataset >> > elif mode == DataRetrievalMode.FULL: >> > try: >> > # Check if the S3 path contains a wildcard and the mode is FULL >> > if "*" in s3_path: >> > logging.warning( >> > f"Wildcard pattern detected in S3 path '{s3_path}' >> > with FULL data retrieval mode. " >> > f"This may put pressure on memory as all files >> > will be loaded into memory at once." >> > ) >> > # Use the same get_parquet_data method for both wildcard >> > and non-wildcard cases >> > parquet_data = s3_handler.get_parquet_data(s3_path) >> > # Load the entire dataset into memory / Chance of OOM. >> > # table = parquet_data.to_table() if >> > isinstance(parquet_data, ds.Dataset) else parquet_data.read_table() >> > # Load the entire dataset into memory, with consideration >> > for Dataset vs. ParquetFile >> > if isinstance(parquet_data, ds.Dataset): >> > table = parquet_data.to_table() >> > else: >> > table = parquet_data.read() >> > return flight.RecordBatchStream(table) >> > except OSError as e: >> > logging.error(f"AWS S3 access error: {str(e)}") >> > raise S3AccessError(f"Failed to access S3: {str(e)}") from e >> > except Exception as e: >> > logging.error(f"Error loading full Parquet dataset: {str(e)}") >> > raise DataProcessingError(f"Error loading full Parquet >> > dataset: {str(e)}") from e >> > >> > else: >> > logging.error(f"Invalid mode: >> > {DataRetrievalMode.from_string(mode)}. Expected 'batch' or 'full'.") >> > raise InvalidModeError() >> > >> > >> > // Helper functions. >> > >> > def get_parquet_data(self, s3_path): >> > """ >> > Retrieves Parquet data from S3. If the path contains a >> > wildcard pattern, it lists all matching files manually. >> > If it's a single file, it reads the file directly. >> > >> > :param s3_path: The S3 path, which could be a wildcard pattern >> > or a direct file path. >> > :return: PyArrow Dataset object if it's a wildcard, or a >> > ParquetFile object for a single file. >> > """ >> > try: >> > # Check if the path contains a wildcard >> > if "*" in s3_path: >> > # Split the directory and pattern (e.g., `*.parquet`) >> > directory, pattern = s3_path.rsplit("/", 1) >> > >> > # List all files in the directory and filter using the pattern >> > logging.info(f"Fetching Parquet files matching wildcard: >> > {s3_path}") >> > files = self.s3_fs.get_file_info(fs.FileSelector(directory)) >> > >> > # Filter files matching the pattern (e.g., *.parquet) and >> > sort them by modification time (mtime_ns) >> > sorted_file_paths = [file.path for file in sorted(files, >> > key=lambda file: file.mtime_ns) if fnmatch.fnmatch(file.path, >> > f"{directory}/{pattern}")] >> > >> > if not sorted_file_paths: >> > raise FileNotFoundError(f"No files matching pattern >> > {pattern} found in {directory}") >> > >> > logging.info(f"Sorted files: {sorted_file_paths}") >> > >> > # Validate schemas across all files >> > if not validate_schemas(sorted_file_paths, self.s3_fs): >> > raise ValueError("Schema mismatch detected across files.") >> > >> > # Create a dataset from the matching files >> > dataset = ds.dataset(sorted_file_paths, format="parquet", >> > filesystem=self.s3_fs) >> > return dataset >> > else: >> > # Handle single file case: read the specific Parquet file >> > logging.info(f"Fetching single Parquet file: {s3_path}") >> > parquet_file = >> > pq.ParquetFile(self.s3_fs.open_input_file(s3_path)) >> > return parquet_file >> > except Exception as e: >> > logging.error(f"Error fetching Parquet data from S3: {e}") >> > raise e >> > >> > @staticmethod >> > def stream_parquet_batches(parquet_data, batch_size=None): >> > """ >> > Stream the Parquet data in batches. Supports both datasets >> > (multiple files) and single Parquet files. >> > >> > :param parquet_data: The Dataset or ParquetFile object to >> > stream data from. >> > :param batch_size: The size of the batches to stream. Default >> > is 100,000 if not provided. >> > :return: Generator for streaming Parquet batches. >> > """ >> > try: >> > # Ensure batch_size is an integer, set default if None >> > if batch_size is None or not isinstance(batch_size, int): >> > batch_size = 100000 >> > >> > if isinstance(parquet_data, ds.Dataset): >> > # If it's a dataset (multiple files), stream dataset >> > batches using `int_batch_size` >> > logging.info(f"Streaming Parquet data in batches from a >> > dataset") >> > for batch in parquet_data.to_batches(batch_size=batch_size): >> > yield batch >> > else: >> > # If it's a single file (ParquetFile), stream file batches >> > (iter_batches) >> > logging.info(f"Streaming Parquet data in batches from a >> > single file") >> > for batch in parquet_data.iter_batches(batch_size=batch_size): >> > yield batch >> > except Exception as e: >> > logging.error(f"Error streaming Parquet batches: {e}") >> > raise e >> > >> > >> > On Wed, Apr 30, 2025 at 11:14 PM James Duong >> > <james.du...@improving.com.invalid> wrote: >> > >> >> Would you be able to share the server’s getStream() and getFlightInfo() >> >> implementations? >> >> >> >> Note that getStream() needs should be written such that it doesn’t block >> >> the grpc thread. >> >> >> >> >> >> Get Outlook for Mac <https://aka.ms/GetOutlookForMac> >> >> >> >> From: Susmit Sarkar <susmitsir...@gmail.com> >> >> Date: Wednesday, April 30, 2025 at 2:59 AM >> >> To: David Li <lidav...@apache.org> >> >> Cc: nik.9...@gmail.com <nik.9...@gmail.com>, dev@arrow.apache.org < >> >> dev@arrow.apache.org> >> >> Subject: Re: Query on stuck Arrow Flight Client while interacting from >> >> local workstation (mac) >> >> >> >> Hi David >> >> >> >> Sharing the arrow client thread dump for reference. Strangely if we pass a >> >> dummy non existent s3 path we are getting proper error from server >> >> >> >> cef_flight_server.exceptions.S3AccessError: Failed to access S3: [Errno 2] >> >> Path does not exist >> >> 'bg0975-cef-ccmedev-data/pp/load_date=2024-11-21/part-00007.c008.snappy.parquet'. >> >> Detail: [errno 2] No such file or directory >> >> >> >> Which translates the server is reachable and we do see the logs in server >> >> as well >> >> >> >> It works fine if we call the client within a VM issue arises in local >> >> workstation, where its stuck indefinitely. >> >> >> >> Thanks, >> >> Susmit >> >> >> >> On Wed, Apr 30, 2025 at 12:54 PM David Li <lidav...@apache.org<mailto: >> >> lidav...@apache.org>> wrote: >> >> This is not specific to Flight; use jstack or your favorite >> >> instrumentation tool (VisualVM etc.) >> >> >> >> On Wed, Apr 30, 2025, at 15:41, NIKHIL RANJAN wrote: >> >> > Hi David, >> >> > >> >> > How to enable thread dump logs for both client and server code. >> >> > >> >> > As of now, I don't see any error on either client side or server side. >> >> > It >> >> > just hangs/gets stuck. >> >> > >> >> > Thanks, >> >> > Nikhil >> >> > >> >> > On Thu, 24 Apr, 2025, 14:39 Susmit Sarkar, <susmitsir...@gmail.com >> >> <mailto:susmitsir...@gmail.com>> wrote: >> >> > >> >> >> Hi Team, >> >> >> >> >> >> We are using this below code snippet in Scala to query the flight >> >> server, >> >> >> but seems to be stuck indefinitely, this issue is seen when we are >> >> testing >> >> >> from our local workstation (Mac to be precise) >> >> >> >> >> >> Another interesting thing, it's able to propagate the error message >> >> >> correctly but not the FlightStream data, the same code works fine when >> >> we >> >> >> run inside a linux VM. >> >> >> >> >> >> Do you folks see any issue in the code? >> >> >> >> >> >> def fetchDataStreamIterator(details: BaseDataAccessDetails): >> >> Iterator[FlightStream] = { >> >> >> logger.info<http://logger.info>(s"Fetching data for details: >> >> ${details.toString}") >> >> >> val ticketStr = buildTicketStr(details) >> >> >> logger.info<http://logger.info>(s"Generated ticket string: >> >> $ticketStr") >> >> >> >> >> >> val allocator = new RootAllocator(Long.MaxValue) >> >> >> val client = FlightClient.builder(allocator, >> >> Location.forGrpcInsecure(serverHost, serverPort)).build() >> >> >> >> >> >> try { >> >> >> val ticket = new Ticket(ticketStr.getBytes(StandardCharsets.UTF_8)) >> >> >> val stream = client.getStream(ticket) >> >> >> >> >> >> Iterator.continually { >> >> >> if (stream.next()) Some(stream) else { >> >> >> // Cleanup when no more batches >> >> >> close(stream, client) >> >> >> None >> >> >> } >> >> >> }.takeWhile(_.isDefined).flatten >> >> >> } catch { >> >> >> case e: FlightRuntimeException => >> >> >> logger.error(s"Error communicating with Flight server: >> >> ${e.getMessage}") >> >> >> throw new CefFlightServerException(s"Error communicating with >> >> Flight server: ${e.getMessage}", e) >> >> >> case e: Exception => >> >> >> logger.error(s"Failed to fetch data: ${e.getMessage}") >> >> >> throw new CefFlightServerException("Failed to fetch data from >> >> Flight Server", e) >> >> >> } >> >> >> } >> >> >> >> >> >> >> >> >> Thanks, >> >> >> >> >> >> Susmit >> >> >> >> >> >> >> >> >> >> Warning: The sender of this message could not be validated and may not be >> >> the actual sender. >> >>