This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c1fc1002 [KYUUBI #2378] Implement BatchesResource GET 
/batches/${batchId}/log
8c1fc1002 is described below

commit 8c1fc100252ff2d5e32b3f3773f90696bc03ff67
Author: ulysses-you <[email protected]>
AuthorDate: Mon May 9 13:52:06 2022 +0800

    [KYUUBI #2378] Implement BatchesResource GET /batches/${batchId}/log
    
    ### _Why are the changes needed?_
    
    Support fetch operation log with rest batch mode.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #2567 from ulysses-you/KYUUBI-2378.
    
    Closes #2378
    
    d4dd5352 [ulysses-you] comment
    fd08362c [ulysses-you] desc
    37b6cc83 [ulysses-you] seek
    78fb6391 [ulysses-you] rebase
    d6b838ac [ulysses-you] style
    0440ec29 [ulysses-you] fix
    b07a81ce [ulysses-you] Implement BatchesResource GET /batches//log
    
    Authored-by: ulysses-you <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 .../apache/kyuubi/operation/log/OperationLog.scala | 49 ++++++++++++++++--
 .../operation/log/SeekableBufferedReader.scala     |  8 +--
 .../kyuubi/operation/log/OperationLogSuite.scala   | 30 ++++++++++-
 .../engine/spark/SparkBatchProcessBuilder.scala    |  2 +-
 .../kyuubi/operation/BatchJobSubmission.scala      | 14 ++++-
 .../kyuubi/server/api/v1/BatchesResource.scala     | 59 +++++++++++++++++-----
 .../kyuubi/session/KyuubiSessionManager.scala      |  8 +++
 .../server/api/v1/BatchesResourceSuite.scala       | 34 ++++++++++++-
 8 files changed, 178 insertions(+), 26 deletions(-)

diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
index 63cc0ded8..84ee117f3 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
@@ -21,8 +21,9 @@ import java.io.{BufferedReader, IOException}
 import java.nio.ByteBuffer
 import java.nio.charset.StandardCharsets
 import java.nio.file.{Files, Path, Paths}
-import java.util.{ArrayList => JArrayList}
+import java.util.{ArrayList => JArrayList, List => JList}
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
 import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet, 
TStringColumn}
@@ -86,11 +87,15 @@ class OperationLog(path: Path) {
   private lazy val writer = Files.newBufferedWriter(path, 
StandardCharsets.UTF_8)
   private lazy val reader = Files.newBufferedReader(path, 
StandardCharsets.UTF_8)
 
+  private lazy val extraPaths: ListBuffer[Path] = ListBuffer()
   private lazy val extraReaders: ListBuffer[BufferedReader] = ListBuffer()
+  private var lastSeekReadPos = 0
+  private var seekableReader: SeekableBufferedReader = _
 
   def addExtraLog(path: Path): Unit = synchronized {
     try {
       extraReaders += Files.newBufferedReader(path, StandardCharsets.UTF_8)
+      extraPaths += path
     } catch {
       case _: IOException =>
     }
@@ -130,6 +135,13 @@ class OperationLog(path: Path) {
     }
   }
 
+  private def toRowSet(logs: JList[String]): TRowSet = {
+    val tColumn = TColumn.stringVal(new TStringColumn(logs, 
ByteBuffer.allocate(0)))
+    val tRow = new TRowSet(0, new JArrayList[TRow](logs.size()))
+    tRow.addToColumns(tColumn)
+    tRow
+  }
+
   /**
    * Read to log file line by line
    *
@@ -144,10 +156,30 @@ class OperationLog(path: Path) {
       logs.addAll(extraLogs)
     }
 
-    val tColumn = TColumn.stringVal(new TStringColumn(logs, 
ByteBuffer.allocate(0)))
-    val tRow = new TRowSet(0, new JArrayList[TRow](logs.size()))
-    tRow.addToColumns(tColumn)
-    tRow
+    toRowSet(logs)
+  }
+
+  def read(from: Int, size: Int): TRowSet = synchronized {
+    var pos = from
+    if (pos < 0) {
+      // just fetch forward
+      pos = lastSeekReadPos
+    }
+    if (seekableReader == null) {
+      seekableReader = new SeekableBufferedReader(Seq(path) ++ extraPaths)
+    } else {
+      // if from < last pos, we should reload the reader
+      // otherwise, we can reuse the existed reader for better performance
+      if (pos < lastSeekReadPos) {
+        seekableReader.close()
+        seekableReader = new SeekableBufferedReader(Seq(path) ++ extraPaths)
+      }
+    }
+
+    val it = seekableReader.readLine(pos, size)
+    val res = it.toList.asJava
+    lastSeekReadPos = pos + res.size()
+    toRowSet(res)
   }
 
   def close(): Unit = synchronized {
@@ -160,6 +192,13 @@ class OperationLog(path: Path) {
       writer.close()
     }
 
+    if (seekableReader != null) {
+      lastSeekReadPos = 0
+      trySafely {
+        seekableReader.close()
+      }
+    }
+
     trySafely {
       Files.delete(path)
     }
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/SeekableBufferedReader.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/SeekableBufferedReader.scala
index 4b1f76328..7c22b94d6 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/SeekableBufferedReader.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/SeekableBufferedReader.scala
@@ -35,7 +35,6 @@ class SeekableBufferedReader(paths: Seq[Path]) extends 
Closeable {
   }
 
   private var linePos = 0L
-  private var numLines = 0L
   private var readerIndex = 0
   private val numReaders = bufferedReaders.length
   private var currentReader = bufferedReaders.head
@@ -48,6 +47,9 @@ class SeekableBufferedReader(paths: Seq[Path]) extends 
Closeable {
       currentReader = bufferedReaders(readerIndex)
       currentValue = currentReader.readLine()
     }
+    if (currentValue != null) {
+      linePos += 1
+    }
   }
 
   /**
@@ -58,14 +60,14 @@ class SeekableBufferedReader(paths: Seq[Path]) extends 
Closeable {
     if (from < 0) throw new IOException("Negative seek offset")
 
     new Iterator[String] {
+      private var numLines = 0L
       override def hasNext: Boolean = {
         if (numLines >= limit) {
           false
         } else {
           nextLine()
-          while (linePos < from && currentValue != null) {
+          while (linePos <= from && currentValue != null) {
             nextLine()
-            linePos += 1
           }
           numLines += 1
           currentValue != null
diff --git 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
index b3b4b5fe6..7523e005b 100644
--- 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
+++ 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
@@ -18,11 +18,12 @@
 package org.apache.kyuubi.operation.log
 
 import java.io.File
+import java.nio.charset.StandardCharsets
 import java.nio.file.{Files, Paths}
 
 import scala.collection.JavaConverters._
 
-import org.apache.hive.service.rpc.thrift.TProtocolVersion
+import org.apache.hive.service.rpc.thrift.{TProtocolVersion, TRowSet}
 
 import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException, Utils}
 import org.apache.kyuubi.config.KyuubiConf
@@ -211,4 +212,31 @@ class OperationLogSuite extends KyuubiFunSuite {
     operationLog.close()
     tempDir.toFile.delete()
   }
+
+  test("test seek reader") {
+    val file = Utils.createTempDir().resolve("f")
+    val writer = Files.newBufferedWriter(file, StandardCharsets.UTF_8)
+    try {
+      0.until(10).foreach(x => writer.write(s"$x\n"))
+      writer.flush()
+      writer.close()
+
+      def compareResult(rows: TRowSet, expected: Seq[String]): Unit = {
+        val res = rows.getColumns.get(0).getStringVal.getValues.asScala
+        assert(res.size == expected.size)
+        res.zip(expected).foreach { case (l, r) =>
+          assert(l == r)
+        }
+      }
+
+      val log = new OperationLog(file)
+      compareResult(log.read(-1, 1), Seq("0"))
+      compareResult(log.read(-1, 1), Seq("1"))
+      compareResult(log.read(0, 1), Seq("0"))
+      compareResult(log.read(0, 2), Seq("0", "1"))
+      compareResult(log.read(5, 10), Seq("5", "6", "7", "8", "9"))
+    } finally {
+      Utils.deleteDirectoryRecursively(file.toFile)
+    }
+  }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
index 5a5c23392..49313ed3e 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
@@ -28,7 +28,7 @@ class SparkBatchProcessBuilder(
     override val conf: KyuubiConf,
     batchId: String,
     batchRequest: BatchRequest,
-    override val extraEngineLog: Option[OperationLog] = None)
+    override val extraEngineLog: Option[OperationLog])
   extends SparkProcessBuilder(proxyUser, conf, extraEngineLog) {
   import SparkProcessBuilder._
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index daf4c7dba..42ff2e0b8 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.hive.service.rpc.thrift._
 
-import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.{KyuubiException, KyuubiSQLException}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.engine.{ApplicationOperation, KillResponse, 
ProcBuilder}
 import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
@@ -42,7 +42,7 @@ class BatchJobSubmission(session: KyuubiBatchSessionImpl, 
batchRequest: BatchReq
 
   override def shouldRunAsync: Boolean = true
 
-  private lazy val _operationLog = OperationLog.createOperationLog(session, 
getHandle)
+  private val _operationLog = OperationLog.createOperationLog(session, 
getHandle)
 
   private val applicationManager =
     
session.sessionManager.asInstanceOf[KyuubiSessionManager].applicationManager
@@ -133,6 +133,16 @@ class BatchJobSubmission(session: KyuubiBatchSessionImpl, 
batchRequest: BatchReq
     }
   }
 
+  def getOperationLogRowSet(
+      order: FetchOrientation,
+      from: Int,
+      size: Int): TRowSet = {
+    val operationLog = getOperationLog
+    operationLog.map(_.read(from, size)).getOrElse {
+      throw KyuubiSQLException(s"Batch ID: $batchId, failed to generate 
operation log")
+    }
+  }
+
   override val getResultSetSchema: TTableSchema = {
     val schema = new TTableSchema()
     Seq("key", "value").zipWithIndex.foreach { case (colName, position) =>
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 6f5208fb0..35834a977 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -20,6 +20,7 @@ package org.apache.kyuubi.server.api.v1
 import javax.ws.rs._
 import javax.ws.rs.core.{MediaType, Response}
 
+import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
 import io.swagger.v3.oas.annotations.media.{Content, Schema}
@@ -28,11 +29,12 @@ import io.swagger.v3.oas.annotations.tags.Tag
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
 import org.apache.kyuubi.Logging
+import org.apache.kyuubi.operation.FetchOrientation
 import org.apache.kyuubi.server.api.ApiRequestContext
 import org.apache.kyuubi.server.api.v1.BatchesResource.REST_BATCH_PROTOCOL
 import org.apache.kyuubi.server.http.authentication.AuthenticationFilter
 import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
-import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager}
+import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, 
KyuubiSessionManager, SessionHandle}
 
 @Tag(name = "Batch")
 @Produces(Array(MediaType.APPLICATION_JSON))
@@ -40,6 +42,20 @@ private[v1] class BatchesResource extends ApiRequestContext 
with Logging {
 
   private def sessionManager = 
fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
 
+  private def buildBatch(sessionHandle: SessionHandle): Batch = {
+    buildBatch(sessionManager.getBatchSessionImpl(sessionHandle))
+  }
+
+  private def buildBatch(session: KyuubiBatchSessionImpl): Batch = {
+    val batchOp = session.batchJobSubmissionOp
+    Batch(
+      batchOp.batchId,
+      batchOp.batchType,
+      batchOp.currentApplicationState.getOrElse(Map.empty),
+      fe.connectionUrl,
+      batchOp.getStatus.state.toString)
+  }
+
   @ApiResponse(
     responseCode = "200",
     content = Array(new Content(
@@ -58,8 +74,7 @@ private[v1] class BatchesResource extends ApiRequestContext 
with Logging {
       ipAddress,
       Option(request.conf).getOrElse(Map()),
       request)
-    val session = 
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
-    buildBatch(session)
+    buildBatch(sessionHandle)
   }
 
   @ApiResponse(
@@ -73,8 +88,7 @@ private[v1] class BatchesResource extends ApiRequestContext 
with Logging {
   def batchInfo(@PathParam("batchId") batchId: String): Batch = {
     try {
       val sessionHandle = sessionManager.getBatchSessionHandle(batchId, 
REST_BATCH_PROTOCOL)
-      val session = 
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
-      buildBatch(session)
+      buildBatch(sessionHandle)
     } catch {
       case NonFatal(e) =>
         error(s"Invalid batchId: $batchId", e)
@@ -102,14 +116,33 @@ private[v1] class BatchesResource extends 
ApiRequestContext with Logging {
     GetBatchesResponse(from, batches.size, batches)
   }
 
-  private def buildBatch(session: KyuubiBatchSessionImpl): Batch = {
-    val batchOp = session.batchJobSubmissionOp
-    Batch(
-      batchOp.batchId,
-      batchOp.batchType,
-      batchOp.currentApplicationState.getOrElse(Map.empty),
-      fe.connectionUrl,
-      batchOp.getStatus.state.toString)
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.APPLICATION_JSON,
+      schema = new Schema(implementation = classOf[OperationLog]))),
+    description = "get the log lines from this batch")
+  @GET
+  @Path("{batchId}/log")
+  def getBatchLog(
+      @PathParam("batchId") batchId: String,
+      @QueryParam("from") @DefaultValue("-1") from: Int,
+      @QueryParam("size") size: Int): OperationLog = {
+    try {
+      val submissionOpt = sessionManager.getBatchSessionImpl(batchId, 
REST_BATCH_PROTOCOL)
+        .batchJobSubmissionOp
+      val rowSet = submissionOpt.getOperationLogRowSet(
+        FetchOrientation.FETCH_NEXT,
+        from,
+        size)
+      val logRowSet = rowSet.getColumns.get(0).getStringVal.getValues.asScala
+      OperationLog(logRowSet, logRowSet.size)
+    } catch {
+      case NonFatal(e) =>
+        val errorMsg = s"Error getting operation log for batchId: $batchId"
+        error(errorMsg, e)
+        throw new NotFoundException(errorMsg)
+    }
   }
 
   @ApiResponse(
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 1a15831e2..9e215168e 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -155,6 +155,14 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
     SessionHandle(HandleIdentifier(UUID.fromString(batchId), 
STATIC_BATCH_SECRET_UUID), protocol)
   }
 
+  def getBatchSessionImpl(batchId: String, protocol: TProtocolVersion): 
KyuubiBatchSessionImpl = {
+    getSession(getBatchSessionHandle(batchId, 
protocol)).asInstanceOf[KyuubiBatchSessionImpl]
+  }
+
+  def getBatchSessionImpl(sessionHandle: SessionHandle): 
KyuubiBatchSessionImpl = {
+    getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
+  }
+
   def getBatchSessionList(batchType: String, from: Int, size: Int): 
Seq[Session] = {
     val sessions =
       if (batchType == null) {
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index a6d7189da..7f396a2e0 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -23,6 +23,9 @@ import java.util.UUID
 import javax.ws.rs.client.Entity
 import javax.ws.rs.core.MediaType
 
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.DurationInt
+
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
 import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
@@ -35,11 +38,12 @@ import org.apache.kyuubi.session.KyuubiSessionManager
 class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
   test("open batch session") {
     val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", conf)
+    val appName = "spark-batch-submission"
     val requestObj = BatchRequest(
       "spark",
       sparkProcessBuilder.mainResource.get,
       sparkProcessBuilder.mainClass,
-      "spark-batch-submission",
+      appName,
       Map(
         "spark.master" -> "local",
         s"spark.${ENGINE_SPARK_MAX_LIFETIME.key}" -> "5000",
@@ -75,6 +79,34 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper {
       .get()
     assert(404 == getBatchResponse.getStatus)
 
+    // get batch log
+    var logResponse = webTarget.path(s"api/v1/batches/${batch.id}/log")
+      .queryParam("from", "0")
+      .queryParam("size", "1")
+      .request(MediaType.APPLICATION_JSON_TYPE)
+      .get()
+    var log = logResponse.readEntity(classOf[OperationLog])
+    val head = log.logRowSet.head
+    assert(log.rowCount == 1)
+
+    val logs = new ArrayBuffer[String]
+    logs.append(head)
+    eventually(timeout(10.seconds), interval(1.seconds)) {
+      logResponse = webTarget.path(s"api/v1/batches/${batch.id}/log")
+        .queryParam("from", "0")
+        .queryParam("size", "100")
+        .request(MediaType.APPLICATION_JSON_TYPE)
+        .get()
+      log = logResponse.readEntity(classOf[OperationLog])
+      if (log.rowCount > 0) {
+        log.logRowSet.foreach(logs.append(_))
+      }
+
+      // check both kyuubi log and engine log
+      assert(logs.exists(_.contains("/bin/spark-submit")) && logs.exists(
+        _.contains(s"spark.SparkContext: Submitted application: $appName")))
+    }
+
     // invalid user name
     val encodeAuthorization = new 
String(Base64.getEncoder.encode(batch.id.getBytes()), "UTF-8")
     var deleteBatchResponse = webTarget.path(s"api/v1/batches/${batch.id}")

Reply via email to