This is an automated email from the ASF dual-hosted git repository.
lixiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 512cb2f [SPARK-31387] Handle unknown operation/session ID in
HiveThriftServer2Listener
512cb2f is described below
commit 512cb2f0246a0d020f0ba726b4596555b15797c6
Author: Ali Smesseim <[email protected]>
AuthorDate: Tue May 12 09:14:34 2020 -0700
[SPARK-31387] Handle unknown operation/session ID in
HiveThriftServer2Listener
### What changes were proposed in this pull request?
The update methods in HiveThriftServer2Listener now check if the parameter
operation/session ID actually exist in the `sessionList` and `executionList`
respectively. This prevents NullPointerExceptions if the operation or session
ID is unknown. Instead, a warning is written to the log.
Also, in HiveSessionImpl.close(), we catch any exception thrown by
`operationManager.closeOperation`. If for any reason this throws an exception,
other operations are not prevented from being closed.
### Why are the changes needed?
The listener's update methods would throw an exception if the operation or
session ID is unknown. In Spark 2, where the listener is called directly, this
hampers with the caller's control flow. In Spark 3, the exception is caught by
the ListenerBus but results in an uninformative NullPointerException.
In HiveSessionImpl.close(), if an exception is thrown when closing an
operation, all following operations are not closed.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Unit tests
Closes #28155 from alismess-db/hive-thriftserver-listener-update-safer.
Authored-by: Ali Smesseim <[email protected]>
Signed-off-by: gatorsmile <[email protected]>
(cherry picked from commit 6994c64efd5770a8fd33220cbcaddc1d96fed886)
Signed-off-by: gatorsmile <[email protected]>
---
.../ui/HiveThriftServer2Listener.scala | 120 ++++++++++++---------
.../hive/thriftserver/HiveSessionImplSuite.scala | 73 +++++++++++++
.../ui/HiveThriftServer2ListenerSuite.scala | 16 +++
.../hive/service/cli/session/HiveSessionImpl.java | 6 +-
.../hive/service/cli/session/HiveSessionImpl.java | 6 +-
5 files changed, 170 insertions(+), 51 deletions(-)
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
index 6d0a506..20a8f2c 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.hive.service.server.HiveServer2
import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD
import org.apache.spark.scheduler._
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState
@@ -38,7 +39,7 @@ private[thriftserver] class HiveThriftServer2Listener(
kvstore: ElementTrackingStore,
sparkConf: SparkConf,
server: Option[HiveServer2],
- live: Boolean = true) extends SparkListener {
+ live: Boolean = true) extends SparkListener with Logging {
private val sessionList = new ConcurrentHashMap[String, LiveSessionData]()
private val executionList = new ConcurrentHashMap[String,
LiveExecutionData]()
@@ -131,60 +132,81 @@ private[thriftserver] class HiveThriftServer2Listener(
updateLiveStore(session)
}
- private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit
= {
- val session = sessionList.get(e.sessionId)
- session.finishTimestamp = e.finishTime
- updateStoreWithTriggerEnabled(session)
- sessionList.remove(e.sessionId)
- }
+ private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit
=
+ Option(sessionList.get(e.sessionId)) match {
+ case None => logWarning(s"onSessionClosed called with unknown session
id: ${e.sessionId}")
+ case Some(sessionData) =>
+ val session = sessionData
+ session.finishTimestamp = e.finishTime
+ updateStoreWithTriggerEnabled(session)
+ sessionList.remove(e.sessionId)
+ }
- private def onOperationStart(e: SparkListenerThriftServerOperationStart):
Unit = {
- val info = getOrCreateExecution(
- e.id,
- e.statement,
- e.sessionId,
- e.startTime,
- e.userName)
-
- info.state = ExecutionState.STARTED
- executionList.put(e.id, info)
- sessionList.get(e.sessionId).totalExecution += 1
- executionList.get(e.id).groupId = e.groupId
- updateLiveStore(executionList.get(e.id))
- updateLiveStore(sessionList.get(e.sessionId))
- }
+ private def onOperationStart(e: SparkListenerThriftServerOperationStart):
Unit =
+ Option(sessionList.get(e.sessionId)) match {
+ case None => logWarning(s"onOperationStart called with unknown session
id: ${e.sessionId}")
+ case Some(sessionData) =>
+ val info = getOrCreateExecution(
+ e.id,
+ e.statement,
+ e.sessionId,
+ e.startTime,
+ e.userName)
+
+ info.state = ExecutionState.STARTED
+ executionList.put(e.id, info)
+ sessionData.totalExecution += 1
+ executionList.get(e.id).groupId = e.groupId
+ updateLiveStore(executionList.get(e.id))
+ updateLiveStore(sessionData)
+ }
- private def onOperationParsed(e: SparkListenerThriftServerOperationParsed):
Unit = {
- executionList.get(e.id).executePlan = e.executionPlan
- executionList.get(e.id).state = ExecutionState.COMPILED
- updateLiveStore(executionList.get(e.id))
- }
+ private def onOperationParsed(e: SparkListenerThriftServerOperationParsed):
Unit =
+ Option(executionList.get(e.id)) match {
+ case None => logWarning(s"onOperationParsed called with unknown
operation id: ${e.id}")
+ case Some(executionData) =>
+ executionData.executePlan = e.executionPlan
+ executionData.state = ExecutionState.COMPILED
+ updateLiveStore(executionData)
+ }
- private def onOperationCanceled(e:
SparkListenerThriftServerOperationCanceled): Unit = {
- executionList.get(e.id).finishTimestamp = e.finishTime
- executionList.get(e.id).state = ExecutionState.CANCELED
- updateLiveStore(executionList.get(e.id))
- }
+ private def onOperationCanceled(e:
SparkListenerThriftServerOperationCanceled): Unit =
+ Option(executionList.get(e.id)) match {
+ case None => logWarning(s"onOperationCanceled called with unknown
operation id: ${e.id}")
+ case Some(executionData) =>
+ executionData.finishTimestamp = e.finishTime
+ executionData.state = ExecutionState.CANCELED
+ updateLiveStore(executionData)
+ }
- private def onOperationError(e: SparkListenerThriftServerOperationError):
Unit = {
- executionList.get(e.id).finishTimestamp = e.finishTime
- executionList.get(e.id).detail = e.errorMsg
- executionList.get(e.id).state = ExecutionState.FAILED
- updateLiveStore(executionList.get(e.id))
- }
+ private def onOperationError(e: SparkListenerThriftServerOperationError):
Unit =
+ Option(executionList.get(e.id)) match {
+ case None => logWarning(s"onOperationError called with unknown operation
id: ${e.id}")
+ case Some(executionData) =>
+ executionData.finishTimestamp = e.finishTime
+ executionData.detail = e.errorMsg
+ executionData.state = ExecutionState.FAILED
+ updateLiveStore(executionData)
+ }
- private def onOperationFinished(e:
SparkListenerThriftServerOperationFinish): Unit = {
- executionList.get(e.id).finishTimestamp = e.finishTime
- executionList.get(e.id).state = ExecutionState.FINISHED
- updateLiveStore(executionList.get(e.id))
- }
+ private def onOperationFinished(e:
SparkListenerThriftServerOperationFinish): Unit =
+ Option(executionList.get(e.id)) match {
+ case None => logWarning(s"onOperationFinished called with unknown
operation id: ${e.id}")
+ case Some(executionData) =>
+ executionData.finishTimestamp = e.finishTime
+ executionData.state = ExecutionState.FINISHED
+ updateLiveStore(executionData)
+ }
- private def onOperationClosed(e: SparkListenerThriftServerOperationClosed):
Unit = {
- executionList.get(e.id).closeTimestamp = e.closeTime
- executionList.get(e.id).state = ExecutionState.CLOSED
- updateStoreWithTriggerEnabled(executionList.get(e.id))
- executionList.remove(e.id)
- }
+ private def onOperationClosed(e: SparkListenerThriftServerOperationClosed):
Unit =
+ Option(executionList.get(e.id)) match {
+ case None => logWarning(s"onOperationClosed called with unknown
operation id: ${e.id}")
+ case Some(executionData) =>
+ executionData.closeTimestamp = e.closeTime
+ executionData.state = ExecutionState.CLOSED
+ updateStoreWithTriggerEnabled(executionData)
+ executionList.remove(e.id)
+ }
// Update both live and history stores. Trigger is enabled by default, hence
// it will cleanup the entity which exceeds the threshold.
diff --git
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala
new file mode 100644
index 0000000..05d540d
--- /dev/null
+++
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.thriftserver
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hive.service.cli.OperationHandle
+import org.apache.hive.service.cli.operation.{GetCatalogsOperation,
OperationManager}
+import org.apache.hive.service.cli.session.{HiveSessionImpl, SessionManager}
+import org.mockito.Mockito.{mock, verify, when}
+import org.mockito.invocation.InvocationOnMock
+
+import org.apache.spark.SparkFunSuite
+
+class HiveSessionImplSuite extends SparkFunSuite {
+ private var session: HiveSessionImpl = _
+ private var operationManager: OperationManager = _
+
+ override def beforeAll() {
+ super.beforeAll()
+
+ session = new HiveSessionImpl(
+ ThriftserverShimUtils.testedProtocolVersions.head,
+ "",
+ "",
+ new HiveConf(),
+ ""
+ )
+ val sessionManager = mock(classOf[SessionManager])
+ session.setSessionManager(sessionManager)
+ operationManager = mock(classOf[OperationManager])
+ session.setOperationManager(operationManager)
+ when(operationManager.newGetCatalogsOperation(session)).thenAnswer(
+ (_: InvocationOnMock) => {
+ val operation = mock(classOf[GetCatalogsOperation])
+ when(operation.getHandle).thenReturn(mock(classOf[OperationHandle]))
+ operation
+ }
+ )
+
+ session.open(Map.empty[String, String].asJava)
+ }
+
+ test("SPARK-31387 - session.close() closes all sessions regardless of thrown
exceptions") {
+ val operationHandle1 = session.getCatalogs
+ val operationHandle2 = session.getCatalogs
+
+ when(operationManager.closeOperation(operationHandle1))
+ .thenThrow(classOf[NullPointerException])
+ when(operationManager.closeOperation(operationHandle2))
+ .thenThrow(classOf[NullPointerException])
+
+ session.close()
+
+ verify(operationManager).closeOperation(operationHandle1)
+ verify(operationManager).closeOperation(operationHandle2)
+ }
+}
diff --git
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
index 075032f..ea2523d 100644
---
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
+++
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
@@ -140,6 +140,22 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite
with BeforeAndAfter {
assert(listener.noLiveData())
}
+ test("SPARK-31387 - listener update methods should not throw exception with
unknown input") {
+ val (statusStore: HiveThriftServer2AppStatusStore,
+ listener: HiveThriftServer2Listener) = createAppStatusStore(true)
+ val unknownSession = "unknown_session"
+ val unknownOperation = "unknown_operation"
+
listener.onOtherEvent(SparkListenerThriftServerSessionClosed(unknownSession, 0))
+ listener.onOtherEvent(SparkListenerThriftServerOperationStart("id",
unknownSession,
+ "stmt", "groupId", 0))
+
listener.onOtherEvent(SparkListenerThriftServerOperationParsed(unknownOperation,
"query"))
+
listener.onOtherEvent(SparkListenerThriftServerOperationCanceled(unknownOperation,
0))
+
listener.onOtherEvent(SparkListenerThriftServerOperationError(unknownOperation,
+ "msg", "trace", 0))
+
listener.onOtherEvent(SparkListenerThriftServerOperationFinish(unknownOperation,
0))
+
listener.onOtherEvent(SparkListenerThriftServerOperationClosed(unknownOperation,
0))
+ }
+
private def createProperties: Properties = {
val properties = new Properties()
properties.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "groupId")
diff --git
a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 745f385..3e2c3de 100644
---
a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++
b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -636,7 +636,11 @@ public class HiveSessionImpl implements HiveSession {
acquire(true);
// Iterate through the opHandles and close their operations
for (OperationHandle opHandle : opHandleSet) {
- operationManager.closeOperation(opHandle);
+ try {
+ operationManager.closeOperation(opHandle);
+ } catch (Exception e) {
+ LOG.warn("Exception is thrown closing operation " + opHandle, e);
+ }
}
opHandleSet.clear();
// Cleanup session log directory.
diff --git
a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 14e9c47..5cdae00 100644
---
a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++
b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -650,7 +650,11 @@ public class HiveSessionImpl implements HiveSession {
acquire(true);
// Iterate through the opHandles and close their operations
for (OperationHandle opHandle : opHandleSet) {
- operationManager.closeOperation(opHandle);
+ try {
+ operationManager.closeOperation(opHandle);
+ } catch (Exception e) {
+ LOG.warn("Exception is thrown closing operation " + opHandle, e);
+ }
}
opHandleSet.clear();
// Cleanup session log directory.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]