Repository: spark
Updated Branches:
  refs/heads/master 278984d5a -> 3c96937c7


[SPARK-24948][SHS] Delegate check access permissions to the file system

## What changes were proposed in this pull request?

In `SparkHadoopUtil. checkAccessPermission`,  we consider only basic 
permissions in order to check wether a user can access a file or not. This is 
not a complete check, as it ignores ACLs and other policies a file system may 
apply in its internal. So this can result in returning wrongly that a user 
cannot access a file (despite he actually can).

The PR proposes to delegate to the filesystem the check whether a file is 
accessible or not, in order to return the right result. A caching layer is 
added for performance reasons.

## How was this patch tested?

modified UTs

Author: Marco Gaido <marcogaid...@gmail.com>

Closes #21895 from mgaido91/SPARK-24948.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c96937c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c96937c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c96937c

Branch: refs/heads/master
Commit: 3c96937c7b1d7a010b630f4b98fd22dafc37808b
Parents: 278984d
Author: Marco Gaido <marcogaid...@gmail.com>
Authored: Mon Aug 6 14:29:05 2018 -0700
Committer: Mridul Muralidharan <mri...@gmail.com>
Committed: Mon Aug 6 14:29:05 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkHadoopUtil.scala   | 23 -----
 .../deploy/history/FsHistoryProvider.scala      | 67 ++++++++++----
 .../spark/deploy/SparkHadoopUtilSuite.scala     | 97 --------------------
 .../deploy/history/FsHistoryProviderSuite.scala | 42 ++++++++-
 4 files changed, 89 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3c96937c/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 8353e64..70a8c65 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -31,7 +31,6 @@ import scala.util.control.NonFatal
 import com.google.common.primitives.Longs
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
-import org.apache.hadoop.fs.permission.FsAction
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 import org.apache.hadoop.security.token.{Token, TokenIdentifier}
@@ -367,28 +366,6 @@ class SparkHadoopUtil extends Logging {
     buffer.toString
   }
 
-  private[spark] def checkAccessPermission(status: FileStatus, mode: 
FsAction): Boolean = {
-    val perm = status.getPermission
-    val ugi = UserGroupInformation.getCurrentUser
-
-    if (ugi.getShortUserName == status.getOwner) {
-      if (perm.getUserAction.implies(mode)) {
-        return true
-      }
-    } else if (ugi.getGroupNames.contains(status.getGroup)) {
-      if (perm.getGroupAction.implies(mode)) {
-        return true
-      }
-    } else if (perm.getOtherAction.implies(mode)) {
-      return true
-    }
-
-    logDebug(s"Permission denied: user=${ugi.getShortUserName}, " +
-      s"path=${status.getPath}:${status.getOwner}:${status.getGroup}" +
-      s"${if (status.isDirectory) "d" else "-"}$perm")
-    false
-  }
-
   def serialize(creds: Credentials): Array[Byte] = {
     val byteStream = new ByteArrayOutputStream
     val dataStream = new DataOutputStream(byteStream)

http://git-wip-us.apache.org/repos/asf/spark/blob/3c96937c/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index bf1eeb0..44d2390 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -21,11 +21,12 @@ import java.io.{File, FileNotFoundException, IOException}
 import java.nio.file.Files
 import java.nio.file.attribute.PosixFilePermissions
 import java.util.{Date, ServiceLoader}
-import java.util.concurrent.{ExecutorService, TimeUnit}
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, 
TimeUnit}
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.concurrent.ExecutionException
 import scala.io.Source
 import scala.util.Try
 import scala.xml.Node
@@ -33,8 +34,7 @@ import scala.xml.Node
 import com.fasterxml.jackson.annotation.JsonIgnore
 import com.google.common.io.ByteStreams
 import com.google.common.util.concurrent.MoreExecutors
-import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.fs.permission.FsAction
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 import org.apache.hadoop.hdfs.DistributedFileSystem
 import org.apache.hadoop.hdfs.protocol.HdfsConstants
 import org.apache.hadoop.security.AccessControlException
@@ -114,7 +114,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     "; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString)
 
   private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
-  private val fs = new Path(logDir).getFileSystem(hadoopConf)
+  // Visible for testing
+  private[history] val fs: FileSystem = new 
Path(logDir).getFileSystem(hadoopConf)
 
   // Used by check event thread and clean log thread.
   // Scheduled thread pool size must be one, otherwise it will have concurrent 
issues about fs
@@ -161,6 +162,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     new HistoryServerDiskManager(conf, path, listing, clock)
   }
 
+  private val blacklist = new ConcurrentHashMap[String, Long]
+
+  // Visible for testing
+  private[history] def isBlacklisted(path: Path): Boolean = {
+    blacklist.containsKey(path.getName)
+  }
+
+  private def blacklist(path: Path): Unit = {
+    blacklist.put(path.getName, clock.getTimeMillis())
+  }
+
+  /**
+   * Removes expired entries in the blacklist, according to the provided 
`expireTimeInSeconds`.
+   */
+  private def clearBlacklist(expireTimeInSeconds: Long): Unit = {
+    val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 1000
+    blacklist.asScala.retain((_, creationTime) => creationTime >= 
expiredThreshold)
+  }
+
   private val activeUIs = new mutable.HashMap[(String, Option[String]), 
LoadedAppUI]()
 
   /**
@@ -418,7 +438,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
             // reading a garbage file is safe, but we would log an error which 
can be scary to
             // the end-user.
             !entry.getPath().getName().startsWith(".") &&
-            SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
+            !isBlacklisted(entry.getPath)
         }
         .filter { entry =>
           try {
@@ -461,32 +481,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
         logDebug(s"New/updated attempts found: ${updated.size} 
${updated.map(_.getPath)}")
       }
 
-      val tasks = updated.map { entry =>
+      val tasks = updated.flatMap { entry =>
         try {
-          replayExecutor.submit(new Runnable {
+          val task: Future[Unit] = replayExecutor.submit(new Runnable {
             override def run(): Unit = mergeApplicationListing(entry, 
newLastScanTime, true)
-          })
+          }, Unit)
+          Some(task -> entry.getPath)
         } catch {
           // let the iteration over the updated entries break, since an 
exception on
           // replayExecutor.submit (..) indicates the ExecutorService is unable
           // to take any more submissions at this time
           case e: Exception =>
             logError(s"Exception while submitting event log for replay", e)
-            null
+            None
         }
-      }.filter(_ != null)
+      }
 
       pendingReplayTasksCount.addAndGet(tasks.size)
 
       // Wait for all tasks to finish. This makes sure that checkForLogs
       // is not scheduled again while some tasks are already running in
       // the replayExecutor.
-      tasks.foreach { task =>
+      tasks.foreach { case (task, path) =>
         try {
           task.get()
         } catch {
           case e: InterruptedException =>
             throw e
+          case e: ExecutionException if 
e.getCause.isInstanceOf[AccessControlException] =>
+            // We don't have read permissions on the log file
+            logWarning(s"Unable to read log $path", e.getCause)
+            blacklist(path)
           case e: Exception =>
             logError("Exception while merging application listings", e)
         } finally {
@@ -779,6 +804,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
         listing.delete(classOf[LogInfo], log.logPath)
       }
     }
+    // Clean the blacklist from the expired entries.
+    clearBlacklist(CLEAN_INTERVAL_S)
   }
 
   /**
@@ -938,13 +965,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
   }
 
   private def deleteLog(log: Path): Unit = {
-    try {
-      fs.delete(log, true)
-    } catch {
-      case _: AccessControlException =>
-        logInfo(s"No permission to delete $log, ignoring.")
-      case ioe: IOException =>
-        logError(s"IOException in cleaning $log", ioe)
+    if (isBlacklisted(log)) {
+      logDebug(s"Skipping deleting $log as we don't have permissions on it.")
+    } else {
+      try {
+        fs.delete(log, true)
+      } catch {
+        case _: AccessControlException =>
+          logInfo(s"No permission to delete $log, ignoring.")
+        case ioe: IOException =>
+          logError(s"IOException in cleaning $log", ioe)
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3c96937c/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala
deleted file mode 100644
index ab24a76..0000000
--- a/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy
-
-import java.security.PrivilegedExceptionAction
-
-import scala.util.Random
-
-import org.apache.hadoop.fs.FileStatus
-import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
-import org.apache.hadoop.security.UserGroupInformation
-import org.scalatest.Matchers
-
-import org.apache.spark.SparkFunSuite
-
-class SparkHadoopUtilSuite extends SparkFunSuite with Matchers {
-  test("check file permission") {
-    import FsAction._
-    val testUser = s"user-${Random.nextInt(100)}"
-    val testGroups = Array(s"group-${Random.nextInt(100)}")
-    val testUgi = UserGroupInformation.createUserForTesting(testUser, 
testGroups)
-
-    testUgi.doAs(new PrivilegedExceptionAction[Void] {
-      override def run(): Void = {
-        val sparkHadoopUtil = new SparkHadoopUtil
-
-        // If file is owned by user and user has access permission
-        var status = fileStatus(testUser, testGroups.head, READ_WRITE, 
READ_WRITE, NONE)
-        sparkHadoopUtil.checkAccessPermission(status, READ) should be(true)
-        sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(true)
-
-        // If file is owned by user but user has no access permission
-        status = fileStatus(testUser, testGroups.head, NONE, READ_WRITE, NONE)
-        sparkHadoopUtil.checkAccessPermission(status, READ) should be(false)
-        sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(false)
-
-        val otherUser = s"test-${Random.nextInt(100)}"
-        val otherGroup = s"test-${Random.nextInt(100)}"
-
-        // If file is owned by user's group and user's group has access 
permission
-        status = fileStatus(otherUser, testGroups.head, NONE, READ_WRITE, NONE)
-        sparkHadoopUtil.checkAccessPermission(status, READ) should be(true)
-        sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(true)
-
-        // If file is owned by user's group but user's group has no access 
permission
-        status = fileStatus(otherUser, testGroups.head, READ_WRITE, NONE, NONE)
-        sparkHadoopUtil.checkAccessPermission(status, READ) should be(false)
-        sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(false)
-
-        // If file is owned by other user and this user has access permission
-        status = fileStatus(otherUser, otherGroup, READ_WRITE, READ_WRITE, 
READ_WRITE)
-        sparkHadoopUtil.checkAccessPermission(status, READ) should be(true)
-        sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(true)
-
-        // If file is owned by other user but this user has no access 
permission
-        status = fileStatus(otherUser, otherGroup, READ_WRITE, READ_WRITE, 
NONE)
-        sparkHadoopUtil.checkAccessPermission(status, READ) should be(false)
-        sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(false)
-
-        null
-      }
-    })
-  }
-
-  private def fileStatus(
-      owner: String,
-      group: String,
-      userAction: FsAction,
-      groupAction: FsAction,
-      otherAction: FsAction): FileStatus = {
-    new FileStatus(0L,
-      false,
-      0,
-      0L,
-      0L,
-      0L,
-      new FsPermission(userAction, groupAction, otherAction),
-      owner,
-      group,
-      null)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/3c96937c/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 77b2394..b4eba75 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -29,9 +29,11 @@ import scala.language.postfixOps
 import com.google.common.io.{ByteStreams, Files}
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.hdfs.DistributedFileSystem
+import org.apache.hadoop.security.AccessControlException
 import org.json4s.jackson.JsonMethods._
-import org.mockito.Matchers.any
-import org.mockito.Mockito.{mock, spy, verify}
+import org.mockito.ArgumentMatcher
+import org.mockito.Matchers.{any, argThat}
+import org.mockito.Mockito.{doThrow, mock, spy, verify, when}
 import org.scalatest.BeforeAndAfter
 import org.scalatest.Matchers
 import org.scalatest.concurrent.Eventually._
@@ -818,6 +820,42 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     }
   }
 
+  test("SPARK-24948: blacklist files we don't have read permission on") {
+    val clock = new ManualClock(1533132471)
+    val provider = new FsHistoryProvider(createTestConf(), clock)
+    val accessDenied = newLogFile("accessDenied", None, inProgress = false)
+    writeFile(accessDenied, true, None,
+      SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, 
"test", None))
+    val accessGranted = newLogFile("accessGranted", None, inProgress = false)
+    writeFile(accessGranted, true, None,
+      SparkListenerApplicationStart("accessGranted", Some("accessGranted"), 
1L, "test", None),
+      SparkListenerApplicationEnd(5L))
+    val mockedFs = spy(provider.fs)
+    doThrow(new AccessControlException("Cannot read accessDenied 
file")).when(mockedFs).open(
+      argThat(new ArgumentMatcher[Path]() {
+        override def matches(path: Any): Boolean = {
+          path.asInstanceOf[Path].getName.toLowerCase == "accessdenied"
+        }
+      }))
+    val mockedProvider = spy(provider)
+    when(mockedProvider.fs).thenReturn(mockedFs)
+    updateAndCheck(mockedProvider) { list =>
+      list.size should be(1)
+    }
+    writeFile(accessDenied, true, None,
+      SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, 
"test", None),
+      SparkListenerApplicationEnd(5L))
+    // Doing 2 times in order to check the blacklist filter too
+    updateAndCheck(mockedProvider) { list =>
+      list.size should be(1)
+    }
+    val accessDeniedPath = new Path(accessDenied.getPath)
+    assert(mockedProvider.isBlacklisted(accessDeniedPath))
+    clock.advance(24 * 60 * 60 * 1000 + 1) // add a bit more than 1d
+    mockedProvider.cleanLogs()
+    assert(!mockedProvider.isBlacklisted(accessDeniedPath))
+  }
+
   /**
    * Asks the provider to check for logs and calls a function to perform 
checks on the updated
    * app list. Example:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to