This is an automated email from the ASF dual-hosted git repository. dbtsai pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new b477194 [SPARK-28157][CORE][2.4] Make SHS clear KVStore `LogInfo`s for the blacklisted entries b477194 is described below commit b47719449ba8a6e454f3cce8aefb947d68ff2095 Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Thu Jun 27 16:57:44 2019 +0000 [SPARK-28157][CORE][2.4] Make SHS clear KVStore `LogInfo`s for the blacklisted entries ## What changes were proposed in this pull request? At Spark 2.4.0/2.3.2/2.2.3, [SPARK-24948](https://issues.apache.org/jira/browse/SPARK-24948) delegated access permission checks to the file system, and maintains a blacklist for all event log files failed once at reading. The blacklisted log files are released back after `CLEAN_INTERVAL_S` seconds. However, the released files whose sizes don't changes are ignored forever due to `info.fileSize < entry.getLen()` condition (previously [here](https://github.com/apache/spark/commit/3c96937c7b1d7a010b630f4b98fd22dafc37808b#diff-a7befb99e7bd7e3ab5c46c2568aa5b3eR454) and now at [shouldReloadLog](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L571)) which returns `false` always when the size is the same with the exi [...] This PR aims to remove the existing entry from `KVStore` when it goes to the blacklist. ## How was this patch tested? Pass the Jenkins with the updated test case. Closes #24975 from dongjoon-hyun/SPARK-28157-2.4. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: DB Tsai <d_t...@apple.com> --- .../apache/spark/deploy/history/FsHistoryProvider.scala | 3 +++ .../spark/deploy/history/FsHistoryProviderSuite.scala | 14 +++++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 38ed5e0..f736066 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -532,6 +532,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // We don't have read permissions on the log file logWarning(s"Unable to read log $path", e.getCause) blacklist(path) + // SPARK-28157 We should remove this blacklisted entry from the KVStore + // to handle permission-only changes with the same file sizes later. + listing.delete(classOf[LogInfo], path.toString) case e: Exception => logError("Exception while merging application listings", e) } finally { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 98ffd72..6aad00b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -869,11 +869,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc writeFile(accessGranted, true, None, SparkListenerApplicationStart("accessGranted", Some("accessGranted"), 1L, "test", None), SparkListenerApplicationEnd(5L)) + var isReadable = false val mockedFs = spy(provider.fs) doThrow(new AccessControlException("Cannot read accessDenied file")).when(mockedFs).open( argThat(new ArgumentMatcher[Path]() { override def matches(path: Any): Boolean = { - path.asInstanceOf[Path].getName.toLowerCase == "accessdenied" + path.asInstanceOf[Path].getName.toLowerCase == "accessdenied" && !isReadable } })) val mockedProvider = spy(provider) @@ -881,9 +882,6 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc updateAndCheck(mockedProvider) { list => list.size should be(1) } - writeFile(accessDenied, true, None, - SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None), - SparkListenerApplicationEnd(5L)) // Doing 2 times in order to check the blacklist filter too updateAndCheck(mockedProvider) { list => list.size should be(1) @@ -891,8 +889,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc val accessDeniedPath = new Path(accessDenied.getPath) assert(mockedProvider.isBlacklisted(accessDeniedPath)) clock.advance(24 * 60 * 60 * 1000 + 1) // add a bit more than 1d + isReadable = true mockedProvider.cleanLogs() - assert(!mockedProvider.isBlacklisted(accessDeniedPath)) + updateAndCheck(mockedProvider) { list => + assert(!mockedProvider.isBlacklisted(accessDeniedPath)) + assert(list.exists(_.name == "accessDenied")) + assert(list.exists(_.name == "accessGranted")) + list.size should be(2) + } } test("check in-progress event logs absolute length") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org