This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new f398dc216 [KYUUBI #4813] Add the fetchorientation parameter to the 
/v1/operations/:operationId/log interface
f398dc216 is described below

commit f398dc2165749ba816d29fe20b2647aef2c12007
Author: huangzhir <[email protected]>
AuthorDate: Tue Jun 6 09:41:24 2023 +0800

    [KYUUBI #4813] Add the fetchorientation parameter to the 
/v1/operations/:operationId/log interface
    
    ### _Why are the changes needed?_
    
    to close https://github.com/apache/kyuubi/issues/4732
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [X] [Run 
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #4813 from huangzhir/operation_log.
    
    Closes #4813
    
    39dcab61d [huangzhir] remove unrelated code
    545efaee6 [huangzhir] remove object lock
    f0d090f9a [huangzhir] throw exception if user requests FETCH_PRIOR
    162e008a1 [huangzhir] remove FETCH_PRIOR test
    3b40f6bac [huangzhir] fix style
    78e49698c [huangzhir] add extra log test
    68154fecb [huangzhir] Merge remote-tracking branch 'origin/master' into 
operation_log
    21c46c06c [huangzhir] code rewritten ,fetch log only support FETCH_NEXT and 
FETCH_FIRST
    cbd714a2b [huangzhir] Add the operationHandle parameter to the 
/v1/operations/:operationId/log interface.
    
    Authored-by: huangzhir <[email protected]>
    Signed-off-by: ulyssesyou <[email protected]>
---
 .../apache/kyuubi/operation/OperationManager.scala |  2 +-
 .../apache/kyuubi/operation/log/OperationLog.scala | 60 ++++++++++++++++++----
 .../kyuubi/operation/log/OperationLogSuite.scala   | 43 +++++++++++++++-
 .../kyuubi/operation/KyuubiOperationManager.scala  |  2 +-
 .../kyuubi/server/api/v1/OperationsResource.scala  | 13 +++--
 .../server/api/v1/OperationsResourceSuite.scala    | 41 +++++++++++++++
 6 files changed, 144 insertions(+), 17 deletions(-)

diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
index df45e6dee..3093bcfbe 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
@@ -146,7 +146,7 @@ abstract class OperationManager(name: String) extends 
AbstractService(name) {
       order: FetchOrientation,
       maxRows: Int): TRowSet = {
     val operationLog = getOperation(opHandle).getOperationLog
-    operationLog.map(_.read(maxRows)).getOrElse {
+    operationLog.map(_.read(order, maxRows)).getOrElse {
       throw KyuubiSQLException(s"$opHandle failed to generate operation log")
     }
   }
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
index c25874b20..a68b7441a 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
@@ -29,6 +29,7 @@ import scala.collection.mutable.ListBuffer
 import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet, 
TStringColumn}
 
 import org.apache.kyuubi.{KyuubiSQLException, Logging}
+import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT, 
FetchOrientation}
 import org.apache.kyuubi.operation.OperationHandle
 import org.apache.kyuubi.session.Session
 import org.apache.kyuubi.util.ThriftUtils
@@ -86,7 +87,7 @@ object OperationLog extends Logging {
 class OperationLog(path: Path) {
 
   private lazy val writer = Files.newBufferedWriter(path, 
StandardCharsets.UTF_8)
-  private lazy val reader = Files.newBufferedReader(path, 
StandardCharsets.UTF_8)
+  private var reader: BufferedReader = _
 
   @volatile private var initialized: Boolean = false
 
@@ -95,6 +96,15 @@ class OperationLog(path: Path) {
   private var lastSeekReadPos = 0
   private var seekableReader: SeekableBufferedReader = _
 
+  def getReader(): BufferedReader = {
+    if (reader == null) {
+      try {
+        reader = Files.newBufferedReader(path, StandardCharsets.UTF_8)
+      } catch handleFileNotFound
+    }
+    reader
+  }
+
   def addExtraLog(path: Path): Unit = synchronized {
     try {
       extraReaders += Files.newBufferedReader(path, StandardCharsets.UTF_8)
@@ -138,13 +148,15 @@ class OperationLog(path: Path) {
           i += 1
         }
       } while ((i < lastRows || maxRows <= 0) && line != null)
-      (logs, i)
-    } catch {
-      case e: IOException =>
-        val absPath = path.toAbsolutePath
-        val opHandle = absPath.getFileName
-        throw KyuubiSQLException(s"Operation[$opHandle] log file $absPath is 
not found", e)
-    }
+    } catch handleFileNotFound
+    (logs, i)
+  }
+
+  private def handleFileNotFound: PartialFunction[Throwable, Unit] = {
+    case e: IOException =>
+      val absPath = path.toAbsolutePath
+      val opHandle = absPath.getFileName
+      throw KyuubiSQLException(s"Operation[$opHandle] log file $absPath is not 
found", e)
   }
 
   private def toRowSet(logs: JList[String]): TRowSet = {
@@ -154,14 +166,25 @@ class OperationLog(path: Path) {
     tRow
   }
 
+  def read(maxRows: Int): TRowSet = synchronized {
+    read(FETCH_NEXT, maxRows)
+  }
+
   /**
    * Read to log file line by line
    *
    * @param maxRows maximum result number can reach
+   * @param order   the  fetch orientation of the result, can be FETCH_NEXT, 
FETCH_FIRST
    */
-  def read(maxRows: Int): TRowSet = synchronized {
+  def read(order: FetchOrientation = FETCH_NEXT, maxRows: Int): TRowSet = 
synchronized {
     if (!initialized) return ThriftUtils.newEmptyRowSet
-    val (logs, lines) = readLogs(reader, maxRows, maxRows)
+    if (order != FETCH_NEXT && order != FETCH_FIRST) {
+      throw KyuubiSQLException(s"$order in operation log is not supported")
+    }
+    if (order == FETCH_FIRST) {
+      resetReader()
+    }
+    val (logs, lines) = readLogs(getReader(), maxRows, maxRows)
     var lastRows = maxRows - lines
     for (extraReader <- extraReaders if lastRows > 0 || maxRows <= 0) {
       val (extraLogs, extraRows) = readLogs(extraReader, lastRows, maxRows)
@@ -172,6 +195,19 @@ class OperationLog(path: Path) {
     toRowSet(logs)
   }
 
+  private def resetReader(): Unit = {
+    trySafely {
+      if (reader != null) {
+        reader.close()
+      }
+    }
+    reader = null
+    closeExtraReaders()
+    extraReaders.clear()
+    extraPaths.foreach(path =>
+      extraReaders += Files.newBufferedReader(path, StandardCharsets.UTF_8))
+  }
+
   def read(from: Int, size: Int): TRowSet = synchronized {
     if (!initialized) return ThriftUtils.newEmptyRowSet
     var pos = from
@@ -202,7 +238,9 @@ class OperationLog(path: Path) {
     closeExtraReaders()
 
     trySafely {
-      reader.close()
+      if (reader != null) {
+        reader.close()
+      }
     }
     trySafely {
       writer.close()
diff --git 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
index edc6b3375..570a8159b 100644
--- 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
+++ 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
@@ -27,7 +27,7 @@ import org.apache.hive.service.rpc.thrift.{TProtocolVersion, 
TRowSet}
 
 import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException, Utils}
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.operation.OperationHandle
+import org.apache.kyuubi.operation.{FetchOrientation, OperationHandle}
 import org.apache.kyuubi.session.NoopSessionManager
 import org.apache.kyuubi.util.ThriftUtils
 
@@ -237,6 +237,47 @@ class OperationLogSuite extends KyuubiFunSuite {
     }
   }
 
+  test("test fetchOrientation read") {
+    val file = Utils.createTempDir().resolve("f")
+    val file2 = Utils.createTempDir().resolve("extra")
+    val writer = Files.newBufferedWriter(file, StandardCharsets.UTF_8)
+    val writer2 = Files.newBufferedWriter(file2, StandardCharsets.UTF_8)
+    try {
+      0.until(10).foreach(x => writer.write(s"$x\n"))
+      writer.flush()
+      writer.close()
+      10.until(20).foreach(x => writer2.write(s"$x\n"))
+      writer2.flush()
+      writer2.close()
+
+      def compareResult(rows: TRowSet, expected: Seq[String]): Unit = {
+        val res = rows.getColumns.get(0).getStringVal.getValues.asScala
+        assert(res.size == expected.size)
+        res.zip(expected).foreach { case (l, r) =>
+          assert(l == r)
+        }
+      }
+
+      val log = new OperationLog(file)
+      log.addExtraLog(file2)
+      // The operation log file is created externally and should be 
initialized actively.
+      log.initOperationLogIfNecessary()
+
+      compareResult(
+        log.read(FetchOrientation.FETCH_NEXT, 10),
+        Seq("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"))
+      compareResult(log.read(FetchOrientation.FETCH_NEXT, 5), Seq("10", "11", 
"12", "13", "14"))
+      compareResult(log.read(FetchOrientation.FETCH_FIRST, 5), Seq("0", "1", 
"2", "3", "4"))
+      compareResult(
+        log.read(FetchOrientation.FETCH_NEXT, 10),
+        Seq("5", "6", "7", "8", "9", "10", "11", "12", "13", "14"))
+      compareResult(log.read(FetchOrientation.FETCH_NEXT, 10), Seq("15", "16", 
"17", "18", "19"))
+    } finally {
+      Utils.deleteDirectoryRecursively(file.toFile)
+      Utils.deleteDirectoryRecursively(file2.toFile)
+    }
+  }
+
   test("[KYUUBI #3511] Reading an uninitialized log should return empty 
rowSet") {
     val sessionManager = new NoopSessionManager
     sessionManager.initialize(KyuubiConf())
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index 6846d0316..4730d1618 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -219,7 +219,7 @@ class KyuubiOperationManager private (name: String) extends 
OperationManager(nam
     val operation = getOperation(opHandle).asInstanceOf[KyuubiOperation]
     val operationLog = operation.getOperationLog
     operationLog match {
-      case Some(log) => log.read(maxRows)
+      case Some(log) => log.read(order, maxRows)
       case None =>
         val remoteHandle = operation.remoteOpHandle()
         val client = operation.client
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
index 70a6d3a28..b55719749 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
@@ -17,7 +17,7 @@
 
 package org.apache.kyuubi.server.api.v1
 
-import javax.ws.rs._
+import javax.ws.rs.{BadRequestException, _}
 import javax.ws.rs.core.{MediaType, Response}
 
 import scala.collection.JavaConverters._
@@ -140,15 +140,22 @@ private[v1] class OperationsResource extends 
ApiRequestContext with Logging {
   @Path("{operationHandle}/log")
   def getOperationLog(
       @PathParam("operationHandle") operationHandleStr: String,
-      @QueryParam("maxrows") maxRows: Int): OperationLog = {
+      @QueryParam("maxrows") @DefaultValue("100") maxRows: Int,
+      @QueryParam("fetchorientation") @DefaultValue("FETCH_NEXT")
+      fetchOrientation: String): OperationLog = {
     try {
+      if (fetchOrientation != "FETCH_NEXT" && fetchOrientation != 
"FETCH_FIRST") {
+        throw new BadRequestException(s"$fetchOrientation in operation log is 
not supported")
+      }
       val rowSet = fe.be.sessionManager.operationManager.getOperationLogRowSet(
         OperationHandle(operationHandleStr),
-        FetchOrientation.FETCH_NEXT,
+        FetchOrientation.withName(fetchOrientation),
         maxRows)
       val logRowSet = rowSet.getColumns.get(0).getStringVal.getValues.asScala
       new OperationLog(logRowSet.asJava, logRowSet.size)
     } catch {
+      case e: BadRequestException =>
+        throw e
       case NonFatal(e) =>
         val errorMsg = s"Error getting operation log for operation handle 
$operationHandleStr"
         error(errorMsg, e)
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala
index 51701b231..72cd4d87d 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala
@@ -102,6 +102,47 @@ class OperationsResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper
     val logRowSet = response.readEntity(classOf[OperationLog])
     assert(logRowSet.getLogRowSet.asScala.exists(_.contains("show tables")))
     assert(logRowSet.getRowCount === 10)
+
+    val response2 = webTarget.path(
+      s"api/v1/operations/$opHandleStr/log")
+      .queryParam("maxrows", "1000")
+      .queryParam("fetchorientation", "FETCH_NEXT")
+      .request(MediaType.APPLICATION_JSON).get()
+    assert(200 == response2.getStatus)
+    val logCount = response2.readEntity(classOf[OperationLog]).getRowCount
+    val totalLogCoung = logCount + 10
+    assert(logCount > 0)
+
+    val response3 = webTarget.path(
+      s"api/v1/operations/$opHandleStr/log")
+      .queryParam("maxrows", "1000")
+      .request(MediaType.APPLICATION_JSON).get()
+    assert(200 == response3.getStatus)
+    assert(response3.readEntity(classOf[OperationLog]).getRowCount == 0)
+
+    val response4 = webTarget.path(
+      s"api/v1/operations/$opHandleStr/log")
+      .queryParam("maxrows", "10")
+      .queryParam("fetchorientation", "FETCH_FIRST")
+      .request(MediaType.APPLICATION_JSON).get()
+    assert(200 == response4.getStatus)
+    assert(response4.readEntity(classOf[OperationLog]).getRowCount == 10)
+
+    val response5 = webTarget.path(
+      s"api/v1/operations/$opHandleStr/log")
+      .queryParam("maxrows", "10")
+      .queryParam("fetchorientation", "FETCH_PRIOR")
+      .request(MediaType.APPLICATION_JSON).get()
+    assert(400 == response5.getStatus)
+    assert(response5.getStatusInfo.getReasonPhrase == "Bad Request")
+
+    val response6 = webTarget.path(
+      s"api/v1/operations/$opHandleStr/log")
+      .queryParam("maxrows", "1000")
+      .queryParam("fetchorientation", "FETCH_NEXT")
+      .request(MediaType.APPLICATION_JSON).get()
+    assert(200 == response6.getStatus)
+    assert(response6.readEntity(classOf[OperationLog]).getRowCount == 
totalLogCoung - 10)
   }
 
   test("test get result row set") {

Reply via email to