This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new f398dc216 [KYUUBI #4813] Add the fetchorientation parameter to the
/v1/operations/:operationId/log interface
f398dc216 is described below
commit f398dc2165749ba816d29fe20b2647aef2c12007
Author: huangzhir <[email protected]>
AuthorDate: Tue Jun 6 09:41:24 2023 +0800
[KYUUBI #4813] Add the fetchorientation parameter to the
/v1/operations/:operationId/log interface
### _Why are the changes needed?_
to close https://github.com/apache/kyuubi/issues/4732
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [X] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4813 from huangzhir/operation_log.
Closes #4813
39dcab61d [huangzhir] remove unrelated code
545efaee6 [huangzhir] remove object lock
f0d090f9a [huangzhir] throw exception if user requests FETCH_PRIOR
162e008a1 [huangzhir] remove FETCH_PRIOR test
3b40f6bac [huangzhir] fix style
78e49698c [huangzhir] add extra log test
68154fecb [huangzhir] Merge remote-tracking branch 'origin/master' into
operation_log
21c46c06c [huangzhir] code rewritten ,fetch log only support FETCH_NEXT and
FETCH_FIRST
cbd714a2b [huangzhir] Add the operationHandle parameter to the
/v1/operations/:operationId/log interface.
Authored-by: huangzhir <[email protected]>
Signed-off-by: ulyssesyou <[email protected]>
---
.../apache/kyuubi/operation/OperationManager.scala | 2 +-
.../apache/kyuubi/operation/log/OperationLog.scala | 60 ++++++++++++++++++----
.../kyuubi/operation/log/OperationLogSuite.scala | 43 +++++++++++++++-
.../kyuubi/operation/KyuubiOperationManager.scala | 2 +-
.../kyuubi/server/api/v1/OperationsResource.scala | 13 +++--
.../server/api/v1/OperationsResourceSuite.scala | 41 +++++++++++++++
6 files changed, 144 insertions(+), 17 deletions(-)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
index df45e6dee..3093bcfbe 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
@@ -146,7 +146,7 @@ abstract class OperationManager(name: String) extends
AbstractService(name) {
order: FetchOrientation,
maxRows: Int): TRowSet = {
val operationLog = getOperation(opHandle).getOperationLog
- operationLog.map(_.read(maxRows)).getOrElse {
+ operationLog.map(_.read(order, maxRows)).getOrElse {
throw KyuubiSQLException(s"$opHandle failed to generate operation log")
}
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
index c25874b20..a68b7441a 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
@@ -29,6 +29,7 @@ import scala.collection.mutable.ListBuffer
import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet,
TStringColumn}
import org.apache.kyuubi.{KyuubiSQLException, Logging}
+import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT,
FetchOrientation}
import org.apache.kyuubi.operation.OperationHandle
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.util.ThriftUtils
@@ -86,7 +87,7 @@ object OperationLog extends Logging {
class OperationLog(path: Path) {
private lazy val writer = Files.newBufferedWriter(path,
StandardCharsets.UTF_8)
- private lazy val reader = Files.newBufferedReader(path,
StandardCharsets.UTF_8)
+ private var reader: BufferedReader = _
@volatile private var initialized: Boolean = false
@@ -95,6 +96,15 @@ class OperationLog(path: Path) {
private var lastSeekReadPos = 0
private var seekableReader: SeekableBufferedReader = _
+ def getReader(): BufferedReader = {
+ if (reader == null) {
+ try {
+ reader = Files.newBufferedReader(path, StandardCharsets.UTF_8)
+ } catch handleFileNotFound
+ }
+ reader
+ }
+
def addExtraLog(path: Path): Unit = synchronized {
try {
extraReaders += Files.newBufferedReader(path, StandardCharsets.UTF_8)
@@ -138,13 +148,15 @@ class OperationLog(path: Path) {
i += 1
}
} while ((i < lastRows || maxRows <= 0) && line != null)
- (logs, i)
- } catch {
- case e: IOException =>
- val absPath = path.toAbsolutePath
- val opHandle = absPath.getFileName
- throw KyuubiSQLException(s"Operation[$opHandle] log file $absPath is
not found", e)
- }
+ } catch handleFileNotFound
+ (logs, i)
+ }
+
+ private def handleFileNotFound: PartialFunction[Throwable, Unit] = {
+ case e: IOException =>
+ val absPath = path.toAbsolutePath
+ val opHandle = absPath.getFileName
+ throw KyuubiSQLException(s"Operation[$opHandle] log file $absPath is not
found", e)
}
private def toRowSet(logs: JList[String]): TRowSet = {
@@ -154,14 +166,25 @@ class OperationLog(path: Path) {
tRow
}
+ def read(maxRows: Int): TRowSet = synchronized {
+ read(FETCH_NEXT, maxRows)
+ }
+
/**
* Read to log file line by line
*
* @param maxRows maximum result number can reach
+ * @param order the fetch orientation of the result, can be FETCH_NEXT,
FETCH_FIRST
*/
- def read(maxRows: Int): TRowSet = synchronized {
+ def read(order: FetchOrientation = FETCH_NEXT, maxRows: Int): TRowSet =
synchronized {
if (!initialized) return ThriftUtils.newEmptyRowSet
- val (logs, lines) = readLogs(reader, maxRows, maxRows)
+ if (order != FETCH_NEXT && order != FETCH_FIRST) {
+ throw KyuubiSQLException(s"$order in operation log is not supported")
+ }
+ if (order == FETCH_FIRST) {
+ resetReader()
+ }
+ val (logs, lines) = readLogs(getReader(), maxRows, maxRows)
var lastRows = maxRows - lines
for (extraReader <- extraReaders if lastRows > 0 || maxRows <= 0) {
val (extraLogs, extraRows) = readLogs(extraReader, lastRows, maxRows)
@@ -172,6 +195,19 @@ class OperationLog(path: Path) {
toRowSet(logs)
}
+ private def resetReader(): Unit = {
+ trySafely {
+ if (reader != null) {
+ reader.close()
+ }
+ }
+ reader = null
+ closeExtraReaders()
+ extraReaders.clear()
+ extraPaths.foreach(path =>
+ extraReaders += Files.newBufferedReader(path, StandardCharsets.UTF_8))
+ }
+
def read(from: Int, size: Int): TRowSet = synchronized {
if (!initialized) return ThriftUtils.newEmptyRowSet
var pos = from
@@ -202,7 +238,9 @@ class OperationLog(path: Path) {
closeExtraReaders()
trySafely {
- reader.close()
+ if (reader != null) {
+ reader.close()
+ }
}
trySafely {
writer.close()
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
index edc6b3375..570a8159b 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
@@ -27,7 +27,7 @@ import org.apache.hive.service.rpc.thrift.{TProtocolVersion,
TRowSet}
import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.operation.OperationHandle
+import org.apache.kyuubi.operation.{FetchOrientation, OperationHandle}
import org.apache.kyuubi.session.NoopSessionManager
import org.apache.kyuubi.util.ThriftUtils
@@ -237,6 +237,47 @@ class OperationLogSuite extends KyuubiFunSuite {
}
}
+ test("test fetchOrientation read") {
+ val file = Utils.createTempDir().resolve("f")
+ val file2 = Utils.createTempDir().resolve("extra")
+ val writer = Files.newBufferedWriter(file, StandardCharsets.UTF_8)
+ val writer2 = Files.newBufferedWriter(file2, StandardCharsets.UTF_8)
+ try {
+ 0.until(10).foreach(x => writer.write(s"$x\n"))
+ writer.flush()
+ writer.close()
+ 10.until(20).foreach(x => writer2.write(s"$x\n"))
+ writer2.flush()
+ writer2.close()
+
+ def compareResult(rows: TRowSet, expected: Seq[String]): Unit = {
+ val res = rows.getColumns.get(0).getStringVal.getValues.asScala
+ assert(res.size == expected.size)
+ res.zip(expected).foreach { case (l, r) =>
+ assert(l == r)
+ }
+ }
+
+ val log = new OperationLog(file)
+ log.addExtraLog(file2)
+ // The operation log file is created externally and should be
initialized actively.
+ log.initOperationLogIfNecessary()
+
+ compareResult(
+ log.read(FetchOrientation.FETCH_NEXT, 10),
+ Seq("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"))
+ compareResult(log.read(FetchOrientation.FETCH_NEXT, 5), Seq("10", "11",
"12", "13", "14"))
+ compareResult(log.read(FetchOrientation.FETCH_FIRST, 5), Seq("0", "1",
"2", "3", "4"))
+ compareResult(
+ log.read(FetchOrientation.FETCH_NEXT, 10),
+ Seq("5", "6", "7", "8", "9", "10", "11", "12", "13", "14"))
+ compareResult(log.read(FetchOrientation.FETCH_NEXT, 10), Seq("15", "16",
"17", "18", "19"))
+ } finally {
+ Utils.deleteDirectoryRecursively(file.toFile)
+ Utils.deleteDirectoryRecursively(file2.toFile)
+ }
+ }
+
test("[KYUUBI #3511] Reading an uninitialized log should return empty
rowSet") {
val sessionManager = new NoopSessionManager
sessionManager.initialize(KyuubiConf())
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index 6846d0316..4730d1618 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -219,7 +219,7 @@ class KyuubiOperationManager private (name: String) extends
OperationManager(nam
val operation = getOperation(opHandle).asInstanceOf[KyuubiOperation]
val operationLog = operation.getOperationLog
operationLog match {
- case Some(log) => log.read(maxRows)
+ case Some(log) => log.read(order, maxRows)
case None =>
val remoteHandle = operation.remoteOpHandle()
val client = operation.client
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
index 70a6d3a28..b55719749 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.server.api.v1
-import javax.ws.rs._
+import javax.ws.rs.{BadRequestException, _}
import javax.ws.rs.core.{MediaType, Response}
import scala.collection.JavaConverters._
@@ -140,15 +140,22 @@ private[v1] class OperationsResource extends
ApiRequestContext with Logging {
@Path("{operationHandle}/log")
def getOperationLog(
@PathParam("operationHandle") operationHandleStr: String,
- @QueryParam("maxrows") maxRows: Int): OperationLog = {
+ @QueryParam("maxrows") @DefaultValue("100") maxRows: Int,
+ @QueryParam("fetchorientation") @DefaultValue("FETCH_NEXT")
+ fetchOrientation: String): OperationLog = {
try {
+ if (fetchOrientation != "FETCH_NEXT" && fetchOrientation !=
"FETCH_FIRST") {
+ throw new BadRequestException(s"$fetchOrientation in operation log is
not supported")
+ }
val rowSet = fe.be.sessionManager.operationManager.getOperationLogRowSet(
OperationHandle(operationHandleStr),
- FetchOrientation.FETCH_NEXT,
+ FetchOrientation.withName(fetchOrientation),
maxRows)
val logRowSet = rowSet.getColumns.get(0).getStringVal.getValues.asScala
new OperationLog(logRowSet.asJava, logRowSet.size)
} catch {
+ case e: BadRequestException =>
+ throw e
case NonFatal(e) =>
val errorMsg = s"Error getting operation log for operation handle
$operationHandleStr"
error(errorMsg, e)
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala
index 51701b231..72cd4d87d 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala
@@ -102,6 +102,47 @@ class OperationsResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper
val logRowSet = response.readEntity(classOf[OperationLog])
assert(logRowSet.getLogRowSet.asScala.exists(_.contains("show tables")))
assert(logRowSet.getRowCount === 10)
+
+ val response2 = webTarget.path(
+ s"api/v1/operations/$opHandleStr/log")
+ .queryParam("maxrows", "1000")
+ .queryParam("fetchorientation", "FETCH_NEXT")
+ .request(MediaType.APPLICATION_JSON).get()
+ assert(200 == response2.getStatus)
+ val logCount = response2.readEntity(classOf[OperationLog]).getRowCount
+ val totalLogCoung = logCount + 10
+ assert(logCount > 0)
+
+ val response3 = webTarget.path(
+ s"api/v1/operations/$opHandleStr/log")
+ .queryParam("maxrows", "1000")
+ .request(MediaType.APPLICATION_JSON).get()
+ assert(200 == response3.getStatus)
+ assert(response3.readEntity(classOf[OperationLog]).getRowCount == 0)
+
+ val response4 = webTarget.path(
+ s"api/v1/operations/$opHandleStr/log")
+ .queryParam("maxrows", "10")
+ .queryParam("fetchorientation", "FETCH_FIRST")
+ .request(MediaType.APPLICATION_JSON).get()
+ assert(200 == response4.getStatus)
+ assert(response4.readEntity(classOf[OperationLog]).getRowCount == 10)
+
+ val response5 = webTarget.path(
+ s"api/v1/operations/$opHandleStr/log")
+ .queryParam("maxrows", "10")
+ .queryParam("fetchorientation", "FETCH_PRIOR")
+ .request(MediaType.APPLICATION_JSON).get()
+ assert(400 == response5.getStatus)
+ assert(response5.getStatusInfo.getReasonPhrase == "Bad Request")
+
+ val response6 = webTarget.path(
+ s"api/v1/operations/$opHandleStr/log")
+ .queryParam("maxrows", "1000")
+ .queryParam("fetchorientation", "FETCH_NEXT")
+ .request(MediaType.APPLICATION_JSON).get()
+ assert(200 == response6.getStatus)
+ assert(response6.readEntity(classOf[OperationLog]).getRowCount ==
totalLogCoung - 10)
}
test("test get result row set") {