Let me open a thread, its a valid suggestion, will attach the discussions Thanks, Susmit
On Wed, May 7, 2025 at 8:34 PM Julian Hyde <jhyde.apa...@gmail.com> wrote: > This thread is getting rather long. I think you should open a case. That’s > a permanent place that you can attach screenshots, stack traces. And it > will also be found by anyone who has the same problem in future. > > Julian > > > On May 7, 2025, at 2:51 AM, David Li <lidav...@apache.org> wrote: > > > > Thanks for reporting back. (Note, the attachment didn't make it.) Hmm, > I'd probably need to poke at it in a debugger. But it sounds like the > callbacks are getting lost somehow. (I'm not a fan of how the Java API > tries to adapt an async API into a synchronous one, not least because it > leads to weird issues like this.) > > > >> On Wed, May 7, 2025, at 14:42, Susmit Sarkar wrote: > >> Hi David > >> > >> 1. the python server thinks the RPC is complete: Yes > >> 2. the Java client got any (or all) of the data before getting stuck? > Depends, if we are reading in batches of 1000 which is pretty low number we > see data being streamed till 15000-18000 rows ultimately stuck, if we > select a batch size of 100K client is stuck indefinitely > >> 3. Sharing the debug screenshot , the method was invoked as expected > >> > >> image.png > >> > >> The issue is only seen when the client runs on a local machine > irrespective of the OS (both windows and Mac). We don't see the issue if > the same client code runs in a VM server pointing to the Arrow flight > server in another VM > >> Thanks, > >> Susmit > >> > >>> On Sat, May 3, 2025 at 12:17 PM David Li <lidav...@apache.org> wrote: > >>> Thanks Susmit. I can't quite see a bug but I'm willing to believe that > Flight is buggy in Java (I have never understood why the code tries to > manually handle flow control and without my memories of 6 years ago the > code seems suspect to me now) > >>> > >>> Do you know if (1) the python server thinks the RPC is complete, (2) > the Java client got any (or all) of the data before getting stuck? It may > also be interesting to step though the Java code with a debugger attached, > and see what the values of `pending` and `completed` are in the > FlightStream instance, and if the methods here[1] are all being hit as > expected. > >>> > >>> [1] > https://github.com/apache/arrow-java/blob/b9e37f0ccecc2651fec3487472c203bd223290e8/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java#L356 > >>> > >>> On Fri, May 2, 2025, at 14:56, Susmit Sarkar wrote: > >>>> Hi James, > >>>> > >>>> The flight server is written in python. Here is the below code > snippet. > >>>> > >>>> We have overridden the *do_get* method only > >>>> > >>>> getStream() and getFlightInfo() is left untouched / default > implementation. > >>>> > >>>> def do_get(self, context, ticket): > >>>> """ > >>>> Handles client requests for data. The ticket will contain: > >>>> - access_key: S3 access key > >>>> - secret_key: S3 secret key > >>>> - s3_path: Full S3 path (e.g., bucket_name/object_key) > >>>> - mode: 'batch' (for batch streaming) or 'full' (for loading > >>>> the entire dataset) > >>>> """ > >>>> try: > >>>> # Parse the ticket to extract credentials, S3 path, and mode > >>>> access_key, secret_key, s3_path, mode, batch_size = > parse_ticket(ticket) > >>>> except InvalidTicketFormatError as e: > >>>> logging.error(str(e)) > >>>> raise > >>>> except InvalidModeError as e: > >>>> logging.error(str(e)) > >>>> raise > >>>> > >>>> # s3fs dont need s3a protocol. > >>>> if s3_path.startswith("s3://"): > >>>> s3_path = s3_path.replace("s3://", "", 1) > >>>> > >>>> logging.info(f"Cloudian S3 Override endpoint: > >>>> {Config.S3_ENDPOINT_OVERRIDE}") > >>>> logging.info(f"Cloudian S3 Region: {Config.S3_REGION}") > >>>> logging.info(f"Fetching Parquet data from S3: {s3_path} in mode: > {mode}") > >>>> > >>>> # Initialize the S3 handler with credentials > >>>> try: > >>>> s3_handler = S3Handler( > >>>> endpoint=Config.S3_ENDPOINT_OVERRIDE, > >>>> region=Config.S3_REGION, > >>>> access_key=access_key, > >>>> secret_key=secret_key > >>>> ) > >>>> except Exception as e: > >>>> logging.error(f"Error initializing S3 handler: {str(e)}") > >>>> raise S3AccessError(f"Error initializing S3 handler: > {str(e)}") from e > >>>> > >>>> if mode == DataRetrievalMode.BATCH: > >>>> try: > >>>> # Use the get_parquet_data method for both wildcard and > >>>> non-wildcard cases > >>>> parquet_data = s3_handler.get_parquet_data(s3_path) > >>>> # parquet_data.schema: This is used when parquet_data is > >>>> an instance of ds.Dataset > >>>> # (i.e., when multiple Parquet files are being processed > >>>> as a dataset). > >>>> # > >>>> # parquet_data.schema_arrow: This is used when > >>>> parquet_data is an instance of pq (pyarrow.parquet) module. > >>>> # A single Parquet file has its own schema, accessible > >>>> via schema_arrow in PyArrow > >>>> schema = parquet_data.schema if isinstance(parquet_data, > >>>> ds.Dataset) else parquet_data.schema_arrow > >>>> return flight.GeneratorStream(schema, > >>>> s3_handler.stream_parquet_batches(parquet_data, batch_size)) > >>>> except OSError as e: > >>>> logging.error(f"AWS S3 access error: {str(e)}") > >>>> raise S3AccessError(f"Failed to access S3: {str(e)}") from > e > >>>> except Exception as e: > >>>> logging.error(f"Error streaming Parquet data: {str(e)}") > >>>> raise DataProcessingError(f"Error streaming Parquet data: > >>>> {str(e)}") from e > >>>> > >>>> # Handle 'full' mode to load the entire dataset > >>>> elif mode == DataRetrievalMode.FULL: > >>>> try: > >>>> # Check if the S3 path contains a wildcard and the mode is > FULL > >>>> if "*" in s3_path: > >>>> logging.warning( > >>>> f"Wildcard pattern detected in S3 path '{s3_path}' > >>>> with FULL data retrieval mode. " > >>>> f"This may put pressure on memory as all files > >>>> will be loaded into memory at once." > >>>> ) > >>>> # Use the same get_parquet_data method for both wildcard > >>>> and non-wildcard cases > >>>> parquet_data = s3_handler.get_parquet_data(s3_path) > >>>> # Load the entire dataset into memory / Chance of OOM. > >>>> # table = parquet_data.to_table() if > >>>> isinstance(parquet_data, ds.Dataset) else parquet_data.read_table() > >>>> # Load the entire dataset into memory, with consideration > >>>> for Dataset vs. ParquetFile > >>>> if isinstance(parquet_data, ds.Dataset): > >>>> table = parquet_data.to_table() > >>>> else: > >>>> table = parquet_data.read() > >>>> return flight.RecordBatchStream(table) > >>>> except OSError as e: > >>>> logging.error(f"AWS S3 access error: {str(e)}") > >>>> raise S3AccessError(f"Failed to access S3: {str(e)}") from > e > >>>> except Exception as e: > >>>> logging.error(f"Error loading full Parquet dataset: > {str(e)}") > >>>> raise DataProcessingError(f"Error loading full Parquet > >>>> dataset: {str(e)}") from e > >>>> > >>>> else: > >>>> logging.error(f"Invalid mode: > >>>> {DataRetrievalMode.from_string(mode)}. Expected 'batch' or 'full'.") > >>>> raise InvalidModeError() > >>>> > >>>> > >>>> // Helper functions. > >>>> > >>>> def get_parquet_data(self, s3_path): > >>>> """ > >>>> Retrieves Parquet data from S3. If the path contains a > >>>> wildcard pattern, it lists all matching files manually. > >>>> If it's a single file, it reads the file directly. > >>>> > >>>> :param s3_path: The S3 path, which could be a wildcard pattern > >>>> or a direct file path. > >>>> :return: PyArrow Dataset object if it's a wildcard, or a > >>>> ParquetFile object for a single file. > >>>> """ > >>>> try: > >>>> # Check if the path contains a wildcard > >>>> if "*" in s3_path: > >>>> # Split the directory and pattern (e.g., `*.parquet`) > >>>> directory, pattern = s3_path.rsplit("/", 1) > >>>> > >>>> # List all files in the directory and filter using the > pattern > >>>> logging.info(f"Fetching Parquet files matching wildcard: > {s3_path}") > >>>> files = > self.s3_fs.get_file_info(fs.FileSelector(directory)) > >>>> > >>>> # Filter files matching the pattern (e.g., *.parquet) and > >>>> sort them by modification time (mtime_ns) > >>>> sorted_file_paths = [file.path for file in sorted(files, > >>>> key=lambda file: file.mtime_ns) if fnmatch.fnmatch(file.path, > >>>> f"{directory}/{pattern}")] > >>>> > >>>> if not sorted_file_paths: > >>>> raise FileNotFoundError(f"No files matching pattern > >>>> {pattern} found in {directory}") > >>>> > >>>> logging.info(f"Sorted files: {sorted_file_paths}") > >>>> > >>>> # Validate schemas across all files > >>>> if not validate_schemas(sorted_file_paths, self.s3_fs): > >>>> raise ValueError("Schema mismatch detected across > files.") > >>>> > >>>> # Create a dataset from the matching files > >>>> dataset = ds.dataset(sorted_file_paths, format="parquet", > >>>> filesystem=self.s3_fs) > >>>> return dataset > >>>> else: > >>>> # Handle single file case: read the specific Parquet file > >>>> logging.info(f"Fetching single Parquet file: {s3_path}") > >>>> parquet_file = > pq.ParquetFile(self.s3_fs.open_input_file(s3_path)) > >>>> return parquet_file > >>>> except Exception as e: > >>>> logging.error(f"Error fetching Parquet data from S3: {e}") > >>>> raise e > >>>> > >>>> @staticmethod > >>>> def stream_parquet_batches(parquet_data, batch_size=None): > >>>> """ > >>>> Stream the Parquet data in batches. Supports both datasets > >>>> (multiple files) and single Parquet files. > >>>> > >>>> :param parquet_data: The Dataset or ParquetFile object to > >>>> stream data from. > >>>> :param batch_size: The size of the batches to stream. Default > >>>> is 100,000 if not provided. > >>>> :return: Generator for streaming Parquet batches. > >>>> """ > >>>> try: > >>>> # Ensure batch_size is an integer, set default if None > >>>> if batch_size is None or not isinstance(batch_size, int): > >>>> batch_size = 100000 > >>>> > >>>> if isinstance(parquet_data, ds.Dataset): > >>>> # If it's a dataset (multiple files), stream dataset > >>>> batches using `int_batch_size` > >>>> logging.info(f"Streaming Parquet data in batches from a > dataset") > >>>> for batch in > parquet_data.to_batches(batch_size=batch_size): > >>>> yield batch > >>>> else: > >>>> # If it's a single file (ParquetFile), stream file batches > >>>> (iter_batches) > >>>> logging.info(f"Streaming Parquet data in batches from a > >>>> single file") > >>>> for batch in > parquet_data.iter_batches(batch_size=batch_size): > >>>> yield batch > >>>> except Exception as e: > >>>> logging.error(f"Error streaming Parquet batches: {e}") > >>>> raise e > >>>> > >>>> > >>>> On Wed, Apr 30, 2025 at 11:14 PM James Duong > >>>> <james.du...@improving.com.invalid> wrote: > >>>> > >>>>> Would you be able to share the server’s getStream() and > getFlightInfo() > >>>>> implementations? > >>>>> > >>>>> Note that getStream() needs should be written such that it doesn’t > block > >>>>> the grpc thread. > >>>>> > >>>>> > >>>>> Get Outlook for Mac <https://aka.ms/GetOutlookForMac> > >>>>> > >>>>> From: Susmit Sarkar <susmitsir...@gmail.com> > >>>>> Date: Wednesday, April 30, 2025 at 2:59 AM > >>>>> To: David Li <lidav...@apache.org> > >>>>> Cc: nik.9...@gmail.com <nik.9...@gmail.com>, dev@arrow.apache.org < > >>>>> dev@arrow.apache.org> > >>>>> Subject: Re: Query on stuck Arrow Flight Client while interacting > from > >>>>> local workstation (mac) > >>>>> > >>>>> Hi David > >>>>> > >>>>> Sharing the arrow client thread dump for reference. Strangely if we > pass a > >>>>> dummy non existent s3 path we are getting proper error from server > >>>>> > >>>>> cef_flight_server.exceptions.S3AccessError: Failed to access S3: > [Errno 2] > >>>>> Path does not exist > >>>>> > 'bg0975-cef-ccmedev-data/pp/load_date=2024-11-21/part-00007.c008.snappy.parquet'. > >>>>> Detail: [errno 2] No such file or directory > >>>>> > >>>>> Which translates the server is reachable and we do see the logs in > server > >>>>> as well > >>>>> > >>>>> It works fine if we call the client within a VM issue arises in local > >>>>> workstation, where its stuck indefinitely. > >>>>> > >>>>> Thanks, > >>>>> Susmit > >>>>> > >>>>> On Wed, Apr 30, 2025 at 12:54 PM David Li <lidav...@apache.org > <mailto: > >>>>> lidav...@apache.org>> wrote: > >>>>> This is not specific to Flight; use jstack or your favorite > >>>>> instrumentation tool (VisualVM etc.) > >>>>> > >>>>> On Wed, Apr 30, 2025, at 15:41, NIKHIL RANJAN wrote: > >>>>>> Hi David, > >>>>>> > >>>>>> How to enable thread dump logs for both client and server code. > >>>>>> > >>>>>> As of now, I don't see any error on either client side or server > side. It > >>>>>> just hangs/gets stuck. > >>>>>> > >>>>>> Thanks, > >>>>>> Nikhil > >>>>>> > >>>>>> On Thu, 24 Apr, 2025, 14:39 Susmit Sarkar, <susmitsir...@gmail.com > >>>>> <mailto:susmitsir...@gmail.com>> wrote: > >>>>>> > >>>>>>> Hi Team, > >>>>>>> > >>>>>>> We are using this below code snippet in Scala to query the flight > >>>>> server, > >>>>>>> but seems to be stuck indefinitely, this issue is seen when we are > >>>>> testing > >>>>>>> from our local workstation (Mac to be precise) > >>>>>>> > >>>>>>> Another interesting thing, it's able to propagate the error message > >>>>>>> correctly but not the FlightStream data, the same code works fine > when > >>>>> we > >>>>>>> run inside a linux VM. > >>>>>>> > >>>>>>> Do you folks see any issue in the code? > >>>>>>> > >>>>>>> def fetchDataStreamIterator(details: BaseDataAccessDetails): > >>>>> Iterator[FlightStream] = { > >>>>>>> logger.info<http://logger.info>(s"Fetching data for details: > >>>>> ${details.toString}") > >>>>>>> val ticketStr = buildTicketStr(details) > >>>>>>> logger.info<http://logger.info>(s"Generated ticket string: > >>>>> $ticketStr") > >>>>>>> > >>>>>>> val allocator = new RootAllocator(Long.MaxValue) > >>>>>>> val client = FlightClient.builder(allocator, > >>>>> Location.forGrpcInsecure(serverHost, serverPort)).build() > >>>>>>> > >>>>>>> try { > >>>>>>> val ticket = new > Ticket(ticketStr.getBytes(StandardCharsets.UTF_8)) > >>>>>>> val stream = client.getStream(ticket) > >>>>>>> > >>>>>>> Iterator.continually { > >>>>>>> if (stream.next()) Some(stream) else { > >>>>>>> // Cleanup when no more batches > >>>>>>> close(stream, client) > >>>>>>> None > >>>>>>> } > >>>>>>> }.takeWhile(_.isDefined).flatten > >>>>>>> } catch { > >>>>>>> case e: FlightRuntimeException => > >>>>>>> logger.error(s"Error communicating with Flight server: > >>>>> ${e.getMessage}") > >>>>>>> throw new CefFlightServerException(s"Error communicating with > >>>>> Flight server: ${e.getMessage}", e) > >>>>>>> case e: Exception => > >>>>>>> logger.error(s"Failed to fetch data: ${e.getMessage}") > >>>>>>> throw new CefFlightServerException("Failed to fetch data from > >>>>> Flight Server", e) > >>>>>>> } > >>>>>>> } > >>>>>>> > >>>>>>> > >>>>>>> Thanks, > >>>>>>> > >>>>>>> Susmit > >>>>>>> > >>>>>>> > >>>>> > >>>>> Warning: The sender of this message could not be validated and may > not be > >>>>> the actual sender. > >>>>> >