This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 8c1fc1002 [KYUUBI #2378] Implement BatchesResource GET
/batches/${batchId}/log
8c1fc1002 is described below
commit 8c1fc100252ff2d5e32b3f3773f90696bc03ff67
Author: ulysses-you <[email protected]>
AuthorDate: Mon May 9 13:52:06 2022 +0800
[KYUUBI #2378] Implement BatchesResource GET /batches/${batchId}/log
### _Why are the changes needed?_
Support fetch operation log with rest batch mode.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #2567 from ulysses-you/KYUUBI-2378.
Closes #2378
d4dd5352 [ulysses-you] comment
fd08362c [ulysses-you] desc
37b6cc83 [ulysses-you] seek
78fb6391 [ulysses-you] rebase
d6b838ac [ulysses-you] style
0440ec29 [ulysses-you] fix
b07a81ce [ulysses-you] Implement BatchesResource GET /batches//log
Authored-by: ulysses-you <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
.../apache/kyuubi/operation/log/OperationLog.scala | 49 ++++++++++++++++--
.../operation/log/SeekableBufferedReader.scala | 8 +--
.../kyuubi/operation/log/OperationLogSuite.scala | 30 ++++++++++-
.../engine/spark/SparkBatchProcessBuilder.scala | 2 +-
.../kyuubi/operation/BatchJobSubmission.scala | 14 ++++-
.../kyuubi/server/api/v1/BatchesResource.scala | 59 +++++++++++++++++-----
.../kyuubi/session/KyuubiSessionManager.scala | 8 +++
.../server/api/v1/BatchesResourceSuite.scala | 34 ++++++++++++-
8 files changed, 178 insertions(+), 26 deletions(-)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
index 63cc0ded8..84ee117f3 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
@@ -21,8 +21,9 @@ import java.io.{BufferedReader, IOException}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths}
-import java.util.{ArrayList => JArrayList}
+import java.util.{ArrayList => JArrayList, List => JList}
+import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet,
TStringColumn}
@@ -86,11 +87,15 @@ class OperationLog(path: Path) {
private lazy val writer = Files.newBufferedWriter(path,
StandardCharsets.UTF_8)
private lazy val reader = Files.newBufferedReader(path,
StandardCharsets.UTF_8)
+ private lazy val extraPaths: ListBuffer[Path] = ListBuffer()
private lazy val extraReaders: ListBuffer[BufferedReader] = ListBuffer()
+ private var lastSeekReadPos = 0
+ private var seekableReader: SeekableBufferedReader = _
def addExtraLog(path: Path): Unit = synchronized {
try {
extraReaders += Files.newBufferedReader(path, StandardCharsets.UTF_8)
+ extraPaths += path
} catch {
case _: IOException =>
}
@@ -130,6 +135,13 @@ class OperationLog(path: Path) {
}
}
+ private def toRowSet(logs: JList[String]): TRowSet = {
+ val tColumn = TColumn.stringVal(new TStringColumn(logs,
ByteBuffer.allocate(0)))
+ val tRow = new TRowSet(0, new JArrayList[TRow](logs.size()))
+ tRow.addToColumns(tColumn)
+ tRow
+ }
+
/**
* Read to log file line by line
*
@@ -144,10 +156,30 @@ class OperationLog(path: Path) {
logs.addAll(extraLogs)
}
- val tColumn = TColumn.stringVal(new TStringColumn(logs,
ByteBuffer.allocate(0)))
- val tRow = new TRowSet(0, new JArrayList[TRow](logs.size()))
- tRow.addToColumns(tColumn)
- tRow
+ toRowSet(logs)
+ }
+
+ def read(from: Int, size: Int): TRowSet = synchronized {
+ var pos = from
+ if (pos < 0) {
+ // just fetch forward
+ pos = lastSeekReadPos
+ }
+ if (seekableReader == null) {
+ seekableReader = new SeekableBufferedReader(Seq(path) ++ extraPaths)
+ } else {
+ // if from < last pos, we should reload the reader
+ // otherwise, we can reuse the existed reader for better performance
+ if (pos < lastSeekReadPos) {
+ seekableReader.close()
+ seekableReader = new SeekableBufferedReader(Seq(path) ++ extraPaths)
+ }
+ }
+
+ val it = seekableReader.readLine(pos, size)
+ val res = it.toList.asJava
+ lastSeekReadPos = pos + res.size()
+ toRowSet(res)
}
def close(): Unit = synchronized {
@@ -160,6 +192,13 @@ class OperationLog(path: Path) {
writer.close()
}
+ if (seekableReader != null) {
+ lastSeekReadPos = 0
+ trySafely {
+ seekableReader.close()
+ }
+ }
+
trySafely {
Files.delete(path)
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/SeekableBufferedReader.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/SeekableBufferedReader.scala
index 4b1f76328..7c22b94d6 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/SeekableBufferedReader.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/SeekableBufferedReader.scala
@@ -35,7 +35,6 @@ class SeekableBufferedReader(paths: Seq[Path]) extends
Closeable {
}
private var linePos = 0L
- private var numLines = 0L
private var readerIndex = 0
private val numReaders = bufferedReaders.length
private var currentReader = bufferedReaders.head
@@ -48,6 +47,9 @@ class SeekableBufferedReader(paths: Seq[Path]) extends
Closeable {
currentReader = bufferedReaders(readerIndex)
currentValue = currentReader.readLine()
}
+ if (currentValue != null) {
+ linePos += 1
+ }
}
/**
@@ -58,14 +60,14 @@ class SeekableBufferedReader(paths: Seq[Path]) extends
Closeable {
if (from < 0) throw new IOException("Negative seek offset")
new Iterator[String] {
+ private var numLines = 0L
override def hasNext: Boolean = {
if (numLines >= limit) {
false
} else {
nextLine()
- while (linePos < from && currentValue != null) {
+ while (linePos <= from && currentValue != null) {
nextLine()
- linePos += 1
}
numLines += 1
currentValue != null
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
index b3b4b5fe6..7523e005b 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
@@ -18,11 +18,12 @@
package org.apache.kyuubi.operation.log
import java.io.File
+import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import scala.collection.JavaConverters._
-import org.apache.hive.service.rpc.thrift.TProtocolVersion
+import org.apache.hive.service.rpc.thrift.{TProtocolVersion, TRowSet}
import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
@@ -211,4 +212,31 @@ class OperationLogSuite extends KyuubiFunSuite {
operationLog.close()
tempDir.toFile.delete()
}
+
+ test("test seek reader") {
+ val file = Utils.createTempDir().resolve("f")
+ val writer = Files.newBufferedWriter(file, StandardCharsets.UTF_8)
+ try {
+ 0.until(10).foreach(x => writer.write(s"$x\n"))
+ writer.flush()
+ writer.close()
+
+ def compareResult(rows: TRowSet, expected: Seq[String]): Unit = {
+ val res = rows.getColumns.get(0).getStringVal.getValues.asScala
+ assert(res.size == expected.size)
+ res.zip(expected).foreach { case (l, r) =>
+ assert(l == r)
+ }
+ }
+
+ val log = new OperationLog(file)
+ compareResult(log.read(-1, 1), Seq("0"))
+ compareResult(log.read(-1, 1), Seq("1"))
+ compareResult(log.read(0, 1), Seq("0"))
+ compareResult(log.read(0, 2), Seq("0", "1"))
+ compareResult(log.read(5, 10), Seq("5", "6", "7", "8", "9"))
+ } finally {
+ Utils.deleteDirectoryRecursively(file.toFile)
+ }
+ }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
index 5a5c23392..49313ed3e 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
@@ -28,7 +28,7 @@ class SparkBatchProcessBuilder(
override val conf: KyuubiConf,
batchId: String,
batchRequest: BatchRequest,
- override val extraEngineLog: Option[OperationLog] = None)
+ override val extraEngineLog: Option[OperationLog])
extends SparkProcessBuilder(proxyUser, conf, extraEngineLog) {
import SparkProcessBuilder._
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index daf4c7dba..42ff2e0b8 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift._
-import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.{KyuubiException, KyuubiSQLException}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.{ApplicationOperation, KillResponse,
ProcBuilder}
import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
@@ -42,7 +42,7 @@ class BatchJobSubmission(session: KyuubiBatchSessionImpl,
batchRequest: BatchReq
override def shouldRunAsync: Boolean = true
- private lazy val _operationLog = OperationLog.createOperationLog(session,
getHandle)
+ private val _operationLog = OperationLog.createOperationLog(session,
getHandle)
private val applicationManager =
session.sessionManager.asInstanceOf[KyuubiSessionManager].applicationManager
@@ -133,6 +133,16 @@ class BatchJobSubmission(session: KyuubiBatchSessionImpl,
batchRequest: BatchReq
}
}
+ def getOperationLogRowSet(
+ order: FetchOrientation,
+ from: Int,
+ size: Int): TRowSet = {
+ val operationLog = getOperationLog
+ operationLog.map(_.read(from, size)).getOrElse {
+ throw KyuubiSQLException(s"Batch ID: $batchId, failed to generate
operation log")
+ }
+ }
+
override val getResultSetSchema: TTableSchema = {
val schema = new TTableSchema()
Seq("key", "value").zipWithIndex.foreach { case (colName, position) =>
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 6f5208fb0..35834a977 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -20,6 +20,7 @@ package org.apache.kyuubi.server.api.v1
import javax.ws.rs._
import javax.ws.rs.core.{MediaType, Response}
+import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import io.swagger.v3.oas.annotations.media.{Content, Schema}
@@ -28,11 +29,12 @@ import io.swagger.v3.oas.annotations.tags.Tag
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.Logging
+import org.apache.kyuubi.operation.FetchOrientation
import org.apache.kyuubi.server.api.ApiRequestContext
import org.apache.kyuubi.server.api.v1.BatchesResource.REST_BATCH_PROTOCOL
import org.apache.kyuubi.server.http.authentication.AuthenticationFilter
import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
-import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager}
+import org.apache.kyuubi.session.{KyuubiBatchSessionImpl,
KyuubiSessionManager, SessionHandle}
@Tag(name = "Batch")
@Produces(Array(MediaType.APPLICATION_JSON))
@@ -40,6 +42,20 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
private def sessionManager =
fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
+ private def buildBatch(sessionHandle: SessionHandle): Batch = {
+ buildBatch(sessionManager.getBatchSessionImpl(sessionHandle))
+ }
+
+ private def buildBatch(session: KyuubiBatchSessionImpl): Batch = {
+ val batchOp = session.batchJobSubmissionOp
+ Batch(
+ batchOp.batchId,
+ batchOp.batchType,
+ batchOp.currentApplicationState.getOrElse(Map.empty),
+ fe.connectionUrl,
+ batchOp.getStatus.state.toString)
+ }
+
@ApiResponse(
responseCode = "200",
content = Array(new Content(
@@ -58,8 +74,7 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
ipAddress,
Option(request.conf).getOrElse(Map()),
request)
- val session =
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
- buildBatch(session)
+ buildBatch(sessionHandle)
}
@ApiResponse(
@@ -73,8 +88,7 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
def batchInfo(@PathParam("batchId") batchId: String): Batch = {
try {
val sessionHandle = sessionManager.getBatchSessionHandle(batchId,
REST_BATCH_PROTOCOL)
- val session =
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
- buildBatch(session)
+ buildBatch(sessionHandle)
} catch {
case NonFatal(e) =>
error(s"Invalid batchId: $batchId", e)
@@ -102,14 +116,33 @@ private[v1] class BatchesResource extends
ApiRequestContext with Logging {
GetBatchesResponse(from, batches.size, batches)
}
- private def buildBatch(session: KyuubiBatchSessionImpl): Batch = {
- val batchOp = session.batchJobSubmissionOp
- Batch(
- batchOp.batchId,
- batchOp.batchType,
- batchOp.currentApplicationState.getOrElse(Map.empty),
- fe.connectionUrl,
- batchOp.getStatus.state.toString)
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.APPLICATION_JSON,
+ schema = new Schema(implementation = classOf[OperationLog]))),
+ description = "get the log lines from this batch")
+ @GET
+ @Path("{batchId}/log")
+ def getBatchLog(
+ @PathParam("batchId") batchId: String,
+ @QueryParam("from") @DefaultValue("-1") from: Int,
+ @QueryParam("size") size: Int): OperationLog = {
+ try {
+ val submissionOpt = sessionManager.getBatchSessionImpl(batchId,
REST_BATCH_PROTOCOL)
+ .batchJobSubmissionOp
+ val rowSet = submissionOpt.getOperationLogRowSet(
+ FetchOrientation.FETCH_NEXT,
+ from,
+ size)
+ val logRowSet = rowSet.getColumns.get(0).getStringVal.getValues.asScala
+ OperationLog(logRowSet, logRowSet.size)
+ } catch {
+ case NonFatal(e) =>
+ val errorMsg = s"Error getting operation log for batchId: $batchId"
+ error(errorMsg, e)
+ throw new NotFoundException(errorMsg)
+ }
}
@ApiResponse(
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 1a15831e2..9e215168e 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -155,6 +155,14 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
SessionHandle(HandleIdentifier(UUID.fromString(batchId),
STATIC_BATCH_SECRET_UUID), protocol)
}
+ def getBatchSessionImpl(batchId: String, protocol: TProtocolVersion):
KyuubiBatchSessionImpl = {
+ getSession(getBatchSessionHandle(batchId,
protocol)).asInstanceOf[KyuubiBatchSessionImpl]
+ }
+
+ def getBatchSessionImpl(sessionHandle: SessionHandle):
KyuubiBatchSessionImpl = {
+ getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
+ }
+
def getBatchSessionList(batchType: String, from: Int, size: Int):
Seq[Session] = {
val sessions =
if (batchType == null) {
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index a6d7189da..7f396a2e0 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -23,6 +23,9 @@ import java.util.UUID
import javax.ws.rs.client.Entity
import javax.ws.rs.core.MediaType
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.DurationInt
+
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
@@ -35,11 +38,12 @@ import org.apache.kyuubi.session.KyuubiSessionManager
class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
test("open batch session") {
val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", conf)
+ val appName = "spark-batch-submission"
val requestObj = BatchRequest(
"spark",
sparkProcessBuilder.mainResource.get,
sparkProcessBuilder.mainClass,
- "spark-batch-submission",
+ appName,
Map(
"spark.master" -> "local",
s"spark.${ENGINE_SPARK_MAX_LIFETIME.key}" -> "5000",
@@ -75,6 +79,34 @@ class BatchesResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
.get()
assert(404 == getBatchResponse.getStatus)
+ // get batch log
+ var logResponse = webTarget.path(s"api/v1/batches/${batch.id}/log")
+ .queryParam("from", "0")
+ .queryParam("size", "1")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .get()
+ var log = logResponse.readEntity(classOf[OperationLog])
+ val head = log.logRowSet.head
+ assert(log.rowCount == 1)
+
+ val logs = new ArrayBuffer[String]
+ logs.append(head)
+ eventually(timeout(10.seconds), interval(1.seconds)) {
+ logResponse = webTarget.path(s"api/v1/batches/${batch.id}/log")
+ .queryParam("from", "0")
+ .queryParam("size", "100")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .get()
+ log = logResponse.readEntity(classOf[OperationLog])
+ if (log.rowCount > 0) {
+ log.logRowSet.foreach(logs.append(_))
+ }
+
+ // check both kyuubi log and engine log
+ assert(logs.exists(_.contains("/bin/spark-submit")) && logs.exists(
+ _.contains(s"spark.SparkContext: Submitted application: $appName")))
+ }
+
// invalid user name
val encodeAuthorization = new
String(Base64.getEncoder.encode(batch.id.getBytes()), "UTF-8")
var deleteBatchResponse = webTarget.path(s"api/v1/batches/${batch.id}")