This thread is getting rather long. I think you should open a case. That’s a 
permanent place that you can attach screenshots, stack traces. And it will also 
be found by anyone who has the same problem in future. 

Julian

> On May 7, 2025, at 2:51 AM, David Li <lidav...@apache.org> wrote:
> 
> Thanks for reporting back. (Note, the attachment didn't make it.) Hmm, I'd 
> probably need to poke at it in a debugger. But it sounds like the callbacks 
> are getting lost somehow. (I'm not a fan of how the Java API tries to adapt 
> an async API into a synchronous one, not least because it leads to weird 
> issues like this.)
> 
>> On Wed, May 7, 2025, at 14:42, Susmit Sarkar wrote:
>> Hi David
>> 
>> 1.  the python server thinks the RPC is complete: Yes
>> 2. the Java client got any (or all) of the data before getting stuck? 
>> Depends, if we are reading in batches of 1000 which is pretty low number we 
>> see data being streamed till 15000-18000 rows ultimately stuck, if we select 
>> a batch size of 100K client is stuck indefinitely
>> 3. Sharing the debug screenshot , the method was invoked as expected
>> 
>> image.png
>> 
>> The issue is only seen when the client runs on a local machine irrespective 
>> of the OS (both windows and Mac). We don't see the issue if the same client 
>> code runs in a VM server pointing to the Arrow flight server in another VM
>> Thanks,
>> Susmit
>> 
>>> On Sat, May 3, 2025 at 12:17 PM David Li <lidav...@apache.org> wrote:
>>> Thanks Susmit. I can't quite see a bug but I'm willing to believe that 
>>> Flight is buggy in Java (I have never understood why the code tries to 
>>> manually handle flow control and without my memories of 6 years ago the 
>>> code seems suspect to me now)
>>> 
>>> Do you know if (1) the python server thinks the RPC is complete, (2) the 
>>> Java client got any (or all) of the data before getting stuck? It may also 
>>> be interesting to step though the Java code with a debugger attached, and 
>>> see what the values of `pending` and `completed` are in the FlightStream 
>>> instance, and if the methods here[1] are all being hit as expected.
>>> 
>>> [1] 
>>> https://github.com/apache/arrow-java/blob/b9e37f0ccecc2651fec3487472c203bd223290e8/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java#L356
>>> 
>>> On Fri, May 2, 2025, at 14:56, Susmit Sarkar wrote:
>>>> Hi James,
>>>> 
>>>> The flight server is written in python. Here is the below code snippet.
>>>> 
>>>> We have overridden the *do_get* method only
>>>> 
>>>> getStream() and getFlightInfo() is left untouched / default implementation.
>>>> 
>>>> def do_get(self, context, ticket):
>>>>    """
>>>>        Handles client requests for data. The ticket will contain:
>>>>        - access_key: S3 access key
>>>>        - secret_key: S3 secret key
>>>>        - s3_path: Full S3 path (e.g., bucket_name/object_key)
>>>>        - mode: 'batch' (for batch streaming) or 'full' (for loading
>>>> the entire dataset)
>>>>    """
>>>>    try:
>>>>        # Parse the ticket to extract credentials, S3 path, and mode
>>>>        access_key, secret_key, s3_path, mode, batch_size = 
>>>> parse_ticket(ticket)
>>>>    except InvalidTicketFormatError as e:
>>>>        logging.error(str(e))
>>>>        raise
>>>>    except InvalidModeError as e:
>>>>        logging.error(str(e))
>>>>        raise
>>>> 
>>>>    # s3fs dont need s3a protocol.
>>>>    if s3_path.startswith("s3://"):
>>>>        s3_path = s3_path.replace("s3://", "", 1)
>>>> 
>>>>    logging.info(f"Cloudian S3 Override endpoint:
>>>> {Config.S3_ENDPOINT_OVERRIDE}")
>>>>    logging.info(f"Cloudian S3 Region: {Config.S3_REGION}")
>>>>    logging.info(f"Fetching Parquet data from S3: {s3_path} in mode: 
>>>> {mode}")
>>>> 
>>>>    # Initialize the S3 handler with credentials
>>>>    try:
>>>>        s3_handler = S3Handler(
>>>>            endpoint=Config.S3_ENDPOINT_OVERRIDE,
>>>>            region=Config.S3_REGION,
>>>>            access_key=access_key,
>>>>            secret_key=secret_key
>>>>        )
>>>>    except Exception as e:
>>>>        logging.error(f"Error initializing S3 handler: {str(e)}")
>>>>        raise S3AccessError(f"Error initializing S3 handler: {str(e)}") 
>>>> from e
>>>> 
>>>>    if mode == DataRetrievalMode.BATCH:
>>>>        try:
>>>>            # Use the get_parquet_data method for both wildcard and
>>>> non-wildcard cases
>>>>            parquet_data = s3_handler.get_parquet_data(s3_path)
>>>>            # parquet_data.schema:  This is used when parquet_data is
>>>> an instance of ds.Dataset
>>>>            # (i.e., when multiple Parquet files are being processed
>>>> as a dataset).
>>>>            #
>>>>            # parquet_data.schema_arrow: This is used when
>>>> parquet_data is an instance of pq (pyarrow.parquet) module.
>>>>            #  A single Parquet file has its own schema, accessible
>>>> via schema_arrow in PyArrow
>>>>            schema = parquet_data.schema if isinstance(parquet_data,
>>>> ds.Dataset) else parquet_data.schema_arrow
>>>>            return flight.GeneratorStream(schema,
>>>> s3_handler.stream_parquet_batches(parquet_data, batch_size))
>>>>        except OSError as e:
>>>>            logging.error(f"AWS S3 access error: {str(e)}")
>>>>            raise S3AccessError(f"Failed to access S3: {str(e)}") from e
>>>>        except Exception as e:
>>>>            logging.error(f"Error streaming Parquet data: {str(e)}")
>>>>            raise DataProcessingError(f"Error streaming Parquet data:
>>>> {str(e)}") from e
>>>> 
>>>>    # Handle 'full' mode to load the entire dataset
>>>>    elif mode == DataRetrievalMode.FULL:
>>>>        try:
>>>>            # Check if the S3 path contains a wildcard and the mode is FULL
>>>>            if "*" in s3_path:
>>>>                logging.warning(
>>>>                    f"Wildcard pattern detected in S3 path '{s3_path}'
>>>> with FULL data retrieval mode. "
>>>>                    f"This may put pressure on memory as all files
>>>> will be loaded into memory at once."
>>>>                )
>>>>            # Use the same get_parquet_data method for both wildcard
>>>> and non-wildcard cases
>>>>            parquet_data = s3_handler.get_parquet_data(s3_path)
>>>>            # Load the entire dataset into memory / Chance of OOM.
>>>>            # table = parquet_data.to_table() if
>>>> isinstance(parquet_data, ds.Dataset) else parquet_data.read_table()
>>>>            # Load the entire dataset into memory, with consideration
>>>> for Dataset vs. ParquetFile
>>>>            if isinstance(parquet_data, ds.Dataset):
>>>>                table = parquet_data.to_table()
>>>>            else:
>>>>                table = parquet_data.read()
>>>>            return flight.RecordBatchStream(table)
>>>>        except OSError as e:
>>>>            logging.error(f"AWS S3 access error: {str(e)}")
>>>>            raise S3AccessError(f"Failed to access S3: {str(e)}") from e
>>>>        except Exception as e:
>>>>            logging.error(f"Error loading full Parquet dataset: {str(e)}")
>>>>            raise DataProcessingError(f"Error loading full Parquet
>>>> dataset: {str(e)}") from e
>>>> 
>>>>    else:
>>>>        logging.error(f"Invalid mode:
>>>> {DataRetrievalMode.from_string(mode)}. Expected 'batch' or 'full'.")
>>>>        raise InvalidModeError()
>>>> 
>>>> 
>>>> // Helper functions.
>>>> 
>>>> def get_parquet_data(self, s3_path):
>>>>    """
>>>>        Retrieves Parquet data from S3. If the path contains a
>>>> wildcard pattern, it lists all matching files manually.
>>>>        If it's a single file, it reads the file directly.
>>>> 
>>>>        :param s3_path: The S3 path, which could be a wildcard pattern
>>>> or a direct file path.
>>>>        :return: PyArrow Dataset object if it's a wildcard, or a
>>>> ParquetFile object for a single file.
>>>>    """
>>>>    try:
>>>>        # Check if the path contains a wildcard
>>>>        if "*" in s3_path:
>>>>            # Split the directory and pattern (e.g., `*.parquet`)
>>>>            directory, pattern = s3_path.rsplit("/", 1)
>>>> 
>>>>            # List all files in the directory and filter using the pattern
>>>>            logging.info(f"Fetching Parquet files matching wildcard: 
>>>> {s3_path}")
>>>>            files = self.s3_fs.get_file_info(fs.FileSelector(directory))
>>>> 
>>>>            # Filter files matching the pattern (e.g., *.parquet) and
>>>> sort them by modification time (mtime_ns)
>>>>            sorted_file_paths = [file.path for file in sorted(files,
>>>> key=lambda file: file.mtime_ns) if fnmatch.fnmatch(file.path,
>>>> f"{directory}/{pattern}")]
>>>> 
>>>>            if not sorted_file_paths:
>>>>                raise FileNotFoundError(f"No files matching pattern
>>>> {pattern} found in {directory}")
>>>> 
>>>>            logging.info(f"Sorted files: {sorted_file_paths}")
>>>> 
>>>>            # Validate schemas across all files
>>>>            if not validate_schemas(sorted_file_paths, self.s3_fs):
>>>>                raise ValueError("Schema mismatch detected across files.")
>>>> 
>>>>            # Create a dataset from the matching files
>>>>            dataset = ds.dataset(sorted_file_paths, format="parquet",
>>>> filesystem=self.s3_fs)
>>>>            return dataset
>>>>        else:
>>>>            # Handle single file case: read the specific Parquet file
>>>>            logging.info(f"Fetching single Parquet file: {s3_path}")
>>>>            parquet_file = 
>>>> pq.ParquetFile(self.s3_fs.open_input_file(s3_path))
>>>>            return parquet_file
>>>>    except Exception as e:
>>>>        logging.error(f"Error fetching Parquet data from S3: {e}")
>>>>        raise e
>>>> 
>>>> @staticmethod
>>>> def stream_parquet_batches(parquet_data, batch_size=None):
>>>>    """
>>>>        Stream the Parquet data in batches. Supports both datasets
>>>> (multiple files) and single Parquet files.
>>>> 
>>>>        :param parquet_data: The Dataset or ParquetFile object to
>>>> stream data from.
>>>>        :param batch_size: The size of the batches to stream. Default
>>>> is 100,000 if not provided.
>>>>        :return: Generator for streaming Parquet batches.
>>>>    """
>>>>    try:
>>>>        # Ensure batch_size is an integer, set default if None
>>>>        if batch_size is None or not isinstance(batch_size, int):
>>>>            batch_size = 100000
>>>> 
>>>>        if isinstance(parquet_data, ds.Dataset):
>>>>            # If it's a dataset (multiple files), stream dataset
>>>> batches using `int_batch_size`
>>>>            logging.info(f"Streaming Parquet data in batches from a 
>>>> dataset")
>>>>            for batch in parquet_data.to_batches(batch_size=batch_size):
>>>>                yield batch
>>>>        else:
>>>>            # If it's a single file (ParquetFile), stream file batches
>>>> (iter_batches)
>>>>            logging.info(f"Streaming Parquet data in batches from a
>>>> single file")
>>>>            for batch in parquet_data.iter_batches(batch_size=batch_size):
>>>>                yield batch
>>>>    except Exception as e:
>>>>        logging.error(f"Error streaming Parquet batches: {e}")
>>>>        raise e
>>>> 
>>>> 
>>>> On Wed, Apr 30, 2025 at 11:14 PM James Duong
>>>> <james.du...@improving.com.invalid> wrote:
>>>> 
>>>>> Would you be able to share the server’s getStream() and getFlightInfo()
>>>>> implementations?
>>>>> 
>>>>> Note that getStream() needs should be written such that it doesn’t block
>>>>> the grpc thread.
>>>>> 
>>>>> 
>>>>> Get Outlook for Mac <https://aka.ms/GetOutlookForMac>
>>>>> 
>>>>> From: Susmit Sarkar <susmitsir...@gmail.com>
>>>>> Date: Wednesday, April 30, 2025 at 2:59 AM
>>>>> To: David Li <lidav...@apache.org>
>>>>> Cc: nik.9...@gmail.com <nik.9...@gmail.com>, dev@arrow.apache.org <
>>>>> dev@arrow.apache.org>
>>>>> Subject: Re: Query on stuck Arrow Flight Client while interacting from
>>>>> local workstation (mac)
>>>>> 
>>>>> Hi David
>>>>> 
>>>>> Sharing the arrow client thread dump for reference. Strangely if we pass a
>>>>> dummy non existent s3 path we are getting proper error from server
>>>>> 
>>>>> cef_flight_server.exceptions.S3AccessError: Failed to access S3: [Errno 2]
>>>>> Path does not exist
>>>>> 'bg0975-cef-ccmedev-data/pp/load_date=2024-11-21/part-00007.c008.snappy.parquet'.
>>>>> Detail: [errno 2] No such file or directory
>>>>> 
>>>>> Which translates the server is reachable and we do see the logs in server
>>>>> as well
>>>>> 
>>>>> It works fine if we call the client within a VM issue arises in local
>>>>> workstation, where its stuck indefinitely.
>>>>> 
>>>>> Thanks,
>>>>> Susmit
>>>>> 
>>>>> On Wed, Apr 30, 2025 at 12:54 PM David Li <lidav...@apache.org<mailto:
>>>>> lidav...@apache.org>> wrote:
>>>>> This is not specific to Flight; use jstack or your favorite
>>>>> instrumentation tool (VisualVM etc.)
>>>>> 
>>>>> On Wed, Apr 30, 2025, at 15:41, NIKHIL RANJAN wrote:
>>>>>> Hi David,
>>>>>> 
>>>>>> How to enable thread dump logs for both client and server code.
>>>>>> 
>>>>>> As of now, I don't see any error on either client side or server side. It
>>>>>> just hangs/gets stuck.
>>>>>> 
>>>>>> Thanks,
>>>>>> Nikhil
>>>>>> 
>>>>>> On Thu, 24 Apr, 2025, 14:39 Susmit Sarkar, <susmitsir...@gmail.com
>>>>> <mailto:susmitsir...@gmail.com>> wrote:
>>>>>> 
>>>>>>> Hi Team,
>>>>>>> 
>>>>>>> We are using this below code snippet in Scala to query the flight
>>>>> server,
>>>>>>> but seems to be stuck indefinitely, this issue is seen when we are
>>>>> testing
>>>>>>> from our local workstation (Mac to be precise)
>>>>>>> 
>>>>>>> Another interesting thing, it's able to propagate the error message
>>>>>>> correctly but not the FlightStream data, the same code works fine when
>>>>> we
>>>>>>> run inside a linux VM.
>>>>>>> 
>>>>>>> Do you folks see any issue in the code?
>>>>>>> 
>>>>>>> def fetchDataStreamIterator(details: BaseDataAccessDetails):
>>>>> Iterator[FlightStream] = {
>>>>>>>  logger.info<http://logger.info>(s"Fetching data for details:
>>>>> ${details.toString}")
>>>>>>>  val ticketStr = buildTicketStr(details)
>>>>>>>  logger.info<http://logger.info>(s"Generated ticket string:
>>>>> $ticketStr")
>>>>>>> 
>>>>>>>  val allocator = new RootAllocator(Long.MaxValue)
>>>>>>>  val client = FlightClient.builder(allocator,
>>>>> Location.forGrpcInsecure(serverHost, serverPort)).build()
>>>>>>> 
>>>>>>>  try {
>>>>>>>    val ticket = new Ticket(ticketStr.getBytes(StandardCharsets.UTF_8))
>>>>>>>    val stream = client.getStream(ticket)
>>>>>>> 
>>>>>>>    Iterator.continually {
>>>>>>>      if (stream.next()) Some(stream) else {
>>>>>>>        // Cleanup when no more batches
>>>>>>>        close(stream, client)
>>>>>>>        None
>>>>>>>      }
>>>>>>>    }.takeWhile(_.isDefined).flatten
>>>>>>>  } catch {
>>>>>>>    case e: FlightRuntimeException =>
>>>>>>>      logger.error(s"Error communicating with Flight server:
>>>>> ${e.getMessage}")
>>>>>>>      throw new CefFlightServerException(s"Error communicating with
>>>>> Flight server: ${e.getMessage}", e)
>>>>>>>    case e: Exception =>
>>>>>>>      logger.error(s"Failed to fetch data: ${e.getMessage}")
>>>>>>>      throw new CefFlightServerException("Failed to fetch data from
>>>>> Flight Server", e)
>>>>>>>  }
>>>>>>> }
>>>>>>> 
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> 
>>>>>>> Susmit
>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> Warning: The sender of this message could not be validated and may not be
>>>>> the actual sender.
>>>>> 

Reply via email to