This is an automated email from the ASF dual-hosted git repository. shv pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.10 by this push: new 104dd85 HDFS-15567. [SBN Read] HDFS should expose msync() API to allow downstream applications call it explicitly. Contributed by Konstantin V Shvachko. 104dd85 is described below commit 104dd85ad84829758fbe63a1b7300e639e02ee90 Author: Konstantin V Shvachko <s...@apache.org> AuthorDate: Mon Oct 12 17:26:24 2020 -0700 HDFS-15567. [SBN Read] HDFS should expose msync() API to allow downstream applications call it explicitly. Contributed by Konstantin V Shvachko. (cherry picked from commit b3786d6c3cc13b0b92b9f42da1731c4ce35c9ded) --- .../org/apache/hadoop/fs/AbstractFileSystem.java | 13 +++++++ .../java/org/apache/hadoop/fs/FileContext.java | 10 ++++++ .../main/java/org/apache/hadoop/fs/FileSystem.java | 13 +++++++ .../org/apache/hadoop/fs/FilterFileSystem.java | 5 +++ .../main/java/org/apache/hadoop/fs/FilterFs.java | 5 +++ .../java/org/apache/hadoop/fs/HarFileSystem.java | 5 +++ .../src/main/java/org/apache/hadoop/fs/Hdfs.java | 13 ++++++- .../apache/hadoop/hdfs/DistributedFileSystem.java | 11 ++++++ .../org/apache/hadoop/hdfs/MiniDFSCluster.java | 3 +- .../namenode/ha/TestConsistentReadsObserver.java | 41 ++++++++++++++++++++-- 10 files changed, 115 insertions(+), 4 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java index df14ee8..bae1454 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java @@ -845,6 +845,19 @@ public abstract class AbstractFileSystem { UnresolvedLinkException, IOException; /** + * Synchronize client metadata state. + * <p/>In some FileSystem implementations such as HDFS metadata + * synchronization is essential to guarantee consistency of read requests + * particularly in HA setting. + * @throws IOException + * @throws UnsupportedOperationException + */ + public void msync() throws IOException, UnsupportedOperationException { + throw new UnsupportedOperationException(getClass().getCanonicalName() + + " does not support method msync"); + } + + /** * The specification of this method matches that of * {@link FileContext#access(Path, FsAction)} * except that an UnresolvedLinkException may be thrown if a symlink is diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java index 12b39bb..4f0f2fc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java @@ -1189,6 +1189,16 @@ public class FileContext { } /** + * Synchronize client metadata state. + * + * @throws IOException + * @throws UnsupportedOperationException + */ + public void msync() throws IOException, UnsupportedOperationException { + defaultFS.msync(); + } + + /** * Checks if the user can access a path. The mode specifies which access * checks to perform. If the requested permissions are granted, then the * method returns normally. If access is denied, then the method throws an diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index c32f4cd..176362c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -2508,6 +2508,19 @@ public abstract class FileSystem extends Configured implements Closeable { public abstract FileStatus getFileStatus(Path f) throws IOException; /** + * Synchronize client metadata state. + * <p/>In some FileSystem implementations such as HDFS metadata + * synchronization is essential to guarantee consistency of read requests + * particularly in HA setting. + * @throws IOException + * @throws UnsupportedOperationException + */ + public void msync() throws IOException, UnsupportedOperationException { + throw new UnsupportedOperationException(getClass().getCanonicalName() + + " does not support method msync"); + } + + /** * Checks if the user can access a path. The mode specifies which access * checks to perform. If the requested permissions are granted, then the * method returns normally. If access is denied, then the method throws an diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java index 1c38df8..0a346d1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java @@ -447,6 +447,11 @@ public class FilterFileSystem extends FileSystem { } @Override + public void msync() throws IOException, UnsupportedOperationException { + fs.msync(); + } + + @Override public void access(Path path, FsAction mode) throws AccessControlException, FileNotFoundException, IOException { fs.access(path, mode); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java index b2a9aab..cccf681 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java @@ -123,6 +123,11 @@ public abstract class FilterFs extends AbstractFileSystem { } @Override + public void msync() throws IOException, UnsupportedOperationException { + myFs.msync(); + } + + @Override public void access(Path path, FsAction mode) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException { checkPath(path); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java index aa58706..807fb6b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java @@ -673,6 +673,11 @@ public class HarFileSystem extends FileSystem { return hstatus; } + @Override + public void msync() throws IOException, UnsupportedOperationException { + fs.msync(); + } + /** * @return null since no checksum algorithm is implemented. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java index 645f1ad..17246cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java @@ -137,7 +137,18 @@ public class Hdfs extends AbstractFileSystem { throw new FileNotFoundException("File does not exist: " + f.toString()); } } - + + /** + * Synchronize client metadata state with Active NameNode. + * <p/>In HA the client synchronizes its state with the Active NameNode + * in order to guarantee subsequent read consistency from Observer Nodes. + * @throws IOException + */ + @Override + public void msync() throws IOException { + dfs.msync(); + } + @Override public FileStatus getFileLinkStatus(Path f) throws IOException, UnresolvedLinkException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 362768d..9babc9c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -1536,6 +1536,17 @@ public class DistributedFileSystem extends FileSystem }.resolve(this, absF); } + /** + * Synchronize client metadata state with Active NameNode. + * <p/>In HA the client synchronizes its state with the Active NameNode + * in order to guarantee subsequent read consistency from Observer Nodes. + * @throws IOException + */ + @Override + public void msync() throws IOException { + dfs.msync(); + } + @SuppressWarnings("deprecation") @Override public void createSymlink(final Path target, final Path link, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index b0586d7..d905e95 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -2656,7 +2656,8 @@ public class MiniDFSCluster implements AutoCloseable { public void rollEditLogAndTail(int nnIndex) throws Exception { getNameNode(nnIndex).getRpcServer().rollEditLog(); for (int i = 2; i < getNumNameNodes(); i++) { - getNameNode(i).getNamesystem().getEditLogTailer().doTailEdits(); + long el = getNameNode(i).getNamesystem().getEditLogTailer().doTailEdits(); + LOG.info("editsLoaded " + el); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java index 780f807..f01a511 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java @@ -23,6 +23,8 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import com.google.common.base.Supplier; + +import java.io.FileNotFoundException; import java.io.IOException; import java.util.Collections; import java.util.concurrent.TimeUnit; @@ -31,9 +33,12 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.ha.HAServiceStatus; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; @@ -110,7 +115,8 @@ public class TestConsistentReadsObserver { final int observerIdx = 2; NameNode nn = dfsCluster.getNameNode(observerIdx); int port = nn.getNameNodeAddress().getPort(); - Configuration configuration = dfsCluster.getConfiguration(observerIdx); + Configuration originalConf = dfsCluster.getConfiguration(observerIdx); + Configuration configuration = new Configuration(originalConf); String prefix = CommonConfigurationKeys.IPC_NAMESPACE + "." + port + "."; configuration.set(prefix + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY, TestRpcScheduler.class.getName()); @@ -127,6 +133,8 @@ public class TestConsistentReadsObserver { // be triggered and client should retry active NN. dfs.getFileStatus(testPath); assertSentTo(0); + // reset the original call queue + NameNodeAdapter.getRpcServer(nn).refreshCallQueue(originalConf); } @Test @@ -207,7 +215,7 @@ public class TestConsistentReadsObserver { // Therefore, the subsequent getFileStatus call should succeed. if (!autoMsync) { // If not testing auto-msync, perform an explicit one here - dfs2.getClient().msync(); + dfs2.msync(); } else if (autoMsyncPeriodMs > 0) { Thread.sleep(autoMsyncPeriodMs); } @@ -413,6 +421,35 @@ public class TestConsistentReadsObserver { } } + @Test(timeout=10000) + public void testMsyncFileContext() throws Exception { + NameNode nn0 = dfsCluster.getNameNode(0); + NameNode nn2 = dfsCluster.getNameNode(2); + HAServiceStatus st = nn0.getRpcServer().getServiceStatus(); + assertEquals("nn0 is not active", HAServiceState.ACTIVE, st.getState()); + st = nn2.getRpcServer().getServiceStatus(); + assertEquals("nn2 is not observer", HAServiceState.OBSERVER, st.getState()); + + FileContext fc = FileContext.getFileContext(conf); + // initialize observer proxy for FileContext + fc.getFsStatus(testPath); + + Path p = new Path(testPath, "testMsyncFileContext"); + fc.mkdir(p, FsPermission.getDefault(), true); + fc.msync(); + dfsCluster.rollEditLogAndTail(0); + LOG.info("State id active = {}, Stat id observer = {}", + nn0.getNamesystem().getFSImage().getLastAppliedOrWrittenTxId(), + nn2.getNamesystem().getFSImage().getLastAppliedOrWrittenTxId()); + try { + // if getFileStatus is taking too long due to server requeueing + // the test will time out + fc.getFileStatus(p); + } catch (FileNotFoundException e) { + fail("File should exist on Observer after msync"); + } + } + private void assertSentTo(int nnIdx) throws IOException { assertTrue("Request was not sent to the expected namenode " + nnIdx, HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx)); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org