Let me open a thread, its a valid suggestion, will attach the discussions

Thanks,
Susmit

On Wed, May 7, 2025 at 8:34 PM Julian Hyde <jhyde.apa...@gmail.com> wrote:

> This thread is getting rather long. I think you should open a case. That’s
> a permanent place that you can attach screenshots, stack traces. And it
> will also be found by anyone who has the same problem in future.
>
> Julian
>
> > On May 7, 2025, at 2:51 AM, David Li <lidav...@apache.org> wrote:
> >
> > Thanks for reporting back. (Note, the attachment didn't make it.) Hmm,
> I'd probably need to poke at it in a debugger. But it sounds like the
> callbacks are getting lost somehow. (I'm not a fan of how the Java API
> tries to adapt an async API into a synchronous one, not least because it
> leads to weird issues like this.)
> >
> >> On Wed, May 7, 2025, at 14:42, Susmit Sarkar wrote:
> >> Hi David
> >>
> >> 1.  the python server thinks the RPC is complete: Yes
> >> 2. the Java client got any (or all) of the data before getting stuck?
> Depends, if we are reading in batches of 1000 which is pretty low number we
> see data being streamed till 15000-18000 rows ultimately stuck, if we
> select a batch size of 100K client is stuck indefinitely
> >> 3. Sharing the debug screenshot , the method was invoked as expected
> >>
> >> image.png
> >>
> >> The issue is only seen when the client runs on a local machine
> irrespective of the OS (both windows and Mac). We don't see the issue if
> the same client code runs in a VM server pointing to the Arrow flight
> server in another VM
> >> Thanks,
> >> Susmit
> >>
> >>> On Sat, May 3, 2025 at 12:17 PM David Li <lidav...@apache.org> wrote:
> >>> Thanks Susmit. I can't quite see a bug but I'm willing to believe that
> Flight is buggy in Java (I have never understood why the code tries to
> manually handle flow control and without my memories of 6 years ago the
> code seems suspect to me now)
> >>>
> >>> Do you know if (1) the python server thinks the RPC is complete, (2)
> the Java client got any (or all) of the data before getting stuck? It may
> also be interesting to step though the Java code with a debugger attached,
> and see what the values of `pending` and `completed` are in the
> FlightStream instance, and if the methods here[1] are all being hit as
> expected.
> >>>
> >>> [1]
> https://github.com/apache/arrow-java/blob/b9e37f0ccecc2651fec3487472c203bd223290e8/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java#L356
> >>>
> >>> On Fri, May 2, 2025, at 14:56, Susmit Sarkar wrote:
> >>>> Hi James,
> >>>>
> >>>> The flight server is written in python. Here is the below code
> snippet.
> >>>>
> >>>> We have overridden the *do_get* method only
> >>>>
> >>>> getStream() and getFlightInfo() is left untouched / default
> implementation.
> >>>>
> >>>> def do_get(self, context, ticket):
> >>>>    """
> >>>>        Handles client requests for data. The ticket will contain:
> >>>>        - access_key: S3 access key
> >>>>        - secret_key: S3 secret key
> >>>>        - s3_path: Full S3 path (e.g., bucket_name/object_key)
> >>>>        - mode: 'batch' (for batch streaming) or 'full' (for loading
> >>>> the entire dataset)
> >>>>    """
> >>>>    try:
> >>>>        # Parse the ticket to extract credentials, S3 path, and mode
> >>>>        access_key, secret_key, s3_path, mode, batch_size =
> parse_ticket(ticket)
> >>>>    except InvalidTicketFormatError as e:
> >>>>        logging.error(str(e))
> >>>>        raise
> >>>>    except InvalidModeError as e:
> >>>>        logging.error(str(e))
> >>>>        raise
> >>>>
> >>>>    # s3fs dont need s3a protocol.
> >>>>    if s3_path.startswith("s3://"):
> >>>>        s3_path = s3_path.replace("s3://", "", 1)
> >>>>
> >>>>    logging.info(f"Cloudian S3 Override endpoint:
> >>>> {Config.S3_ENDPOINT_OVERRIDE}")
> >>>>    logging.info(f"Cloudian S3 Region: {Config.S3_REGION}")
> >>>>    logging.info(f"Fetching Parquet data from S3: {s3_path} in mode:
> {mode}")
> >>>>
> >>>>    # Initialize the S3 handler with credentials
> >>>>    try:
> >>>>        s3_handler = S3Handler(
> >>>>            endpoint=Config.S3_ENDPOINT_OVERRIDE,
> >>>>            region=Config.S3_REGION,
> >>>>            access_key=access_key,
> >>>>            secret_key=secret_key
> >>>>        )
> >>>>    except Exception as e:
> >>>>        logging.error(f"Error initializing S3 handler: {str(e)}")
> >>>>        raise S3AccessError(f"Error initializing S3 handler:
> {str(e)}") from e
> >>>>
> >>>>    if mode == DataRetrievalMode.BATCH:
> >>>>        try:
> >>>>            # Use the get_parquet_data method for both wildcard and
> >>>> non-wildcard cases
> >>>>            parquet_data = s3_handler.get_parquet_data(s3_path)
> >>>>            # parquet_data.schema:  This is used when parquet_data is
> >>>> an instance of ds.Dataset
> >>>>            # (i.e., when multiple Parquet files are being processed
> >>>> as a dataset).
> >>>>            #
> >>>>            # parquet_data.schema_arrow: This is used when
> >>>> parquet_data is an instance of pq (pyarrow.parquet) module.
> >>>>            #  A single Parquet file has its own schema, accessible
> >>>> via schema_arrow in PyArrow
> >>>>            schema = parquet_data.schema if isinstance(parquet_data,
> >>>> ds.Dataset) else parquet_data.schema_arrow
> >>>>            return flight.GeneratorStream(schema,
> >>>> s3_handler.stream_parquet_batches(parquet_data, batch_size))
> >>>>        except OSError as e:
> >>>>            logging.error(f"AWS S3 access error: {str(e)}")
> >>>>            raise S3AccessError(f"Failed to access S3: {str(e)}") from
> e
> >>>>        except Exception as e:
> >>>>            logging.error(f"Error streaming Parquet data: {str(e)}")
> >>>>            raise DataProcessingError(f"Error streaming Parquet data:
> >>>> {str(e)}") from e
> >>>>
> >>>>    # Handle 'full' mode to load the entire dataset
> >>>>    elif mode == DataRetrievalMode.FULL:
> >>>>        try:
> >>>>            # Check if the S3 path contains a wildcard and the mode is
> FULL
> >>>>            if "*" in s3_path:
> >>>>                logging.warning(
> >>>>                    f"Wildcard pattern detected in S3 path '{s3_path}'
> >>>> with FULL data retrieval mode. "
> >>>>                    f"This may put pressure on memory as all files
> >>>> will be loaded into memory at once."
> >>>>                )
> >>>>            # Use the same get_parquet_data method for both wildcard
> >>>> and non-wildcard cases
> >>>>            parquet_data = s3_handler.get_parquet_data(s3_path)
> >>>>            # Load the entire dataset into memory / Chance of OOM.
> >>>>            # table = parquet_data.to_table() if
> >>>> isinstance(parquet_data, ds.Dataset) else parquet_data.read_table()
> >>>>            # Load the entire dataset into memory, with consideration
> >>>> for Dataset vs. ParquetFile
> >>>>            if isinstance(parquet_data, ds.Dataset):
> >>>>                table = parquet_data.to_table()
> >>>>            else:
> >>>>                table = parquet_data.read()
> >>>>            return flight.RecordBatchStream(table)
> >>>>        except OSError as e:
> >>>>            logging.error(f"AWS S3 access error: {str(e)}")
> >>>>            raise S3AccessError(f"Failed to access S3: {str(e)}") from
> e
> >>>>        except Exception as e:
> >>>>            logging.error(f"Error loading full Parquet dataset:
> {str(e)}")
> >>>>            raise DataProcessingError(f"Error loading full Parquet
> >>>> dataset: {str(e)}") from e
> >>>>
> >>>>    else:
> >>>>        logging.error(f"Invalid mode:
> >>>> {DataRetrievalMode.from_string(mode)}. Expected 'batch' or 'full'.")
> >>>>        raise InvalidModeError()
> >>>>
> >>>>
> >>>> // Helper functions.
> >>>>
> >>>> def get_parquet_data(self, s3_path):
> >>>>    """
> >>>>        Retrieves Parquet data from S3. If the path contains a
> >>>> wildcard pattern, it lists all matching files manually.
> >>>>        If it's a single file, it reads the file directly.
> >>>>
> >>>>        :param s3_path: The S3 path, which could be a wildcard pattern
> >>>> or a direct file path.
> >>>>        :return: PyArrow Dataset object if it's a wildcard, or a
> >>>> ParquetFile object for a single file.
> >>>>    """
> >>>>    try:
> >>>>        # Check if the path contains a wildcard
> >>>>        if "*" in s3_path:
> >>>>            # Split the directory and pattern (e.g., `*.parquet`)
> >>>>            directory, pattern = s3_path.rsplit("/", 1)
> >>>>
> >>>>            # List all files in the directory and filter using the
> pattern
> >>>>            logging.info(f"Fetching Parquet files matching wildcard:
> {s3_path}")
> >>>>            files =
> self.s3_fs.get_file_info(fs.FileSelector(directory))
> >>>>
> >>>>            # Filter files matching the pattern (e.g., *.parquet) and
> >>>> sort them by modification time (mtime_ns)
> >>>>            sorted_file_paths = [file.path for file in sorted(files,
> >>>> key=lambda file: file.mtime_ns) if fnmatch.fnmatch(file.path,
> >>>> f"{directory}/{pattern}")]
> >>>>
> >>>>            if not sorted_file_paths:
> >>>>                raise FileNotFoundError(f"No files matching pattern
> >>>> {pattern} found in {directory}")
> >>>>
> >>>>            logging.info(f"Sorted files: {sorted_file_paths}")
> >>>>
> >>>>            # Validate schemas across all files
> >>>>            if not validate_schemas(sorted_file_paths, self.s3_fs):
> >>>>                raise ValueError("Schema mismatch detected across
> files.")
> >>>>
> >>>>            # Create a dataset from the matching files
> >>>>            dataset = ds.dataset(sorted_file_paths, format="parquet",
> >>>> filesystem=self.s3_fs)
> >>>>            return dataset
> >>>>        else:
> >>>>            # Handle single file case: read the specific Parquet file
> >>>>            logging.info(f"Fetching single Parquet file: {s3_path}")
> >>>>            parquet_file =
> pq.ParquetFile(self.s3_fs.open_input_file(s3_path))
> >>>>            return parquet_file
> >>>>    except Exception as e:
> >>>>        logging.error(f"Error fetching Parquet data from S3: {e}")
> >>>>        raise e
> >>>>
> >>>> @staticmethod
> >>>> def stream_parquet_batches(parquet_data, batch_size=None):
> >>>>    """
> >>>>        Stream the Parquet data in batches. Supports both datasets
> >>>> (multiple files) and single Parquet files.
> >>>>
> >>>>        :param parquet_data: The Dataset or ParquetFile object to
> >>>> stream data from.
> >>>>        :param batch_size: The size of the batches to stream. Default
> >>>> is 100,000 if not provided.
> >>>>        :return: Generator for streaming Parquet batches.
> >>>>    """
> >>>>    try:
> >>>>        # Ensure batch_size is an integer, set default if None
> >>>>        if batch_size is None or not isinstance(batch_size, int):
> >>>>            batch_size = 100000
> >>>>
> >>>>        if isinstance(parquet_data, ds.Dataset):
> >>>>            # If it's a dataset (multiple files), stream dataset
> >>>> batches using `int_batch_size`
> >>>>            logging.info(f"Streaming Parquet data in batches from a
> dataset")
> >>>>            for batch in
> parquet_data.to_batches(batch_size=batch_size):
> >>>>                yield batch
> >>>>        else:
> >>>>            # If it's a single file (ParquetFile), stream file batches
> >>>> (iter_batches)
> >>>>            logging.info(f"Streaming Parquet data in batches from a
> >>>> single file")
> >>>>            for batch in
> parquet_data.iter_batches(batch_size=batch_size):
> >>>>                yield batch
> >>>>    except Exception as e:
> >>>>        logging.error(f"Error streaming Parquet batches: {e}")
> >>>>        raise e
> >>>>
> >>>>
> >>>> On Wed, Apr 30, 2025 at 11:14 PM James Duong
> >>>> <james.du...@improving.com.invalid> wrote:
> >>>>
> >>>>> Would you be able to share the server’s getStream() and
> getFlightInfo()
> >>>>> implementations?
> >>>>>
> >>>>> Note that getStream() needs should be written such that it doesn’t
> block
> >>>>> the grpc thread.
> >>>>>
> >>>>>
> >>>>> Get Outlook for Mac <https://aka.ms/GetOutlookForMac>
> >>>>>
> >>>>> From: Susmit Sarkar <susmitsir...@gmail.com>
> >>>>> Date: Wednesday, April 30, 2025 at 2:59 AM
> >>>>> To: David Li <lidav...@apache.org>
> >>>>> Cc: nik.9...@gmail.com <nik.9...@gmail.com>, dev@arrow.apache.org <
> >>>>> dev@arrow.apache.org>
> >>>>> Subject: Re: Query on stuck Arrow Flight Client while interacting
> from
> >>>>> local workstation (mac)
> >>>>>
> >>>>> Hi David
> >>>>>
> >>>>> Sharing the arrow client thread dump for reference. Strangely if we
> pass a
> >>>>> dummy non existent s3 path we are getting proper error from server
> >>>>>
> >>>>> cef_flight_server.exceptions.S3AccessError: Failed to access S3:
> [Errno 2]
> >>>>> Path does not exist
> >>>>>
> 'bg0975-cef-ccmedev-data/pp/load_date=2024-11-21/part-00007.c008.snappy.parquet'.
> >>>>> Detail: [errno 2] No such file or directory
> >>>>>
> >>>>> Which translates the server is reachable and we do see the logs in
> server
> >>>>> as well
> >>>>>
> >>>>> It works fine if we call the client within a VM issue arises in local
> >>>>> workstation, where its stuck indefinitely.
> >>>>>
> >>>>> Thanks,
> >>>>> Susmit
> >>>>>
> >>>>> On Wed, Apr 30, 2025 at 12:54 PM David Li <lidav...@apache.org
> <mailto:
> >>>>> lidav...@apache.org>> wrote:
> >>>>> This is not specific to Flight; use jstack or your favorite
> >>>>> instrumentation tool (VisualVM etc.)
> >>>>>
> >>>>> On Wed, Apr 30, 2025, at 15:41, NIKHIL RANJAN wrote:
> >>>>>> Hi David,
> >>>>>>
> >>>>>> How to enable thread dump logs for both client and server code.
> >>>>>>
> >>>>>> As of now, I don't see any error on either client side or server
> side. It
> >>>>>> just hangs/gets stuck.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Nikhil
> >>>>>>
> >>>>>> On Thu, 24 Apr, 2025, 14:39 Susmit Sarkar, <susmitsir...@gmail.com
> >>>>> <mailto:susmitsir...@gmail.com>> wrote:
> >>>>>>
> >>>>>>> Hi Team,
> >>>>>>>
> >>>>>>> We are using this below code snippet in Scala to query the flight
> >>>>> server,
> >>>>>>> but seems to be stuck indefinitely, this issue is seen when we are
> >>>>> testing
> >>>>>>> from our local workstation (Mac to be precise)
> >>>>>>>
> >>>>>>> Another interesting thing, it's able to propagate the error message
> >>>>>>> correctly but not the FlightStream data, the same code works fine
> when
> >>>>> we
> >>>>>>> run inside a linux VM.
> >>>>>>>
> >>>>>>> Do you folks see any issue in the code?
> >>>>>>>
> >>>>>>> def fetchDataStreamIterator(details: BaseDataAccessDetails):
> >>>>> Iterator[FlightStream] = {
> >>>>>>>  logger.info<http://logger.info>(s"Fetching data for details:
> >>>>> ${details.toString}")
> >>>>>>>  val ticketStr = buildTicketStr(details)
> >>>>>>>  logger.info<http://logger.info>(s"Generated ticket string:
> >>>>> $ticketStr")
> >>>>>>>
> >>>>>>>  val allocator = new RootAllocator(Long.MaxValue)
> >>>>>>>  val client = FlightClient.builder(allocator,
> >>>>> Location.forGrpcInsecure(serverHost, serverPort)).build()
> >>>>>>>
> >>>>>>>  try {
> >>>>>>>    val ticket = new
> Ticket(ticketStr.getBytes(StandardCharsets.UTF_8))
> >>>>>>>    val stream = client.getStream(ticket)
> >>>>>>>
> >>>>>>>    Iterator.continually {
> >>>>>>>      if (stream.next()) Some(stream) else {
> >>>>>>>        // Cleanup when no more batches
> >>>>>>>        close(stream, client)
> >>>>>>>        None
> >>>>>>>      }
> >>>>>>>    }.takeWhile(_.isDefined).flatten
> >>>>>>>  } catch {
> >>>>>>>    case e: FlightRuntimeException =>
> >>>>>>>      logger.error(s"Error communicating with Flight server:
> >>>>> ${e.getMessage}")
> >>>>>>>      throw new CefFlightServerException(s"Error communicating with
> >>>>> Flight server: ${e.getMessage}", e)
> >>>>>>>    case e: Exception =>
> >>>>>>>      logger.error(s"Failed to fetch data: ${e.getMessage}")
> >>>>>>>      throw new CefFlightServerException("Failed to fetch data from
> >>>>> Flight Server", e)
> >>>>>>>  }
> >>>>>>> }
> >>>>>>>
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>>
> >>>>>>> Susmit
> >>>>>>>
> >>>>>>>
> >>>>>
> >>>>> Warning: The sender of this message could not be validated and may
> not be
> >>>>> the actual sender.
> >>>>>
>

Reply via email to