[ https://issues.apache.org/jira/browse/SPARK-21395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chaozhong Yang updated SPARK-21395: ----------------------------------- Description: In HiveServer2, TFetchResultsReq has a member which is named as `fetchType`. If fetchType is equal to be `1`, the thrift server should return operation log to client. However, we found Spark SQL's thrift server always return nothing to client for TFetchResultsReq with fetchType(1). We have checked the ${HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}/${session-id} directory carefully and found that there were existed operation log files with zero bytes(empty file). Why? Let's take a look at SQLOperation.java in Hive: {code:java} @Override public void runInternal() throws HiveSQLException { setState(OperationState.PENDING); final HiveConf opConfig = getConfigForOperation(); prepare(opConfig); if (!shouldRunAsync()) { runQuery(opConfig); } else { // We'll pass ThreadLocals in the background thread from the foreground (handler) thread final SessionState parentSessionState = SessionState.get(); // ThreadLocal Hive object needs to be set in background thread. // The metastore client in Hive is associated with right user. final Hive parentHive = getSessionHive(); // Current UGI will get used by metastore when metsatore is in embedded mode // So this needs to get passed to the new background thread final UserGroupInformation currentUGI = getCurrentUGI(opConfig); // Runnable impl to call runInternal asynchronously, // from a different thread Runnable backgroundOperation = new Runnable() { @Override public void run() { PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() { @Override public Object run() throws HiveSQLException { Hive.set(parentHive); SessionState.setCurrentSessionState(parentSessionState); // Set current OperationLog in this async thread for keeping on saving query log. registerCurrentOperationLog(); try { runQuery(opConfig); } catch (HiveSQLException e) { setOperationException(e); LOG.error("Error running hive query: ", e); } finally { unregisterOperationLog(); } return null; } }; try { currentUGI.doAs(doAsAction); } catch (Exception e) { setOperationException(new HiveSQLException(e)); LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e); } finally { /** * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup * when this thread is garbage collected later. * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() */ if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) { ThreadWithGarbageCleanup currentThread = (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread(); currentThread.cacheThreadLocalRawStore(); } } } }; try { // This submit blocks if no background threads are available to run this operation Future<?> backgroundHandle = getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation); setBackgroundHandle(backgroundHandle); } catch (RejectedExecutionException rejected) { setState(OperationState.ERROR); throw new HiveSQLException("The background threadpool cannot accept" + " new task for execution, please retry the operation", rejected); } } } {code} Obviously, registerOperationLog is the key point that Hive can produce and return operation log to client. But, in Spark SQL, SparkExecuteStatementOperation doesn't registerOperationLog before execute sql statement: {code:scala} override def runInternal(): Unit = { setState(OperationState.PENDING) setHasResultSet(true) // avoid no resultset for async run if (!runInBackground) { execute() } else { val sparkServiceUGI = Utils.getUGI() // Runnable impl to call runInternal asynchronously, // from a different thread val backgroundOperation = new Runnable() { override def run(): Unit = { val doAsAction = new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { try { execute() } catch { case e: HiveSQLException => setOperationException(e) log.error("Error running hive query: ", e) } } } try { sparkServiceUGI.doAs(doAsAction) } catch { case e: Exception => setOperationException(new HiveSQLException(e)) logError("Error running hive query as user : " + sparkServiceUGI.getShortUserName(), e) } } } try { // This submit blocks if no background threads are available to run this operation val backgroundHandle = parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation) setBackgroundHandle(backgroundHandle) } catch { case rejected: RejectedExecutionException => setState(OperationState.ERROR) throw new HiveSQLException("The background threadpool cannot accept" + " new task for execution, please retry the operation", rejected) case NonFatal(e) => logError(s"Error executing query in background", e) setState(OperationState.ERROR) throw e } } } {code} LogDrivertAppender append logOutput into operation log file depends on current thread local operationLog: {code:java} @Override protected void subAppend(LoggingEvent event) { super.subAppend(event); // That should've gone into our writer. Notify the LogContext. String logOutput = writer.toString(); writer.reset(); OperationLog log = operationManager.getOperationLogByThread(); if (log == null) { LOG.debug(" ---+++=== Dropped log event from thread " + event.getThreadName()); return; } log.writeOperationLog(logOutput); } {code} was: In HiveServer2, TFetchResultsReq has a member which is named as `fetchType`. If fetchType is equal to be `1`, the thrift server should return operation log to client. However, we found Spark SQL's thrift server always return nothing to client for TFetchResultsReq with fetchType(1). We have checked the ${HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}/${session-id} directory carefully and found that there were existed operation log file with zero bytes(empty file). Why? Let's take a look at SQLOperation.java in Hive: {code:java} @Override public void runInternal() throws HiveSQLException { setState(OperationState.PENDING); final HiveConf opConfig = getConfigForOperation(); prepare(opConfig); if (!shouldRunAsync()) { runQuery(opConfig); } else { // We'll pass ThreadLocals in the background thread from the foreground (handler) thread final SessionState parentSessionState = SessionState.get(); // ThreadLocal Hive object needs to be set in background thread. // The metastore client in Hive is associated with right user. final Hive parentHive = getSessionHive(); // Current UGI will get used by metastore when metsatore is in embedded mode // So this needs to get passed to the new background thread final UserGroupInformation currentUGI = getCurrentUGI(opConfig); // Runnable impl to call runInternal asynchronously, // from a different thread Runnable backgroundOperation = new Runnable() { @Override public void run() { PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() { @Override public Object run() throws HiveSQLException { Hive.set(parentHive); SessionState.setCurrentSessionState(parentSessionState); // Set current OperationLog in this async thread for keeping on saving query log. registerCurrentOperationLog(); try { runQuery(opConfig); } catch (HiveSQLException e) { setOperationException(e); LOG.error("Error running hive query: ", e); } finally { unregisterOperationLog(); } return null; } }; try { currentUGI.doAs(doAsAction); } catch (Exception e) { setOperationException(new HiveSQLException(e)); LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e); } finally { /** * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup * when this thread is garbage collected later. * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() */ if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) { ThreadWithGarbageCleanup currentThread = (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread(); currentThread.cacheThreadLocalRawStore(); } } } }; try { // This submit blocks if no background threads are available to run this operation Future<?> backgroundHandle = getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation); setBackgroundHandle(backgroundHandle); } catch (RejectedExecutionException rejected) { setState(OperationState.ERROR); throw new HiveSQLException("The background threadpool cannot accept" + " new task for execution, please retry the operation", rejected); } } } {code} Obviously, registerOperationLog is the key point that Hive can produce and return operation log to client. But, in Spark SQL, SparkExecuteStatementOperation doesn't registerOperationLog before execute sql statement: {code:scala} override def runInternal(): Unit = { setState(OperationState.PENDING) setHasResultSet(true) // avoid no resultset for async run if (!runInBackground) { execute() } else { val sparkServiceUGI = Utils.getUGI() // Runnable impl to call runInternal asynchronously, // from a different thread val backgroundOperation = new Runnable() { override def run(): Unit = { val doAsAction = new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { try { execute() } catch { case e: HiveSQLException => setOperationException(e) log.error("Error running hive query: ", e) } } } try { sparkServiceUGI.doAs(doAsAction) } catch { case e: Exception => setOperationException(new HiveSQLException(e)) logError("Error running hive query as user : " + sparkServiceUGI.getShortUserName(), e) } } } try { // This submit blocks if no background threads are available to run this operation val backgroundHandle = parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation) setBackgroundHandle(backgroundHandle) } catch { case rejected: RejectedExecutionException => setState(OperationState.ERROR) throw new HiveSQLException("The background threadpool cannot accept" + " new task for execution, please retry the operation", rejected) case NonFatal(e) => logError(s"Error executing query in background", e) setState(OperationState.ERROR) throw e } } } {code} LogDrivertAppender append logOutput into operation log file depends on current thread local operationLog: {code:java} @Override protected void subAppend(LoggingEvent event) { super.subAppend(event); // That should've gone into our writer. Notify the LogContext. String logOutput = writer.toString(); writer.reset(); OperationLog log = operationManager.getOperationLogByThread(); if (log == null) { LOG.debug(" ---+++=== Dropped log event from thread " + event.getThreadName()); return; } log.writeOperationLog(logOutput); } {code} > Spark SQL hive-thriftserver doesn't register operation log before execute sql > statement > --------------------------------------------------------------------------------------- > > Key: SPARK-21395 > URL: https://issues.apache.org/jira/browse/SPARK-21395 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.1.0, 2.1.1 > Reporter: Chaozhong Yang > > In HiveServer2, TFetchResultsReq has a member which is named as `fetchType`. > If fetchType is equal to be `1`, the thrift server should return operation > log to client. However, we found Spark SQL's thrift server always return > nothing to client for TFetchResultsReq with fetchType(1). We > have checked the > ${HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}/${session-id} directory > carefully and found that there were existed operation log files with zero > bytes(empty file). Why? Let's take a look at SQLOperation.java in Hive: > {code:java} > @Override > public void runInternal() throws HiveSQLException { > setState(OperationState.PENDING); > final HiveConf opConfig = getConfigForOperation(); > prepare(opConfig); > if (!shouldRunAsync()) { > runQuery(opConfig); > } else { > // We'll pass ThreadLocals in the background thread from the foreground > (handler) thread > final SessionState parentSessionState = SessionState.get(); > // ThreadLocal Hive object needs to be set in background thread. > // The metastore client in Hive is associated with right user. > final Hive parentHive = getSessionHive(); > // Current UGI will get used by metastore when metsatore is in embedded > mode > // So this needs to get passed to the new background thread > final UserGroupInformation currentUGI = getCurrentUGI(opConfig); > // Runnable impl to call runInternal asynchronously, > // from a different thread > Runnable backgroundOperation = new Runnable() { > @Override > public void run() { > PrivilegedExceptionAction<Object> doAsAction = new > PrivilegedExceptionAction<Object>() { > @Override > public Object run() throws HiveSQLException { > Hive.set(parentHive); > SessionState.setCurrentSessionState(parentSessionState); > // Set current OperationLog in this async thread for keeping on > saving query log. > registerCurrentOperationLog(); > try { > runQuery(opConfig); > } catch (HiveSQLException e) { > setOperationException(e); > LOG.error("Error running hive query: ", e); > } finally { > unregisterOperationLog(); > } > return null; > } > }; > try { > currentUGI.doAs(doAsAction); > } catch (Exception e) { > setOperationException(new HiveSQLException(e)); > LOG.error("Error running hive query as user : " + > currentUGI.getShortUserName(), e); > } > finally { > /** > * We'll cache the ThreadLocal RawStore object for this > background thread for an orderly cleanup > * when this thread is garbage collected later. > * @see > org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() > */ > if (ThreadWithGarbageCleanup.currentThread() instanceof > ThreadWithGarbageCleanup) { > ThreadWithGarbageCleanup currentThread = > (ThreadWithGarbageCleanup) > ThreadWithGarbageCleanup.currentThread(); > currentThread.cacheThreadLocalRawStore(); > } > } > } > }; > try { > // This submit blocks if no background threads are available to run > this operation > Future<?> backgroundHandle = > > getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation); > setBackgroundHandle(backgroundHandle); > } catch (RejectedExecutionException rejected) { > setState(OperationState.ERROR); > throw new HiveSQLException("The background threadpool cannot accept" + > " new task for execution, please retry the operation", rejected); > } > } > } > {code} > Obviously, registerOperationLog is the key point that Hive can produce and > return operation log to client. > But, in Spark SQL, SparkExecuteStatementOperation doesn't > registerOperationLog before execute sql statement: > {code:scala} > override def runInternal(): Unit = { > setState(OperationState.PENDING) > setHasResultSet(true) // avoid no resultset for async run > if (!runInBackground) { > execute() > } else { > val sparkServiceUGI = Utils.getUGI() > // Runnable impl to call runInternal asynchronously, > // from a different thread > val backgroundOperation = new Runnable() { > override def run(): Unit = { > val doAsAction = new PrivilegedExceptionAction[Unit]() { > override def run(): Unit = { > try { > execute() > } catch { > case e: HiveSQLException => > setOperationException(e) > log.error("Error running hive query: ", e) > } > } > } > try { > sparkServiceUGI.doAs(doAsAction) > } catch { > case e: Exception => > setOperationException(new HiveSQLException(e)) > logError("Error running hive query as user : " + > sparkServiceUGI.getShortUserName(), e) > } > } > } > try { > // This submit blocks if no background threads are available to run > this operation > val backgroundHandle = > > parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation) > setBackgroundHandle(backgroundHandle) > } catch { > case rejected: RejectedExecutionException => > setState(OperationState.ERROR) > throw new HiveSQLException("The background threadpool cannot > accept" + > " new task for execution, please retry the operation", rejected) > case NonFatal(e) => > logError(s"Error executing query in background", e) > setState(OperationState.ERROR) > throw e > } > } > } > {code} > LogDrivertAppender append logOutput into operation log file depends on > current thread local operationLog: > {code:java} > @Override > protected void subAppend(LoggingEvent event) { > super.subAppend(event); > // That should've gone into our writer. Notify the LogContext. > String logOutput = writer.toString(); > writer.reset(); > OperationLog log = operationManager.getOperationLogByThread(); > if (log == null) { > LOG.debug(" ---+++=== Dropped log event from thread " + > event.getThreadName()); > return; > } > log.writeOperationLog(logOutput); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org