This is an automated email from the ASF dual-hosted git repository. chengpan pushed a commit to branch KYUUBI-4806-1 in repository https://gitbox.apache.org/repos/asf/kyuubi.git
commit ffd0367b38169f98306e2b8f1d76d7fd51bb49b3 Author: Paul Lin <[email protected]> AuthorDate: Mon Aug 14 17:54:38 2023 +0800 Refactor getNextRowSetInternal to support fetch streaming data --- .../engine/chat/operation/ChatOperation.scala | 8 +++++-- .../engine/flink/operation/FlinkOperation.scala | 11 ++++++--- .../engine/hive/operation/HiveOperation.scala | 17 ++++++++++---- .../hive/operation/HiveOperationManager.scala | 4 ++-- .../engine/jdbc/operation/JdbcOperation.scala | 10 +++++--- .../engine/spark/operation/SparkOperation.scala | 10 +++++--- .../engine/trino/operation/ExecuteStatement.scala | 10 +++++--- .../engine/trino/operation/TrinoOperation.scala | 10 +++++--- .../kyuubi/operation/AbstractOperation.scala | 11 +++++---- .../apache/kyuubi/operation/FetchIterator.scala | 2 +- .../org/apache/kyuubi/operation/Operation.scala | 4 ++-- .../apache/kyuubi/operation/OperationManager.scala | 9 +++++--- .../kyuubi/service/AbstractBackendService.scala | 2 +- .../org/apache/kyuubi/service/BackendService.scala | 2 +- .../apache/kyuubi/service/TFrontendService.scala | 9 +++----- .../apache/kyuubi/session/AbstractSession.scala | 2 +- .../scala/org/apache/kyuubi/session/Session.scala | 4 ++-- .../apache/kyuubi/operation/NoopOperation.scala | 10 +++++--- .../kyuubi/operation/NoopOperationManager.scala | 8 ++++--- .../kyuubi/jdbc/hive/KyuubiQueryResultSet.java | 27 ++++++++++++++-------- .../kyuubi/operation/ExecutedCommandExec.scala | 11 ++++++--- .../operation/KyuubiApplicationOperation.scala | 11 ++++++--- .../apache/kyuubi/operation/KyuubiOperation.scala | 9 ++++++-- .../kyuubi/operation/KyuubiOperationManager.scala | 10 ++++---- .../kyuubi/server/BackendServiceMetric.scala | 7 +++--- .../kyuubi/server/api/v1/OperationsResource.scala | 6 +++-- .../kyuubi/server/mysql/MySQLCommandHandler.scala | 3 ++- .../org/apache/kyuubi/server/trino/api/Query.scala | 2 +- .../org/apache/kyuubi/WithKyuubiServerOnYarn.scala | 2 +- .../server/trino/api/TrinoContextSuite.scala | 4 ++-- 30 files changed, 152 insertions(+), 83 deletions(-) diff --git a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala index 3ae21aa9f..735410a20 100644 --- a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala +++ b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala @@ -31,7 +31,9 @@ abstract class ChatOperation(session: Session) extends AbstractOperation(session protected lazy val conf: KyuubiConf = session.sessionManager.getConf - override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = { + override def getNextRowSetInternal( + order: FetchOrientation, + rowSetSize: Int): TFetchResultsResp = { validateDefaultFetchOrientation(order) assertState(OperationState.FINISHED) setHasResultSet(true) @@ -47,7 +49,9 @@ abstract class ChatOperation(session: Session) extends AbstractOperation(session val taken = iter.take(rowSetSize) val resultRowSet = RowSet.toTRowSet(taken.toSeq, 1, getProtocolVersion) resultRowSet.setStartRowOffset(iter.getPosition) - resultRowSet + val resp = new TFetchResultsResp(OK_STATUS) + resp.setResults(resultRowSet) + resp } override def cancel(): Unit = { diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala index ef80034d3..da9210fa0 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala @@ -25,7 +25,7 @@ import scala.collection.JavaConverters.collectionAsScalaIterableConverter import org.apache.flink.configuration.Configuration import org.apache.flink.table.gateway.service.context.SessionContext import org.apache.flink.table.gateway.service.operation.OperationExecutor -import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet, TTableSchema} +import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp, TTableSchema} import org.apache.kyuubi.{KyuubiSQLException, Utils} import org.apache.kyuubi.engine.flink.result.ResultSet @@ -91,7 +91,9 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio resp } - override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = { + override def getNextRowSetInternal( + order: FetchOrientation, + rowSetSize: Int): TFetchResultsResp = { validateDefaultFetchOrientation(order) assertState(OperationState.FINISHED) setHasResultSet(true) @@ -112,7 +114,10 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio zoneId, getProtocolVersion) resultRowSet.setStartRowOffset(resultSet.getData.getPosition) - resultRowSet + val resp = new TFetchResultsResp(OK_STATUS) + resp.setResults(resultRowSet) + resp.setHasMoreRows(resultSet.getData.hasNext) + resp } override def shouldRunAsync: Boolean = false diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala index c7166356d..acf49b1f5 100644 --- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala +++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperation.scala @@ -21,7 +21,7 @@ import java.util.concurrent.Future import org.apache.hive.service.cli.operation.{Operation, OperationManager} import org.apache.hive.service.cli.session.{HiveSession, SessionManager => HiveSessionManager} -import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet} +import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp} import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY @@ -95,22 +95,29 @@ abstract class HiveOperation(session: Session) extends AbstractOperation(session resp } - override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = { + override def getNextRowSetInternal( + order: FetchOrientation, + rowSetSize: Int): TFetchResultsResp = { val tOrder = FetchOrientation.toTFetchOrientation(order) val hiveOrder = org.apache.hive.service.cli.FetchOrientation.getFetchOrientation(tOrder) val rowSet = internalHiveOperation.getNextRowSet(hiveOrder, rowSetSize) - rowSet.toTRowSet + val resp = new TFetchResultsResp(OK_STATUS) + resp.setResults(rowSet.toTRowSet) + resp } - def getOperationLogRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = { + def getOperationLogRowSet(order: FetchOrientation, rowSetSize: Int): TFetchResultsResp = { val tOrder = FetchOrientation.toTFetchOrientation(order) val hiveOrder = org.apache.hive.service.cli.FetchOrientation.getFetchOrientation(tOrder) val handle = internalHiveOperation.getHandle - delegatedOperationManager.getOperationLogRowSet( + val rowSet = delegatedOperationManager.getOperationLogRowSet( handle, hiveOrder, rowSetSize, hive.getHiveConf).toTRowSet + val resp = new TFetchResultsResp(OK_STATUS) + resp.setResults(rowSet) + resp } override def isTimedOut: Boolean = internalHiveOperation.isTimedOut(System.currentTimeMillis) diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala index 0762a2938..4e41e742e 100644 --- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala +++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala @@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.hive.operation import java.util.List import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hive.service.rpc.thrift.TRowSet +import org.apache.hive.service.rpc.thrift.TFetchResultsResp import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.engine.hive.session.HiveSessionImpl @@ -154,7 +154,7 @@ class HiveOperationManager() extends OperationManager("HiveOperationManager") { override def getOperationLogRowSet( opHandle: OperationHandle, order: FetchOrientation, - maxRows: Int): TRowSet = { + maxRows: Int): TFetchResultsResp = { val operation = getOperation(opHandle).asInstanceOf[HiveOperation] operation.getOperationLogRowSet(order, maxRows) } diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala index 1c4e0c57f..ee105b270 100644 --- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala @@ -16,7 +16,7 @@ */ package org.apache.kyuubi.engine.jdbc.operation -import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet} +import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp, TRowSet} import org.apache.kyuubi.{KyuubiSQLException, Utils} import org.apache.kyuubi.config.KyuubiConf @@ -36,7 +36,9 @@ abstract class JdbcOperation(session: Session) extends AbstractOperation(session protected lazy val dialect: JdbcDialect = JdbcDialects.get(conf) - override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = { + override def getNextRowSetInternal( + order: FetchOrientation, + rowSetSize: Int): TFetchResultsResp = { validateDefaultFetchOrientation(order) assertState(OperationState.FINISHED) setHasResultSet(true) @@ -51,7 +53,9 @@ abstract class JdbcOperation(session: Session) extends AbstractOperation(session val taken = iter.take(rowSetSize) val resultRowSet = toTRowSet(taken) resultRowSet.setStartRowOffset(iter.getPosition) - resultRowSet + val resp = new TFetchResultsResp(OK_STATUS) + resp.setResults(resultRowSet) + resp } override def cancel(): Unit = { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index 5062a4904..eeb2883ab 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.spark.operation import java.io.IOException import java.time.ZoneId -import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TProgressUpdateResp, TRowSet} +import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp, TProgressUpdateResp, TRowSet} import org.apache.spark.kyuubi.{SparkProgressMonitor, SQLOperationListener} import org.apache.spark.kyuubi.SparkUtilsHelper.redact import org.apache.spark.sql.{DataFrame, Row, SparkSession} @@ -233,7 +233,9 @@ abstract class SparkOperation(session: Session) resp } - override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = { + override def getNextRowSetInternal( + order: FetchOrientation, + rowSetSize: Int): TFetchResultsResp = { var resultRowSet: TRowSet = null try { withLocalProperties { @@ -264,7 +266,9 @@ abstract class SparkOperation(session: Session) } } catch onError(cancel = true) - resultRowSet + val resp = new TFetchResultsResp(OK_STATUS) + resp.setResults(resultRowSet) + resp } override def shouldRunAsync: Boolean = false diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala index 405c93b0b..c932fa6ca 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala @@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.trino.operation import java.util.concurrent.RejectedExecutionException -import org.apache.hive.service.rpc.thrift.TRowSet +import org.apache.hive.service.rpc.thrift.TFetchResultsResp import org.apache.kyuubi.{KyuubiSQLException, Logging} import org.apache.kyuubi.engine.trino.TrinoStatement @@ -82,7 +82,9 @@ class ExecuteStatement( } } - override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = { + override def getNextRowSetInternal( + order: FetchOrientation, + rowSetSize: Int): TFetchResultsResp = { validateDefaultFetchOrientation(order) assertState(OperationState.FINISHED) setHasResultSet(true) @@ -97,7 +99,9 @@ class ExecuteStatement( val taken = iter.take(rowSetSize) val resultRowSet = RowSet.toTRowSet(taken.toList, schema, getProtocolVersion) resultRowSet.setStartRowOffset(iter.getPosition) - resultRowSet + val fetchResultsResp = new TFetchResultsResp(OK_STATUS) + fetchResultsResp.setResults(resultRowSet) + fetchResultsResp } private def executeStatement(trinoStatement: TrinoStatement): Unit = { diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala index 20153b84f..412ed7749 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala @@ -21,7 +21,7 @@ import java.io.IOException import io.trino.client.Column import io.trino.client.StatementClient -import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet} +import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp} import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.Utils @@ -54,7 +54,9 @@ abstract class TrinoOperation(session: Session) extends AbstractOperation(sessio resp } - override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = { + override def getNextRowSetInternal( + order: FetchOrientation, + rowSetSize: Int): TFetchResultsResp = { validateDefaultFetchOrientation(order) assertState(OperationState.FINISHED) setHasResultSet(true) @@ -66,7 +68,9 @@ abstract class TrinoOperation(session: Session) extends AbstractOperation(sessio val taken = iter.take(rowSetSize) val resultRowSet = RowSet.toTRowSet(taken.toList, schema, getProtocolVersion) resultRowSet.setStartRowOffset(iter.getPosition) - resultRowSet + val resp = new TFetchResultsResp(OK_STATUS) + resp.setResults(resultRowSet) + resp } override protected def beforeRun(): Unit = { diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala index 94f53111d..0a185b942 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala @@ -23,7 +23,7 @@ import java.util.concurrent.locks.ReentrantLock import scala.collection.JavaConverters._ import org.apache.commons.lang3.StringUtils -import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TProgressUpdateResp, TProtocolVersion, TRowSet, TStatus, TStatusCode} +import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp, TProgressUpdateResp, TProtocolVersion, TStatus, TStatusCode} import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils} import org.apache.kyuubi.config.KyuubiConf.OPERATION_IDLE_TIMEOUT @@ -182,11 +182,12 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin override def getResultSetMetadata: TGetResultSetMetadataResp - def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet + def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TFetchResultsResp - override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = withLockRequired { - getNextRowSetInternal(order, rowSetSize) - } + override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TFetchResultsResp = + withLockRequired { + getNextRowSetInternal(order, rowSetSize) + } /** * convert SQL 'like' pattern to a Java regular expression. diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/FetchIterator.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/FetchIterator.scala index fdada1174..ada155887 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/FetchIterator.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/FetchIterator.scala @@ -20,7 +20,7 @@ package org.apache.kyuubi.operation /** * Borrowed from Apache Spark, see SPARK-33655 */ -sealed trait FetchIterator[A] extends Iterator[A] { +trait FetchIterator[A] extends Iterator[A] { /** * Begin a fetch block, forward from the current position. diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala index 6f496c9b8..c20a16f61 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala @@ -19,7 +19,7 @@ package org.apache.kyuubi.operation import java.util.concurrent.Future -import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet} +import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp} import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation import org.apache.kyuubi.operation.log.OperationLog @@ -32,7 +32,7 @@ trait Operation { def close(): Unit def getResultSetMetadata: TGetResultSetMetadataResp - def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet + def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TFetchResultsResp def getSession: Session def getHandle: OperationHandle diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala index 3093bcfbe..76b6669da 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala @@ -137,18 +137,21 @@ abstract class OperationManager(name: String) extends AbstractService(name) { final def getOperationNextRowSet( opHandle: OperationHandle, order: FetchOrientation, - maxRows: Int): TRowSet = { + maxRows: Int): TFetchResultsResp = { getOperation(opHandle).getNextRowSet(order, maxRows) } def getOperationLogRowSet( opHandle: OperationHandle, order: FetchOrientation, - maxRows: Int): TRowSet = { + maxRows: Int): TFetchResultsResp = { val operationLog = getOperation(opHandle).getOperationLog - operationLog.map(_.read(order, maxRows)).getOrElse { + val rowSet = operationLog.map(_.read(order, maxRows)).getOrElse { throw KyuubiSQLException(s"$opHandle failed to generate operation log") } + val resp = new TFetchResultsResp(new TStatus(TStatusCode.SUCCESS_STATUS)) + resp.setResults(rowSet) + resp } final def removeExpiredOperations(handles: Seq[OperationHandle]): Seq[Operation] = synchronized { diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala index 171e04901..443b35354 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala @@ -201,7 +201,7 @@ abstract class AbstractBackendService(name: String) operationHandle: OperationHandle, orientation: FetchOrientation, maxRows: Int, - fetchLog: Boolean): TRowSet = { + fetchLog: Boolean): TFetchResultsResp = { maxRowsLimit.foreach(limit => if (maxRows > limit) { throw new IllegalArgumentException(s"Max rows for fetching results " + diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala index 968a94197..85df9024c 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala @@ -101,7 +101,7 @@ trait BackendService { operationHandle: OperationHandle, orientation: FetchOrientation, maxRows: Int, - fetchLog: Boolean): TRowSet + fetchLog: Boolean): TFetchResultsResp def sessionManager: SessionManager } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala index aa012ab56..7cc23779f 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala @@ -520,23 +520,20 @@ abstract class TFrontendService(name: String) override def FetchResults(req: TFetchResultsReq): TFetchResultsResp = { debug(req.toString) - val resp = new TFetchResultsResp try { val operationHandle = OperationHandle(req.getOperationHandle) val orientation = FetchOrientation.getFetchOrientation(req.getOrientation) // 1 means fetching log val fetchLog = req.getFetchType == 1 val maxRows = req.getMaxRows.toInt - val rowSet = be.fetchResults(operationHandle, orientation, maxRows, fetchLog) - resp.setResults(rowSet) - resp.setHasMoreRows(false) - resp.setStatus(OK_STATUS) + be.fetchResults(operationHandle, orientation, maxRows, fetchLog) } catch { case e: Exception => error("Error fetching results: ", e) + val resp = new TFetchResultsResp resp.setStatus(KyuubiSQLException.toTStatus(e)) + resp } - resp } protected def notSupportTokenErrorStatus = { diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala index 1a8c51ccd..a9e33f5a0 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala @@ -233,7 +233,7 @@ abstract class AbstractSession( operationHandle: OperationHandle, orientation: FetchOrientation, maxRows: Int, - fetchLog: Boolean): TRowSet = { + fetchLog: Boolean): TFetchResultsResp = { if (fetchLog) { sessionManager.operationManager.getOperationLogRowSet(operationHandle, orientation, maxRows) } else { diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala index bc9f9a8f6..2cdac9f3a 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala @@ -17,7 +17,7 @@ package org.apache.kyuubi.session -import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TGetResultSetMetadataResp, TProtocolVersion, TRowSet} +import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetInfoType, TGetInfoValue, TGetResultSetMetadataResp, TProtocolVersion} import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation import org.apache.kyuubi.operation.OperationHandle @@ -91,7 +91,7 @@ trait Session { operationHandle: OperationHandle, orientation: FetchOrientation, maxRows: Int, - fetchLog: Boolean): TRowSet + fetchLog: Boolean): TFetchResultsResp def closeExpiredOperations(): Unit } diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala index 4093d1fca..266962ed2 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import scala.collection.JavaConverters._ -import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc, TGetResultSetMetadataResp, TPrimitiveTypeEntry, TRowSet, TStringColumn, TTableSchema, TTypeDesc, TTypeEntry, TTypeId} +import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc, TFetchResultsResp, TGetResultSetMetadataResp, TPrimitiveTypeEntry, TStringColumn, TTableSchema, TTypeDesc, TTypeEntry, TTypeId} import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation @@ -76,11 +76,15 @@ class NoopOperation(session: Session, shouldFail: Boolean = false) resp } - override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = { + override def getNextRowSetInternal( + order: FetchOrientation, + rowSetSize: Int): TFetchResultsResp = { val col = TColumn.stringVal(new TStringColumn(Seq(opType).asJava, ByteBuffer.allocate(0))) val tRowSet = ThriftUtils.newEmptyRowSet tRowSet.addToColumns(col) - tRowSet + val resp = new TFetchResultsResp(OK_STATUS) + resp.setResults(tRowSet) + resp } override def shouldRunAsync: Boolean = false diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala index 455e5d4d2..dbd5db9d8 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala @@ -20,7 +20,7 @@ package org.apache.kyuubi.operation import java.nio.ByteBuffer import java.util -import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet, TStringColumn} +import org.apache.hive.service.rpc.thrift.{TColumn, TFetchResultsResp, TRow, TRowSet, TStatus, TStatusCode, TStringColumn} import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation import org.apache.kyuubi.session.Session @@ -136,13 +136,15 @@ class NoopOperationManager extends OperationManager("noop") { override def getOperationLogRowSet( opHandle: OperationHandle, order: FetchOrientation, - maxRows: Int): TRowSet = { + maxRows: Int): TFetchResultsResp = { val logs = new util.ArrayList[String]() logs.add("test") val tColumn = TColumn.stringVal(new TStringColumn(logs, ByteBuffer.allocate(0))) val tRow = new TRowSet(0, new util.ArrayList[TRow](logs.size())) tRow.addToColumns(tColumn) - tRow + val resp = new TFetchResultsResp(new TStatus(TStatusCode.SUCCESS_STATUS)) + resp.setResults(tRow) + resp } override def getQueryId(operation: Operation): String = { diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiQueryResultSet.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiQueryResultSet.java index 82ea74a01..242ec7720 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiQueryResultSet.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiQueryResultSet.java @@ -26,6 +26,7 @@ import org.apache.hive.service.rpc.thrift.*; import org.apache.kyuubi.jdbc.hive.cli.RowSet; import org.apache.kyuubi.jdbc.hive.cli.RowSetFactory; import org.apache.kyuubi.jdbc.hive.common.HiveDecimal; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +48,7 @@ public class KyuubiQueryResultSet extends KyuubiBaseResultSet { private boolean emptyResultSet = false; private boolean isScrollable = false; private boolean fetchFirst = false; + private boolean hasMoreToFetch = false; private final TProtocolVersion protocol; @@ -317,25 +319,20 @@ public class KyuubiQueryResultSet extends KyuubiBaseResultSet { try { TFetchOrientation orientation = TFetchOrientation.FETCH_NEXT; if (fetchFirst) { - // If we are asked to start from begining, clear the current fetched resultset + // If we are asked to start from beginning, clear the current fetched resultset orientation = TFetchOrientation.FETCH_FIRST; fetchedRows = null; fetchedRowsItr = null; fetchFirst = false; } if (fetchedRows == null || !fetchedRowsItr.hasNext()) { - TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle, orientation, fetchSize); - TFetchResultsResp fetchResp; - fetchResp = client.FetchResults(fetchReq); - Utils.verifySuccessWithInfo(fetchResp.getStatus()); - - TRowSet results = fetchResp.getResults(); - fetchedRows = RowSetFactory.create(results, protocol); - fetchedRowsItr = fetchedRows.iterator(); + fetchResult(orientation); } if (fetchedRowsItr.hasNext()) { row = fetchedRowsItr.next(); + } else if (hasMoreToFetch) { + fetchResult(orientation); } else { return false; } @@ -350,6 +347,18 @@ public class KyuubiQueryResultSet extends KyuubiBaseResultSet { return true; } + private void fetchResult(TFetchOrientation orientation) throws SQLException, TException { + TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle, orientation, fetchSize); + TFetchResultsResp fetchResp; + fetchResp = client.FetchResults(fetchReq); + Utils.verifySuccessWithInfo(fetchResp.getStatus()); + hasMoreToFetch = fetchResp.isSetHasMoreRows() && fetchResp.isHasMoreRows(); + + TRowSet results = fetchResp.getResults(); + fetchedRows = RowSetFactory.create(results, protocol); + fetchedRowsItr = fetchedRows.iterator(); + } + @Override public ResultSetMetaData getMetaData() throws SQLException { if (isClosed) { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecutedCommandExec.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecutedCommandExec.scala index 93379da4b..b1645f924 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecutedCommandExec.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecutedCommandExec.scala @@ -17,7 +17,7 @@ package org.apache.kyuubi.operation -import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet} +import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp} import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation import org.apache.kyuubi.operation.log.OperationLog @@ -67,11 +67,16 @@ class ExecutedCommandExec( if (!shouldRunAsync) getBackgroundHandle.get() } - override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = { + override def getNextRowSetInternal( + order: FetchOrientation, + rowSetSize: Int): TFetchResultsResp = { validateDefaultFetchOrientation(order) assertState(OperationState.FINISHED) setHasResultSet(true) - command.getNextRowSet(order, rowSetSize, getProtocolVersion) + val rowSet = command.getNextRowSet(order, rowSetSize, getProtocolVersion) + val resp = new TFetchResultsResp(OK_STATUS) + resp.setResults(rowSet) + resp } override def getResultSetMetadata: TGetResultSetMetadataResp = { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala index 54cfdfe93..b35dbee44 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala @@ -22,7 +22,7 @@ import java.util.{ArrayList => JArrayList} import scala.collection.JavaConverters._ -import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc, TGetResultSetMetadataResp, TPrimitiveTypeEntry, TRow, TRowSet, TStringColumn, TTableSchema, TTypeDesc, TTypeEntry, TTypeId} +import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc, TFetchResultsResp, TGetResultSetMetadataResp, TPrimitiveTypeEntry, TRow, TRowSet, TStringColumn, TTableSchema, TTypeDesc, TTypeEntry, TTypeId} import org.apache.kyuubi.engine.ApplicationInfo import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation @@ -54,8 +54,11 @@ abstract class KyuubiApplicationOperation(session: Session) extends KyuubiOperat resp } - override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = { - applicationInfoMap.map { state => + override def getNextRowSetInternal( + order: FetchOrientation, + rowSetSize: Int): TFetchResultsResp = { + val resp = new TFetchResultsResp(OK_STATUS) + val rowSet = applicationInfoMap.map { state => val tRow = new TRowSet(0, new JArrayList[TRow](state.size)) Seq(state.keys, state.values.map(Option(_).getOrElse(""))).map(_.toSeq.asJava).foreach { col => @@ -64,5 +67,7 @@ abstract class KyuubiApplicationOperation(session: Session) extends KyuubiOperat } tRow }.getOrElse(ThriftUtils.EMPTY_ROW_SET) + resp.setResults(rowSet) + resp } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala index e357b7912..6339e65bf 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala @@ -179,11 +179,16 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi } } - override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = { + override def getNextRowSetInternal( + order: FetchOrientation, + rowSetSize: Int): TFetchResultsResp = { validateDefaultFetchOrientation(order) assertState(OperationState.FINISHED) setHasResultSet(true) - client.fetchResults(_remoteOpHandle, order, rowSetSize, fetchLog = false) + val rowset = client.fetchResults(_remoteOpHandle, order, rowSetSize, fetchLog = false) + val resp = new TFetchResultsResp(OK_STATUS) + resp.setResults(rowset) + resp } override def shouldRunAsync: Boolean = false diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala index 3078401b0..f54a21cf1 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala @@ -19,7 +19,7 @@ package org.apache.kyuubi.operation import java.util.concurrent.TimeUnit -import org.apache.hive.service.rpc.thrift.TRowSet +import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TStatus, TStatusCode} import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.config.KyuubiConf @@ -214,11 +214,11 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam override def getOperationLogRowSet( opHandle: OperationHandle, order: FetchOrientation, - maxRows: Int): TRowSet = { - + maxRows: Int): TFetchResultsResp = { + val resp = new TFetchResultsResp(new TStatus(TStatusCode.SUCCESS_STATUS)) val operation = getOperation(opHandle).asInstanceOf[KyuubiOperation] val operationLog = operation.getOperationLog - operationLog match { + val rowSet = operationLog match { case Some(log) => log.read(order, maxRows) case None => val remoteHandle = operation.remoteOpHandle() @@ -229,6 +229,8 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam ThriftUtils.EMPTY_ROW_SET } } + resp.setResults(rowSet) + resp } override def start(): Unit = synchronized { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala index 3a92a5ad0..9da4b78c0 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala @@ -183,9 +183,10 @@ trait BackendServiceMetric extends BackendService { operationHandle: OperationHandle, orientation: FetchOrientation, maxRows: Int, - fetchLog: Boolean): TRowSet = { + fetchLog: Boolean): TFetchResultsResp = { MetricsSystem.timerTracing(MetricsConstants.BS_FETCH_RESULTS) { - val rowSet = super.fetchResults(operationHandle, orientation, maxRows, fetchLog) + val fetchResultsResp = super.fetchResults(operationHandle, orientation, maxRows, fetchLog) + val rowSet = fetchResultsResp.getResults // TODO: the statistics are wrong when we enabled the arrow. val rowsSize = if (rowSet.getColumnsSize > 0) { @@ -217,7 +218,7 @@ trait BackendServiceMetric extends BackendService { operation.increaseFetchResultsCount(rowsSize) } - rowSet + fetchResultsResp } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala index ae5feea52..fdde5bbc5 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala @@ -145,10 +145,11 @@ private[v1] class OperationsResource extends ApiRequestContext with Logging { if (fetchOrientation != "FETCH_NEXT" && fetchOrientation != "FETCH_FIRST") { throw new BadRequestException(s"$fetchOrientation in operation log is not supported") } - val rowSet = fe.be.sessionManager.operationManager.getOperationLogRowSet( + val fetchResultsResp = fe.be.sessionManager.operationManager.getOperationLogRowSet( OperationHandle(operationHandleStr), FetchOrientation.withName(fetchOrientation), maxRows) + val rowSet = fetchResultsResp.getResults val logRowSet = rowSet.getColumns.get(0).getStringVal.getValues.asScala new OperationLog(logRowSet.asJava, logRowSet.size) } catch { @@ -175,11 +176,12 @@ private[v1] class OperationsResource extends ApiRequestContext with Logging { @QueryParam("fetchorientation") @DefaultValue("FETCH_NEXT") fetchOrientation: String): ResultRowSet = { try { - val rowSet = fe.be.fetchResults( + val fetchResultsResp = fe.be.fetchResults( OperationHandle(operationHandleStr), FetchOrientation.withName(fetchOrientation), maxRows, fetchLog = false) + val rowSet = fetchResultsResp.getResults val rows = rowSet.getRows.asScala.map(i => { new Row(i.getColVals.asScala.map(i => { new Field( diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandHandler.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandHandler.scala index 2f574d904..5f7a07f58 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandHandler.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandHandler.scala @@ -196,11 +196,12 @@ class MySQLCommandHandler( .getOrElse(KyuubiSQLException(s"Error operator state ${opStatus.state}")) } val resultSetMetadata = be.getResultSetMetadata(opHandle) - val rowSet = be.fetchResults( + val fetchResultResp = be.fetchResults( opHandle, FetchOrientation.FETCH_NEXT, Int.MaxValue, fetchLog = false) + val rowSet = fetchResultResp.getResults MySQLQueryResult(resultSetMetadata.getSchema, rowSet) } catch { case rethrow: Exception => diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala index 4e768b04a..dc9de4ae2 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/trino/api/Query.scala @@ -69,7 +69,7 @@ case class Query( queryId.operationHandle, defaultFetchOrientation, defaultMaxRows, - false) + false).getResults TrinoContext.createQueryResults( queryId.getQueryId, nextUri, diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala index 68d95dc80..1826760db 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala @@ -151,7 +151,7 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD } val resultColumns = batchJobSubmissionOp.getNextRowSet(FetchOrientation.FETCH_NEXT, 10) - .getColumns.asScala + .getResults.getColumns.asScala val keys = resultColumns.head.getStringVal.getValues.asScala val values = resultColumns.apply(1).getStringVal.getValues.asScala diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoContextSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoContextSuite.scala index 87c8eda96..6c5a01e46 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoContextSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/trino/api/TrinoContextSuite.scala @@ -84,7 +84,7 @@ class TrinoContextSuite extends KyuubiFunSuite with RestFrontendTestHelper { checkOpState(opHandleStr, FINISHED) val metadataResp = fe.be.getResultSetMetadata(opHandle) - val tRowSet = fe.be.fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 1000, false) + val tRowSet = fe.be.fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 1000, false).getResults val status = fe.be.getOperationStatus(opHandle, Some(0)) val uri = new URI("sfdsfsdfdsf") @@ -111,7 +111,7 @@ class TrinoContextSuite extends KyuubiFunSuite with RestFrontendTestHelper { checkOpState(opHandleStr, FINISHED) val metadataResp = fe.be.getResultSetMetadata(opHandle) - val tRowSet = fe.be.fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 1000, false) + val tRowSet = fe.be.fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 1000, false).getResults val status = fe.be.getOperationStatus(opHandle, Some(0)) val uri = new URI("sfdsfsdfdsf")
