This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 84fb0b442d HDDS-9347. Fix Ozone FS listStatus() cache-table 
inconsistencies. (#5399)
84fb0b442d is described below

commit 84fb0b442dcbb8e3e242ac5b43dc769cc087657a
Author: Sadanand Shenoy <[email protected]>
AuthorDate: Fri Oct 20 13:59:11 2023 +0530

    HDDS-9347. Fix Ozone FS listStatus() cache-table inconsistencies. (#5399)
    
    Co-authored-by: Chung En Lee <[email protected]>
---
 .../hadoop/fs/ozone/TestRootedOzoneFileSystem.java | 132 ++++++++++++---------
 .../fs/ozone/TestRootedOzoneFileSystemWithFSO.java |  10 +-
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  61 +++++++---
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |   3 +
 .../hadoop/ozone/om/TrashOzoneFileSystem.java      |   3 +
 .../ozone/om/ratis/OzoneManagerStateMachine.java   |   5 +
 ...OzoneManagerProtocolServerSideTranslatorPB.java |  17 ++-
 7 files changed, 157 insertions(+), 74 deletions(-)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
index fa38239f69..07c9680aa4 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
@@ -88,6 +88,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.net.URI;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Paths;
 import java.security.PrivilegedExceptionAction;
@@ -118,6 +119,7 @@ import static 
org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE;
 import static org.apache.hadoop.hdds.client.ECReplicationConfig.EcCodec.RS;
 import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_LISTING_PAGE_SIZE;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_OFS_SHARED_TMP_DIR;
@@ -153,28 +155,31 @@ public class TestRootedOzoneFileSystem {
   @Parameterized.Parameters
   public static Collection<Object[]> data() {
     return Arrays.asList(
-        new Object[]{true, true, true},
-        new Object[]{true, true, false},
-        new Object[]{true, false, false},
-        new Object[]{false, true, false},
-        new Object[]{false, false, false}
+        new Object[]{true, true, true, false},
+        new Object[]{true, true, false, false},
+        new Object[]{true, false, false, false},
+        new Object[]{false, true, false, false},
+        new Object[]{false, false, false, false},
+        new Object[]{true, true, false, true},
+        new Object[]{false, false, false, true}
     );
   }
 
   public TestRootedOzoneFileSystem(boolean setDefaultFs,
-      boolean enableOMRatis, boolean isAclEnabled) {
+      boolean enableOMRatis, boolean isAclEnabled, boolean noFlush) {
     // Ignored. Actual init done in initParam().
     // This empty constructor is still required to avoid argument exception.
   }
 
   @Parameterized.BeforeParam
-  public static void initParam(boolean setDefaultFs,
-                               boolean enableOMRatis, boolean isAclEnabled)
+  public static void initParam(boolean setDefaultFs, boolean enableOMRatis,
+      boolean isAclEnabled, boolean noFlush)
       throws IOException, InterruptedException, TimeoutException {
     // Initialize the cluster before EACH set of parameters
     enabledFileSystemPaths = setDefaultFs;
     omRatisEnabled = enableOMRatis;
     enableAcl = isAclEnabled;
+    useOnlyCache = noFlush;
     initClusterAndEnv();
   }
 
@@ -221,6 +226,8 @@ public class TestRootedOzoneFileSystem {
   private static boolean isBucketFSOptimized = false;
   private static boolean enableAcl;
 
+  private static boolean useOnlyCache;
+
   private static OzoneConfiguration conf;
   private static MiniOzoneCluster cluster = null;
   private static FileSystem fs;
@@ -294,6 +301,18 @@ public class TestRootedOzoneFileSystem {
     userOfs = UGI_USER1.doAs(
         (PrivilegedExceptionAction<RootedOzoneFileSystem>)()
             -> (RootedOzoneFileSystem) FileSystem.get(conf));
+
+    if (useOnlyCache) {
+      if (omRatisEnabled) {
+        cluster.getOzoneManager().getOmRatisServer().getOmStateMachine()
+            .getOzoneManagerDoubleBuffer().stopDaemon();
+      } else {
+        cluster.getOzoneManager().getOmServerProtocol()
+            .getOzoneManagerDoubleBuffer().stopDaemon();
+        cluster.getOzoneManager().getOmServerProtocol()
+            .setShouldFlushCache(false);
+      }
+    }
   }
 
   protected OMMetrics getOMMetrics() {
@@ -451,8 +470,9 @@ public class TestRootedOzoneFileSystem {
     ContractTestUtils.touch(fs, file4);
     fileStatuses = ofs.listStatus(parent);
     Assert.assertEquals(
-        "FileStatus did not return all children of the directory",
-        3, fileStatuses.length);
+        "FileStatus did not return all children of" +
+            " the directory : Got " + Arrays.toString(
+            fileStatuses), 3, fileStatuses.length);
 
     // Cleanup
     fs.delete(parent, true);
@@ -568,59 +588,55 @@ public class TestRootedOzoneFileSystem {
    */
   @Test
   public void testListStatusIteratorOnPageSize() throws Exception {
-    int[] pageSize = {
-        1, LISTING_PAGE_SIZE, LISTING_PAGE_SIZE + 1,
-        LISTING_PAGE_SIZE - 1, LISTING_PAGE_SIZE + LISTING_PAGE_SIZE / 2,
-        LISTING_PAGE_SIZE + LISTING_PAGE_SIZE
+    final int pageSize = 32;
+    int[] dirCounts = {
+        1,
+        pageSize - 1,
+        pageSize,
+        pageSize + 1,
+        pageSize + pageSize / 2,
+        pageSize + pageSize
     };
-    for (int numDir : pageSize) {
-      int range = numDir / LISTING_PAGE_SIZE;
-      switch (range) {
-      case 0:
-        listStatusIterator(numDir);
-        break;
-      case 1:
-        listStatusIterator(numDir);
-        break;
-      case 2:
-        listStatusIterator(numDir);
-        break;
-      default:
-        listStatusIterator(numDir);
-      }
-    }
-  }
-
-  private void listStatusIterator(int numDirs) throws IOException {
+    OzoneConfiguration config = new OzoneConfiguration(conf);
+    config.setInt(OZONE_FS_LISTING_PAGE_SIZE, pageSize);
+    URI uri = FileSystem.getDefaultUri(config);
+    config.setBoolean(
+        String.format("fs.%s.impl.disable.cache", uri.getScheme()), true);
+    FileSystem subject = FileSystem.get(uri, config);
     Path root = new Path("/" + volumeName + "/" + bucketName);
-    Set<String> paths = new TreeSet<>();
+    Path dir = new Path(root, "listStatusIterator");
     try {
-      for (int i = 0; i < numDirs; i++) {
-        Path p = new Path(root, String.valueOf(i));
-        fs.mkdirs(p);
-        paths.add(p.getName());
+      Set<String> paths = new TreeSet<>();
+      for (int dirCount : dirCounts) {
+        listStatusIterator(subject, dir, paths, dirCount);
       }
+    } finally {
+      subject.delete(dir, true);
+    }
+  }
 
-      RemoteIterator<FileStatus> iterator = ofs.listStatusIterator(root);
-      int iCount = 0;
-      if (iterator != null) {
-        while (iterator.hasNext()) {
-          FileStatus fileStatus = iterator.next();
-          iCount++;
-          Assert.assertTrue(paths.contains(fileStatus.getPath().getName()));
-        }
-      }
-      Assert.assertEquals(
-          "Total directories listed do not match the existing directories",
-          numDirs, iCount);
+  private static void listStatusIterator(FileSystem subject,
+      Path dir, Set<String> paths, int total) throws IOException {
+    for (int i = paths.size(); i < total; i++) {
+      Path p = new Path(dir, String.valueOf(i));
+      subject.mkdirs(p);
+      paths.add(p.getName());
+    }
 
-    } finally {
-      // Cleanup
-      for (int i = 0; i < numDirs; i++) {
-        Path p = new Path(root, String.valueOf(i));
-        fs.delete(p, true);
+    RemoteIterator<FileStatus> iterator = subject.listStatusIterator(dir);
+    int iCount = 0;
+    if (iterator != null) {
+      while (iterator.hasNext()) {
+        FileStatus fileStatus = iterator.next();
+        iCount++;
+        String filename = fileStatus.getPath().getName();
+        assertTrue(filename + " not found", paths.contains(filename));
       }
     }
+
+    assertEquals(
+        "Total directories listed do not match the existing directories",
+        total, iCount);
   }
 
   /**
@@ -2429,6 +2445,9 @@ public class TestRootedOzoneFileSystem {
 
   @Test
   public void testSnapshotRead() throws Exception {
+    if (useOnlyCache) {
+      return;
+    }
     // Init data
     OzoneBucket bucket1 =
         TestDataUtil.createVolumeAndBucket(client, bucketLayout);
@@ -2475,6 +2494,9 @@ public class TestRootedOzoneFileSystem {
 
   @Test
   public void testSnapshotDiff() throws Exception {
+    if (useOnlyCache) {
+      return;
+    }
     OzoneBucket bucket1 =
         TestDataUtil.createVolumeAndBucket(client, bucketLayout);
     Path volumePath1 = new Path(OZONE_URI_DELIMITER, bucket1.getVolumeName());
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystemWithFSO.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystemWithFSO.java
index d1599d8689..b4005169f8 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystemWithFSO.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystemWithFSO.java
@@ -56,14 +56,16 @@ public class TestRootedOzoneFileSystemWithFSO
   @Parameterized.Parameters
   public static Collection<Object[]> data() {
     return Arrays.asList(
-        new Object[]{true, true, false},
-        new Object[]{true, false, false}
+        new Object[]{true, true, false, false},
+        new Object[]{true, false, false, false},
+        new Object[]{true, true, false, true},
+        new Object[]{true, false, false, true}
     );
   }
 
   public TestRootedOzoneFileSystemWithFSO(boolean setDefaultFs,
-      boolean enableOMRatis, boolean enableAcl) {
-    super(setDefaultFs, enableOMRatis, enableAcl);
+      boolean enableOMRatis, boolean isAclEnabled, boolean noFlush) {
+    super(setDefaultFs, enableOMRatis, isAclEnabled, noFlush);
   }
 
   @BeforeClass
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index a230882112..1e62c5c57c 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -1441,7 +1441,13 @@ public class KeyManagerImpl implements KeyManager {
   private void listStatusFindKeyInTableCache(
       Iterator<Map.Entry<CacheKey<String>, CacheValue<OmKeyInfo>>> cacheIter,
       String keyArgs, String startCacheKey, boolean recursive,
-      TreeMap<String, OzoneFileStatus> cacheKeyMap) {
+      TreeMap<String, OzoneFileStatus> cacheKeyMap) throws IOException {
+
+    Map<String, OmKeyInfo> remainingKeys = new HashMap<>();
+    // extract the /volume/buck/ prefix from the startCacheKey
+    int volBuckEndIndex = StringUtils.ordinalIndexOf(
+        startCacheKey, OZONE_URI_DELIMITER, 3);
+    String volumeBuckPrefix = startCacheKey.substring(0, volBuckEndIndex + 1);
 
     while (cacheIter.hasNext()) {
       Map.Entry<CacheKey<String>, CacheValue<OmKeyInfo>> entry =
@@ -1452,14 +1458,14 @@ public class KeyManagerImpl implements KeyManager {
       }
       OmKeyInfo cacheOmKeyInfo = entry.getValue().getCacheValue();
       // cacheOmKeyInfo is null if an entry is deleted in cache
-      if (cacheOmKeyInfo != null
-          && cacheKey.startsWith(startCacheKey)
-          && cacheKey.compareTo(startCacheKey) >= 0) {
+      if (cacheOmKeyInfo != null && cacheKey.startsWith(
+          keyArgs) && cacheKey.compareTo(startCacheKey) >= 0) {
         if (!recursive) {
           String remainingKey = StringUtils.stripEnd(cacheKey.substring(
-              startCacheKey.length()), OZONE_URI_DELIMITER);
+              keyArgs.length()), OZONE_URI_DELIMITER);
           // For non-recursive, the remaining part of key can't have '/'
           if (remainingKey.contains(OZONE_URI_DELIMITER)) {
+            remainingKeys.put(cacheKey, cacheOmKeyInfo);
             continue;
           }
         }
@@ -1474,6 +1480,31 @@ public class KeyManagerImpl implements KeyManager {
         cacheKeyMap.put(cacheKey, null);
       }
     }
+
+    // let's say fsPaths is disabled, then creating a key like a/b/c
+    // will not create intermediate keys in the keyTable so only entry
+    // in the keyTable would be {a/b/c}. This would be skipped from getting
+    // added to cacheKeyMap above as remainingKey would be {b/c} and it
+    // contains the slash, In this case we track such keys which are not added
+    // to the map, find the immediate child and check if they are present in
+    // the map. If not create a fake dir and add it. This is similar to the
+    // logic in findKeyInDbWithIterator.
+    if (!recursive) {
+      for (Map.Entry<String, OmKeyInfo> entry : remainingKeys.entrySet()) {
+        String remainingKey = entry.getKey();
+        String immediateChild =
+            OzoneFSUtils.getImmediateChild(remainingKey, keyArgs);
+        if (!cacheKeyMap.containsKey(immediateChild)) {
+          // immediateChild contains volume/bucket prefix remove it.
+          String immediateChildKeyName =
+              immediateChild.replaceAll(volumeBuckPrefix, "");
+          OmKeyInfo fakeDirEntry =
+              createDirectoryKey(entry.getValue(), immediateChildKeyName);
+          cacheKeyMap.put(immediateChild,
+              new OzoneFileStatus(fakeDirEntry, scmBlockSize, true));
+        }
+      }
+    }
   }
 
   /**
@@ -1521,14 +1552,15 @@ public class KeyManagerImpl implements KeyManager {
       String startKey, long numEntries, String clientAddress,
       boolean allowPartialPrefixes) throws IOException {
     Preconditions.checkNotNull(args, "Key args can not be null");
-    String volName = args.getVolumeName();
-    String buckName = args.getBucketName();
+    String volumeName = args.getVolumeName();
+    String bucketName = args.getBucketName();
+    String keyName = args.getKeyName();
     List<OzoneFileStatus> fileStatusList = new ArrayList<>();
     if (numEntries <= 0) {
       return fileStatusList;
     }
 
-    if (isBucketFSOptimized(volName, buckName)) {
+    if (isBucketFSOptimized(volumeName, bucketName)) {
       Preconditions.checkArgument(!recursive);
       OzoneListStatusHelper statusHelper =
           new OzoneListStatusHelper(metadataManager, scmBlockSize,
@@ -1540,9 +1572,6 @@ public class KeyManagerImpl implements KeyManager {
       return buildFinalStatusList(statuses, args, clientAddress);
     }
 
-    String volumeName = args.getVolumeName();
-    String bucketName = args.getBucketName();
-    String keyName = args.getKeyName();
     // A map sorted by OmKey to combine results from TableCache and DB.
     TreeMap<String, OzoneFileStatus> cacheKeyMap = new TreeMap<>();
 
@@ -1564,8 +1593,8 @@ public class KeyManagerImpl implements KeyManager {
     metadataManager.getLock().acquireReadLock(BUCKET_LOCK, volumeName,
         bucketName);
     try {
-      keyTable = metadataManager
-          .getKeyTable(getBucketLayout(metadataManager, volName, buckName));
+      keyTable = metadataManager.getKeyTable(
+          getBucketLayout(metadataManager, volumeName, bucketName));
       iterator = getIteratorForKeyInTableCache(recursive, startKey,
           volumeName, bucketName, cacheKeyMap, keyArgs, keyTable);
     } finally {
@@ -1687,7 +1716,11 @@ public class KeyManagerImpl implements KeyManager {
                 if (!entryKeyName.equals(immediateChild)) {
                   OmKeyInfo fakeDirEntry = createDirectoryKey(
                       omKeyInfo, immediateChild);
-                  cacheKeyMap.put(entryInDb,
+                  String fakeDirKey = ozoneManager.getMetadataManager()
+                      .getOzoneKey(fakeDirEntry.getVolumeName(),
+                          fakeDirEntry.getBucketName(),
+                          fakeDirEntry.getKeyName());
+                  cacheKeyMap.put(fakeDirKey,
                       new OzoneFileStatus(fakeDirEntry,
                           scmBlockSize, true));
                 } else {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index e6b027427d..5616048ffd 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -1486,6 +1486,9 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
       Map.Entry<CacheKey<String>, CacheValue<OmVolumeArgs>> entry =
           cacheIterator.next();
       omVolumeArgs = entry.getValue().getCacheValue();
+      if (omVolumeArgs == null) {
+        continue;
+      }
       volumeName = omVolumeArgs.getVolume();
 
       if (!prefixIsEmpty && !volumeName.startsWith(prefix)) {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java
index 115a8c5f8e..bc650339db 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java
@@ -308,6 +308,9 @@ public class TrashOzoneFileSystem extends FileSystem {
       Map.Entry<CacheKey<String>, CacheValue<OmBucketInfo>> entry =
           bucketIterator.next();
       OmBucketInfo omBucketInfo = entry.getValue().getCacheValue();
+      if (omBucketInfo == null) {
+        continue;
+      }
       Path volumePath = new Path(OZONE_URI_DELIMITER,
           omBucketInfo.getVolumeName());
       Path bucketPath = new Path(volumePath, omBucketInfo.getBucketName());
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 0551e6dfc7..9aefa41f46 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -753,4 +753,9 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
   public void awaitDoubleBufferFlush() throws InterruptedException {
     ozoneManagerDoubleBuffer.awaitFlush();
   }
+
+  @VisibleForTesting
+  public OzoneManagerDoubleBuffer getOzoneManagerDoubleBuffer() {
+    return ozoneManagerDoubleBuffer;
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index c8e9b679cf..d26cce0c13 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -77,6 +77,9 @@ public class OzoneManagerProtocolServerSideTranslatorPB 
implements
       ProtocolMessageEnum> dispatcher;
   private final RequestValidations requestValidations;
 
+  // always true, only used in tests
+  private boolean shouldFlushCache = true;
+
   /**
    * Constructs an instance of the server handler.
    *
@@ -284,7 +287,9 @@ public class OzoneManagerProtocolServerSideTranslatorPB 
implements
       return createErrorResponse(request, ex);
     }
     try {
-      omClientResponse.getFlushFuture().get();
+      if (shouldFlushCache) {
+        omClientResponse.getFlushFuture().get();
+      }
       if (LOG.isTraceEnabled()) {
         LOG.trace("Future for {} is completed", request);
       }
@@ -340,4 +345,14 @@ public class OzoneManagerProtocolServerSideTranslatorPB 
implements
   public void awaitDoubleBufferFlush() throws InterruptedException {
     ozoneManagerDoubleBuffer.awaitFlush();
   }
+
+  @VisibleForTesting
+  public OzoneManagerDoubleBuffer getOzoneManagerDoubleBuffer() {
+    return ozoneManagerDoubleBuffer;
+  }
+
+  @VisibleForTesting
+  public void setShouldFlushCache(boolean shouldFlushCache) {
+    this.shouldFlushCache = shouldFlushCache;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to