I believe I figured it out i changed slightly the logic, post the changes
the issue is not prevalent..

def fetchDataStream(details: ObjectStoreDetails): Iterator[FlightStream] = {
  logger.info(s"Fetching data for S3 path: ${details.s3Path}")
  val ticketStr = buildTicketStr(details)
  logger.info(s"Generated ticket string: $ticketStr")

  val allocator = new RootAllocator(Long.MaxValue)
  val client = FlightClient.builder(allocator,
Location.forGrpcInsecure(serverHost, serverPort)).build()
  var stream: FlightStream = null

  try {
    val ticket = new Ticket(ticketStr.getBytes(StandardCharsets.UTF_8))
    stream = client.getStream(ticket)
    // Return an iterator for the stream with proper cleanup
    Iterator.continually(stream).takeWhile { _ =>
      val hasNext = stream.next()
      if (!hasNext) {
        // Close the stream explicitly once data is fully read
        close(stream, client)
        if (logger.isDebugEnabled()) {
          logger.debug("Closed FlightStream and client..")
        }
      }
      hasNext
    }
  } catch {
    case e: FlightRuntimeException =>
      logger.error(s"Error communicating with Flight server: ${e.getMessage}")
      close(stream, client)
      throw new CefFlightServerException(s"Error communicating with
Flight server: ${e.getMessage}", e)
    case e: Exception =>
      logger.error(s"Failed to fetch data: ${e.getMessage}")
      close(stream, client)
      throw new CefFlightServerException("Failed to fetch data from
Flight Server", e)
  }
}

Finally block was getting executed first, now i changed the approach a
bit, dont see any memory leaks. if the client is closed underlying
meory allocator is closed too




On Fri, Oct 25, 2024 at 4:09 PM Susmit Sarkar <susmitsir...@gmail.com>
wrote:

> Hi Team,
>
> We are seeing the issue often with Memory Leak:
>
> *JDK 11*
>
> "org.apache.arrow" % "arrow-flight" % "17.0.0",
> "org.apache.arrow" % "arrow-vector" % "17.0.0",
> "org.apache.arrow" % "flight-core" % "17.0.0",
>
>
> 4-10-25 15:25:06.394 [main] ERROR o.apache.arrow.memory.BaseAllocator -
> Memory was leaked by query. Memory leaked: (10485760)
> Allocator(flight-client) 0/10485760/10485760/9223372036854775807
> (res/actual/peak/limit) 2024-10-25 15:25:06.395 [main] ERROR
> o.apache.arrow.memory.BaseAllocator - Memory was leaked by query. Memory
> leaked: (10485760) Outstanding child allocators : Allocator(flight-client)
> 0/10485760/10485760/9223372036854775807 (res/actual/peak/limit)
> Allocator(ROOT) 0/10485760/10485760/9223372036854775807
> (res/actual/peak/limit) 2024-10-25 15:25:06.396 [main] ERROR
> c.t.c.d.ArrowFlightDataFetcher - Failed to fetch data: Memory was leaked by
> query. Memory leaked: (10485760) Allocator(flight-client)
> 0/10485760/10485760/9223372036854775807 (res/actual/peak/limit) Exception
> in thread "main" com.tesco.cef.exceptions.CefFlightServerException: Failed
> to fetch data from Flight Server at
> com.tesco.cef.datafetcher.ArrowFlightDataFetcher$$anon$1.applyOrElse(ArrowFlightDataFetcher.scala:47)
> at
> com.tesco.cef.datafetcher.ArrowFlightDataFetcher$$anon$1.applyOrElse(ArrowFlightDataFetcher.scala:41)
> at scala.util.Failure.recover(Try.scala:233) at
> com.tesco.cef.datafetcher.ArrowFlightDataFetcher.fetchDataStream(ArrowFlightDataFetcher.scala:47)
> at
> com.tesco.cef.client.CefFlightDataClient.getStream(CefFlightDataClient.scala:27)
> at com.tesco.cef.samples.client.Main$.processStream(Main.scala:120) at
> com.tesco.cef.samples.client.Main$.main(Main.scala:60) at
> com.tesco.cef.samples.client.Main.main(Main.scala) Caused by:
> java.lang.IllegalStateException: Memory was leaked by query. Memory leaked:
> (10485760) Allocator(flight-client) 0/10485760/10485760/9223372036854775807
> (res/actual/peak/limit) at
> org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:501) at
> org.apache.arrow.flight.FlightClient.close(FlightClient.java:754) at
> scala.util.Using$Releasable$AutoCloseableIsReleasable$.release(Using.scala:392)
> at
> scala.util.Using$Releasable$AutoCloseableIsReleasable$.release(Using.scala:391)
> at scala.util.Using$.$anonfun$apply$1(Using.scala:268) at
> scala.util.Using$.apply(Using.scala:113) at
> com.tesco.cef.datafetcher.ArrowFlightDataFetcher.fetchDataStream$$anonfun$2(ArrowFlightDataFetcher.scala:40)
> at scala.util.Using$.$anonfun$apply$1(Using.scala:262) at
> scala.util.Using$.apply(Using.scala:113) at
> com.tesco.cef.datafetcher.ArrowFlightDataFetcher.fetchDataStream(ArrowFlightDataFetcher.scala:41)
> ... 4 more Suppressed: java.lang.IllegalStateException: Memory was leaked
> by query. Memory leaked: (10485760)
>
> def fetchDataStream(details: ObjectStoreDetails): Iterator[FlightStream] = {
>   logger.info(s"Fetching data for S3 path: ${details.s3Path}")
>   val ticketStr = buildTicketStr(details)
>   logger.info(s"Generated ticket string: $ticketStr")
>
>   val allocator = new RootAllocator(Long.MaxValue)
>   val client = FlightClient.builder(allocator, 
> Location.forGrpcInsecure(serverHost, serverPort)).build()
>   val ticket = new Ticket(ticketStr.getBytes(StandardCharsets.UTF_8))
>   // Using Try for proper resource cleanup
>   try {
>     val stream = client.getStream(ticket)
>     // Collect and return an iterator for each batch
>     Iterator.continually(stream).takeWhile(_.next())
>   } catch {
>     case e: FlightRuntimeException =>
>       logger.error(s"Error communicating with Flight server: ${e.getMessage}")
>       throw new CefFlightServerException(s"Error communicating with Flight 
> server: ${e.getMessage}", e)
>     case e: Exception =>
>       logger.error(s"Failed to fetch data: ${e.getMessage}")
>       throw new CefFlightServerException("Failed to fetch data from Flight 
> Server", e)
>   } finally {
>     if (client != null) client.close()
>     if (allocator != null) allocator.close()
>   }
> }
>
> Sharing the above code snippet for reference
> Are we doing anything wrong here? how to avoid memory spillage issue?
>
> Thanks,
> Susmit
>
>
>

Reply via email to