http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/MetadataOperation.java ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/MetadataOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/MetadataOperation.java new file mode 100644 index 0000000..4595ef5 --- /dev/null +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/MetadataOperation.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli.operation; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzPluginException; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.service.cli.HiveSQLException; +import org.apache.hive.service.cli.OperationState; +import org.apache.hive.service.cli.OperationType; +import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.session.HiveSession; + +/** + * MetadataOperation. + * + */ +public abstract class MetadataOperation extends Operation { + + protected static final String DEFAULT_HIVE_CATALOG = ""; + protected static TableSchema RESULT_SET_SCHEMA; + private static final char SEARCH_STRING_ESCAPE = '\\'; + + protected MetadataOperation(HiveSession parentSession, OperationType opType) { + super(parentSession, opType, false); + setHasResultSet(true); + } + + + /* (non-Javadoc) + * @see org.apache.hive.service.cli.Operation#close() + */ + @Override + public void close() throws HiveSQLException { + setState(OperationState.CLOSED); + cleanupOperationLog(); + } + + /** + * Convert wildchars and escape sequence from JDBC format to datanucleous/regex + */ + protected String convertIdentifierPattern(final String pattern, boolean datanucleusFormat) { + if (pattern == null) { + return convertPattern("%", true); + } else { + return convertPattern(pattern, datanucleusFormat); + } + } + + /** + * Convert wildchars and escape sequence of schema pattern from JDBC format to datanucleous/regex + * The schema pattern treats empty string also as wildchar + */ + protected String convertSchemaPattern(final String pattern) { + if ((pattern == null) || pattern.isEmpty()) { + return convertPattern("%", true); + } else { + return convertPattern(pattern, true); + } + } + + /** + * Convert a pattern containing JDBC catalog search wildcards into + * Java regex patterns. + * + * @param pattern input which may contain '%' or '_' wildcard characters, or + * these characters escaped using {@link #getSearchStringEscape()}. + * @return replace %/_ with regex search characters, also handle escaped + * characters. + * + * The datanucleus module expects the wildchar as '*'. The columns search on the + * other hand is done locally inside the hive code and that requires the regex wildchar + * format '.*' This is driven by the datanucleusFormat flag. + */ + private String convertPattern(final String pattern, boolean datanucleusFormat) { + String wStr; + if (datanucleusFormat) { + wStr = "*"; + } else { + wStr = ".*"; + } + return pattern + .replaceAll("([^\\\\])%", "$1" + wStr).replaceAll("\\\\%", "%").replaceAll("^%", wStr) + .replaceAll("([^\\\\])_", "$1.").replaceAll("\\\\_", "_").replaceAll("^_", "."); + } + + protected boolean isAuthV2Enabled(){ + SessionState ss = SessionState.get(); + return (ss.isAuthorizationModeV2() && + HiveConf.getBoolVar(ss.getConf(), HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)); + } + + protected void authorizeMetaGets(HiveOperationType opType, List<HivePrivilegeObject> inpObjs) + throws HiveSQLException { + authorizeMetaGets(opType, inpObjs, null); + } + + protected void authorizeMetaGets(HiveOperationType opType, List<HivePrivilegeObject> inpObjs, + String cmdString) throws HiveSQLException { + SessionState ss = SessionState.get(); + HiveAuthzContext.Builder ctxBuilder = new HiveAuthzContext.Builder(); + ctxBuilder.setUserIpAddress(ss.getUserIpAddress()); + ctxBuilder.setCommandString(cmdString); + try { + ss.getAuthorizerV2().checkPrivileges(opType, inpObjs, null, + ctxBuilder.build()); + } catch (HiveAuthzPluginException | HiveAccessControlException e) { + throw new HiveSQLException(e.getMessage(), e); + } + } + +}
http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java new file mode 100644 index 0000000..19153b6 --- /dev/null +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.service.cli.operation; + +import java.io.File; +import java.io.FileNotFoundException; +import java.util.EnumSet; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.OperationLog; +import org.apache.hive.service.cli.FetchOrientation; +import org.apache.hive.service.cli.HiveSQLException; +import org.apache.hive.service.cli.OperationHandle; +import org.apache.hive.service.cli.OperationState; +import org.apache.hive.service.cli.OperationStatus; +import org.apache.hive.service.cli.OperationType; +import org.apache.hive.service.cli.RowSet; +import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.session.HiveSession; +import org.apache.hive.service.cli.thrift.TProtocolVersion; + +public abstract class Operation { + protected final HiveSession parentSession; + private OperationState state = OperationState.INITIALIZED; + private final OperationHandle opHandle; + private HiveConf configuration; + public static final Log LOG = LogFactory.getLog(Operation.class.getName()); + public static final FetchOrientation DEFAULT_FETCH_ORIENTATION = FetchOrientation.FETCH_NEXT; + public static final long DEFAULT_FETCH_MAX_ROWS = 100; + protected boolean hasResultSet; + protected volatile HiveSQLException operationException; + protected final boolean runAsync; + protected volatile Future<?> backgroundHandle; + protected OperationLog operationLog; + protected boolean isOperationLogEnabled; + + private long operationTimeout; + private long lastAccessTime; + + protected static final EnumSet<FetchOrientation> DEFAULT_FETCH_ORIENTATION_SET = + EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST); + + protected Operation(HiveSession parentSession, OperationType opType, boolean runInBackground) { + this.parentSession = parentSession; + this.runAsync = runInBackground; + this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion()); + lastAccessTime = System.currentTimeMillis(); + operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(), + HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS); + } + + public Future<?> getBackgroundHandle() { + return backgroundHandle; + } + + protected void setBackgroundHandle(Future<?> backgroundHandle) { + this.backgroundHandle = backgroundHandle; + } + + public boolean shouldRunAsync() { + return runAsync; + } + + public void setConfiguration(HiveConf configuration) { + this.configuration = new HiveConf(configuration); + } + + public HiveConf getConfiguration() { + return new HiveConf(configuration); + } + + public HiveSession getParentSession() { + return parentSession; + } + + public OperationHandle getHandle() { + return opHandle; + } + + public TProtocolVersion getProtocolVersion() { + return opHandle.getProtocolVersion(); + } + + public OperationType getType() { + return opHandle.getOperationType(); + } + + public OperationStatus getStatus() { + return new OperationStatus(state, operationException); + } + + public boolean hasResultSet() { + return hasResultSet; + } + + protected void setHasResultSet(boolean hasResultSet) { + this.hasResultSet = hasResultSet; + opHandle.setHasResultSet(hasResultSet); + } + + public OperationLog getOperationLog() { + return operationLog; + } + + protected final OperationState setState(OperationState newState) throws HiveSQLException { + state.validateTransition(newState); + this.state = newState; + this.lastAccessTime = System.currentTimeMillis(); + return this.state; + } + + public boolean isTimedOut(long current) { + if (operationTimeout == 0) { + return false; + } + if (operationTimeout > 0) { + // check only when it's in terminal state + return state.isTerminal() && lastAccessTime + operationTimeout <= current; + } + return lastAccessTime + -operationTimeout <= current; + } + + public long getLastAccessTime() { + return lastAccessTime; + } + + public long getOperationTimeout() { + return operationTimeout; + } + + public void setOperationTimeout(long operationTimeout) { + this.operationTimeout = operationTimeout; + } + + protected void setOperationException(HiveSQLException operationException) { + this.operationException = operationException; + } + + protected final void assertState(OperationState state) throws HiveSQLException { + if (this.state != state) { + throw new HiveSQLException("Expected state " + state + ", but found " + this.state); + } + this.lastAccessTime = System.currentTimeMillis(); + } + + public boolean isRunning() { + return OperationState.RUNNING.equals(state); + } + + public boolean isFinished() { + return OperationState.FINISHED.equals(state); + } + + public boolean isCanceled() { + return OperationState.CANCELED.equals(state); + } + + public boolean isFailed() { + return OperationState.ERROR.equals(state); + } + + protected void createOperationLog() { + if (parentSession.isOperationLogEnabled()) { + File operationLogFile = new File(parentSession.getOperationLogSessionDir(), + opHandle.getHandleIdentifier().toString()); + isOperationLogEnabled = true; + + // create log file + try { + if (operationLogFile.exists()) { + LOG.warn("The operation log file should not exist, but it is already there: " + + operationLogFile.getAbsolutePath()); + operationLogFile.delete(); + } + if (!operationLogFile.createNewFile()) { + // the log file already exists and cannot be deleted. + // If it can be read/written, keep its contents and use it. + if (!operationLogFile.canRead() || !operationLogFile.canWrite()) { + LOG.warn("The already existed operation log file cannot be recreated, " + + "and it cannot be read or written: " + operationLogFile.getAbsolutePath()); + isOperationLogEnabled = false; + return; + } + } + } catch (Exception e) { + LOG.warn("Unable to create operation log file: " + operationLogFile.getAbsolutePath(), e); + isOperationLogEnabled = false; + return; + } + + // create OperationLog object with above log file + try { + operationLog = new OperationLog(opHandle.toString(), operationLogFile, parentSession.getHiveConf()); + } catch (FileNotFoundException e) { + LOG.warn("Unable to instantiate OperationLog object for operation: " + + opHandle, e); + isOperationLogEnabled = false; + return; + } + + // register this operationLog to current thread + OperationLog.setCurrentOperationLog(operationLog); + } + } + + protected void unregisterOperationLog() { + if (isOperationLogEnabled) { + OperationLog.removeCurrentOperationLog(); + } + } + + /** + * Invoked before runInternal(). + * Set up some preconditions, or configurations. + */ + protected void beforeRun() { + createOperationLog(); + } + + /** + * Invoked after runInternal(), even if an exception is thrown in runInternal(). + * Clean up resources, which was set up in beforeRun(). + */ + protected void afterRun() { + unregisterOperationLog(); + } + + /** + * Implemented by subclass of Operation class to execute specific behaviors. + * @throws HiveSQLException + */ + protected abstract void runInternal() throws HiveSQLException; + + public void run() throws HiveSQLException { + beforeRun(); + try { + runInternal(); + } finally { + afterRun(); + } + } + + protected void cleanupOperationLog() { + if (isOperationLogEnabled) { + if (operationLog == null) { + LOG.error("Operation [ " + opHandle.getHandleIdentifier() + " ] " + + "logging is enabled, but its OperationLog object cannot be found."); + } else { + operationLog.close(); + } + } + } + + // TODO: make this abstract and implement in subclasses. + public void cancel() throws HiveSQLException { + setState(OperationState.CANCELED); + throw new UnsupportedOperationException("SQLOperation.cancel()"); + } + + public abstract void close() throws HiveSQLException; + + public abstract TableSchema getResultSetSchema() throws HiveSQLException; + + public abstract RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException; + + public RowSet getNextRowSet() throws HiveSQLException { + return getNextRowSet(FetchOrientation.FETCH_NEXT, DEFAULT_FETCH_MAX_ROWS); + } + + /** + * Verify if the given fetch orientation is part of the default orientation types. + * @param orientation + * @throws HiveSQLException + */ + protected void validateDefaultFetchOrientation(FetchOrientation orientation) + throws HiveSQLException { + validateFetchOrientation(orientation, DEFAULT_FETCH_ORIENTATION_SET); + } + + /** + * Verify if the given fetch orientation is part of the supported orientation types. + * @param orientation + * @param supportedOrientations + * @throws HiveSQLException + */ + protected void validateFetchOrientation(FetchOrientation orientation, + EnumSet<FetchOrientation> supportedOrientations) throws HiveSQLException { + if (!supportedOrientations.contains(orientation)) { + throw new HiveSQLException("The fetch type " + orientation.toString() + + " is not supported for this resultset", "HY106"); + } + } + + protected HiveSQLException toSQLException(String prefix, CommandProcessorResponse response) { + HiveSQLException ex = new HiveSQLException(prefix + ": " + response.getErrorMessage(), + response.getSQLState(), response.getResponseCode()); + if (response.getException() != null) { + ex.initCause(response.getException()); + } + return ex; + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java new file mode 100644 index 0000000..92c340a --- /dev/null +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli.operation; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.ql.session.OperationLog; +import org.apache.hive.service.AbstractService; +import org.apache.hive.service.cli.FetchOrientation; +import org.apache.hive.service.cli.HiveSQLException; +import org.apache.hive.service.cli.OperationHandle; +import org.apache.hive.service.cli.OperationState; +import org.apache.hive.service.cli.OperationStatus; +import org.apache.hive.service.cli.RowSet; +import org.apache.hive.service.cli.RowSetFactory; +import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.session.HiveSession; +import org.apache.log4j.Appender; +import org.apache.log4j.Logger; + +/** + * OperationManager. + * + */ +public class OperationManager extends AbstractService { + private final Log LOG = LogFactory.getLog(OperationManager.class.getName()); + + private final Map<OperationHandle, Operation> handleToOperation = + new HashMap<OperationHandle, Operation>(); + + public OperationManager() { + super(OperationManager.class.getSimpleName()); + } + + @Override + public synchronized void init(HiveConf hiveConf) { + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { + initOperationLogCapture(hiveConf.getVar( + HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL)); + } else { + LOG.debug("Operation level logging is turned off"); + } + super.init(hiveConf); + } + + @Override + public synchronized void start() { + super.start(); + // TODO + } + + @Override + public synchronized void stop() { + // TODO + super.stop(); + } + + private void initOperationLogCapture(String loggingMode) { + // Register another Appender (with the same layout) that talks to us. + Appender ap = new LogDivertAppender(this, OperationLog.getLoggingLevel(loggingMode)); + Logger.getRootLogger().addAppender(ap); + } + + public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, + String statement, Map<String, String> confOverlay, boolean runAsync) + throws HiveSQLException { + ExecuteStatementOperation executeStatementOperation = ExecuteStatementOperation + .newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync); + addOperation(executeStatementOperation); + return executeStatementOperation; + } + + public GetTypeInfoOperation newGetTypeInfoOperation(HiveSession parentSession) { + GetTypeInfoOperation operation = new GetTypeInfoOperation(parentSession); + addOperation(operation); + return operation; + } + + public GetCatalogsOperation newGetCatalogsOperation(HiveSession parentSession) { + GetCatalogsOperation operation = new GetCatalogsOperation(parentSession); + addOperation(operation); + return operation; + } + + public GetSchemasOperation newGetSchemasOperation(HiveSession parentSession, + String catalogName, String schemaName) { + GetSchemasOperation operation = new GetSchemasOperation(parentSession, catalogName, schemaName); + addOperation(operation); + return operation; + } + + public MetadataOperation newGetTablesOperation(HiveSession parentSession, + String catalogName, String schemaName, String tableName, + List<String> tableTypes) { + MetadataOperation operation = + new GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes); + addOperation(operation); + return operation; + } + + public GetTableTypesOperation newGetTableTypesOperation(HiveSession parentSession) { + GetTableTypesOperation operation = new GetTableTypesOperation(parentSession); + addOperation(operation); + return operation; + } + + public GetColumnsOperation newGetColumnsOperation(HiveSession parentSession, + String catalogName, String schemaName, String tableName, String columnName) { + GetColumnsOperation operation = new GetColumnsOperation(parentSession, + catalogName, schemaName, tableName, columnName); + addOperation(operation); + return operation; + } + + public GetFunctionsOperation newGetFunctionsOperation(HiveSession parentSession, + String catalogName, String schemaName, String functionName) { + GetFunctionsOperation operation = new GetFunctionsOperation(parentSession, + catalogName, schemaName, functionName); + addOperation(operation); + return operation; + } + + public Operation getOperation(OperationHandle operationHandle) throws HiveSQLException { + Operation operation = getOperationInternal(operationHandle); + if (operation == null) { + throw new HiveSQLException("Invalid OperationHandle: " + operationHandle); + } + return operation; + } + + private synchronized Operation getOperationInternal(OperationHandle operationHandle) { + return handleToOperation.get(operationHandle); + } + + private synchronized Operation removeTimedOutOperation(OperationHandle operationHandle) { + Operation operation = handleToOperation.get(operationHandle); + if (operation != null && operation.isTimedOut(System.currentTimeMillis())) { + handleToOperation.remove(operationHandle); + return operation; + } + return null; + } + + private synchronized void addOperation(Operation operation) { + handleToOperation.put(operation.getHandle(), operation); + } + + private synchronized Operation removeOperation(OperationHandle opHandle) { + return handleToOperation.remove(opHandle); + } + + public OperationStatus getOperationStatus(OperationHandle opHandle) + throws HiveSQLException { + return getOperation(opHandle).getStatus(); + } + + public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { + Operation operation = getOperation(opHandle); + OperationState opState = operation.getStatus().getState(); + if (opState == OperationState.CANCELED || + opState == OperationState.CLOSED || + opState == OperationState.FINISHED || + opState == OperationState.ERROR || + opState == OperationState.UNKNOWN) { + // Cancel should be a no-op in either cases + LOG.debug(opHandle + ": Operation is already aborted in state - " + opState); + } + else { + LOG.debug(opHandle + ": Attempting to cancel from state - " + opState); + operation.cancel(); + } + } + + public void closeOperation(OperationHandle opHandle) throws HiveSQLException { + Operation operation = removeOperation(opHandle); + if (operation == null) { + throw new HiveSQLException("Operation does not exist!"); + } + operation.close(); + } + + public TableSchema getOperationResultSetSchema(OperationHandle opHandle) + throws HiveSQLException { + return getOperation(opHandle).getResultSetSchema(); + } + + public RowSet getOperationNextRowSet(OperationHandle opHandle) + throws HiveSQLException { + return getOperation(opHandle).getNextRowSet(); + } + + public RowSet getOperationNextRowSet(OperationHandle opHandle, + FetchOrientation orientation, long maxRows) + throws HiveSQLException { + return getOperation(opHandle).getNextRowSet(orientation, maxRows); + } + + public RowSet getOperationLogRowSet(OperationHandle opHandle, + FetchOrientation orientation, long maxRows) + throws HiveSQLException { + // get the OperationLog object from the operation + OperationLog operationLog = getOperation(opHandle).getOperationLog(); + if (operationLog == null) { + throw new HiveSQLException("Couldn't find log associated with operation handle: " + opHandle); + } + + // read logs + List<String> logs; + try { + logs = operationLog.readOperationLog(isFetchFirst(orientation), maxRows); + } catch (SQLException e) { + throw new HiveSQLException(e.getMessage(), e.getCause()); + } + + + // convert logs to RowSet + TableSchema tableSchema = new TableSchema(getLogSchema()); + RowSet rowSet = RowSetFactory.create(tableSchema, getOperation(opHandle).getProtocolVersion()); + for (String log : logs) { + rowSet.addRow(new String[] {log}); + } + + return rowSet; + } + + private boolean isFetchFirst(FetchOrientation fetchOrientation) { + //TODO: Since OperationLog is moved to package o.a.h.h.ql.session, + // we may add a Enum there and map FetchOrientation to it. + if (fetchOrientation.equals(FetchOrientation.FETCH_FIRST)) { + return true; + } + return false; + } + + private Schema getLogSchema() { + Schema schema = new Schema(); + FieldSchema fieldSchema = new FieldSchema(); + fieldSchema.setName("operation_log"); + fieldSchema.setType("string"); + schema.addToFieldSchemas(fieldSchema); + return schema; + } + + public OperationLog getOperationLogByThread() { + return OperationLog.getCurrentOperationLog(); + } + + public List<Operation> removeExpiredOperations(OperationHandle[] handles) { + List<Operation> removed = new ArrayList<Operation>(); + for (OperationHandle handle : handles) { + Operation operation = removeTimedOutOperation(handle); + if (operation != null) { + LOG.warn("Operation " + handle + " is timed-out and will be closed"); + removed.add(operation); + } + } + return removed; + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/SQLOperation.java ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/SQLOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/SQLOperation.java new file mode 100644 index 0000000..33ee16b --- /dev/null +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -0,0 +1,473 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli.operation; + +import java.io.IOException; +import java.io.Serializable; +import java.io.UnsupportedEncodingException; +import java.security.PrivilegedExceptionAction; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; + +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.exec.ExplainTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.VariableSubstitution; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.OperationLog; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.service.cli.FetchOrientation; +import org.apache.hive.service.cli.HiveSQLException; +import org.apache.hive.service.cli.OperationState; +import org.apache.hive.service.cli.RowSet; +import org.apache.hive.service.cli.RowSetFactory; +import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.session.HiveSession; +import org.apache.hive.service.server.ThreadWithGarbageCleanup; + +/** + * SQLOperation. + * + */ +public class SQLOperation extends ExecuteStatementOperation { + + private Driver driver = null; + private CommandProcessorResponse response; + private TableSchema resultSchema = null; + private Schema mResultSchema = null; + private SerDe serde = null; + private boolean fetchStarted = false; + + public SQLOperation(HiveSession parentSession, String statement, Map<String, + String> confOverlay, boolean runInBackground) { + // TODO: call setRemoteUser in ExecuteStatementOperation or higher. + super(parentSession, statement, confOverlay, runInBackground); + } + + /*** + * Compile the query and extract metadata + * @param sqlOperationConf + * @throws HiveSQLException + */ + public void prepare(HiveConf sqlOperationConf) throws HiveSQLException { + setState(OperationState.RUNNING); + + try { + driver = new Driver(sqlOperationConf, getParentSession().getUserName()); + + // set the operation handle information in Driver, so that thrift API users + // can use the operation handle they receive, to lookup query information in + // Yarn ATS + String guid64 = Base64.encodeBase64URLSafeString(getHandle().getHandleIdentifier() + .toTHandleIdentifier().getGuid()).trim(); + driver.setOperationId(guid64); + + // In Hive server mode, we are not able to retry in the FetchTask + // case, when calling fetch queries since execute() has returned. + // For now, we disable the test attempts. + driver.setTryCount(Integer.MAX_VALUE); + + String subStatement = new VariableSubstitution().substitute(sqlOperationConf, statement); + response = driver.compileAndRespond(subStatement); + if (0 != response.getResponseCode()) { + throw toSQLException("Error while compiling statement", response); + } + + mResultSchema = driver.getSchema(); + + // hasResultSet should be true only if the query has a FetchTask + // "explain" is an exception for now + if(driver.getPlan().getFetchTask() != null) { + //Schema has to be set + if (mResultSchema == null || !mResultSchema.isSetFieldSchemas()) { + throw new HiveSQLException("Error compiling query: Schema and FieldSchema " + + "should be set when query plan has a FetchTask"); + } + resultSchema = new TableSchema(mResultSchema); + setHasResultSet(true); + } else { + setHasResultSet(false); + } + // Set hasResultSet true if the plan has ExplainTask + // TODO explain should use a FetchTask for reading + for (Task<? extends Serializable> task: driver.getPlan().getRootTasks()) { + if (task.getClass() == ExplainTask.class) { + resultSchema = new TableSchema(mResultSchema); + setHasResultSet(true); + break; + } + } + } catch (HiveSQLException e) { + setState(OperationState.ERROR); + throw e; + } catch (Exception e) { + setState(OperationState.ERROR); + throw new HiveSQLException("Error running query: " + e.toString(), e); + } + } + + private void runQuery(HiveConf sqlOperationConf) throws HiveSQLException { + try { + // In Hive server mode, we are not able to retry in the FetchTask + // case, when calling fetch queries since execute() has returned. + // For now, we disable the test attempts. + driver.setTryCount(Integer.MAX_VALUE); + response = driver.run(); + if (0 != response.getResponseCode()) { + throw toSQLException("Error while processing statement", response); + } + } catch (HiveSQLException e) { + // If the operation was cancelled by another thread, + // Driver#run will return a non-zero response code. + // We will simply return if the operation state is CANCELED, + // otherwise throw an exception + if (getStatus().getState() == OperationState.CANCELED) { + return; + } + else { + setState(OperationState.ERROR); + throw e; + } + } catch (Exception e) { + setState(OperationState.ERROR); + throw new HiveSQLException("Error running query: " + e.toString(), e); + } + setState(OperationState.FINISHED); + } + + @Override + public void runInternal() throws HiveSQLException { + setState(OperationState.PENDING); + final HiveConf opConfig = getConfigForOperation(); + prepare(opConfig); + if (!shouldRunAsync()) { + runQuery(opConfig); + } else { + // We'll pass ThreadLocals in the background thread from the foreground (handler) thread + final SessionState parentSessionState = SessionState.get(); + // ThreadLocal Hive object needs to be set in background thread. + // The metastore client in Hive is associated with right user. + final Hive parentHive = getSessionHive(); + // Current UGI will get used by metastore when metsatore is in embedded mode + // So this needs to get passed to the new background thread + final UserGroupInformation currentUGI = getCurrentUGI(opConfig); + // Runnable impl to call runInternal asynchronously, + // from a different thread + Runnable backgroundOperation = new Runnable() { + @Override + public void run() { + PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() { + @Override + public Object run() throws HiveSQLException { + Hive.set(parentHive); + SessionState.setCurrentSessionState(parentSessionState); + // Set current OperationLog in this async thread for keeping on saving query log. + registerCurrentOperationLog(); + try { + runQuery(opConfig); + } catch (HiveSQLException e) { + setOperationException(e); + LOG.error("Error running hive query: ", e); + } finally { + unregisterOperationLog(); + } + return null; + } + }; + + try { + currentUGI.doAs(doAsAction); + } catch (Exception e) { + setOperationException(new HiveSQLException(e)); + LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e); + } + finally { + /** + * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup + * when this thread is garbage collected later. + * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() + */ + if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) { + ThreadWithGarbageCleanup currentThread = + (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread(); + currentThread.cacheThreadLocalRawStore(); + } + } + } + }; + try { + // This submit blocks if no background threads are available to run this operation + Future<?> backgroundHandle = + getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation); + setBackgroundHandle(backgroundHandle); + } catch (RejectedExecutionException rejected) { + setState(OperationState.ERROR); + throw new HiveSQLException("The background threadpool cannot accept" + + " new task for execution, please retry the operation", rejected); + } + } + } + + /** + * Returns the current UGI on the stack + * @param opConfig + * @return UserGroupInformation + * @throws HiveSQLException + */ + private UserGroupInformation getCurrentUGI(HiveConf opConfig) throws HiveSQLException { + try { + return Utils.getUGI(); + } catch (Exception e) { + throw new HiveSQLException("Unable to get current user", e); + } + } + + /** + * Returns the ThreadLocal Hive for the current thread + * @return Hive + * @throws HiveSQLException + */ + private Hive getSessionHive() throws HiveSQLException { + try { + return Hive.get(); + } catch (HiveException e) { + throw new HiveSQLException("Failed to get ThreadLocal Hive object", e); + } + } + + private void registerCurrentOperationLog() { + if (isOperationLogEnabled) { + if (operationLog == null) { + LOG.warn("Failed to get current OperationLog object of Operation: " + + getHandle().getHandleIdentifier()); + isOperationLogEnabled = false; + return; + } + OperationLog.setCurrentOperationLog(operationLog); + } + } + + private void cleanup(OperationState state) throws HiveSQLException { + setState(state); + if (shouldRunAsync()) { + Future<?> backgroundHandle = getBackgroundHandle(); + if (backgroundHandle != null) { + backgroundHandle.cancel(true); + } + } + if (driver != null) { + driver.close(); + driver.destroy(); + } + driver = null; + + SessionState ss = SessionState.get(); + if (ss.getTmpOutputFile() != null) { + ss.getTmpOutputFile().delete(); + } + } + + @Override + public void cancel() throws HiveSQLException { + cleanup(OperationState.CANCELED); + } + + @Override + public void close() throws HiveSQLException { + cleanup(OperationState.CLOSED); + cleanupOperationLog(); + } + + @Override + public TableSchema getResultSetSchema() throws HiveSQLException { + assertState(OperationState.FINISHED); + if (resultSchema == null) { + resultSchema = new TableSchema(driver.getSchema()); + } + return resultSchema; + } + + private transient final List<Object> convey = new ArrayList<Object>(); + + @Override + public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { + validateDefaultFetchOrientation(orientation); + assertState(OperationState.FINISHED); + + RowSet rowSet = RowSetFactory.create(resultSchema, getProtocolVersion()); + + try { + /* if client is requesting fetch-from-start and its not the first time reading from this operation + * then reset the fetch position to beginning + */ + if (orientation.equals(FetchOrientation.FETCH_FIRST) && fetchStarted) { + driver.resetFetch(); + } + fetchStarted = true; + driver.setMaxRows((int) maxRows); + if (driver.getResults(convey)) { + return decode(convey, rowSet); + } + return rowSet; + } catch (IOException e) { + throw new HiveSQLException(e); + } catch (CommandNeedRetryException e) { + throw new HiveSQLException(e); + } catch (Exception e) { + throw new HiveSQLException(e); + } finally { + convey.clear(); + } + } + + private RowSet decode(List<Object> rows, RowSet rowSet) throws Exception { + if (driver.isFetchingTable()) { + return prepareFromRow(rows, rowSet); + } + return decodeFromString(rows, rowSet); + } + + // already encoded to thrift-able object in ThriftFormatter + private RowSet prepareFromRow(List<Object> rows, RowSet rowSet) throws Exception { + for (Object row : rows) { + rowSet.addRow((Object[]) row); + } + return rowSet; + } + + private RowSet decodeFromString(List<Object> rows, RowSet rowSet) + throws SQLException, SerDeException { + getSerDe(); + StructObjectInspector soi = (StructObjectInspector) serde.getObjectInspector(); + List<? extends StructField> fieldRefs = soi.getAllStructFieldRefs(); + + Object[] deserializedFields = new Object[fieldRefs.size()]; + Object rowObj; + ObjectInspector fieldOI; + + int protocol = getProtocolVersion().getValue(); + for (Object rowString : rows) { + try { + rowObj = serde.deserialize(new BytesWritable(((String)rowString).getBytes("UTF-8"))); + } catch (UnsupportedEncodingException e) { + throw new SerDeException(e); + } + for (int i = 0; i < fieldRefs.size(); i++) { + StructField fieldRef = fieldRefs.get(i); + fieldOI = fieldRef.getFieldObjectInspector(); + Object fieldData = soi.getStructFieldData(rowObj, fieldRef); + deserializedFields[i] = SerDeUtils.toThriftPayload(fieldData, fieldOI, protocol); + } + rowSet.addRow(deserializedFields); + } + return rowSet; + } + + private SerDe getSerDe() throws SQLException { + if (serde != null) { + return serde; + } + try { + List<FieldSchema> fieldSchemas = mResultSchema.getFieldSchemas(); + StringBuilder namesSb = new StringBuilder(); + StringBuilder typesSb = new StringBuilder(); + + if (fieldSchemas != null && !fieldSchemas.isEmpty()) { + for (int pos = 0; pos < fieldSchemas.size(); pos++) { + if (pos != 0) { + namesSb.append(","); + typesSb.append(","); + } + namesSb.append(fieldSchemas.get(pos).getName()); + typesSb.append(fieldSchemas.get(pos).getType()); + } + } + String names = namesSb.toString(); + String types = typesSb.toString(); + + serde = new LazySimpleSerDe(); + Properties props = new Properties(); + if (names.length() > 0) { + LOG.debug("Column names: " + names); + props.setProperty(serdeConstants.LIST_COLUMNS, names); + } + if (types.length() > 0) { + LOG.debug("Column types: " + types); + props.setProperty(serdeConstants.LIST_COLUMN_TYPES, types); + } + SerDeUtils.initializeSerDe(serde, new HiveConf(), props, null); + + } catch (Exception ex) { + ex.printStackTrace(); + throw new SQLException("Could not create ResultSet: " + ex.getMessage(), ex); + } + return serde; + } + + /** + * If there are query specific settings to overlay, then create a copy of config + * There are two cases we need to clone the session config that's being passed to hive driver + * 1. Async query - + * If the client changes a config setting, that shouldn't reflect in the execution already underway + * 2. confOverlay - + * The query specific settings should only be applied to the query config and not session + * @return new configuration + * @throws HiveSQLException + */ + private HiveConf getConfigForOperation() throws HiveSQLException { + HiveConf sqlOperationConf = getParentSession().getHiveConf(); + if (!getConfOverlay().isEmpty() || shouldRunAsync()) { + // clone the partent session config for this query + sqlOperationConf = new HiveConf(sqlOperationConf); + + // apply overlay query specific settings, if any + for (Map.Entry<String, String> confEntry : getConfOverlay().entrySet()) { + try { + sqlOperationConf.verifyAndSet(confEntry.getKey(), confEntry.getValue()); + } catch (IllegalArgumentException e) { + throw new HiveSQLException("Error applying statement specific settings", e); + } + } + } + return sqlOperationConf; + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/TableTypeMapping.java ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/TableTypeMapping.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/TableTypeMapping.java new file mode 100644 index 0000000..3a8a07f --- /dev/null +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/TableTypeMapping.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli.operation; + +import java.util.Set; + + +public interface TableTypeMapping { + /** + * Map client's table type name to hive's table type + * @param clientTypeName + * @return + */ + public String mapToHiveType (String clientTypeName); + + /** + * Map hive's table type name to client's table type + * @param clientTypeName + * @return + */ + public String mapToClientType (String hiveTypeName); + + /** + * Get all the table types of this mapping + * @return + */ + public Set<String> getTableTypeNames(); +} http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/TableTypeMappingFactory.java ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/TableTypeMappingFactory.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/TableTypeMappingFactory.java new file mode 100644 index 0000000..d8ac269 --- /dev/null +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/TableTypeMappingFactory.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli.operation; + +public class TableTypeMappingFactory { + + public enum TableTypeMappings { + HIVE, + CLASSIC + } + private static TableTypeMapping hiveTableTypeMapping = new HiveTableTypeMapping(); + private static TableTypeMapping classicTableTypeMapping = new ClassicTableTypeMapping(); + + public static TableTypeMapping getTableTypeMapping(String mappingType) { + if (TableTypeMappings.CLASSIC.toString().equalsIgnoreCase(mappingType)) { + return classicTableTypeMapping; + } else { + return hiveTableTypeMapping; + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSession.java ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSession.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSession.java new file mode 100644 index 0000000..65f9b29 --- /dev/null +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSession.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli.session; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hive.service.auth.HiveAuthFactory; +import org.apache.hive.service.cli.*; + +public interface HiveSession extends HiveSessionBase { + + void open(Map<String, String> sessionConfMap) throws Exception; + + IMetaStoreClient getMetaStoreClient() throws HiveSQLException; + + /** + * getInfo operation handler + * @param getInfoType + * @return + * @throws HiveSQLException + */ + GetInfoValue getInfo(GetInfoType getInfoType) throws HiveSQLException; + + /** + * execute operation handler + * @param statement + * @param confOverlay + * @return + * @throws HiveSQLException + */ + OperationHandle executeStatement(String statement, + Map<String, String> confOverlay) throws HiveSQLException; + + /** + * execute operation handler + * @param statement + * @param confOverlay + * @return + * @throws HiveSQLException + */ + OperationHandle executeStatementAsync(String statement, + Map<String, String> confOverlay) throws HiveSQLException; + + /** + * getTypeInfo operation handler + * @return + * @throws HiveSQLException + */ + OperationHandle getTypeInfo() throws HiveSQLException; + + /** + * getCatalogs operation handler + * @return + * @throws HiveSQLException + */ + OperationHandle getCatalogs() throws HiveSQLException; + + /** + * getSchemas operation handler + * @param catalogName + * @param schemaName + * @return + * @throws HiveSQLException + */ + OperationHandle getSchemas(String catalogName, String schemaName) + throws HiveSQLException; + + /** + * getTables operation handler + * @param catalogName + * @param schemaName + * @param tableName + * @param tableTypes + * @return + * @throws HiveSQLException + */ + OperationHandle getTables(String catalogName, String schemaName, + String tableName, List<String> tableTypes) throws HiveSQLException; + + /** + * getTableTypes operation handler + * @return + * @throws HiveSQLException + */ + OperationHandle getTableTypes() throws HiveSQLException ; + + /** + * getColumns operation handler + * @param catalogName + * @param schemaName + * @param tableName + * @param columnName + * @return + * @throws HiveSQLException + */ + OperationHandle getColumns(String catalogName, String schemaName, + String tableName, String columnName) throws HiveSQLException; + + /** + * getFunctions operation handler + * @param catalogName + * @param schemaName + * @param functionName + * @return + * @throws HiveSQLException + */ + OperationHandle getFunctions(String catalogName, String schemaName, + String functionName) throws HiveSQLException; + + /** + * close the session + * @throws HiveSQLException + */ + void close() throws HiveSQLException; + + void cancelOperation(OperationHandle opHandle) throws HiveSQLException; + + void closeOperation(OperationHandle opHandle) throws HiveSQLException; + + TableSchema getResultSetMetadata(OperationHandle opHandle) + throws HiveSQLException; + + RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + long maxRows, FetchType fetchType) throws HiveSQLException; + + String getDelegationToken(HiveAuthFactory authFactory, String owner, + String renewer) throws HiveSQLException; + + void cancelDelegationToken(HiveAuthFactory authFactory, String tokenStr) + throws HiveSQLException; + + void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr) + throws HiveSQLException; + + void closeExpiredOperations(); + + long getNoOperationTime(); +} http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionBase.java ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionBase.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionBase.java new file mode 100644 index 0000000..9b04d67 --- /dev/null +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionBase.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli.session; + +import java.util.Map; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.service.cli.SessionHandle; +import org.apache.hive.service.cli.operation.OperationManager; +import org.apache.hive.service.cli.thrift.TProtocolVersion; + +import java.io.File; +import java.util.Map; + +/** + * Methods that don't need to be executed under a doAs + * context are here. Rest of them in HiveSession interface + */ +public interface HiveSessionBase { + + TProtocolVersion getProtocolVersion(); + + /** + * Set the session manager for the session + * @param sessionManager + */ + void setSessionManager(SessionManager sessionManager); + + /** + * Get the session manager for the session + */ + SessionManager getSessionManager(); + + /** + * Set operation manager for the session + * @param operationManager + */ + void setOperationManager(OperationManager operationManager); + + /** + * Check whether operation logging is enabled and session dir is created successfully + */ + boolean isOperationLogEnabled(); + + /** + * Get the session dir, which is the parent dir of operation logs + * @return a file representing the parent directory of operation logs + */ + File getOperationLogSessionDir(); + + /** + * Set the session dir, which is the parent dir of operation logs + * @param operationLogRootDir the parent dir of the session dir + */ + void setOperationLogSessionDir(File operationLogRootDir); + + SessionHandle getSessionHandle(); + + String getUsername(); + + String getPassword(); + + HiveConf getHiveConf(); + + SessionState getSessionState(); + + String getUserName(); + + void setUserName(String userName); + + String getIpAddress(); + + void setIpAddress(String ipAddress); + + long getLastAccessTime(); +} http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionHook.java ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionHook.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionHook.java new file mode 100644 index 0000000..06388cc --- /dev/null +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionHook.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli.session; + +import org.apache.hadoop.hive.ql.hooks.Hook; +import org.apache.hive.service.cli.HiveSQLException; + +/** + * HiveSessionHook. + * HiveServer2 session level Hook interface. The run method is executed + * when session manager starts a new session + * + */ +public interface HiveSessionHook extends Hook { + + /** + * @param sessionHookContext context + * @throws HiveSQLException + */ + public void run(HiveSessionHookContext sessionHookContext) throws HiveSQLException; +} http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionHookContext.java ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionHookContext.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionHookContext.java new file mode 100644 index 0000000..156c814 --- /dev/null +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionHookContext.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli.session; + +import org.apache.hadoop.hive.conf.HiveConf; +/** + * HiveSessionHookContext. + * Interface passed to the HiveServer2 session hook execution. This enables + * the hook implementation to accesss session config, user and session handle + */ +public interface HiveSessionHookContext { + + /** + * Retrieve session conf + * @return + */ + public HiveConf getSessionConf(); + + /** + * The get the username starting the session + * @return + */ + public String getSessionUser(); + + /** + * Retrieve handle for the session + * @return + */ + public String getSessionHandle(); +} http://git-wip-us.apache.org/repos/asf/spark/blob/7feeb82c/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionHookContextImpl.java ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionHookContextImpl.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionHookContextImpl.java new file mode 100644 index 0000000..1ee4ac8 --- /dev/null +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionHookContextImpl.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli.session; + +import org.apache.hadoop.hive.conf.HiveConf; + +/** + * + * HiveSessionHookContextImpl. + * Session hook context implementation which is created by session manager + * and passed to hook invocation. + */ +public class HiveSessionHookContextImpl implements HiveSessionHookContext { + + private final HiveSession hiveSession; + + HiveSessionHookContextImpl(HiveSession hiveSession) { + this.hiveSession = hiveSession; + } + + @Override + public HiveConf getSessionConf() { + return hiveSession.getHiveConf(); + } + + + @Override + public String getSessionUser() { + return hiveSession.getUserName(); + } + + @Override + public String getSessionHandle() { + return hiveSession.getSessionHandle().toString(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
