This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 49d224e00 [KYUUBI #6335] [REST] Support uploading extra resources in
creating batch jobs via REST API
49d224e00 is described below
commit 49d224e0026511dab1250d13089f8bb6ec738abd
Author: Bowen Liang <[email protected]>
AuthorDate: Wed Aug 7 14:24:02 2024 +0800
[KYUUBI #6335] [REST] Support uploading extra resources in creating batch
jobs via REST API
# :mag: Description
## Issue References ๐
## Describe Your Solution ๐ง
- support creating batch jobs with uploading extra resource files
- allow uploading extra resource when creating batch jobs via REST API
- support binding the subresources to configs by customed configs,
eg.`spark.submit.pyFiles`.
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
+ new test
---
# Checklist ๐
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6335 from bowenliang123/batch-subresource.
Closes #6335
57d43d26d [Bowen Liang] nit
d866a8a17 [Bowen Liang] warn exception
20d4328a1 [Bowen Liang] log exception when exception ignored
58c402334 [Bowen Liang] rename param to ignoreException
80bc21034 [Bowen Liang] cleanup the uploaded resource folder when handling
files error
3e7961124 [Bowen Liang] throw exception when file non-existed
09ac48a26 [liangbowen] pyspark extra resources
Lead-authored-by: Bowen Liang <[email protected]>
Co-authored-by: liangbowen <[email protected]>
Signed-off-by: Bowen Liang <[email protected]>
---
.../src/main/scala/org/apache/kyuubi/Utils.scala | 21 +++-
.../org/apache/kyuubi/client/BatchRestApi.java | 24 +++-
.../kyuubi/client/api/v1/dto/BatchRequest.java | 20 +++-
.../kyuubi/operation/BatchJobSubmission.scala | 7 +-
.../kyuubi/server/api/v1/BatchesResource.scala | 127 ++++++++++++++++++---
.../apache/kyuubi/session/KyuubiBatchSession.scala | 10 +-
kyuubi-server/src/test/resources/python/app.py | 20 ++++
.../src/test/resources/python/module1/__init__.py | 0
.../src/test/resources/python/module1/module.py | 5 +
.../src/test/resources/python/module2/__init__.py | 0
.../src/test/resources/python/module2/module.py | 6 +
.../server/rest/client/BatchRestApiSuite.scala | 88 +++++++++++++-
12 files changed, 296 insertions(+), 32 deletions(-)
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
index 5944e9f97..326b1601f 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
@@ -137,12 +137,23 @@ object Utils extends Logging {
/**
* Delete a directory recursively.
*/
- def deleteDirectoryRecursively(f: File): Boolean = {
- if (f.isDirectory) f.listFiles match {
- case files: Array[File] => files.foreach(deleteDirectoryRecursively)
- case _ =>
+ def deleteDirectoryRecursively(f: File, ignoreException: Boolean = true):
Unit = {
+ if (f.isDirectory) {
+ val files = f.listFiles
+ if (files != null && files.nonEmpty) {
+ files.foreach(deleteDirectoryRecursively(_, ignoreException))
+ }
+ }
+ try {
+ f.delete()
+ } catch {
+ case e: Exception =>
+ if (ignoreException) {
+ warn(s"Ignoring the exception in deleting file, path: ${f.toPath}",
e)
+ } else {
+ throw e
+ }
}
- f.delete()
}
/**
diff --git
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java
index e6f9577b3..681170b87 100644
---
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java
+++
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java
@@ -18,8 +18,9 @@
package org.apache.kyuubi.client;
import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
+import java.nio.file.Paths;
+import java.util.*;
+import org.apache.commons.lang3.StringUtils;
import org.apache.kyuubi.client.api.v1.dto.*;
import org.apache.kyuubi.client.util.JsonUtils;
import org.apache.kyuubi.client.util.VersionUtils;
@@ -46,10 +47,29 @@ public class BatchRestApi {
}
public Batch createBatch(BatchRequest request, File resourceFile) {
+ return createBatch(request, resourceFile, Collections.emptyList());
+ }
+
+ public Batch createBatch(BatchRequest request, File resourceFile,
List<String> extraResources) {
setClientVersion(request);
Map<String, MultiPart> multiPartMap = new HashMap<>();
multiPartMap.put("batchRequest", new
MultiPart(MultiPart.MultiPartType.JSON, request));
multiPartMap.put("resourceFile", new
MultiPart(MultiPart.MultiPartType.FILE, resourceFile));
+ extraResources.stream()
+ .distinct()
+ .filter(StringUtils::isNotBlank)
+ .map(
+ path -> {
+ File file = Paths.get(path).toFile();
+ if (!file.exists()) {
+ throw new RuntimeException("File not existed, path: " + path);
+ }
+ return file;
+ })
+ .forEach(
+ file ->
+ multiPartMap.put(
+ file.getName(), new
MultiPart(MultiPart.MultiPartType.FILE, file)));
return this.getClient().post(API_BASE_PATH, multiPartMap, Batch.class,
client.getAuthHeader());
}
diff --git
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/BatchRequest.java
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/BatchRequest.java
index f45821fc2..ac9850498 100644
---
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/BatchRequest.java
+++
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/BatchRequest.java
@@ -31,6 +31,7 @@ public class BatchRequest {
private String name;
private Map<String, String> conf = Collections.emptyMap();
private List<String> args = Collections.emptyList();
+ private Map<String, String> extraResourcesMap = Collections.emptyMap();
public BatchRequest() {}
@@ -110,6 +111,14 @@ public class BatchRequest {
this.args = args;
}
+ public Map<String, String> getExtraResourcesMap() {
+ return extraResourcesMap;
+ }
+
+ public void setExtraResourcesMap(Map<String, String> extraResourcesMap) {
+ this.extraResourcesMap = extraResourcesMap;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -120,13 +129,20 @@ public class BatchRequest {
&& Objects.equals(getClassName(), that.getClassName())
&& Objects.equals(getName(), that.getName())
&& Objects.equals(getConf(), that.getConf())
- && Objects.equals(getArgs(), that.getArgs());
+ && Objects.equals(getArgs(), that.getArgs())
+ && Objects.equals(getExtraResourcesMap(), that.getExtraResourcesMap());
}
@Override
public int hashCode() {
return Objects.hash(
- getBatchType(), getResource(), getClassName(), getName(), getConf(),
getArgs());
+ getBatchType(),
+ getResource(),
+ getClassName(),
+ getName(),
+ getConf(),
+ getArgs(),
+ getExtraResourcesMap());
}
@Override
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 2c90058db..8b2cfef85 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -17,7 +17,6 @@
package org.apache.kyuubi.operation
-import java.nio.file.{Files, Paths}
import java.util.Locale
import java.util.concurrent.TimeUnit
@@ -395,11 +394,7 @@ class BatchJobSubmission(
private def cleanupUploadedResourceIfNeeded(): Unit = {
if (session.isResourceUploaded) {
- try {
- Files.deleteIfExists(Paths.get(resource))
- } catch {
- case e: Throwable => error(s"Error deleting the uploaded resource:
$resource", e)
- }
+ Utils.deleteDirectoryRecursively(session.resourceUploadFolderPath.toFile)
}
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index aed806714..182b28b0c 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -18,6 +18,7 @@
package org.apache.kyuubi.server.api.v1
import java.io.InputStream
+import java.nio.file.{Path => JPath}
import java.util
import java.util.{Collections, Locale, UUID}
import java.util.concurrent.ConcurrentHashMap
@@ -32,7 +33,7 @@ import io.swagger.v3.oas.annotations.media.{Content, Schema}
import io.swagger.v3.oas.annotations.responses.ApiResponse
import io.swagger.v3.oas.annotations.tags.Tag
import org.apache.commons.lang3.StringUtils
-import org.glassfish.jersey.media.multipart.{FormDataContentDisposition,
FormDataParam}
+import org.glassfish.jersey.media.multipart.{FormDataContentDisposition,
FormDataMultiPart, FormDataParam}
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.client.api.v1.dto._
@@ -190,7 +191,8 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
def openBatchSessionWithUpload(
@FormDataParam("batchRequest") batchRequest: BatchRequest,
@FormDataParam("resourceFile") resourceFileInputStream: InputStream,
- @FormDataParam("resourceFile") resourceFileMetadata:
FormDataContentDisposition): Batch = {
+ @FormDataParam("resourceFile") resourceFileMetadata:
FormDataContentDisposition,
+ formDataMultiPart: FormDataMultiPart): Batch = {
require(
fe.getConf.get(BATCH_RESOURCE_UPLOAD_ENABLED),
"Batch resource upload function is disabled.")
@@ -198,12 +200,12 @@ private[v1] class BatchesResource extends
ApiRequestContext with Logging {
batchRequest != null,
"batchRequest is required and please check the content type" +
" of batchRequest is application/json")
- val tempFile = Utils.writeToTempFile(
- resourceFileInputStream,
- KyuubiApplicationManager.uploadWorkDir,
- resourceFileMetadata.getFileName)
- batchRequest.setResource(tempFile.getPath)
- openBatchSessionInternal(batchRequest, isResourceFromUpload = true)
+ openBatchSessionInternal(
+ batchRequest,
+ isResourceFromUpload = true,
+ resourceFileInputStream = Some(resourceFileInputStream),
+ resourceFileMetadata = Some(resourceFileMetadata),
+ formDataMultiPartOpt = Some(formDataMultiPart))
}
/**
@@ -215,7 +217,10 @@ private[v1] class BatchesResource extends
ApiRequestContext with Logging {
*/
private def openBatchSessionInternal(
request: BatchRequest,
- isResourceFromUpload: Boolean = false): Batch = {
+ isResourceFromUpload: Boolean = false,
+ resourceFileInputStream: Option[InputStream] = None,
+ resourceFileMetadata: Option[FormDataContentDisposition] = None,
+ formDataMultiPartOpt: Option[FormDataMultiPart] = None): Batch = {
require(
supportedBatchType(request.getBatchType),
s"${request.getBatchType} is not in the supported list:
$SUPPORTED_BATCH_TYPES}")
@@ -243,6 +248,14 @@ private[v1] class BatchesResource extends
ApiRequestContext with Logging {
markDuplicated(batch)
case None =>
val batchId = userProvidedBatchId.getOrElse(UUID.randomUUID().toString)
+ if (isResourceFromUpload) {
+ handleUploadingFiles(
+ batchId,
+ request,
+ resourceFileInputStream.get,
+ resourceFileMetadata.get.getFileName,
+ formDataMultiPartOpt)
+ }
request.setConf(
(request.getConf.asScala ++ Map(
KYUUBI_BATCH_ID_KEY -> batchId,
@@ -525,22 +538,110 @@ private[v1] class BatchesResource extends
ApiRequestContext with Logging {
}
}
}
+
+ private def handleUploadingFiles(
+ batchId: String,
+ request: BatchRequest,
+ resourceFileInputStream: InputStream,
+ resourceFileName: String,
+ formDataMultiPartOpt: Option[FormDataMultiPart]): Option[JPath] = {
+ val uploadFileFolderPath = batchResourceUploadFolderPath(batchId)
+ try {
+ handleUploadingResourceFile(
+ request,
+ resourceFileInputStream,
+ resourceFileName,
+ uploadFileFolderPath)
+ handleUploadingExtraResourcesFiles(request, formDataMultiPartOpt,
uploadFileFolderPath)
+ Some(uploadFileFolderPath)
+ } catch {
+ case e: Exception =>
+ Utils.deleteDirectoryRecursively(uploadFileFolderPath.toFile)
+ throw e
+ }
+ }
+
+ private def handleUploadingResourceFile(
+ request: BatchRequest,
+ inputStream: InputStream,
+ fileName: String,
+ uploadFileFolderPath: JPath): Unit = {
+ try {
+ val tempFile = Utils.writeToTempFile(inputStream, uploadFileFolderPath,
fileName)
+ request.setResource(tempFile.getPath)
+ } catch {
+ case e: Exception =>
+ throw new RuntimeException(
+ s"Failed handling uploaded resource file $fileName: ${e.getMessage}",
+ e)
+ }
+ }
+
+ private def handleUploadingExtraResourcesFiles(
+ request: BatchRequest,
+ formDataMultiPartOpt: Option[FormDataMultiPart],
+ uploadFileFolderPath: JPath): Unit = {
+ val extraResourceMap = request.getExtraResourcesMap.asScala
+ if (extraResourceMap.nonEmpty) {
+ val fileNameSeparator = ","
+ val formDataMultiPart = formDataMultiPartOpt.get
+ val transformedExtraResourcesMap = extraResourceMap
+ .mapValues(confValue =>
+ confValue.split(fileNameSeparator).filter(StringUtils.isNotBlank(_)))
+ .filter { case (confKey, fileNames) =>
+ fileNames.nonEmpty && StringUtils.isNotBlank(confKey)
+ }.mapValues { fileNames =>
+ fileNames.map(fileName =>
+ Option(formDataMultiPart.getField(fileName))
+ .getOrElse(throw new RuntimeException(s"File part for file
$fileName not found")))
+ }.map {
+ case (confKey, fileParts) =>
+ val tempFilePaths = fileParts.map { filePart =>
+ val fileName = filePart.getContentDisposition.getFileName
+ try {
+ Utils.writeToTempFile(
+ filePart.getValueAs(classOf[InputStream]),
+ uploadFileFolderPath,
+ fileName).getPath
+ } catch {
+ case e: Exception =>
+ throw new RuntimeException(
+ s"Failed handling uploaded extra resource file $fileName:
${e.getMessage}",
+ e)
+ }
+ }
+ (confKey, tempFilePaths.mkString(fileNameSeparator))
+ }
+
+ val conf = request.getConf
+ transformedExtraResourcesMap.foreach { case (confKey, tempFilePathStr) =>
+ conf.get(confKey) match {
+ case confValue: String if StringUtils.isNotBlank(confValue) =>
+ conf.put(confKey, List(confValue.trim,
tempFilePathStr).mkString(fileNameSeparator))
+ case _ => conf.put(confKey, tempFilePathStr)
+ }
+ }
+ }
+ }
}
object BatchesResource {
- val SUPPORTED_BATCH_TYPES = Seq("SPARK", "PYSPARK")
- val VALID_BATCH_STATES = Seq(
+ private lazy val SUPPORTED_BATCH_TYPES = Set("SPARK", "PYSPARK")
+ private lazy val VALID_BATCH_STATES = Set(
OperationState.PENDING,
OperationState.RUNNING,
OperationState.FINISHED,
OperationState.ERROR,
OperationState.CANCELED).map(_.toString)
- def supportedBatchType(batchType: String): Boolean = {
+ private def supportedBatchType(batchType: String): Boolean = {
Option(batchType).exists(bt =>
SUPPORTED_BATCH_TYPES.contains(bt.toUpperCase(Locale.ROOT)))
}
- def validBatchState(batchState: String): Boolean = {
+ private def validBatchState(batchState: String): Boolean = {
Option(batchState).exists(bt =>
VALID_BATCH_STATES.contains(bt.toUpperCase(Locale.ROOT)))
}
+
+ def batchResourceUploadFolderPath(batchId: String): JPath =
+ KyuubiApplicationManager.uploadWorkDir.resolve(s"batch-$batchId")
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
index f648c39cb..149c7ab01 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
@@ -17,6 +17,8 @@
package org.apache.kyuubi.session
+import java.nio.file.Path
+
import scala.collection.JavaConverters._
import org.apache.kyuubi.client.util.BatchUtils._
@@ -26,6 +28,7 @@ import org.apache.kyuubi.engine.KyuubiApplicationManager
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
import org.apache.kyuubi.operation.OperationState
+import org.apache.kyuubi.server.api.v1.BatchesResource
import org.apache.kyuubi.server.metadata.api.Metadata
import org.apache.kyuubi.session.SessionType.SessionType
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion
@@ -79,6 +82,9 @@ class KyuubiBatchSession(
override val normalizedConf: Map[String, String] =
sessionConf.getBatchConf(batchType) ++
sessionManager.validateBatchConf(conf)
+ private[kyuubi] def resourceUploadFolderPath: Path =
+ BatchesResource.batchResourceUploadFolderPath(batchJobSubmissionOp.batchId)
+
val optimizedConf: Map[String, String] = {
val confOverlay = sessionManager.sessionConfAdvisor.map(_.getConfOverlay(
user,
@@ -101,8 +107,8 @@ class KyuubiBatchSession(
batchName.filterNot(_.trim.isEmpty).orElse(optimizedConf.get(KyuubiConf.SESSION_NAME.key))
// whether the resource file is from uploading
- private[kyuubi] val isResourceUploaded: Boolean =
- conf.getOrElse(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY,
"false").toBoolean
+ private[kyuubi] lazy val isResourceUploaded: Boolean =
+ conf.getOrElse(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY,
false.toString).toBoolean
private[kyuubi] lazy val batchJobSubmissionOp =
sessionManager.operationManager
.newBatchJobSubmissionOperation(
diff --git a/kyuubi-server/src/test/resources/python/app.py
b/kyuubi-server/src/test/resources/python/app.py
new file mode 100644
index 000000000..482a82196
--- /dev/null
+++ b/kyuubi-server/src/test/resources/python/app.py
@@ -0,0 +1,20 @@
+from module1.module import func1
+
+from pyspark.sql import SparkSession
+from pyspark.sql.types import StructType, StructField, IntegerType
+
+if __name__ == "__main__":
+ print(f"Started running PySpark app at {func1()}")
+
+ spark = SparkSession.builder.appName("pyspark-sample").getOrCreate()
+ sc = spark.sparkContext
+
+ data = [1, 2, 3, 4, 5]
+ rdd = sc.parallelize(data)
+ transformed_rdd = rdd.map(lambda x: x * 2)
+ collected = transformed_rdd.collect()
+
+ df = spark.createDataFrame(transformed_rdd, IntegerType())
+ df.coalesce(1).write.format("csv").option("header", "false").save("/tmp/"
+ func1())
+
+ print(f"Result: {collected}")
diff --git a/kyuubi-server/src/test/resources/python/module1/__init__.py
b/kyuubi-server/src/test/resources/python/module1/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/kyuubi-server/src/test/resources/python/module1/module.py
b/kyuubi-server/src/test/resources/python/module1/module.py
new file mode 100644
index 000000000..aac092925
--- /dev/null
+++ b/kyuubi-server/src/test/resources/python/module1/module.py
@@ -0,0 +1,5 @@
+from module2.module import current_time
+
+
+def func1():
+ return "result_" + current_time()
diff --git a/kyuubi-server/src/test/resources/python/module2/__init__.py
b/kyuubi-server/src/test/resources/python/module2/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/kyuubi-server/src/test/resources/python/module2/module.py
b/kyuubi-server/src/test/resources/python/module2/module.py
new file mode 100644
index 000000000..ba098a762
--- /dev/null
+++ b/kyuubi-server/src/test/resources/python/module2/module.py
@@ -0,0 +1,6 @@
+from datetime import datetime
+
+
+def current_time():
+ now = datetime.now()
+ return now.strftime("%Y%m%d%H%M%S")
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala
index 20ec2fc0a..e6ea0d162 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala
@@ -16,9 +16,12 @@
*/
package org.apache.kyuubi.server.rest.client
-
-import java.nio.file.Paths
+import java.io.{File, FileOutputStream}
+import java.nio.file.{Files, Path, Paths}
import java.util.Base64
+import java.util.zip.{ZipEntry, ZipOutputStream}
+
+import scala.collection.JavaConverters._
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
@@ -29,6 +32,7 @@ import org.apache.kyuubi.client.exception.KyuubiRestException
import org.apache.kyuubi.config.KyuubiReservedKeys
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.session.{KyuubiSession, SessionHandle}
+import org.apache.kyuubi.util.GoldenFileUtils.getCurrentModuleHome
class BatchRestApiSuite extends RestClientTestHelper with BatchTestHelper {
@@ -99,6 +103,86 @@ class BatchRestApiSuite extends RestClientTestHelper with
BatchTestHelper {
basicKyuubiRestClient.close()
}
+ test("basic batch rest client with uploading resource and extra resources") {
+ def preparePyModulesZip(
+ srcFolderPath: Path,
+ targetZipFileName: String,
+ excludedFileNames: Set[String] = Set.empty[String]): String = {
+
+ def addFolderToZip(zos: ZipOutputStream, folder: File, parentFolder:
String = ""): Unit = {
+ if (folder.isDirectory) {
+ folder.listFiles().foreach { file =>
+ val fileName = file.getName
+ if (!(excludedFileNames.contains(fileName) ||
fileName.startsWith("."))) {
+ if (file.isDirectory) {
+ val folderPath =
+ if (parentFolder.isEmpty) fileName else parentFolder + "/" +
fileName
+ addFolderToZip(zos, file, folderPath)
+ } else {
+ val filePath = if (parentFolder.isEmpty) fileName else
parentFolder + "/" + fileName
+ zos.putNextEntry(new ZipEntry(filePath))
+ zos.write(Files.readAllBytes(file.toPath))
+ zos.closeEntry()
+ }
+ }
+ }
+ }
+ }
+
+ val zipFilePath = Paths.get(System.getProperty("java.io.tmpdir"),
targetZipFileName).toString
+ val fileOutputStream = new FileOutputStream(zipFilePath)
+ val zipOutputStream = new ZipOutputStream(fileOutputStream)
+ try {
+ addFolderToZip(zipOutputStream, srcFolderPath.toFile)
+ } finally {
+ zipOutputStream.close()
+ fileOutputStream.close()
+ }
+ zipFilePath
+ }
+
+ val basicKyuubiRestClient: KyuubiRestClient =
+ KyuubiRestClient.builder(baseUri.toString)
+ .authHeaderMethod(KyuubiRestClient.AuthHeaderMethod.BASIC)
+ .username(ldapUser)
+ .password(ldapUserPasswd)
+ .socketTimeout(5 * 60 * 1000)
+ .build()
+ val batchRestApi: BatchRestApi = new BatchRestApi(basicKyuubiRestClient)
+
+ val pythonScriptsPath =
s"${getCurrentModuleHome(this)}/src/test/resources/python/"
+ val appScriptFileName = "app.py"
+ val appScriptFile = Paths.get(pythonScriptsPath, appScriptFileName).toFile
+ val modulesZipFileName = "pymodules.zip"
+ val modulesZipFile = preparePyModulesZip(
+ srcFolderPath = Paths.get(pythonScriptsPath),
+ targetZipFileName = modulesZipFileName,
+ excludedFileNames = Set(appScriptFileName))
+
+ val requestObj = newSparkBatchRequest(Map("spark.master" -> "local"))
+ requestObj.setBatchType("PYSPARK")
+ requestObj.setName("pyspark-test")
+ requestObj.setExtraResourcesMap(Map("spark.submit.pyFiles" ->
modulesZipFileName).asJava)
+ val extraResources = List(modulesZipFile)
+ val batch: Batch = batchRestApi.createBatch(requestObj, appScriptFile,
extraResources.asJava)
+
+ try {
+ assert(batch.getKyuubiInstance === fe.connectionUrl)
+ assert(batch.getBatchType === "PYSPARK")
+ val batchId = batch.getId
+ assert(batchId !== null)
+
+ eventually(timeout(1.minutes), interval(1.seconds)) {
+ val batch = batchRestApi.getBatchById(batchId)
+ assert(batch.getState == "FINISHED")
+ }
+
+ } finally {
+ Files.deleteIfExists(Paths.get(modulesZipFile))
+ basicKyuubiRestClient.close()
+ }
+ }
+
test("basic batch rest client with invalid user") {
val totalConnections =
MetricsSystem.counterValue(MetricsConstants.REST_CONN_TOTAL).getOrElse(0L)