Repository: tez Updated Branches: refs/heads/master c909f45a7 -> 89d47c325
TEZ-3566. Avoid caching fs isntances in TokenCache after a point. Contributed by Harish Jaiprakash. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/89d47c32 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/89d47c32 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/89d47c32 Branch: refs/heads/master Commit: 89d47c325ac17b61053b1806a93a11f270b980b4 Parents: c909f45 Author: Siddharth Seth <[email protected]> Authored: Mon Jan 9 19:02:03 2017 -0800 Committer: Siddharth Seth <[email protected]> Committed: Mon Jan 9 19:02:03 2017 -0800 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/tez/common/security/TokenCache.java | 14 ++++- .../tez/common/security/TestTokenCache.java | 54 +++++++++++++++++++- 3 files changed, 68 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/89d47c32/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d49507b..6aa66f3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3566. Avoid caching fs isntances in TokenCache after a point. TEZ-3568. Update SecurityUtils configuration to pick user provided configuration. TEZ-3561. Fix wrong tez tarball name in install.md. TEZ-3565. amConfig should check queuename isEmpty. @@ -165,6 +166,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3566. Avoid caching fs isntances in TokenCache after a point. TEZ-3568. Update SecurityUtils configuration to pick user provided configuration. TEZ-3559. TEZ_LIB_URIS doesn't work with schemes different than the defaultFS TEZ-3549. TaskAttemptImpl does not initialize TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS correctly http://git-wip-us.apache.org/repos/asf/tez/blob/89d47c32/tez-api/src/main/java/org/apache/tez/common/security/TokenCache.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/security/TokenCache.java b/tez-api/src/main/java/org/apache/tez/common/security/TokenCache.java index 8bb23fb..0ce5844 100644 --- a/tez-api/src/main/java/org/apache/tez/common/security/TokenCache.java +++ b/tez-api/src/main/java/org/apache/tez/common/security/TokenCache.java @@ -76,11 +76,23 @@ public class TokenCache { obtainTokensForFileSystemsInternal(credentials, ps, conf); } + private static final int MAX_FS_OBJECTS = 10; static void obtainTokensForFileSystemsInternal(Credentials credentials, Path[] ps, Configuration conf) throws IOException { Set<FileSystem> fsSet = new HashSet<FileSystem>(); for(Path p: ps) { - fsSet.add(p.getFileSystem(conf)); + FileSystem fs = p.getFileSystem(conf); + if (fsSet.size() == MAX_FS_OBJECTS) { + LOG.warn("No of FileSystem objects exceeds {}, updating tokens for all paths. This can" + + " happen when fs.<scheme>.impl.disable.cache is set to true."); + } + if (fsSet.size() >= MAX_FS_OBJECTS) { + // Too many fs objects are being created, most likely the cache is disabled. Prevent an + // OOM and just directly invoke instead of adding to the set. + obtainTokensForFileSystemsInternal(fs, credentials, conf); + } else { + fsSet.add(fs); + } } for (FileSystem fs : fsSet) { obtainTokensForFileSystemsInternal(fs, credentials, conf); http://git-wip-us.apache.org/repos/asf/tez/blob/89d47c32/tez-api/src/test/java/org/apache/tez/common/security/TestTokenCache.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/common/security/TestTokenCache.java b/tez-api/src/test/java/org/apache/tez/common/security/TestTokenCache.java index 00c6842..6fc6daa 100644 --- a/tez-api/src/test/java/org/apache/tez/common/security/TestTokenCache.java +++ b/tez-api/src/test/java/org/apache/tez/common/security/TestTokenCache.java @@ -19,13 +19,19 @@ package org.apache.tez.common.security; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemTestHelper.MockFileSystem; +import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; @@ -62,7 +68,6 @@ public class TestTokenCache { MockFileSystem fs1 = createFileSystemForServiceName("service1"); MockFileSystem fs2 = createFileSystemForServiceName("service2"); - MockFileSystem fs3 = createFileSystemForServiceName("service3"); // get the tokens for fs1 & fs2 and write out to binary creds file Credentials creds = new Credentials(); @@ -89,6 +94,53 @@ public class TestTokenCache { } } + @Test(timeout=5000) + public void testObtainTokensForFileSystems() throws Exception { + Path[] paths = makePaths(100, "test://dir/file"); + Credentials creds = new Credentials(); + Configuration conf = new Configuration(TestTokenCache.conf); + conf.set("fs.test.impl", TestFileSystem.class.getName()); + + // Cache enabled should be invoked only once + conf.setBoolean("fs.test.impl.disable.cache", false); + TokenCache.obtainTokensForFileSystemsInternal(creds, paths, conf); + verify(TestFileSystem.fs, times(1)).addDelegationTokens(renewer, creds); + + // Cache disabled should be invoked for every path. + conf.setBoolean("fs.test.impl.disable.cache", true); + TokenCache.obtainTokensForFileSystemsInternal(creds, paths, conf); + verify(TestFileSystem.fs, times(paths.length + 1)).addDelegationTokens(renewer, creds); + } + + private Path[] makePaths(int count, String prefix) throws Exception { + Path[] ps = new Path[count]; + for (int i = 0; i < count; ++i) { + ps[i] = new Path(prefix + i); + } + return ps; + } + + public static class TestFileSystem extends FilterFileSystem { + static final FileSystem fs = mock(FileSystem.class); + static { + try { + when(fs.getUri()).thenReturn(new URI("test:///")); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + public TestFileSystem() { + super(fs); + } + + @Override + public Token<?>[] addDelegationTokens(String renewer, Credentials credentials) + throws IOException { + return fs.addDelegationTokens(renewer, credentials); + } + } + private MockFileSystem createFileSystemForServiceName(final String service) throws IOException { MockFileSystem mockFs = new MockFileSystem();
