We have opened a new issue

https://github.com/apache/arrow-java/issues/753

Attached all screenshots and thread dumps, requesting for the assistance if
possible.

Thanks David, James and Julian for the help till now

@David Li <lidav...@apache.org> : I hope you will be able to view the
attachment in the issue mentioned above

Regards,
Susmit

On Thu, May 8, 2025 at 8:50 AM Susmit Sarkar <susmitsir...@gmail.com> wrote:

> Let me open a thread, its a valid suggestion, will attach the discussions
>
> Thanks,
> Susmit
>
> On Wed, May 7, 2025 at 8:34 PM Julian Hyde <jhyde.apa...@gmail.com> wrote:
>
>> This thread is getting rather long. I think you should open a case.
>> That’s a permanent place that you can attach screenshots, stack traces. And
>> it will also be found by anyone who has the same problem in future.
>>
>> Julian
>>
>> > On May 7, 2025, at 2:51 AM, David Li <lidav...@apache.org> wrote:
>> >
>> > Thanks for reporting back. (Note, the attachment didn't make it.) Hmm,
>> I'd probably need to poke at it in a debugger. But it sounds like the
>> callbacks are getting lost somehow. (I'm not a fan of how the Java API
>> tries to adapt an async API into a synchronous one, not least because it
>> leads to weird issues like this.)
>> >
>> >> On Wed, May 7, 2025, at 14:42, Susmit Sarkar wrote:
>> >> Hi David
>> >>
>> >> 1.  the python server thinks the RPC is complete: Yes
>> >> 2. the Java client got any (or all) of the data before getting stuck?
>> Depends, if we are reading in batches of 1000 which is pretty low number we
>> see data being streamed till 15000-18000 rows ultimately stuck, if we
>> select a batch size of 100K client is stuck indefinitely
>> >> 3. Sharing the debug screenshot , the method was invoked as expected
>> >>
>> >> image.png
>> >>
>> >> The issue is only seen when the client runs on a local machine
>> irrespective of the OS (both windows and Mac). We don't see the issue if
>> the same client code runs in a VM server pointing to the Arrow flight
>> server in another VM
>> >> Thanks,
>> >> Susmit
>> >>
>> >>> On Sat, May 3, 2025 at 12:17 PM David Li <lidav...@apache.org> wrote:
>> >>> Thanks Susmit. I can't quite see a bug but I'm willing to believe
>> that Flight is buggy in Java (I have never understood why the code tries to
>> manually handle flow control and without my memories of 6 years ago the
>> code seems suspect to me now)
>> >>>
>> >>> Do you know if (1) the python server thinks the RPC is complete, (2)
>> the Java client got any (or all) of the data before getting stuck? It may
>> also be interesting to step though the Java code with a debugger attached,
>> and see what the values of `pending` and `completed` are in the
>> FlightStream instance, and if the methods here[1] are all being hit as
>> expected.
>> >>>
>> >>> [1]
>> https://github.com/apache/arrow-java/blob/b9e37f0ccecc2651fec3487472c203bd223290e8/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java#L356
>> >>>
>> >>> On Fri, May 2, 2025, at 14:56, Susmit Sarkar wrote:
>> >>>> Hi James,
>> >>>>
>> >>>> The flight server is written in python. Here is the below code
>> snippet.
>> >>>>
>> >>>> We have overridden the *do_get* method only
>> >>>>
>> >>>> getStream() and getFlightInfo() is left untouched / default
>> implementation.
>> >>>>
>> >>>> def do_get(self, context, ticket):
>> >>>>    """
>> >>>>        Handles client requests for data. The ticket will contain:
>> >>>>        - access_key: S3 access key
>> >>>>        - secret_key: S3 secret key
>> >>>>        - s3_path: Full S3 path (e.g., bucket_name/object_key)
>> >>>>        - mode: 'batch' (for batch streaming) or 'full' (for loading
>> >>>> the entire dataset)
>> >>>>    """
>> >>>>    try:
>> >>>>        # Parse the ticket to extract credentials, S3 path, and mode
>> >>>>        access_key, secret_key, s3_path, mode, batch_size =
>> parse_ticket(ticket)
>> >>>>    except InvalidTicketFormatError as e:
>> >>>>        logging.error(str(e))
>> >>>>        raise
>> >>>>    except InvalidModeError as e:
>> >>>>        logging.error(str(e))
>> >>>>        raise
>> >>>>
>> >>>>    # s3fs dont need s3a protocol.
>> >>>>    if s3_path.startswith("s3://"):
>> >>>>        s3_path = s3_path.replace("s3://", "", 1)
>> >>>>
>> >>>>    logging.info(f"Cloudian S3 Override endpoint:
>> >>>> {Config.S3_ENDPOINT_OVERRIDE}")
>> >>>>    logging.info(f"Cloudian S3 Region: {Config.S3_REGION}")
>> >>>>    logging.info(f"Fetching Parquet data from S3: {s3_path} in mode:
>> {mode}")
>> >>>>
>> >>>>    # Initialize the S3 handler with credentials
>> >>>>    try:
>> >>>>        s3_handler = S3Handler(
>> >>>>            endpoint=Config.S3_ENDPOINT_OVERRIDE,
>> >>>>            region=Config.S3_REGION,
>> >>>>            access_key=access_key,
>> >>>>            secret_key=secret_key
>> >>>>        )
>> >>>>    except Exception as e:
>> >>>>        logging.error(f"Error initializing S3 handler: {str(e)}")
>> >>>>        raise S3AccessError(f"Error initializing S3 handler:
>> {str(e)}") from e
>> >>>>
>> >>>>    if mode == DataRetrievalMode.BATCH:
>> >>>>        try:
>> >>>>            # Use the get_parquet_data method for both wildcard and
>> >>>> non-wildcard cases
>> >>>>            parquet_data = s3_handler.get_parquet_data(s3_path)
>> >>>>            # parquet_data.schema:  This is used when parquet_data is
>> >>>> an instance of ds.Dataset
>> >>>>            # (i.e., when multiple Parquet files are being processed
>> >>>> as a dataset).
>> >>>>            #
>> >>>>            # parquet_data.schema_arrow: This is used when
>> >>>> parquet_data is an instance of pq (pyarrow.parquet) module.
>> >>>>            #  A single Parquet file has its own schema, accessible
>> >>>> via schema_arrow in PyArrow
>> >>>>            schema = parquet_data.schema if isinstance(parquet_data,
>> >>>> ds.Dataset) else parquet_data.schema_arrow
>> >>>>            return flight.GeneratorStream(schema,
>> >>>> s3_handler.stream_parquet_batches(parquet_data, batch_size))
>> >>>>        except OSError as e:
>> >>>>            logging.error(f"AWS S3 access error: {str(e)}")
>> >>>>            raise S3AccessError(f"Failed to access S3: {str(e)}")
>> from e
>> >>>>        except Exception as e:
>> >>>>            logging.error(f"Error streaming Parquet data: {str(e)}")
>> >>>>            raise DataProcessingError(f"Error streaming Parquet data:
>> >>>> {str(e)}") from e
>> >>>>
>> >>>>    # Handle 'full' mode to load the entire dataset
>> >>>>    elif mode == DataRetrievalMode.FULL:
>> >>>>        try:
>> >>>>            # Check if the S3 path contains a wildcard and the mode
>> is FULL
>> >>>>            if "*" in s3_path:
>> >>>>                logging.warning(
>> >>>>                    f"Wildcard pattern detected in S3 path '{s3_path}'
>> >>>> with FULL data retrieval mode. "
>> >>>>                    f"This may put pressure on memory as all files
>> >>>> will be loaded into memory at once."
>> >>>>                )
>> >>>>            # Use the same get_parquet_data method for both wildcard
>> >>>> and non-wildcard cases
>> >>>>            parquet_data = s3_handler.get_parquet_data(s3_path)
>> >>>>            # Load the entire dataset into memory / Chance of OOM.
>> >>>>            # table = parquet_data.to_table() if
>> >>>> isinstance(parquet_data, ds.Dataset) else parquet_data.read_table()
>> >>>>            # Load the entire dataset into memory, with consideration
>> >>>> for Dataset vs. ParquetFile
>> >>>>            if isinstance(parquet_data, ds.Dataset):
>> >>>>                table = parquet_data.to_table()
>> >>>>            else:
>> >>>>                table = parquet_data.read()
>> >>>>            return flight.RecordBatchStream(table)
>> >>>>        except OSError as e:
>> >>>>            logging.error(f"AWS S3 access error: {str(e)}")
>> >>>>            raise S3AccessError(f"Failed to access S3: {str(e)}")
>> from e
>> >>>>        except Exception as e:
>> >>>>            logging.error(f"Error loading full Parquet dataset:
>> {str(e)}")
>> >>>>            raise DataProcessingError(f"Error loading full Parquet
>> >>>> dataset: {str(e)}") from e
>> >>>>
>> >>>>    else:
>> >>>>        logging.error(f"Invalid mode:
>> >>>> {DataRetrievalMode.from_string(mode)}. Expected 'batch' or 'full'.")
>> >>>>        raise InvalidModeError()
>> >>>>
>> >>>>
>> >>>> // Helper functions.
>> >>>>
>> >>>> def get_parquet_data(self, s3_path):
>> >>>>    """
>> >>>>        Retrieves Parquet data from S3. If the path contains a
>> >>>> wildcard pattern, it lists all matching files manually.
>> >>>>        If it's a single file, it reads the file directly.
>> >>>>
>> >>>>        :param s3_path: The S3 path, which could be a wildcard pattern
>> >>>> or a direct file path.
>> >>>>        :return: PyArrow Dataset object if it's a wildcard, or a
>> >>>> ParquetFile object for a single file.
>> >>>>    """
>> >>>>    try:
>> >>>>        # Check if the path contains a wildcard
>> >>>>        if "*" in s3_path:
>> >>>>            # Split the directory and pattern (e.g., `*.parquet`)
>> >>>>            directory, pattern = s3_path.rsplit("/", 1)
>> >>>>
>> >>>>            # List all files in the directory and filter using the
>> pattern
>> >>>>            logging.info(f"Fetching Parquet files matching wildcard:
>> {s3_path}")
>> >>>>            files =
>> self.s3_fs.get_file_info(fs.FileSelector(directory))
>> >>>>
>> >>>>            # Filter files matching the pattern (e.g., *.parquet) and
>> >>>> sort them by modification time (mtime_ns)
>> >>>>            sorted_file_paths = [file.path for file in sorted(files,
>> >>>> key=lambda file: file.mtime_ns) if fnmatch.fnmatch(file.path,
>> >>>> f"{directory}/{pattern}")]
>> >>>>
>> >>>>            if not sorted_file_paths:
>> >>>>                raise FileNotFoundError(f"No files matching pattern
>> >>>> {pattern} found in {directory}")
>> >>>>
>> >>>>            logging.info(f"Sorted files: {sorted_file_paths}")
>> >>>>
>> >>>>            # Validate schemas across all files
>> >>>>            if not validate_schemas(sorted_file_paths, self.s3_fs):
>> >>>>                raise ValueError("Schema mismatch detected across
>> files.")
>> >>>>
>> >>>>            # Create a dataset from the matching files
>> >>>>            dataset = ds.dataset(sorted_file_paths, format="parquet",
>> >>>> filesystem=self.s3_fs)
>> >>>>            return dataset
>> >>>>        else:
>> >>>>            # Handle single file case: read the specific Parquet file
>> >>>>            logging.info(f"Fetching single Parquet file: {s3_path}")
>> >>>>            parquet_file =
>> pq.ParquetFile(self.s3_fs.open_input_file(s3_path))
>> >>>>            return parquet_file
>> >>>>    except Exception as e:
>> >>>>        logging.error(f"Error fetching Parquet data from S3: {e}")
>> >>>>        raise e
>> >>>>
>> >>>> @staticmethod
>> >>>> def stream_parquet_batches(parquet_data, batch_size=None):
>> >>>>    """
>> >>>>        Stream the Parquet data in batches. Supports both datasets
>> >>>> (multiple files) and single Parquet files.
>> >>>>
>> >>>>        :param parquet_data: The Dataset or ParquetFile object to
>> >>>> stream data from.
>> >>>>        :param batch_size: The size of the batches to stream. Default
>> >>>> is 100,000 if not provided.
>> >>>>        :return: Generator for streaming Parquet batches.
>> >>>>    """
>> >>>>    try:
>> >>>>        # Ensure batch_size is an integer, set default if None
>> >>>>        if batch_size is None or not isinstance(batch_size, int):
>> >>>>            batch_size = 100000
>> >>>>
>> >>>>        if isinstance(parquet_data, ds.Dataset):
>> >>>>            # If it's a dataset (multiple files), stream dataset
>> >>>> batches using `int_batch_size`
>> >>>>            logging.info(f"Streaming Parquet data in batches from a
>> dataset")
>> >>>>            for batch in
>> parquet_data.to_batches(batch_size=batch_size):
>> >>>>                yield batch
>> >>>>        else:
>> >>>>            # If it's a single file (ParquetFile), stream file batches
>> >>>> (iter_batches)
>> >>>>            logging.info(f"Streaming Parquet data in batches from a
>> >>>> single file")
>> >>>>            for batch in
>> parquet_data.iter_batches(batch_size=batch_size):
>> >>>>                yield batch
>> >>>>    except Exception as e:
>> >>>>        logging.error(f"Error streaming Parquet batches: {e}")
>> >>>>        raise e
>> >>>>
>> >>>>
>> >>>> On Wed, Apr 30, 2025 at 11:14 PM James Duong
>> >>>> <james.du...@improving.com.invalid> wrote:
>> >>>>
>> >>>>> Would you be able to share the server’s getStream() and
>> getFlightInfo()
>> >>>>> implementations?
>> >>>>>
>> >>>>> Note that getStream() needs should be written such that it doesn’t
>> block
>> >>>>> the grpc thread.
>> >>>>>
>> >>>>>
>> >>>>> Get Outlook for Mac <https://aka.ms/GetOutlookForMac>
>> >>>>>
>> >>>>> From: Susmit Sarkar <susmitsir...@gmail.com>
>> >>>>> Date: Wednesday, April 30, 2025 at 2:59 AM
>> >>>>> To: David Li <lidav...@apache.org>
>> >>>>> Cc: nik.9...@gmail.com <nik.9...@gmail.com>, dev@arrow.apache.org <
>> >>>>> dev@arrow.apache.org>
>> >>>>> Subject: Re: Query on stuck Arrow Flight Client while interacting
>> from
>> >>>>> local workstation (mac)
>> >>>>>
>> >>>>> Hi David
>> >>>>>
>> >>>>> Sharing the arrow client thread dump for reference. Strangely if we
>> pass a
>> >>>>> dummy non existent s3 path we are getting proper error from server
>> >>>>>
>> >>>>> cef_flight_server.exceptions.S3AccessError: Failed to access S3:
>> [Errno 2]
>> >>>>> Path does not exist
>> >>>>>
>> 'bg0975-cef-ccmedev-data/pp/load_date=2024-11-21/part-00007.c008.snappy.parquet'.
>> >>>>> Detail: [errno 2] No such file or directory
>> >>>>>
>> >>>>> Which translates the server is reachable and we do see the logs in
>> server
>> >>>>> as well
>> >>>>>
>> >>>>> It works fine if we call the client within a VM issue arises in
>> local
>> >>>>> workstation, where its stuck indefinitely.
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Susmit
>> >>>>>
>> >>>>> On Wed, Apr 30, 2025 at 12:54 PM David Li <lidav...@apache.org
>> <mailto:
>> >>>>> lidav...@apache.org>> wrote:
>> >>>>> This is not specific to Flight; use jstack or your favorite
>> >>>>> instrumentation tool (VisualVM etc.)
>> >>>>>
>> >>>>> On Wed, Apr 30, 2025, at 15:41, NIKHIL RANJAN wrote:
>> >>>>>> Hi David,
>> >>>>>>
>> >>>>>> How to enable thread dump logs for both client and server code.
>> >>>>>>
>> >>>>>> As of now, I don't see any error on either client side or server
>> side. It
>> >>>>>> just hangs/gets stuck.
>> >>>>>>
>> >>>>>> Thanks,
>> >>>>>> Nikhil
>> >>>>>>
>> >>>>>> On Thu, 24 Apr, 2025, 14:39 Susmit Sarkar, <susmitsir...@gmail.com
>> >>>>> <mailto:susmitsir...@gmail.com>> wrote:
>> >>>>>>
>> >>>>>>> Hi Team,
>> >>>>>>>
>> >>>>>>> We are using this below code snippet in Scala to query the flight
>> >>>>> server,
>> >>>>>>> but seems to be stuck indefinitely, this issue is seen when we are
>> >>>>> testing
>> >>>>>>> from our local workstation (Mac to be precise)
>> >>>>>>>
>> >>>>>>> Another interesting thing, it's able to propagate the error
>> message
>> >>>>>>> correctly but not the FlightStream data, the same code works fine
>> when
>> >>>>> we
>> >>>>>>> run inside a linux VM.
>> >>>>>>>
>> >>>>>>> Do you folks see any issue in the code?
>> >>>>>>>
>> >>>>>>> def fetchDataStreamIterator(details: BaseDataAccessDetails):
>> >>>>> Iterator[FlightStream] = {
>> >>>>>>>  logger.info<http://logger.info>(s"Fetching data for details:
>> >>>>> ${details.toString}")
>> >>>>>>>  val ticketStr = buildTicketStr(details)
>> >>>>>>>  logger.info<http://logger.info>(s"Generated ticket string:
>> >>>>> $ticketStr")
>> >>>>>>>
>> >>>>>>>  val allocator = new RootAllocator(Long.MaxValue)
>> >>>>>>>  val client = FlightClient.builder(allocator,
>> >>>>> Location.forGrpcInsecure(serverHost, serverPort)).build()
>> >>>>>>>
>> >>>>>>>  try {
>> >>>>>>>    val ticket = new
>> Ticket(ticketStr.getBytes(StandardCharsets.UTF_8))
>> >>>>>>>    val stream = client.getStream(ticket)
>> >>>>>>>
>> >>>>>>>    Iterator.continually {
>> >>>>>>>      if (stream.next()) Some(stream) else {
>> >>>>>>>        // Cleanup when no more batches
>> >>>>>>>        close(stream, client)
>> >>>>>>>        None
>> >>>>>>>      }
>> >>>>>>>    }.takeWhile(_.isDefined).flatten
>> >>>>>>>  } catch {
>> >>>>>>>    case e: FlightRuntimeException =>
>> >>>>>>>      logger.error(s"Error communicating with Flight server:
>> >>>>> ${e.getMessage}")
>> >>>>>>>      throw new CefFlightServerException(s"Error communicating with
>> >>>>> Flight server: ${e.getMessage}", e)
>> >>>>>>>    case e: Exception =>
>> >>>>>>>      logger.error(s"Failed to fetch data: ${e.getMessage}")
>> >>>>>>>      throw new CefFlightServerException("Failed to fetch data from
>> >>>>> Flight Server", e)
>> >>>>>>>  }
>> >>>>>>> }
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> Thanks,
>> >>>>>>>
>> >>>>>>> Susmit
>> >>>>>>>
>> >>>>>>>
>> >>>>>
>> >>>>> Warning: The sender of this message could not be validated and may
>> not be
>> >>>>> the actual sender.
>> >>>>>
>>
>

Reply via email to