This is an automated email from the ASF dual-hosted git repository.
ggal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push:
new f615f272 [LIVY-977][SERVER][CONF] Livy can not be started if HDFS is
still in safe mode (#440)
f615f272 is described below
commit f615f272e9130d02170024832ea308516b907195
Author: RajshekharMuchandi <[email protected]>
AuthorDate: Tue Mar 12 20:57:17 2024 +0530
[LIVY-977][SERVER][CONF] Livy can not be started if HDFS is still in safe
mode (#440)
## What changes were proposed in this pull request?
HDFS safe mode is checked when livy session is created. If safe mode is ON,
then IllegalStateException is thrown after max retry attempts (configurable)
with safe mode interval (configurable) checks are done. If safe mode is OFF,
then livy will be able to create session.
https://issues.apache.org/jira/browse/LIVY-977
## How was this patch tested?
Added unit test cases to validate code changes. Also, done manual testing
in CDP cluster by creating livy sessions with HDFS safe mode check ON/OFF.
---
conf/livy.conf.template | 5 +++
.../src/main/scala/org/apache/livy/LivyConf.scala | 6 +++
.../server/recovery/FileSystemStateStore.scala | 45 ++++++++++++++++++++++
.../server/recovery/FileSystemStateStoreSpec.scala | 35 ++++++++++++++++-
4 files changed, 90 insertions(+), 1 deletion(-)
diff --git a/conf/livy.conf.template b/conf/livy.conf.template
index 7566971c..e99251d0 100644
--- a/conf/livy.conf.template
+++ b/conf/livy.conf.template
@@ -195,3 +195,8 @@
# Enable to allow custom classpath by proxy user in cluster mode
# The below configuration parameter is disabled by default.
# livy.server.session.allow-custom-classpath = true
+
+# value specifies interval to check safe mode in hdfs filesystem
+# livy.server.hdfs.safe-mode.interval = 5
+# value specifies max attempts to retry when safe mode is ON in hdfs filesystem
+# livy.server.hdfs.safe-mode.max.retry.attempts = 10
diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala
b/server/src/main/scala/org/apache/livy/LivyConf.scala
index 31b68725..720aa4e1 100644
--- a/server/src/main/scala/org/apache/livy/LivyConf.scala
+++ b/server/src/main/scala/org/apache/livy/LivyConf.scala
@@ -252,6 +252,12 @@ object LivyConf {
// how often to check livy session leakage
val YARN_APP_LEAKAGE_CHECK_INTERVAL =
Entry("livy.server.yarn.app-leakage.check-interval", "60s")
+ // value specifies interval to check safe mode in hdfs filesystem
+ val HDFS_SAFE_MODE_INTERVAL_IN_SECONDS =
Entry("livy.server.hdfs.safe-mode.interval", 5)
+
+ // value specifies max attempts to retry when safe mode is ON in hdfs
filesystem
+ val HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS =
Entry("livy.server.hdfs.safe-mode.max.retry.attempts", 12)
+
// Whether session timeout should be checked, by default it will be checked,
which means inactive
// session will be stopped after "livy.server.session.timeout"
val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true)
diff --git
a/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala
b/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala
index 826a2fbd..6fee7f0e 100644
---
a/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala
+++
b/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala
@@ -20,6 +20,7 @@ package org.apache.livy.server.recovery
import java.io.{FileNotFoundException, IOException}
import java.net.URI
import java.util
+import java.util.concurrent.TimeUnit
import scala.reflect.ClassTag
import scala.util.control.NonFatal
@@ -28,6 +29,8 @@ import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.Options.{CreateOpts, Rename}
import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
+import org.apache.hadoop.hdfs.DistributedFileSystem
+import org.apache.hadoop.hdfs.protocol.HdfsConstants
import org.apache.livy.{LivyConf, Logging}
import org.apache.livy.Utils.usingResource
@@ -42,6 +45,8 @@ class FileSystemStateStore(
this(livyConf, None)
}
+ private val fs = FileSystem.newInstance(livyConf.hadoopConf)
+
private val fsUri = {
val fsPath = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL)
require(fsPath != null && !fsPath.isEmpty,
@@ -57,6 +62,8 @@ class FileSystemStateStore(
// Only Livy user should have access to state files.
fileContext.setUMask(new FsPermission("077"))
+ startSafeModeCheck()
+
// Create state store dir if it doesn't exist.
val stateStorePath = absPath(".")
try {
@@ -134,4 +141,42 @@ class FileSystemStateStore(
}
private def absPath(key: String): Path = new Path(fsUri.getPath(), key)
+
+ /**
+ * Checks whether HDFS is in safe mode.
+ *
+ * Note that DistributedFileSystem is a `@LimitedPrivate` class, which for
all practical reasons
+ * makes it more public than not.
+ */
+ def isFsInSafeMode(): Boolean = fs match {
+ case dfs: DistributedFileSystem =>
+ isFsInSafeMode(dfs)
+ case _ =>
+ false
+ }
+
+ def isFsInSafeMode(dfs: DistributedFileSystem): Boolean = {
+ /* true to check only for Active NNs status */
+ dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET, true)
+ }
+
+ def startSafeModeCheck(): Unit = {
+ // Cannot probe anything while the FS is in safe mode,
+ // so wait for seconds which is configurable
+ val safeModeInterval =
livyConf.getInt(LivyConf.HDFS_SAFE_MODE_INTERVAL_IN_SECONDS)
+ val safeModeMaxRetryAttempts =
livyConf.getInt(LivyConf.HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS)
+ for (retryAttempts <- 0 to safeModeMaxRetryAttempts if isFsInSafeMode()) {
+ info("HDFS is still in safe mode. Waiting...")
+ Thread.sleep(TimeUnit.SECONDS.toMillis(safeModeInterval))
+ }
+
+ // if hdfs is still in safe mode
+ // even after max retry attempts
+ // then throw IllegalStateException
+ if (isFsInSafeMode()) {
+ throw new IllegalStateException("Reached max retry attempts for safe
mode check " +
+ "in hdfs file system")
+ }
+ }
+
}
diff --git
a/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala
b/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala
index 082a80ab..1ee1a2fe 100644
---
a/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala
+++
b/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala
@@ -23,10 +23,11 @@ import java.util
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.Options.{CreateOpts, Rename}
import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.hadoop.hdfs.DistributedFileSystem
import org.hamcrest.Description
import org.mockito.ArgumentMatcher
import org.mockito.Matchers.{any, anyInt, argThat, eq => equal}
-import org.mockito.Mockito.{atLeastOnce, verify, when}
+import org.mockito.Mockito.{atLeastOnce, spy, verify, when}
import org.mockito.internal.matchers.Equals
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
@@ -53,6 +54,14 @@ class FileSystemStateStoreSpec extends FunSpec with
LivyBaseUnitTestSuite {
conf
}
+ def makeConfWithTwoSeconds(): LivyConf = {
+ val conf = new LivyConf()
+ conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "file://tmp/")
+ conf.set(LivyConf.HDFS_SAFE_MODE_INTERVAL_IN_SECONDS, new Integer(2))
+ conf.set(LivyConf.HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS, new Integer(2))
+ conf
+ }
+
def mockFileContext(rootDirPermission: String): FileContext = {
val fileContext = mock[FileContext]
val rootDirStatus = mock[FileStatus]
@@ -188,5 +197,29 @@ class FileSystemStateStoreSpec extends FunSpec with
LivyBaseUnitTestSuite {
verify(fileContext).delete(pathEq("/key"), equal(false))
}
+
+ it("set safe mode ON and wait") {
+ val fileContext = mockFileContext("700")
+ val provider = spy(new FileSystemStateStore(makeConf(),
Some(fileContext)))
+ val dfs = mock[DistributedFileSystem]
+ provider.isFsInSafeMode()
+ assert(!provider.isFsInSafeMode(dfs))
+ }
+
+ it("provider throws IllegalStateException when reaches 'N' " +
+ "max attempts to access HDFS file system") {
+ val provider = new SafeModeTestProvider(makeConfWithTwoSeconds(),
+ Some(mockFileContext("700")))
+ provider.inSafeMode = true
+ intercept[IllegalStateException](provider.startSafeModeCheck())
+ }
}
+
+ private class SafeModeTestProvider(conf: LivyConf, context:
Option[FileContext])
+ extends FileSystemStateStore(conf, context) {
+ var inSafeMode = true
+
+ override def isFsInSafeMode(): Boolean = inSafeMode
+ }
+
}