This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 512cb2f [SPARK-31387] Handle unknown operation/session ID in HiveThriftServer2Listener 512cb2f is described below commit 512cb2f0246a0d020f0ba726b4596555b15797c6 Author: Ali Smesseim <ali.smess...@databricks.com> AuthorDate: Tue May 12 09:14:34 2020 -0700 [SPARK-31387] Handle unknown operation/session ID in HiveThriftServer2Listener ### What changes were proposed in this pull request? The update methods in HiveThriftServer2Listener now check if the parameter operation/session ID actually exist in the `sessionList` and `executionList` respectively. This prevents NullPointerExceptions if the operation or session ID is unknown. Instead, a warning is written to the log. Also, in HiveSessionImpl.close(), we catch any exception thrown by `operationManager.closeOperation`. If for any reason this throws an exception, other operations are not prevented from being closed. ### Why are the changes needed? The listener's update methods would throw an exception if the operation or session ID is unknown. In Spark 2, where the listener is called directly, this hampers with the caller's control flow. In Spark 3, the exception is caught by the ListenerBus but results in an uninformative NullPointerException. In HiveSessionImpl.close(), if an exception is thrown when closing an operation, all following operations are not closed. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit tests Closes #28155 from alismess-db/hive-thriftserver-listener-update-safer. Authored-by: Ali Smesseim <ali.smess...@databricks.com> Signed-off-by: gatorsmile <gatorsm...@gmail.com> (cherry picked from commit 6994c64efd5770a8fd33220cbcaddc1d96fed886) Signed-off-by: gatorsmile <gatorsm...@gmail.com> --- .../ui/HiveThriftServer2Listener.scala | 120 ++++++++++++--------- .../hive/thriftserver/HiveSessionImplSuite.scala | 73 +++++++++++++ .../ui/HiveThriftServer2ListenerSuite.scala | 16 +++ .../hive/service/cli/session/HiveSessionImpl.java | 6 +- .../hive/service/cli/session/HiveSessionImpl.java | 6 +- 5 files changed, 170 insertions(+), 51 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala index 6d0a506..20a8f2c 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala @@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.hive.service.server.HiveServer2 import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.internal.Logging import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD import org.apache.spark.scheduler._ import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState @@ -38,7 +39,7 @@ private[thriftserver] class HiveThriftServer2Listener( kvstore: ElementTrackingStore, sparkConf: SparkConf, server: Option[HiveServer2], - live: Boolean = true) extends SparkListener { + live: Boolean = true) extends SparkListener with Logging { private val sessionList = new ConcurrentHashMap[String, LiveSessionData]() private val executionList = new ConcurrentHashMap[String, LiveExecutionData]() @@ -131,60 +132,81 @@ private[thriftserver] class HiveThriftServer2Listener( updateLiveStore(session) } - private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit = { - val session = sessionList.get(e.sessionId) - session.finishTimestamp = e.finishTime - updateStoreWithTriggerEnabled(session) - sessionList.remove(e.sessionId) - } + private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit = + Option(sessionList.get(e.sessionId)) match { + case None => logWarning(s"onSessionClosed called with unknown session id: ${e.sessionId}") + case Some(sessionData) => + val session = sessionData + session.finishTimestamp = e.finishTime + updateStoreWithTriggerEnabled(session) + sessionList.remove(e.sessionId) + } - private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit = { - val info = getOrCreateExecution( - e.id, - e.statement, - e.sessionId, - e.startTime, - e.userName) - - info.state = ExecutionState.STARTED - executionList.put(e.id, info) - sessionList.get(e.sessionId).totalExecution += 1 - executionList.get(e.id).groupId = e.groupId - updateLiveStore(executionList.get(e.id)) - updateLiveStore(sessionList.get(e.sessionId)) - } + private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit = + Option(sessionList.get(e.sessionId)) match { + case None => logWarning(s"onOperationStart called with unknown session id: ${e.sessionId}") + case Some(sessionData) => + val info = getOrCreateExecution( + e.id, + e.statement, + e.sessionId, + e.startTime, + e.userName) + + info.state = ExecutionState.STARTED + executionList.put(e.id, info) + sessionData.totalExecution += 1 + executionList.get(e.id).groupId = e.groupId + updateLiveStore(executionList.get(e.id)) + updateLiveStore(sessionData) + } - private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): Unit = { - executionList.get(e.id).executePlan = e.executionPlan - executionList.get(e.id).state = ExecutionState.COMPILED - updateLiveStore(executionList.get(e.id)) - } + private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): Unit = + Option(executionList.get(e.id)) match { + case None => logWarning(s"onOperationParsed called with unknown operation id: ${e.id}") + case Some(executionData) => + executionData.executePlan = e.executionPlan + executionData.state = ExecutionState.COMPILED + updateLiveStore(executionData) + } - private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit = { - executionList.get(e.id).finishTimestamp = e.finishTime - executionList.get(e.id).state = ExecutionState.CANCELED - updateLiveStore(executionList.get(e.id)) - } + private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit = + Option(executionList.get(e.id)) match { + case None => logWarning(s"onOperationCanceled called with unknown operation id: ${e.id}") + case Some(executionData) => + executionData.finishTimestamp = e.finishTime + executionData.state = ExecutionState.CANCELED + updateLiveStore(executionData) + } - private def onOperationError(e: SparkListenerThriftServerOperationError): Unit = { - executionList.get(e.id).finishTimestamp = e.finishTime - executionList.get(e.id).detail = e.errorMsg - executionList.get(e.id).state = ExecutionState.FAILED - updateLiveStore(executionList.get(e.id)) - } + private def onOperationError(e: SparkListenerThriftServerOperationError): Unit = + Option(executionList.get(e.id)) match { + case None => logWarning(s"onOperationError called with unknown operation id: ${e.id}") + case Some(executionData) => + executionData.finishTimestamp = e.finishTime + executionData.detail = e.errorMsg + executionData.state = ExecutionState.FAILED + updateLiveStore(executionData) + } - private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit = { - executionList.get(e.id).finishTimestamp = e.finishTime - executionList.get(e.id).state = ExecutionState.FINISHED - updateLiveStore(executionList.get(e.id)) - } + private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit = + Option(executionList.get(e.id)) match { + case None => logWarning(s"onOperationFinished called with unknown operation id: ${e.id}") + case Some(executionData) => + executionData.finishTimestamp = e.finishTime + executionData.state = ExecutionState.FINISHED + updateLiveStore(executionData) + } - private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit = { - executionList.get(e.id).closeTimestamp = e.closeTime - executionList.get(e.id).state = ExecutionState.CLOSED - updateStoreWithTriggerEnabled(executionList.get(e.id)) - executionList.remove(e.id) - } + private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit = + Option(executionList.get(e.id)) match { + case None => logWarning(s"onOperationClosed called with unknown operation id: ${e.id}") + case Some(executionData) => + executionData.closeTimestamp = e.closeTime + executionData.state = ExecutionState.CLOSED + updateStoreWithTriggerEnabled(executionData) + executionList.remove(e.id) + } // Update both live and history stores. Trigger is enabled by default, hence // it will cleanup the entity which exceeds the threshold. diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala new file mode 100644 index 0000000..05d540d --- /dev/null +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.thriftserver + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hive.service.cli.OperationHandle +import org.apache.hive.service.cli.operation.{GetCatalogsOperation, OperationManager} +import org.apache.hive.service.cli.session.{HiveSessionImpl, SessionManager} +import org.mockito.Mockito.{mock, verify, when} +import org.mockito.invocation.InvocationOnMock + +import org.apache.spark.SparkFunSuite + +class HiveSessionImplSuite extends SparkFunSuite { + private var session: HiveSessionImpl = _ + private var operationManager: OperationManager = _ + + override def beforeAll() { + super.beforeAll() + + session = new HiveSessionImpl( + ThriftserverShimUtils.testedProtocolVersions.head, + "", + "", + new HiveConf(), + "" + ) + val sessionManager = mock(classOf[SessionManager]) + session.setSessionManager(sessionManager) + operationManager = mock(classOf[OperationManager]) + session.setOperationManager(operationManager) + when(operationManager.newGetCatalogsOperation(session)).thenAnswer( + (_: InvocationOnMock) => { + val operation = mock(classOf[GetCatalogsOperation]) + when(operation.getHandle).thenReturn(mock(classOf[OperationHandle])) + operation + } + ) + + session.open(Map.empty[String, String].asJava) + } + + test("SPARK-31387 - session.close() closes all sessions regardless of thrown exceptions") { + val operationHandle1 = session.getCatalogs + val operationHandle2 = session.getCatalogs + + when(operationManager.closeOperation(operationHandle1)) + .thenThrow(classOf[NullPointerException]) + when(operationManager.closeOperation(operationHandle2)) + .thenThrow(classOf[NullPointerException]) + + session.close() + + verify(operationManager).closeOperation(operationHandle1) + verify(operationManager).closeOperation(operationHandle2) + } +} diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala index 075032f..ea2523d 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala @@ -140,6 +140,22 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter { assert(listener.noLiveData()) } + test("SPARK-31387 - listener update methods should not throw exception with unknown input") { + val (statusStore: HiveThriftServer2AppStatusStore, + listener: HiveThriftServer2Listener) = createAppStatusStore(true) + val unknownSession = "unknown_session" + val unknownOperation = "unknown_operation" + listener.onOtherEvent(SparkListenerThriftServerSessionClosed(unknownSession, 0)) + listener.onOtherEvent(SparkListenerThriftServerOperationStart("id", unknownSession, + "stmt", "groupId", 0)) + listener.onOtherEvent(SparkListenerThriftServerOperationParsed(unknownOperation, "query")) + listener.onOtherEvent(SparkListenerThriftServerOperationCanceled(unknownOperation, 0)) + listener.onOtherEvent(SparkListenerThriftServerOperationError(unknownOperation, + "msg", "trace", 0)) + listener.onOtherEvent(SparkListenerThriftServerOperationFinish(unknownOperation, 0)) + listener.onOtherEvent(SparkListenerThriftServerOperationClosed(unknownOperation, 0)) + } + private def createProperties: Properties = { val properties = new Properties() properties.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "groupId") diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 745f385..3e2c3de 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -636,7 +636,11 @@ public class HiveSessionImpl implements HiveSession { acquire(true); // Iterate through the opHandles and close their operations for (OperationHandle opHandle : opHandleSet) { - operationManager.closeOperation(opHandle); + try { + operationManager.closeOperation(opHandle); + } catch (Exception e) { + LOG.warn("Exception is thrown closing operation " + opHandle, e); + } } opHandleSet.clear(); // Cleanup session log directory. diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 14e9c47..5cdae00 100644 --- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -650,7 +650,11 @@ public class HiveSessionImpl implements HiveSession { acquire(true); // Iterate through the opHandles and close their operations for (OperationHandle opHandle : opHandleSet) { - operationManager.closeOperation(opHandle); + try { + operationManager.closeOperation(opHandle); + } catch (Exception e) { + LOG.warn("Exception is thrown closing operation " + opHandle, e); + } } opHandleSet.clear(); // Cleanup session log directory. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org