Thanks for reporting back. (Note, the attachment didn't make it.) Hmm, I'd 
probably need to poke at it in a debugger. But it sounds like the callbacks are 
getting lost somehow. (I'm not a fan of how the Java API tries to adapt an 
async API into a synchronous one, not least because it leads to weird issues 
like this.)

On Wed, May 7, 2025, at 14:42, Susmit Sarkar wrote:
> Hi David
> 
> 1.  the python server thinks the RPC is complete: Yes
> 2. the Java client got any (or all) of the data before getting stuck? 
> Depends, if we are reading in batches of 1000 which is pretty low number we 
> see data being streamed till 15000-18000 rows ultimately stuck, if we select 
> a batch size of 100K client is stuck indefinitely
> 3. Sharing the debug screenshot , the method was invoked as expected
> 
> image.png
> 
> The issue is only seen when the client runs on a local machine irrespective 
> of the OS (both windows and Mac). We don't see the issue if the same client 
> code runs in a VM server pointing to the Arrow flight server in another VM
> Thanks,
> Susmit
> 
> On Sat, May 3, 2025 at 12:17 PM David Li <lidav...@apache.org> wrote:
>> Thanks Susmit. I can't quite see a bug but I'm willing to believe that 
>> Flight is buggy in Java (I have never understood why the code tries to 
>> manually handle flow control and without my memories of 6 years ago the code 
>> seems suspect to me now)
>> 
>> Do you know if (1) the python server thinks the RPC is complete, (2) the 
>> Java client got any (or all) of the data before getting stuck? It may also 
>> be interesting to step though the Java code with a debugger attached, and 
>> see what the values of `pending` and `completed` are in the FlightStream 
>> instance, and if the methods here[1] are all being hit as expected.
>> 
>> [1] 
>> https://github.com/apache/arrow-java/blob/b9e37f0ccecc2651fec3487472c203bd223290e8/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java#L356
>> 
>> On Fri, May 2, 2025, at 14:56, Susmit Sarkar wrote:
>> > Hi James,
>> >
>> > The flight server is written in python. Here is the below code snippet.
>> >
>> > We have overridden the *do_get* method only
>> >
>> > getStream() and getFlightInfo() is left untouched / default implementation.
>> >
>> > def do_get(self, context, ticket):
>> >     """
>> >         Handles client requests for data. The ticket will contain:
>> >         - access_key: S3 access key
>> >         - secret_key: S3 secret key
>> >         - s3_path: Full S3 path (e.g., bucket_name/object_key)
>> >         - mode: 'batch' (for batch streaming) or 'full' (for loading
>> > the entire dataset)
>> >     """
>> >     try:
>> >         # Parse the ticket to extract credentials, S3 path, and mode
>> >         access_key, secret_key, s3_path, mode, batch_size = 
>> > parse_ticket(ticket)
>> >     except InvalidTicketFormatError as e:
>> >         logging.error(str(e))
>> >         raise
>> >     except InvalidModeError as e:
>> >         logging.error(str(e))
>> >         raise
>> >
>> >     # s3fs dont need s3a protocol.
>> >     if s3_path.startswith("s3://"):
>> >         s3_path = s3_path.replace("s3://", "", 1)
>> >
>> >     logging.info(f"Cloudian S3 Override endpoint:
>> > {Config.S3_ENDPOINT_OVERRIDE}")
>> >     logging.info(f"Cloudian S3 Region: {Config.S3_REGION}")
>> >     logging.info(f"Fetching Parquet data from S3: {s3_path} in mode: 
>> > {mode}")
>> >
>> >     # Initialize the S3 handler with credentials
>> >     try:
>> >         s3_handler = S3Handler(
>> >             endpoint=Config.S3_ENDPOINT_OVERRIDE,
>> >             region=Config.S3_REGION,
>> >             access_key=access_key,
>> >             secret_key=secret_key
>> >         )
>> >     except Exception as e:
>> >         logging.error(f"Error initializing S3 handler: {str(e)}")
>> >         raise S3AccessError(f"Error initializing S3 handler: {str(e)}") 
>> > from e
>> >
>> >     if mode == DataRetrievalMode.BATCH:
>> >         try:
>> >             # Use the get_parquet_data method for both wildcard and
>> > non-wildcard cases
>> >             parquet_data = s3_handler.get_parquet_data(s3_path)
>> >             # parquet_data.schema:  This is used when parquet_data is
>> > an instance of ds.Dataset
>> >             # (i.e., when multiple Parquet files are being processed
>> > as a dataset).
>> >             #
>> >             # parquet_data.schema_arrow: This is used when
>> > parquet_data is an instance of pq (pyarrow.parquet) module.
>> >             #  A single Parquet file has its own schema, accessible
>> > via schema_arrow in PyArrow
>> >             schema = parquet_data.schema if isinstance(parquet_data,
>> > ds.Dataset) else parquet_data.schema_arrow
>> >             return flight.GeneratorStream(schema,
>> > s3_handler.stream_parquet_batches(parquet_data, batch_size))
>> >         except OSError as e:
>> >             logging.error(f"AWS S3 access error: {str(e)}")
>> >             raise S3AccessError(f"Failed to access S3: {str(e)}") from e
>> >         except Exception as e:
>> >             logging.error(f"Error streaming Parquet data: {str(e)}")
>> >             raise DataProcessingError(f"Error streaming Parquet data:
>> > {str(e)}") from e
>> >
>> >     # Handle 'full' mode to load the entire dataset
>> >     elif mode == DataRetrievalMode.FULL:
>> >         try:
>> >             # Check if the S3 path contains a wildcard and the mode is FULL
>> >             if "*" in s3_path:
>> >                 logging.warning(
>> >                     f"Wildcard pattern detected in S3 path '{s3_path}'
>> > with FULL data retrieval mode. "
>> >                     f"This may put pressure on memory as all files
>> > will be loaded into memory at once."
>> >                 )
>> >             # Use the same get_parquet_data method for both wildcard
>> > and non-wildcard cases
>> >             parquet_data = s3_handler.get_parquet_data(s3_path)
>> >             # Load the entire dataset into memory / Chance of OOM.
>> >             # table = parquet_data.to_table() if
>> > isinstance(parquet_data, ds.Dataset) else parquet_data.read_table()
>> >             # Load the entire dataset into memory, with consideration
>> > for Dataset vs. ParquetFile
>> >             if isinstance(parquet_data, ds.Dataset):
>> >                 table = parquet_data.to_table()
>> >             else:
>> >                 table = parquet_data.read()
>> >             return flight.RecordBatchStream(table)
>> >         except OSError as e:
>> >             logging.error(f"AWS S3 access error: {str(e)}")
>> >             raise S3AccessError(f"Failed to access S3: {str(e)}") from e
>> >         except Exception as e:
>> >             logging.error(f"Error loading full Parquet dataset: {str(e)}")
>> >             raise DataProcessingError(f"Error loading full Parquet
>> > dataset: {str(e)}") from e
>> >
>> >     else:
>> >         logging.error(f"Invalid mode:
>> > {DataRetrievalMode.from_string(mode)}. Expected 'batch' or 'full'.")
>> >         raise InvalidModeError()
>> >
>> >
>> > // Helper functions.
>> >
>> > def get_parquet_data(self, s3_path):
>> >     """
>> >         Retrieves Parquet data from S3. If the path contains a
>> > wildcard pattern, it lists all matching files manually.
>> >         If it's a single file, it reads the file directly.
>> >
>> >         :param s3_path: The S3 path, which could be a wildcard pattern
>> > or a direct file path.
>> >         :return: PyArrow Dataset object if it's a wildcard, or a
>> > ParquetFile object for a single file.
>> >     """
>> >     try:
>> >         # Check if the path contains a wildcard
>> >         if "*" in s3_path:
>> >             # Split the directory and pattern (e.g., `*.parquet`)
>> >             directory, pattern = s3_path.rsplit("/", 1)
>> >
>> >             # List all files in the directory and filter using the pattern
>> >             logging.info(f"Fetching Parquet files matching wildcard: 
>> > {s3_path}")
>> >             files = self.s3_fs.get_file_info(fs.FileSelector(directory))
>> >
>> >             # Filter files matching the pattern (e.g., *.parquet) and
>> > sort them by modification time (mtime_ns)
>> >             sorted_file_paths = [file.path for file in sorted(files,
>> > key=lambda file: file.mtime_ns) if fnmatch.fnmatch(file.path,
>> > f"{directory}/{pattern}")]
>> >
>> >             if not sorted_file_paths:
>> >                 raise FileNotFoundError(f"No files matching pattern
>> > {pattern} found in {directory}")
>> >
>> >             logging.info(f"Sorted files: {sorted_file_paths}")
>> >
>> >             # Validate schemas across all files
>> >             if not validate_schemas(sorted_file_paths, self.s3_fs):
>> >                 raise ValueError("Schema mismatch detected across files.")
>> >
>> >             # Create a dataset from the matching files
>> >             dataset = ds.dataset(sorted_file_paths, format="parquet",
>> > filesystem=self.s3_fs)
>> >             return dataset
>> >         else:
>> >             # Handle single file case: read the specific Parquet file
>> >             logging.info(f"Fetching single Parquet file: {s3_path}")
>> >             parquet_file = 
>> > pq.ParquetFile(self.s3_fs.open_input_file(s3_path))
>> >             return parquet_file
>> >     except Exception as e:
>> >         logging.error(f"Error fetching Parquet data from S3: {e}")
>> >         raise e
>> >
>> > @staticmethod
>> > def stream_parquet_batches(parquet_data, batch_size=None):
>> >     """
>> >         Stream the Parquet data in batches. Supports both datasets
>> > (multiple files) and single Parquet files.
>> >
>> >         :param parquet_data: The Dataset or ParquetFile object to
>> > stream data from.
>> >         :param batch_size: The size of the batches to stream. Default
>> > is 100,000 if not provided.
>> >         :return: Generator for streaming Parquet batches.
>> >     """
>> >     try:
>> >         # Ensure batch_size is an integer, set default if None
>> >         if batch_size is None or not isinstance(batch_size, int):
>> >             batch_size = 100000
>> >
>> >         if isinstance(parquet_data, ds.Dataset):
>> >             # If it's a dataset (multiple files), stream dataset
>> > batches using `int_batch_size`
>> >             logging.info(f"Streaming Parquet data in batches from a 
>> > dataset")
>> >             for batch in parquet_data.to_batches(batch_size=batch_size):
>> >                 yield batch
>> >         else:
>> >             # If it's a single file (ParquetFile), stream file batches
>> > (iter_batches)
>> >             logging.info(f"Streaming Parquet data in batches from a
>> > single file")
>> >             for batch in parquet_data.iter_batches(batch_size=batch_size):
>> >                 yield batch
>> >     except Exception as e:
>> >         logging.error(f"Error streaming Parquet batches: {e}")
>> >         raise e
>> >
>> >
>> > On Wed, Apr 30, 2025 at 11:14 PM James Duong
>> > <james.du...@improving.com.invalid> wrote:
>> >
>> >> Would you be able to share the server’s getStream() and getFlightInfo()
>> >> implementations?
>> >>
>> >> Note that getStream() needs should be written such that it doesn’t block
>> >> the grpc thread.
>> >>
>> >>
>> >> Get Outlook for Mac <https://aka.ms/GetOutlookForMac>
>> >>
>> >> From: Susmit Sarkar <susmitsir...@gmail.com>
>> >> Date: Wednesday, April 30, 2025 at 2:59 AM
>> >> To: David Li <lidav...@apache.org>
>> >> Cc: nik.9...@gmail.com <nik.9...@gmail.com>, dev@arrow.apache.org <
>> >> dev@arrow.apache.org>
>> >> Subject: Re: Query on stuck Arrow Flight Client while interacting from
>> >> local workstation (mac)
>> >>
>> >> Hi David
>> >>
>> >> Sharing the arrow client thread dump for reference. Strangely if we pass a
>> >> dummy non existent s3 path we are getting proper error from server
>> >>
>> >> cef_flight_server.exceptions.S3AccessError: Failed to access S3: [Errno 2]
>> >> Path does not exist
>> >> 'bg0975-cef-ccmedev-data/pp/load_date=2024-11-21/part-00007.c008.snappy.parquet'.
>> >> Detail: [errno 2] No such file or directory
>> >>
>> >> Which translates the server is reachable and we do see the logs in server
>> >> as well
>> >>
>> >> It works fine if we call the client within a VM issue arises in local
>> >> workstation, where its stuck indefinitely.
>> >>
>> >> Thanks,
>> >> Susmit
>> >>
>> >> On Wed, Apr 30, 2025 at 12:54 PM David Li <lidav...@apache.org<mailto:
>> >> lidav...@apache.org>> wrote:
>> >> This is not specific to Flight; use jstack or your favorite
>> >> instrumentation tool (VisualVM etc.)
>> >>
>> >> On Wed, Apr 30, 2025, at 15:41, NIKHIL RANJAN wrote:
>> >> > Hi David,
>> >> >
>> >> > How to enable thread dump logs for both client and server code.
>> >> >
>> >> > As of now, I don't see any error on either client side or server side. 
>> >> > It
>> >> > just hangs/gets stuck.
>> >> >
>> >> > Thanks,
>> >> > Nikhil
>> >> >
>> >> > On Thu, 24 Apr, 2025, 14:39 Susmit Sarkar, <susmitsir...@gmail.com
>> >> <mailto:susmitsir...@gmail.com>> wrote:
>> >> >
>> >> >> Hi Team,
>> >> >>
>> >> >> We are using this below code snippet in Scala to query the flight
>> >> server,
>> >> >> but seems to be stuck indefinitely, this issue is seen when we are
>> >> testing
>> >> >> from our local workstation (Mac to be precise)
>> >> >>
>> >> >> Another interesting thing, it's able to propagate the error message
>> >> >> correctly but not the FlightStream data, the same code works fine when
>> >> we
>> >> >> run inside a linux VM.
>> >> >>
>> >> >> Do you folks see any issue in the code?
>> >> >>
>> >> >> def fetchDataStreamIterator(details: BaseDataAccessDetails):
>> >> Iterator[FlightStream] = {
>> >> >>   logger.info<http://logger.info>(s"Fetching data for details:
>> >> ${details.toString}")
>> >> >>   val ticketStr = buildTicketStr(details)
>> >> >>   logger.info<http://logger.info>(s"Generated ticket string:
>> >> $ticketStr")
>> >> >>
>> >> >>   val allocator = new RootAllocator(Long.MaxValue)
>> >> >>   val client = FlightClient.builder(allocator,
>> >> Location.forGrpcInsecure(serverHost, serverPort)).build()
>> >> >>
>> >> >>   try {
>> >> >>     val ticket = new Ticket(ticketStr.getBytes(StandardCharsets.UTF_8))
>> >> >>     val stream = client.getStream(ticket)
>> >> >>
>> >> >>     Iterator.continually {
>> >> >>       if (stream.next()) Some(stream) else {
>> >> >>         // Cleanup when no more batches
>> >> >>         close(stream, client)
>> >> >>         None
>> >> >>       }
>> >> >>     }.takeWhile(_.isDefined).flatten
>> >> >>   } catch {
>> >> >>     case e: FlightRuntimeException =>
>> >> >>       logger.error(s"Error communicating with Flight server:
>> >> ${e.getMessage}")
>> >> >>       throw new CefFlightServerException(s"Error communicating with
>> >> Flight server: ${e.getMessage}", e)
>> >> >>     case e: Exception =>
>> >> >>       logger.error(s"Failed to fetch data: ${e.getMessage}")
>> >> >>       throw new CefFlightServerException("Failed to fetch data from
>> >> Flight Server", e)
>> >> >>   }
>> >> >> }
>> >> >>
>> >> >>
>> >> >> Thanks,
>> >> >>
>> >> >> Susmit
>> >> >>
>> >> >>
>> >>
>> >> Warning: The sender of this message could not be validated and may not be
>> >> the actual sender.
>> >>

Reply via email to