This is an automated email from the ASF dual-hosted git repository. jshao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push: new cae9d97 [LIVY-622][LIVY-623][LIVY-624][LIVY-625][THRIFT] Support GetFunctions, GetSchemas, GetTables, GetColumns in Livy thrift server cae9d97 is described below commit cae9d97185bf371912dcd863dff5babfd9cb704a Author: yihengwang <yihengw...@tencent.com> AuthorDate: Fri Aug 16 10:32:11 2019 +0800 [LIVY-622][LIVY-623][LIVY-624][LIVY-625][THRIFT] Support GetFunctions, GetSchemas, GetTables, GetColumns in Livy thrift server ## What changes were proposed in this pull request? In this patch, we add the implementations of GetSchemas, GetFunctions, GetTables, and GetColumns in Livy Thrift server. https://issues.apache.org/jira/browse/LIVY-622 https://issues.apache.org/jira/browse/LIVY-623 https://issues.apache.org/jira/browse/LIVY-624 https://issues.apache.org/jira/browse/LIVY-625 ## How was this patch tested? Add new unit tests and integration test. Run them with existing tests. Author: yihengwang <yihengw...@tencent.com> Closes #194 from yiheng/fix_575. --- .../apache/livy/thriftserver/LivyCLIService.scala | 16 +-- .../livy/thriftserver/LivyOperationManager.scala | 63 ++++++++ .../livy/thriftserver/cli/ThriftCLIService.scala | 17 ++- .../operation/GetColumnsOperation.scala | 102 +++++++++++++ .../operation/GetFunctionsOperation.scala | 94 ++++++++++++ .../operation/GetSchemasOperation.scala | 63 ++++++++ .../operation/GetTablesOperation.scala | 73 ++++++++++ .../thriftserver/operation/MetadataOperation.scala | 6 + .../operation/SparkCatalogOperation.scala | 119 ++++++++++++++++ .../livy/thriftserver/ThriftServerSuites.scala | 158 ++++++++++++++++++++- .../livy/thriftserver/session/CatalogJobState.java | 28 ++++ .../session/CleanupCatalogResultJob.java | 37 +++++ .../livy/thriftserver/session/ColumnBuffer.java | 36 +++++ .../session/FetchCatalogResultJob.java | 51 +++++++ .../livy/thriftserver/session/GetColumnsJob.java | 93 ++++++++++++ .../livy/thriftserver/session/GetFunctionsJob.java | 67 +++++++++ .../livy/thriftserver/session/GetSchemasJob.java | 47 ++++++ .../livy/thriftserver/session/GetTablesJob.java | 92 ++++++++++++ .../livy/thriftserver/session/SparkCatalogJob.java | 50 +++++++ .../livy/thriftserver/session/SparkUtils.java | 113 +++++++++++++++ .../thriftserver/session/ThriftSessionState.java | 32 +++++ .../thriftserver/session/ThriftSessionTest.java | 53 ++++++- 22 files changed, 1395 insertions(+), 15 deletions(-) diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala index 725bdc8..3c84b4a 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala @@ -215,8 +215,8 @@ class LivyCLIService(server: LivyThriftServer) sessionHandle: SessionHandle, catalogName: String, schemaName: String): OperationHandle = { - // TODO - throw new HiveSQLException("Operation GET_SCHEMAS is not yet supported") + sessionManager.operationManager.getSchemas( + sessionHandle, catalogName, schemaName) } @throws[HiveSQLException] @@ -226,8 +226,8 @@ class LivyCLIService(server: LivyThriftServer) schemaName: String, tableName: String, tableTypes: util.List[String]): OperationHandle = { - // TODO - throw new HiveSQLException("Operation GET_TABLES is not yet supported") + sessionManager.operationManager.getTables( + sessionHandle, catalogName, schemaName, tableName, tableTypes) } @throws[HiveSQLException] @@ -243,8 +243,8 @@ class LivyCLIService(server: LivyThriftServer) schemaName: String, tableName: String, columnName: String): OperationHandle = { - // TODO - throw new HiveSQLException("Operation GET_COLUMNS is not yet supported") + sessionManager.operationManager.getColumns( + sessionHandle, catalogName, schemaName, tableName, columnName) } @throws[HiveSQLException] @@ -253,8 +253,8 @@ class LivyCLIService(server: LivyThriftServer) catalogName: String, schemaName: String, functionName: String): OperationHandle = { - // TODO - throw new HiveSQLException("Operation GET_FUNCTIONS is not yet supported") + sessionManager.operationManager.getFunctions( + sessionHandle, catalogName, schemaName, functionName) } @throws[HiveSQLException] diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala index e6d48ff..2454185 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala @@ -187,6 +187,69 @@ class LivyOperationManager(val livyThriftSessionManager: LivyThriftSessionManage }) } + @throws[HiveSQLException] + def getTables( + sessionHandle: SessionHandle, + catalogName: String, + schemaName: String, + tableName: String, + tableTypes: util.List[String]): OperationHandle = { + executeOperation(sessionHandle, { + val op = new GetTablesOperation( + sessionHandle, + catalogName, + schemaName, + tableName, + tableTypes, + livyThriftSessionManager) + addOperation(op, sessionHandle) + op + }) + } + + @throws[HiveSQLException] + def getFunctions( + sessionHandle: SessionHandle, + catalogName: String, + schemaName: String, + functionName: String): OperationHandle = { + executeOperation(sessionHandle, { + val op = new GetFunctionsOperation(sessionHandle, catalogName, schemaName, functionName, + livyThriftSessionManager) + addOperation(op, sessionHandle) + op + }) + } + + @throws[HiveSQLException] + def getSchemas( + sessionHandle: SessionHandle, + catalogName: String, + schemaName: String): OperationHandle = { + executeOperation(sessionHandle, { + val op = new GetSchemasOperation(sessionHandle, catalogName, schemaName, + livyThriftSessionManager) + addOperation(op, sessionHandle) + op + }) + } + + @throws[HiveSQLException] + def getColumns( + sessionHandle: SessionHandle, + catalogName: String, + schemaName: String, + tableName: String, + columnName: String): OperationHandle = { + executeOperation(sessionHandle, { + val op = new GetColumnsOperation(sessionHandle, catalogName, schemaName, tableName, + columnName, livyThriftSessionManager) + addOperation(op, sessionHandle) + op + }) + } + + /** * Cancel the running operation unless it is already in a terminal state */ diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftCLIService.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftCLIService.scala index 4a3276f..da108ab 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftCLIService.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftCLIService.scala @@ -427,8 +427,8 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName: override def GetSchemas(req: TGetSchemasReq): TGetSchemasResp = { val resp = new TGetSchemasResp try { - val opHandle = cliService.getSchemas( - new SessionHandle(req.getSessionHandle), req.getCatalogName, req.getSchemaName) + val opHandle = cliService.getSchemas(createSessionHandle(req.getSessionHandle), + req.getCatalogName, req.getSchemaName) resp.setOperationHandle(opHandle.toTOperationHandle) resp.setStatus(ThriftCLIService.OK_STATUS) } catch { @@ -444,7 +444,7 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName: val resp = new TGetTablesResp try { val opHandle = cliService.getTables( - new SessionHandle(req.getSessionHandle), + createSessionHandle(req.getSessionHandle), req.getCatalogName, req.getSchemaName, req.getTableName, @@ -479,7 +479,7 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName: val resp = new TGetColumnsResp try { val opHandle = cliService.getColumns( - new SessionHandle(req.getSessionHandle), + createSessionHandle(req.getSessionHandle), req.getCatalogName, req.getSchemaName, req.getTableName, @@ -499,7 +499,7 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName: val resp = new TGetFunctionsResp try { val opHandle = cliService.getFunctions( - new SessionHandle(req.getSessionHandle), + createSessionHandle(req.getSessionHandle), req.getCatalogName, req.getSchemaName, req.getFunctionName) @@ -728,6 +728,13 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName: s"Failed to validate proxy privilege of $realUser for $proxyUser", "08S01", e) } } + + private def createSessionHandle(tHandle: TSessionHandle): SessionHandle = { + val protocolVersion = cliService.getSessionManager + .getSessionInfo(new SessionHandle(tHandle)) + .protocolVersion + new SessionHandle(tHandle, protocolVersion) + } } object ThriftCLIService { diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetColumnsOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetColumnsOperation.scala new file mode 100644 index 0000000..c9c106c --- /dev/null +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetColumnsOperation.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.thriftserver.operation + +import org.apache.hive.service.cli.{HiveSQLException, OperationState, OperationType, SessionHandle} + +import org.apache.livy.Logging +import org.apache.livy.thriftserver.types.{BasicDataType, Field, Schema} +import org.apache.livy.thriftserver.LivyThriftSessionManager +import org.apache.livy.thriftserver.session.{GetColumnsJob, GetFunctionsJob} + +class GetColumnsOperation( + sessionHandle: SessionHandle, + catalogName: String, + schemaName: String, + tableName: String, + columnName: String, + sessionManager: LivyThriftSessionManager) + extends SparkCatalogOperation( + sessionHandle, OperationType.GET_COLUMNS, sessionManager) with Logging { + + @throws(classOf[HiveSQLException]) + override protected def runInternal(): Unit = { + setState(OperationState.RUNNING) + try { + rscClient.submit(new GetColumnsJob( + convertSchemaPattern(schemaName), + convertIdentifierPattern(tableName, datanucleusFormat = true), + Option(columnName).map { convertIdentifierPattern(_, datanucleusFormat = false) }.orNull, + sessionId, + jobId + )).get() + + setState(OperationState.FINISHED) + } catch { + case e: Throwable => + error("Remote job meet an exception: ", e) + setState(OperationState.ERROR) + throw new HiveSQLException(e) + } + } + + @throws(classOf[HiveSQLException]) + override def getResultSetSchema: Schema = { + assertState(Seq(OperationState.FINISHED)) + GetColumnsOperation.SCHEMA + } +} + +object GetColumnsOperation { + val SCHEMA = Schema( + Field("TABLE_CAT", BasicDataType("string"), "Catalog name. NULL if not applicable."), + Field("TABLE_SCHEM", BasicDataType("string"), "Schema name."), + Field("TABLE_NAME", BasicDataType("string"), "Table name."), + Field("COLUMN_NAME", BasicDataType("string"), "Column name"), + Field("DATA_TYPE", BasicDataType("integer"), "SQL type from java.sql.Types"), + Field("TYPE_NAME", BasicDataType("string"), + "Data source dependent type name, for a UDT the type name is fully qualified"), + Field("COLUMN_SIZE", BasicDataType("integer"), "Column size. For char or date types this is " + + "the maximum number of characters, for numeric or decimal types this is precision."), + Field("BUFFER_LENGTH", BasicDataType("byte"), "Unused"), + Field("DECIMAL_DIGITS", BasicDataType("integer"), "The number of fractional digits"), + Field("NUM_PREC_RADIX", BasicDataType("integer"), "Radix (typically either 10 or 2)"), + Field("NULLABLE", BasicDataType("integer"), "Is NULL allowed"), + Field("REMARKS", BasicDataType("string"), "Comment describing column (may be null)"), + Field("COLUMN_DEF", BasicDataType("string"), "Default value (may be null)"), + Field("SQL_DATA_TYPE", BasicDataType("integer"), "Unused"), + Field("SQL_DATETIME_SUB", BasicDataType("integer"), "Unused"), + Field("CHAR_OCTET_LENGTH", BasicDataType("integer"), "For char types the maximum number of " + + "bytes in the column"), + Field("ORDINAL_POSITION", BasicDataType("integer"), "Index of column in table (starting at 1)"), + Field("IS_NULLABLE", BasicDataType("string"), "\"NO\" means column definitely does not " + + "allow NULL values; \"YES\" means the column might allow NULL values. An empty string " + + "means nobody knows."), + Field("SCOPE_CATALOG", BasicDataType("string"), "Catalog of table that is the scope of a " + + "reference attribute (null if DATA_TYPE isn't REF)"), + Field("SCOPE_SCHEMA", BasicDataType("string"), "Schema of table that is the scope of a " + + "reference attribute (null if the DATA_TYPE isn't REF)"), + Field("SCOPE_TABLE", BasicDataType("string"), "Table name that this the scope of a " + + "reference attribure (null if the DATA_TYPE isn't REF)"), + Field("SOURCE_DATA_TYPE", BasicDataType("short"), "Source type of a distinct type or " + + "user-generated Ref type, SQL type from java.sql.Types (null if DATA_TYPE isn't " + + "DISTINCT or user-generated REF)"), + Field("IS_AUTO_INCREMENT", BasicDataType("string"), "Indicates whether this column is " + + "auto incremented.") + ) +} diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetFunctionsOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetFunctionsOperation.scala new file mode 100644 index 0000000..0e43f16 --- /dev/null +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetFunctionsOperation.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.thriftserver.operation + +import org.apache.hive.service.cli.{HiveSQLException, OperationState, OperationType, SessionHandle} + +import org.apache.livy.Logging +import org.apache.livy.thriftserver.session.GetFunctionsJob +import org.apache.livy.thriftserver.types.{BasicDataType, Field, Schema} +import org.apache.livy.thriftserver.LivyThriftSessionManager + +class GetFunctionsOperation( + sessionHandle: SessionHandle, + catalogName: String, + schemaName: String, + functionName: String, + sessionManager: LivyThriftSessionManager) + extends SparkCatalogOperation( + sessionHandle, OperationType.GET_FUNCTIONS, sessionManager) with Logging { + + @throws(classOf[HiveSQLException]) + override protected def runInternal(): Unit = { + setState(OperationState.RUNNING) + try { + rscClient.submit(new GetFunctionsJob( + convertSchemaPattern(schemaName), + convertFunctionName(functionName), + sessionId, + jobId + )).get() + + setState(OperationState.FINISHED) + } catch { + case e: Throwable => + error("Remote job meet an exception: ", e) + setState(OperationState.ERROR) + throw new HiveSQLException(e) + } + } + + @throws(classOf[HiveSQLException]) + override def getResultSetSchema: Schema = { + assertState(Seq(OperationState.FINISHED)) + GetFunctionsOperation.SCHEMA + } + + private def convertFunctionName(name: String): String = { + if (name == null) { + ".*" + } else { + var escape = false + name.flatMap { + case c if escape => + if (c != '\\') escape = false + c.toString + case '\\' => + escape = true + "" + case '%' => ".*" + case '_' => "." + case c => Character.toLowerCase(c).toString + } + } + } +} + +object GetFunctionsOperation { + val SCHEMA = Schema( + Field("FUNCTION_CAT", BasicDataType("string"), "Function catalog (may be null)"), + Field("FUNCTION_SCHEM", BasicDataType("string"), "Function schema (may be null)"), + Field("FUNCTION_NAME", BasicDataType("string"), + "Function name. This is the name used to invoke the function"), + Field("REMARKS", BasicDataType("string"), "Explanatory comment on the function"), + Field("FUNCTION_TYPE", BasicDataType("integer"), + "Kind of function."), + Field("SPECIFIC_NAME", BasicDataType("string"), + "The name which uniquely identifies this function within its schema") + ) +} diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetSchemasOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetSchemasOperation.scala new file mode 100644 index 0000000..6bd0a17 --- /dev/null +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetSchemasOperation.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.thriftserver.operation + +import org.apache.hive.service.cli._ + +import org.apache.livy.Logging +import org.apache.livy.thriftserver.types.{BasicDataType, Field, Schema} +import org.apache.livy.thriftserver.LivyThriftSessionManager +import org.apache.livy.thriftserver.session.{GetSchemasJob, GetTablesJob} + +class GetSchemasOperation( + sessionHandle: SessionHandle, + catalogName: String, + schemaName: String, + sessionManager: LivyThriftSessionManager) + extends SparkCatalogOperation( + sessionHandle, OperationType.GET_SCHEMAS, sessionManager) with Logging { + + @throws(classOf[HiveSQLException]) + override protected def runInternal(): Unit = { + setState(OperationState.RUNNING) + try { + rscClient.submit(new GetSchemasJob( + convertSchemaPattern(schemaName), sessionId, jobId + )).get() + setState(OperationState.FINISHED) + } catch { + case e: Throwable => + error("Remote job meet an exception: ", e) + setState(OperationState.ERROR) + throw new HiveSQLException(e) + } + } + + @throws(classOf[HiveSQLException]) + override def getResultSetSchema: Schema = { + assertState(Seq(OperationState.FINISHED)) + GetSchemasOperation.SCHEMA + } +} + +object GetSchemasOperation { + val SCHEMA = Schema( + Field("TABLE_SCHEM", BasicDataType("string"), "Schema name."), + Field("TABLE_CATALOG", BasicDataType("string"), "Catalog name.") + ) +} diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTablesOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTablesOperation.scala new file mode 100644 index 0000000..4a939b3 --- /dev/null +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTablesOperation.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.thriftserver.operation + +import org.apache.hive.service.cli.{HiveSQLException, OperationState, OperationType, SessionHandle} + +import org.apache.livy.Logging +import org.apache.livy.thriftserver.types.{BasicDataType, Field, Schema} +import org.apache.livy.thriftserver.LivyThriftSessionManager +import org.apache.livy.thriftserver.session.GetTablesJob + +class GetTablesOperation( + sessionHandle: SessionHandle, + catalogName: String, + schemaName: String, + tableName: String, + tableTypes: java.util.List[String], + sessionManager: LivyThriftSessionManager) + extends SparkCatalogOperation( + sessionHandle, OperationType.GET_TABLES, sessionManager) with Logging { + + @throws(classOf[HiveSQLException]) + override protected def runInternal(): Unit = { + setState(OperationState.RUNNING) + try { + rscClient.submit(new GetTablesJob( + convertSchemaPattern(schemaName), + convertIdentifierPattern(tableName, datanucleusFormat = true), + tableTypes, + sessionId, + jobId + )).get() + + setState(OperationState.FINISHED) + } catch { + case e: Throwable => + error("Remote job meet an exception: ", e) + setState(OperationState.ERROR) + throw new HiveSQLException(e) + } + } + + @throws(classOf[HiveSQLException]) + override def getResultSetSchema: Schema = { + assertState(Seq(OperationState.FINISHED)) + GetTablesOperation.SCHEMA + } +} + +object GetTablesOperation { + val SCHEMA = Schema( + Field("TABLE_CAT", BasicDataType("string"), "Catalog name. NULL if not applicable."), + Field("TABLE_SCHEM", BasicDataType("string"), "Schema name."), + Field("TABLE_NAME", BasicDataType("string"), "Table name."), + Field("TABLE_TYPE", BasicDataType("string"), "The table type, e.g. \"TABLE\", \"VIEW\", etc."), + Field("REMARKS", BasicDataType("string"), "Comments about the table.") + ) +} diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala index 4db3929..4689f26 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala @@ -21,6 +21,12 @@ import org.apache.hive.service.cli.{FetchOrientation, HiveSQLException, Operatio import org.apache.livy.thriftserver.serde.ThriftResultSet +/** + * MetadataOperation is the base class for operations which do not perform any call on Spark side + * + * @param sessionHandle + * @param opType + */ abstract class MetadataOperation(sessionHandle: SessionHandle, opType: OperationType) extends Operation(sessionHandle, opType) { setHasResultSet(true) diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/SparkCatalogOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/SparkCatalogOperation.scala new file mode 100644 index 0000000..9ed31f7 --- /dev/null +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/SparkCatalogOperation.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.thriftserver.operation + +import org.apache.commons.lang.StringUtils +import org.apache.hive.service.cli._ + +import org.apache.livy.thriftserver.LivyThriftSessionManager +import org.apache.livy.thriftserver.serde.ThriftResultSet +import org.apache.livy.thriftserver.session.{CleanupCatalogResultJob, FetchCatalogResultJob} + +/** + * SparkCatalogOperation is the base class for operations which need to fetch catalog information + * from spark session. + */ +abstract class SparkCatalogOperation( + sessionHandle: SessionHandle, + opType: OperationType, + sessionManager: LivyThriftSessionManager) + extends Operation(sessionHandle, opType) { + + // The initialization need to be lazy in order not to block when the instance is created + protected lazy val rscClient = { + // This call is blocking, we are waiting for the session to be ready. + sessionManager.getLivySession(sessionHandle).client.get + } + + protected lazy val jobId = { + this.opHandle.getHandleIdentifier.getPublicId.toString + "-" + + this.opHandle.getHandleIdentifier.getSecretId.toString + } + + protected lazy val sessionId = { + sessionHandle.getSessionId.toString + } + + @throws[HiveSQLException] + override def close(): Unit = { + val cleaned = rscClient.submit(new CleanupCatalogResultJob(sessionId, jobId)).get() + if (!cleaned) { + warn(s"Fail to cleanup fetch catalog job (session = ${sessionId}), " + + "this message can be ignored if the job failed.") + } + setState(OperationState.CLOSED) + } + + @throws[HiveSQLException] + override def cancel(stateAfterCancel: OperationState): Unit = { + setState(OperationState.CANCELED) + // Spark fetch schema is not a really spark job. It only run on driver and cannot be cancelled + throw new UnsupportedOperationException("SparkCatalogOperation.cancel()") + } + + /** + * Convert wildchars and escape sequence from JDBC format to datanucleous/regex + * + * This is ported from Spark Hive Thrift MetaOperation. + */ + protected def convertIdentifierPattern(pattern: String, datanucleusFormat: Boolean): String = { + if (pattern == null) { + convertPattern("%", datanucleusFormat = true) + } else { + convertPattern(pattern, datanucleusFormat) + } + } + + /** + * Convert wildchars and escape sequence of schema pattern from JDBC format to datanucleous/regex + * The schema pattern treats empty string also as wildchar. + * + * This is ported from Spark Hive Thrift MetaOperation. + */ + protected def convertSchemaPattern(pattern: String): String = { + if (StringUtils.isEmpty(pattern)) { + convertPattern("%", datanucleusFormat = true) + } else { + convertPattern(pattern, datanucleusFormat = true) + } + } + + private def convertPattern(pattern: String, datanucleusFormat: Boolean): String = { + val wStr = if (datanucleusFormat) "*" else ".*" + pattern + .replaceAll("([^\\\\])%", "$1" + wStr) + .replaceAll("\\\\%", "%") + .replaceAll("^%", wStr) + .replaceAll("([^\\\\])_", "$1.") + .replaceAll("\\\\_", "_") + .replaceAll("^_", ".") + } + + override def getNextRowSet(orientation: FetchOrientation, maxRowsL: Long): ThriftResultSet = { + validateFetchOrientation(orientation) + assertState(Seq(OperationState.FINISHED)) + setHasResultSet(true) + val maxRows = maxRowsL.toInt + val results = rscClient.submit(new FetchCatalogResultJob(sessionId, jobId, maxRows)).get() + + val rowSet = ThriftResultSet.apply(getResultSetSchema, protocolVersion) + import scala.collection.JavaConverters._ + results.asScala.foreach { r => rowSet.addRow(r.asInstanceOf[Array[Any]]) } + return rowSet + } +} diff --git a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala index a3d9e88..6411881 100644 --- a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala +++ b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala @@ -17,7 +17,7 @@ package org.apache.livy.thriftserver -import java.sql.{Date, SQLException, Statement} +import java.sql.{Connection, Date, SQLException, Statement} import org.apache.livy.LivyConf @@ -69,6 +69,114 @@ trait CommonThriftTests { } assert(!resultSetComplex.next()) } + + def getSchemasTest(connection: Connection): Unit = { + val metadata = connection.getMetaData + val schemaResultSet = metadata.getSchemas() + assert(schemaResultSet.getMetaData.getColumnCount == 2) + assert(schemaResultSet.getMetaData.getColumnName(1) == "TABLE_SCHEM") + assert(schemaResultSet.getMetaData.getColumnName(2) == "TABLE_CATALOG") + schemaResultSet.next() + assert(schemaResultSet.getString(1) == "default") + assert(!schemaResultSet.next()) + } + + def getFunctionsTest(connection: Connection): Unit = { + val metadata = connection.getMetaData + + val functionResultSet = metadata.getFunctions("", "default", "unix_timestamp") + assert(functionResultSet.getMetaData.getColumnCount == 6) + assert(functionResultSet.getMetaData.getColumnName(1) == "FUNCTION_CAT") + assert(functionResultSet.getMetaData.getColumnName(2) == "FUNCTION_SCHEM") + assert(functionResultSet.getMetaData.getColumnName(3) == "FUNCTION_NAME") + assert(functionResultSet.getMetaData.getColumnName(4) == "REMARKS") + assert(functionResultSet.getMetaData.getColumnName(5) == "FUNCTION_TYPE") + assert(functionResultSet.getMetaData.getColumnName(6) == "SPECIFIC_NAME") + functionResultSet.next() + assert(functionResultSet.getString(3) == "unix_timestamp") + assert(functionResultSet.getString(6) == + "org.apache.spark.sql.catalyst.expressions.UnixTimestamp") + assert(!functionResultSet.next()) + } + + def getTablesTest(connection: Connection): Unit = { + val statement = connection.createStatement() + statement.execute("CREATE TABLE test_get_tables (id integer, desc string) USING json") + statement.close() + + val metadata = connection.getMetaData + val tablesResultSet = metadata.getTables("", "default", "*", Array("TABLE")) + assert(tablesResultSet.getMetaData.getColumnCount == 5) + assert(tablesResultSet.getMetaData.getColumnName(1) == "TABLE_CAT") + assert(tablesResultSet.getMetaData.getColumnName(2) == "TABLE_SCHEM") + assert(tablesResultSet.getMetaData.getColumnName(3) == "TABLE_NAME") + assert(tablesResultSet.getMetaData.getColumnName(4) == "TABLE_TYPE") + assert(tablesResultSet.getMetaData.getColumnName(5) == "REMARKS") + + tablesResultSet.next() + assert(tablesResultSet.getString(3) == "test_get_tables") + assert(tablesResultSet.getString(4) == "TABLE") + assert(!tablesResultSet.next()) + } + + def getColumnsTest(connection: Connection): Unit = { + val metadata = connection.getMetaData + val statement = connection.createStatement() + statement.execute("CREATE TABLE test_get_columns (id integer, desc string) USING json") + statement.close() + + val columnsResultSet = metadata.getColumns("", "default", "test_get_columns", ".*") + assert(columnsResultSet.getMetaData.getColumnCount == 23) + columnsResultSet.next() + assert(columnsResultSet.getString(1) == "") + assert(columnsResultSet.getString(2) == "default") + assert(columnsResultSet.getString(3) == "test_get_columns") + assert(columnsResultSet.getString(4) == "id") + assert(columnsResultSet.getInt(5) == 4) + assert(columnsResultSet.getString(6) == "integer") + assert(columnsResultSet.getInt(7) == 10) + assert(columnsResultSet.getString(8) == null) + assert(columnsResultSet.getInt(9) == 0) + assert(columnsResultSet.getInt(10) == 10) + assert(columnsResultSet.getInt(11) == 1) + assert(columnsResultSet.getString(12) == "") + assert(columnsResultSet.getString(13) == null) + assert(columnsResultSet.getString(14) == null) + assert(columnsResultSet.getString(15) == null) + assert(columnsResultSet.getString(15) == null) + assert(columnsResultSet.getInt(17) == 0) + assert(columnsResultSet.getString(18) == "YES") + assert(columnsResultSet.getString(19) == null) + assert(columnsResultSet.getString(20) == null) + assert(columnsResultSet.getString(21) == null) + assert(columnsResultSet.getString(22) == null) + assert(columnsResultSet.getString(23) == "NO") + columnsResultSet.next() + assert(columnsResultSet.getString(1) == "") + assert(columnsResultSet.getString(2) == "default") + assert(columnsResultSet.getString(3) == "test_get_columns") + assert(columnsResultSet.getString(4) == "desc") + assert(columnsResultSet.getInt(5) == 12) + assert(columnsResultSet.getString(6) == "string") + assert(columnsResultSet.getInt(7) == Integer.MAX_VALUE) + assert(columnsResultSet.getString(8) == null) + assert(columnsResultSet.getString(9) == null) + assert(columnsResultSet.getString(10) == null) + assert(columnsResultSet.getInt(11) == 1) + assert(columnsResultSet.getString(12) == "") + assert(columnsResultSet.getString(13) == null) + assert(columnsResultSet.getString(14) == null) + assert(columnsResultSet.getString(15) == null) + assert(columnsResultSet.getString(16) == null) + assert(columnsResultSet.getInt(17) == 1) + assert(columnsResultSet.getString(18) == "YES") + assert(columnsResultSet.getString(19) == null) + assert(columnsResultSet.getString(20) == null) + assert(columnsResultSet.getString(21) == null) + assert(columnsResultSet.getString(22) == null) + assert(columnsResultSet.getString(23) == "NO") + assert(!columnsResultSet.next()) + } } class BinaryThriftServerSuite extends ThriftServerBaseTest with CommonThriftTests { @@ -163,6 +271,30 @@ class BinaryThriftServerSuite extends ThriftServerBaseTest with CommonThriftTest assert(caught.getMessage.contains("Table or view not found: `global_temp`.`invalid_table`")) } } + + test("fetch schemas") { + withJdbcConnection { connection => + getSchemasTest(connection) + } + } + + test("fetch functions") { + withJdbcConnection { connection => + getFunctionsTest(connection) + } + } + + test("fetch tables") { + withJdbcConnection { connection => + getTablesTest(connection) + } + } + + test("fetch column") { + withJdbcConnection { connection => + getColumnsTest(connection) + } + } } class HttpThriftServerSuite extends ThriftServerBaseTest with CommonThriftTests { @@ -175,4 +307,28 @@ class HttpThriftServerSuite extends ThriftServerBaseTest with CommonThriftTests dataTypesTest(statement, supportMap) } } + + test("fetch schemas") { + withJdbcConnection { connection => + getSchemasTest(connection) + } + } + + test("fetch functions") { + withJdbcConnection { connection => + getFunctionsTest(connection) + } + } + + test("fetch tables") { + withJdbcConnection { connection => + getTablesTest(connection) + } + } + + test("fetch column") { + withJdbcConnection { connection => + getColumnsTest(connection) + } + } } diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CatalogJobState.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CatalogJobState.java new file mode 100644 index 0000000..5571574 --- /dev/null +++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CatalogJobState.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.thriftserver.session; + +import java.util.Iterator; + +public class CatalogJobState { + final Iterator<Object[]> iter; + + public CatalogJobState(Iterator<Object[]> iter) { + this.iter = iter; + } +} diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupCatalogResultJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupCatalogResultJob.java new file mode 100644 index 0000000..b9444ca --- /dev/null +++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupCatalogResultJob.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.thriftserver.session; + +import org.apache.livy.Job; +import org.apache.livy.JobContext; + +public class CleanupCatalogResultJob implements Job<Boolean> { + private final String sessionId; + private final String jobId; + + public CleanupCatalogResultJob(String sessionId, String jobId) { + this.sessionId = sessionId; + this.jobId = jobId; + } + + @Override + public Boolean call(JobContext jc) throws Exception { + ThriftSessionState session = ThriftSessionState.get(jc, sessionId); + return session.cleanupCatalogJob(jobId); + } +} diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java index 4408586..d4ec747 100644 --- a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java +++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java @@ -204,6 +204,42 @@ public class ColumnBuffer { return nulls != null ? BitSet.valueOf(nulls) : new BitSet(); } + public ColumnBuffer extractSubset(int start, int end) { + ColumnBuffer subset = new ColumnBuffer(type); + subset.currentSize = end - start; + subset.ensureCapacity(); + switch (type) { + case BOOLEAN: + System.arraycopy(bools, start, subset.bools, 0, end - start); + break; + case BYTE: + System.arraycopy(bytes, start, subset.bytes, 0, end - start); + break; + case SHORT: + System.arraycopy(shorts, start, subset.shorts, 0, end - start); + break; + case INTEGER: + System.arraycopy(ints, start, subset.ints, 0, end - start); + break; + case LONG: + System.arraycopy(longs, start, subset.longs, 0, end - start); + break; + case FLOAT: + System.arraycopy(floats, start, subset.floats, 0, end - start); + break; + case DOUBLE: + System.arraycopy(doubles, start, subset.doubles, 0, end - start); + break; + case BINARY: + System.arraycopy(buffers, start, subset.buffers, 0, end - start); + break; + case STRING: + System.arraycopy(strings, start, subset.strings, 0, end - start); + break; + } + return subset; + } + private boolean isNull(int index) { if (nulls == null) { return false; diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/FetchCatalogResultJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/FetchCatalogResultJob.java new file mode 100644 index 0000000..9654b02 --- /dev/null +++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/FetchCatalogResultJob.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.thriftserver.session; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.livy.Job; +import org.apache.livy.JobContext; + +public class FetchCatalogResultJob implements Job<List<Object[]>> { + private final String sessionId; + private final String jobId; + private final int maxRows; + + public FetchCatalogResultJob(String sessionId, String jobId, int maxRows) { + this.sessionId = sessionId; + this.jobId = jobId; + this.maxRows = maxRows; + } + + @Override + public List<Object[]> call(JobContext jc) throws Exception { + ThriftSessionState session = ThriftSessionState.get(jc, sessionId); + Iterator<Object[]> iterator = session.findCatalogJob(jobId).iter; + + List<Object[]> result = new ArrayList<>(); + int n = 0; + while (iterator.hasNext() && n < maxRows) { + result.add(iterator.next()); + n += 1; + } + return result; + } +} diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetColumnsJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetColumnsJob.java new file mode 100644 index 0000000..bc2bd73 --- /dev/null +++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetColumnsJob.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.thriftserver.session; + +import java.util.ArrayList; +import java.util.List; + +import static scala.collection.JavaConversions.seqAsJavaList; + +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; +import org.apache.spark.sql.catalyst.catalog.SessionCatalog; +import org.apache.spark.sql.types.StructField; + +public class GetColumnsJob extends SparkCatalogJob { + private final String databasePattern; + private final String tablePattern; + private final String columnPattern; + + public GetColumnsJob( + String databasePattern, + String tablePattern, + String columnPattern, + String sessionId, + String jobId) { + super(sessionId, jobId); + this.databasePattern = databasePattern; + this.tablePattern = tablePattern; + this.columnPattern = columnPattern; + } + + @Override + protected List<Object[]> fetchCatalogObjects(SessionCatalog catalog) { + List<Object[]> columnList = new ArrayList<Object[]>(); + List<String> databases = seqAsJavaList(catalog.listDatabases(databasePattern)); + + for (String db : databases) { + List<TableIdentifier> tableIdentifiers = + seqAsJavaList(catalog.listTables(db, tablePattern)); + for (TableIdentifier tableIdentifier : tableIdentifiers) { + CatalogTable table = catalog.getTempViewOrPermanentTableMetadata(tableIdentifier); + List<StructField> fields = seqAsJavaList(table.schema()); + int position = 0; + for (StructField field : fields) { + if (columnPattern == null || field.name().matches(columnPattern)) { + columnList.add(new Object[] { + DEFAULT_HIVE_CATALOG, + table.database(), + table.identifier().table(), + field.name(), + SparkUtils.toJavaSQLType(field.dataType()), // datatype + field.dataType().typeName(), + SparkUtils.getColumnSize(field.dataType()), //columnsize, + null, // BUFFER_LENGTH, unused, + SparkUtils.getDecimalDigits(field.dataType()), + SparkUtils.getNumPrecRadix(field.dataType()), + field.nullable() ? 1 : 0, + field.getComment().isDefined() ? field.getComment().get() : "", + null, // COLUMN_DEF + null, // SQL_DATA_TYPE + null, // SQL_DATETIME_SUB + null, // CHAR_OCTET_LENGTH + position, + field.nullable() ? "YES" : "NO", // IS_NULLABLE + null, // SCOPE_CATALOG + null, // SCOPE_SCHEMA + null, // SCOPE_TABLE + null, // SOURCE_DATA_TYPE + "NO" // IS_AUTO_INCREMENT + }); + position += 1; + } + } + } + } + return columnList; + } +} diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetFunctionsJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetFunctionsJob.java new file mode 100644 index 0000000..297fb80 --- /dev/null +++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetFunctionsJob.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.thriftserver.session; + +import java.util.ArrayList; +import java.util.List; + +import scala.Tuple2; +import static scala.collection.JavaConversions.seqAsJavaList; + +import org.apache.spark.sql.catalyst.FunctionIdentifier; +import org.apache.spark.sql.catalyst.catalog.SessionCatalog; +import org.apache.spark.sql.catalyst.expressions.ExpressionInfo; + +public class GetFunctionsJob extends SparkCatalogJob { + private final String databasePattern; + private final String functionName; + + public GetFunctionsJob( + String databasePattern, + String functionName, + String sessionId, + String jobId) { + super(sessionId, jobId); + this.databasePattern = databasePattern; + this.functionName = functionName; + } + + @Override + protected List<Object[]> fetchCatalogObjects(SessionCatalog catalog) { + List<Object[]> funcList = new ArrayList<Object[]>(); + + List<String> databases = seqAsJavaList(catalog.listDatabases(databasePattern)); + for (String db : databases) { + List<Tuple2<FunctionIdentifier, String>> identifiersTypes = + seqAsJavaList(catalog.listFunctions(db, functionName)); + for (Tuple2<FunctionIdentifier, String> identifierType : identifiersTypes) { + FunctionIdentifier function = identifierType._1; + ExpressionInfo info = catalog.lookupFunctionInfo(function); + funcList.add(new Object[] { + null, + function.database().isDefined() ? function.database().get() : null, + function.funcName(), + info.getUsage() + info.getExtended(), + null, + info.getClassName() + }); + } + } + return funcList; + } +} diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetSchemasJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetSchemasJob.java new file mode 100644 index 0000000..451bea2 --- /dev/null +++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetSchemasJob.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.thriftserver.session; + +import java.util.ArrayList; +import java.util.List; + +import static scala.collection.JavaConversions.seqAsJavaList; + +import org.apache.spark.sql.catalyst.catalog.SessionCatalog; + +public class GetSchemasJob extends SparkCatalogJob { + private final String schemaName; + + public GetSchemasJob(String schemaName, String sessionId, String jobId) { + super(sessionId, jobId); + this.schemaName = schemaName; + } + + @Override + protected List<Object[]> fetchCatalogObjects(SessionCatalog catalog) { + List<String> databases = seqAsJavaList(catalog.listDatabases(schemaName)); + List<Object[]> schemas = new ArrayList<>(); + for (String db : databases) { + schemas.add(new Object[] { + db, + DEFAULT_HIVE_CATALOG, + }); + } + return schemas; + } +} diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetTablesJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetTablesJob.java new file mode 100644 index 0000000..a071aef --- /dev/null +++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetTablesJob.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.thriftserver.session; + +import java.util.ArrayList; +import java.util.List; + +import static scala.collection.JavaConversions.seqAsJavaList; + +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; +import org.apache.spark.sql.catalyst.catalog.CatalogTableType; +import org.apache.spark.sql.catalyst.catalog.SessionCatalog; + +public class GetTablesJob extends SparkCatalogJob { + private final String databasePattern; + private final String tablePattern; + private final List<String> tableTypes = new ArrayList<String>(); + + public GetTablesJob( + String databasePattern, + String tablePattern, + List<String> tableTypes, + String sessionId, + String jobId) { + super(sessionId, jobId); + this.databasePattern = databasePattern; + this.tablePattern = tablePattern; + if (tableTypes != null) { + for (String type : tableTypes) { + this.tableTypes.add(type.toUpperCase()); + } + } + } + + @Override + protected List<Object[]> fetchCatalogObjects(SessionCatalog catalog) { + List<Object[]> tableList = new ArrayList<Object[]>(); + List<String> databases = seqAsJavaList(catalog.listDatabases(databasePattern)); + for (String db : databases) { + List<TableIdentifier> tableIdentifiers = + seqAsJavaList(catalog.listTables(db, tablePattern)); + for (TableIdentifier tableIdentifier : tableIdentifiers) { + CatalogTable table = catalog.getTempViewOrPermanentTableMetadata(tableIdentifier); + String type = convertTableType(table.tableType().name()); + if (tableTypes.isEmpty() || tableTypes.contains(type)) { + tableList.add( + new Object[] { + DEFAULT_HIVE_CATALOG, + table.database(), + table.identifier().table(), + type, + table.comment().isDefined() ? table.comment().get() : "" + }); + } + } + } + return tableList; + } + + private String convertTableType(String originalType) { + if (originalType.equals(CatalogTableType.MANAGED().name())) { + return ClassicTableTypes.TABLE.name(); + } else if (originalType.equals(CatalogTableType.EXTERNAL().name())) { + return ClassicTableTypes.TABLE.name(); + } else if (originalType.equals(CatalogTableType.VIEW().name())) { + return ClassicTableTypes.VIEW.name(); + } else { + throw new IllegalArgumentException("Invalid spark table type " + originalType); + } + } + + private enum ClassicTableTypes { + TABLE, + VIEW, + } +} diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkCatalogJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkCatalogJob.java new file mode 100644 index 0000000..e86721e --- /dev/null +++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkCatalogJob.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.thriftserver.session; + +import java.util.List; + +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.catalog.SessionCatalog; + +import org.apache.livy.Job; +import org.apache.livy.JobContext; + +public abstract class SparkCatalogJob implements Job<Void> { + protected static final String DEFAULT_HIVE_CATALOG = ""; + + private final String sessionId; + private final String jobId; + + public SparkCatalogJob(String sessionId, String jobId) { + this.sessionId = sessionId; + this.jobId = jobId; + } + + protected abstract List<Object[]> fetchCatalogObjects(SessionCatalog catalog); + + @Override + public Void call(JobContext jc) throws Exception { + SessionCatalog catalog = ((SparkSession)jc.sparkSession()).sessionState().catalog(); + List<Object[]> objects = fetchCatalogObjects(catalog); + + ThriftSessionState session = ThriftSessionState.get(jc, sessionId); + session.registerCatalogJob(jobId, objects.iterator()); + return null; + } +} diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java index 99eab4d..fac79ad 100644 --- a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java +++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java @@ -17,6 +17,8 @@ package org.apache.livy.thriftserver.session; +import java.sql.Types; + import org.apache.spark.sql.types.*; /** @@ -57,4 +59,115 @@ final class SparkUtils { return types; } + /** + * This method is ported from Spark Hive Thrift server Type class + * @param type + * @return + */ + public static int toJavaSQLType(org.apache.spark.sql.types.DataType type) { + if (type instanceof NullType) { + return Types.NULL; + } else if (type instanceof BooleanType) { + return Types.BOOLEAN; + } else if (type instanceof ByteType) { + return Types.TINYINT; + } else if (type instanceof ShortType) { + return Types.SMALLINT; + } else if (type instanceof IntegerType) { + return Types.INTEGER; + } else if (type instanceof LongType) { + return Types.BIGINT; + } else if (type instanceof FloatType) { + return Types.FLOAT; + } else if (type instanceof DoubleType) { + return Types.DOUBLE; + } else if (type instanceof StringType) { + return Types.VARCHAR; + } else if (type instanceof DecimalType) { + return Types.DECIMAL; + } else if (type instanceof DateType) { + return Types.DATE; + } else if (type instanceof TimestampType) { + return Types.TIMESTAMP; + } else if (type instanceof BinaryType) { + return Types.BINARY; + } else if (type instanceof ArrayType) { + return Types.ARRAY; + } else if (type instanceof MapType) { + return Types.JAVA_OBJECT; + } else if (type instanceof StructType) { + return Types.STRUCT; + } else { + return Types.OTHER; + } + } + + /** + * This method is ported from Spark hive Thrift server TypeDescriptor + * @param type + * @return + */ + public static Integer getColumnSize(org.apache.spark.sql.types.DataType type) { + if (type instanceof ByteType) { + return 3; + } else if (type instanceof ShortType) { + return 5; + } else if (type instanceof IntegerType) { + return 10; + } else if (type instanceof LongType) { + return 19; + } else if (type instanceof FloatType) { + return 7; + } else if (type instanceof DoubleType) { + return 15; + } else if (type instanceof DecimalType) { + return ((DecimalType)type).precision(); + } else if (type instanceof StringType || type instanceof BinaryType || type instanceof MapType + || type instanceof ArrayType || type instanceof StructType) { + return Integer.MAX_VALUE; + } else if (type instanceof DateType) { + return 10; + } else if (type instanceof TimestampType) { + return 29; + } else { + return null; + } + } + + /** + * This method is ported from Spark hive Thrift server TypeDescriptor + * @param type + * @return + */ + public static Integer getDecimalDigits(org.apache.spark.sql.types.DataType type) { + if (type instanceof BooleanType || type instanceof ByteType || type instanceof ShortType + || type instanceof IntegerType || type instanceof LongType) { + return 0; + } else if (type instanceof FloatType) { + return 7; + } else if (type instanceof DoubleType) { + return 15; + } else if (type instanceof DecimalType) { + return ((DecimalType)type).scale(); + } else if (type instanceof TimestampType) { + return 9; + } else { + return null; + } + } + + /** + * This method is ported from Spark Hive Thrift server Type class + * @param type + * @return + */ + public static Integer getNumPrecRadix(org.apache.spark.sql.types.DataType type) { + if (type instanceof ByteType || type instanceof ShortType || type instanceof IntegerType + || type instanceof LongType || type instanceof FloatType || type instanceof DoubleType + || type instanceof DecimalType) { + return 10; + } else { + return null; + } + } } diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java index 1d71259..5378270 100644 --- a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java +++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java @@ -77,6 +77,7 @@ class ThriftSessionState { this.sessionId = sessionId; this.statements = new ConcurrentHashMap<>(); this.spark = ctx.<SparkSession>sparkSession().newSession(); + this.catalogJobStates = new ConcurrentHashMap<>(); } SparkSession spark() { @@ -126,4 +127,35 @@ class ThriftSessionState { } } + + private final ConcurrentMap<String, CatalogJobState> catalogJobStates; + + void registerCatalogJob(String jobId, Iterator<Object[]> results) { + checkNotNull(jobId, "No catalog job ID."); + CatalogJobState state = new CatalogJobState(results); + if (catalogJobStates.putIfAbsent(jobId, state) != null) { + throw new IllegalStateException( + String.format("Catalog job %s already registered.", jobId)); + } + } + + CatalogJobState findCatalogJob(String jobId) { + checkNotNull(jobId, "No catalog job ID."); + CatalogJobState state = catalogJobStates.get(jobId); + if (state == null) { + throw catalogJobNotFound(jobId); + } + return state; + } + + boolean cleanupCatalogJob(String jobId) { + checkNotNull(jobId, "No catalog job ID."); + return catalogJobStates.remove(jobId) != null; + } + + private NoSuchElementException catalogJobNotFound(String jobId) { + return new NoSuchElementException( + String.format("Catalog job %s not found in session %s.", jobId, sessionId)); + } + } diff --git a/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java b/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java index 4dd30a2..addc630 100644 --- a/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java +++ b/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java @@ -19,6 +19,8 @@ package org.apache.livy.thriftserver.session; import java.net.URI; import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -124,9 +126,58 @@ public class ThriftSessionTest { assertTrue(cols[0].getNulls().get(0)); assertTrue(waitFor(new CleanupStatementJob(s1, st4))); - // Tear down the session. waitFor(new UnregisterSessionJob(s1)); + + String s3 = nextSession(); + waitFor(new RegisterSessionJob(s3)); + String getSchemaJobId = "test_get_schema_job"; + waitFor(new GetSchemasJob("default", s3, getSchemaJobId)); + List<Object[]> schemas = waitFor( + new FetchCatalogResultJob(s3, getSchemaJobId, Integer.MAX_VALUE)); + assertEquals(1, schemas.size()); + assertEquals("default", schemas.get(0)[0]); + assertTrue(waitFor(new CleanupCatalogResultJob(s3, getSchemaJobId))); + + String getTablesJobId = "test_get_tables_job"; + List<String> testTableTypes = new ArrayList<>(); + testTableTypes.add("Table"); + waitFor(new GetTablesJob("default", "*", + testTableTypes, s3, getTablesJobId)); + List<Object[]> tables = waitFor( + new FetchCatalogResultJob(s3, getTablesJobId, Integer.MAX_VALUE)); + assertEquals(1, tables.size()); + assertEquals("default", tables.get(0)[1]); + assertEquals("test", tables.get(0)[2]); + assertTrue(waitFor(new CleanupCatalogResultJob(s3, getTablesJobId))); + + String getColumnsJobId = "test_get_columns_job"; + waitFor(new GetColumnsJob("default", "test", ".*", s3, getColumnsJobId)); + List<Object[]> columns = waitFor( + new FetchCatalogResultJob(s3, getColumnsJobId, Integer.MAX_VALUE)); + assertEquals(2, columns.size()); + assertEquals("default", columns.get(0)[1]); + assertEquals("test", columns.get(0)[2]); + assertEquals("id", columns.get(0)[3]); + assertEquals("integer", columns.get(0)[5]); + assertEquals("default", columns.get(1)[1]); + assertEquals("test", columns.get(1)[2]); + assertEquals("desc", columns.get(1)[3]); + assertEquals("string", columns.get(1)[5]); + assertTrue(waitFor(new CleanupCatalogResultJob(s3, getColumnsJobId))); + + String getFunctionsJobId = "test_get_functions_job"; + waitFor(new GetFunctionsJob("default", "unix_timestamp", s3, getFunctionsJobId)); + List<Object[]> functions = waitFor( + new FetchCatalogResultJob(s3, getFunctionsJobId, Integer.MAX_VALUE)); + assertEquals(1, functions.size()); + assertNull(functions.get(0)[1]); + assertEquals("unix_timestamp", functions.get(0)[2]); + assertEquals("org.apache.spark.sql.catalyst.expressions.UnixTimestamp", functions.get(0)[5]); + assertTrue(waitFor(new CleanupCatalogResultJob(s3, getFunctionsJobId))); + + // Tear down the session. + waitFor(new UnregisterSessionJob(s3)); } private String nextSession() {