This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 84fb0b442d HDDS-9347. Fix Ozone FS listStatus() cache-table
inconsistencies. (#5399)
84fb0b442d is described below
commit 84fb0b442dcbb8e3e242ac5b43dc769cc087657a
Author: Sadanand Shenoy <[email protected]>
AuthorDate: Fri Oct 20 13:59:11 2023 +0530
HDDS-9347. Fix Ozone FS listStatus() cache-table inconsistencies. (#5399)
Co-authored-by: Chung En Lee <[email protected]>
---
.../hadoop/fs/ozone/TestRootedOzoneFileSystem.java | 132 ++++++++++++---------
.../fs/ozone/TestRootedOzoneFileSystemWithFSO.java | 10 +-
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 61 +++++++---
.../hadoop/ozone/om/OmMetadataManagerImpl.java | 3 +
.../hadoop/ozone/om/TrashOzoneFileSystem.java | 3 +
.../ozone/om/ratis/OzoneManagerStateMachine.java | 5 +
...OzoneManagerProtocolServerSideTranslatorPB.java | 17 ++-
7 files changed, 157 insertions(+), 74 deletions(-)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
index fa38239f69..07c9680aa4 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
@@ -88,6 +88,7 @@ import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.security.PrivilegedExceptionAction;
@@ -118,6 +119,7 @@ import static
org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE;
import static org.apache.hadoop.hdds.client.ECReplicationConfig.EcCodec.RS;
import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_LISTING_PAGE_SIZE;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_OFS_SHARED_TMP_DIR;
@@ -153,28 +155,31 @@ public class TestRootedOzoneFileSystem {
@Parameterized.Parameters
public static Collection<Object[]> data() {
return Arrays.asList(
- new Object[]{true, true, true},
- new Object[]{true, true, false},
- new Object[]{true, false, false},
- new Object[]{false, true, false},
- new Object[]{false, false, false}
+ new Object[]{true, true, true, false},
+ new Object[]{true, true, false, false},
+ new Object[]{true, false, false, false},
+ new Object[]{false, true, false, false},
+ new Object[]{false, false, false, false},
+ new Object[]{true, true, false, true},
+ new Object[]{false, false, false, true}
);
}
public TestRootedOzoneFileSystem(boolean setDefaultFs,
- boolean enableOMRatis, boolean isAclEnabled) {
+ boolean enableOMRatis, boolean isAclEnabled, boolean noFlush) {
// Ignored. Actual init done in initParam().
// This empty constructor is still required to avoid argument exception.
}
@Parameterized.BeforeParam
- public static void initParam(boolean setDefaultFs,
- boolean enableOMRatis, boolean isAclEnabled)
+ public static void initParam(boolean setDefaultFs, boolean enableOMRatis,
+ boolean isAclEnabled, boolean noFlush)
throws IOException, InterruptedException, TimeoutException {
// Initialize the cluster before EACH set of parameters
enabledFileSystemPaths = setDefaultFs;
omRatisEnabled = enableOMRatis;
enableAcl = isAclEnabled;
+ useOnlyCache = noFlush;
initClusterAndEnv();
}
@@ -221,6 +226,8 @@ public class TestRootedOzoneFileSystem {
private static boolean isBucketFSOptimized = false;
private static boolean enableAcl;
+ private static boolean useOnlyCache;
+
private static OzoneConfiguration conf;
private static MiniOzoneCluster cluster = null;
private static FileSystem fs;
@@ -294,6 +301,18 @@ public class TestRootedOzoneFileSystem {
userOfs = UGI_USER1.doAs(
(PrivilegedExceptionAction<RootedOzoneFileSystem>)()
-> (RootedOzoneFileSystem) FileSystem.get(conf));
+
+ if (useOnlyCache) {
+ if (omRatisEnabled) {
+ cluster.getOzoneManager().getOmRatisServer().getOmStateMachine()
+ .getOzoneManagerDoubleBuffer().stopDaemon();
+ } else {
+ cluster.getOzoneManager().getOmServerProtocol()
+ .getOzoneManagerDoubleBuffer().stopDaemon();
+ cluster.getOzoneManager().getOmServerProtocol()
+ .setShouldFlushCache(false);
+ }
+ }
}
protected OMMetrics getOMMetrics() {
@@ -451,8 +470,9 @@ public class TestRootedOzoneFileSystem {
ContractTestUtils.touch(fs, file4);
fileStatuses = ofs.listStatus(parent);
Assert.assertEquals(
- "FileStatus did not return all children of the directory",
- 3, fileStatuses.length);
+ "FileStatus did not return all children of" +
+ " the directory : Got " + Arrays.toString(
+ fileStatuses), 3, fileStatuses.length);
// Cleanup
fs.delete(parent, true);
@@ -568,59 +588,55 @@ public class TestRootedOzoneFileSystem {
*/
@Test
public void testListStatusIteratorOnPageSize() throws Exception {
- int[] pageSize = {
- 1, LISTING_PAGE_SIZE, LISTING_PAGE_SIZE + 1,
- LISTING_PAGE_SIZE - 1, LISTING_PAGE_SIZE + LISTING_PAGE_SIZE / 2,
- LISTING_PAGE_SIZE + LISTING_PAGE_SIZE
+ final int pageSize = 32;
+ int[] dirCounts = {
+ 1,
+ pageSize - 1,
+ pageSize,
+ pageSize + 1,
+ pageSize + pageSize / 2,
+ pageSize + pageSize
};
- for (int numDir : pageSize) {
- int range = numDir / LISTING_PAGE_SIZE;
- switch (range) {
- case 0:
- listStatusIterator(numDir);
- break;
- case 1:
- listStatusIterator(numDir);
- break;
- case 2:
- listStatusIterator(numDir);
- break;
- default:
- listStatusIterator(numDir);
- }
- }
- }
-
- private void listStatusIterator(int numDirs) throws IOException {
+ OzoneConfiguration config = new OzoneConfiguration(conf);
+ config.setInt(OZONE_FS_LISTING_PAGE_SIZE, pageSize);
+ URI uri = FileSystem.getDefaultUri(config);
+ config.setBoolean(
+ String.format("fs.%s.impl.disable.cache", uri.getScheme()), true);
+ FileSystem subject = FileSystem.get(uri, config);
Path root = new Path("/" + volumeName + "/" + bucketName);
- Set<String> paths = new TreeSet<>();
+ Path dir = new Path(root, "listStatusIterator");
try {
- for (int i = 0; i < numDirs; i++) {
- Path p = new Path(root, String.valueOf(i));
- fs.mkdirs(p);
- paths.add(p.getName());
+ Set<String> paths = new TreeSet<>();
+ for (int dirCount : dirCounts) {
+ listStatusIterator(subject, dir, paths, dirCount);
}
+ } finally {
+ subject.delete(dir, true);
+ }
+ }
- RemoteIterator<FileStatus> iterator = ofs.listStatusIterator(root);
- int iCount = 0;
- if (iterator != null) {
- while (iterator.hasNext()) {
- FileStatus fileStatus = iterator.next();
- iCount++;
- Assert.assertTrue(paths.contains(fileStatus.getPath().getName()));
- }
- }
- Assert.assertEquals(
- "Total directories listed do not match the existing directories",
- numDirs, iCount);
+ private static void listStatusIterator(FileSystem subject,
+ Path dir, Set<String> paths, int total) throws IOException {
+ for (int i = paths.size(); i < total; i++) {
+ Path p = new Path(dir, String.valueOf(i));
+ subject.mkdirs(p);
+ paths.add(p.getName());
+ }
- } finally {
- // Cleanup
- for (int i = 0; i < numDirs; i++) {
- Path p = new Path(root, String.valueOf(i));
- fs.delete(p, true);
+ RemoteIterator<FileStatus> iterator = subject.listStatusIterator(dir);
+ int iCount = 0;
+ if (iterator != null) {
+ while (iterator.hasNext()) {
+ FileStatus fileStatus = iterator.next();
+ iCount++;
+ String filename = fileStatus.getPath().getName();
+ assertTrue(filename + " not found", paths.contains(filename));
}
}
+
+ assertEquals(
+ "Total directories listed do not match the existing directories",
+ total, iCount);
}
/**
@@ -2429,6 +2445,9 @@ public class TestRootedOzoneFileSystem {
@Test
public void testSnapshotRead() throws Exception {
+ if (useOnlyCache) {
+ return;
+ }
// Init data
OzoneBucket bucket1 =
TestDataUtil.createVolumeAndBucket(client, bucketLayout);
@@ -2475,6 +2494,9 @@ public class TestRootedOzoneFileSystem {
@Test
public void testSnapshotDiff() throws Exception {
+ if (useOnlyCache) {
+ return;
+ }
OzoneBucket bucket1 =
TestDataUtil.createVolumeAndBucket(client, bucketLayout);
Path volumePath1 = new Path(OZONE_URI_DELIMITER, bucket1.getVolumeName());
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystemWithFSO.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystemWithFSO.java
index d1599d8689..b4005169f8 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystemWithFSO.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystemWithFSO.java
@@ -56,14 +56,16 @@ public class TestRootedOzoneFileSystemWithFSO
@Parameterized.Parameters
public static Collection<Object[]> data() {
return Arrays.asList(
- new Object[]{true, true, false},
- new Object[]{true, false, false}
+ new Object[]{true, true, false, false},
+ new Object[]{true, false, false, false},
+ new Object[]{true, true, false, true},
+ new Object[]{true, false, false, true}
);
}
public TestRootedOzoneFileSystemWithFSO(boolean setDefaultFs,
- boolean enableOMRatis, boolean enableAcl) {
- super(setDefaultFs, enableOMRatis, enableAcl);
+ boolean enableOMRatis, boolean isAclEnabled, boolean noFlush) {
+ super(setDefaultFs, enableOMRatis, isAclEnabled, noFlush);
}
@BeforeClass
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index a230882112..1e62c5c57c 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -1441,7 +1441,13 @@ public class KeyManagerImpl implements KeyManager {
private void listStatusFindKeyInTableCache(
Iterator<Map.Entry<CacheKey<String>, CacheValue<OmKeyInfo>>> cacheIter,
String keyArgs, String startCacheKey, boolean recursive,
- TreeMap<String, OzoneFileStatus> cacheKeyMap) {
+ TreeMap<String, OzoneFileStatus> cacheKeyMap) throws IOException {
+
+ Map<String, OmKeyInfo> remainingKeys = new HashMap<>();
+ // extract the /volume/buck/ prefix from the startCacheKey
+ int volBuckEndIndex = StringUtils.ordinalIndexOf(
+ startCacheKey, OZONE_URI_DELIMITER, 3);
+ String volumeBuckPrefix = startCacheKey.substring(0, volBuckEndIndex + 1);
while (cacheIter.hasNext()) {
Map.Entry<CacheKey<String>, CacheValue<OmKeyInfo>> entry =
@@ -1452,14 +1458,14 @@ public class KeyManagerImpl implements KeyManager {
}
OmKeyInfo cacheOmKeyInfo = entry.getValue().getCacheValue();
// cacheOmKeyInfo is null if an entry is deleted in cache
- if (cacheOmKeyInfo != null
- && cacheKey.startsWith(startCacheKey)
- && cacheKey.compareTo(startCacheKey) >= 0) {
+ if (cacheOmKeyInfo != null && cacheKey.startsWith(
+ keyArgs) && cacheKey.compareTo(startCacheKey) >= 0) {
if (!recursive) {
String remainingKey = StringUtils.stripEnd(cacheKey.substring(
- startCacheKey.length()), OZONE_URI_DELIMITER);
+ keyArgs.length()), OZONE_URI_DELIMITER);
// For non-recursive, the remaining part of key can't have '/'
if (remainingKey.contains(OZONE_URI_DELIMITER)) {
+ remainingKeys.put(cacheKey, cacheOmKeyInfo);
continue;
}
}
@@ -1474,6 +1480,31 @@ public class KeyManagerImpl implements KeyManager {
cacheKeyMap.put(cacheKey, null);
}
}
+
+ // let's say fsPaths is disabled, then creating a key like a/b/c
+ // will not create intermediate keys in the keyTable so only entry
+ // in the keyTable would be {a/b/c}. This would be skipped from getting
+ // added to cacheKeyMap above as remainingKey would be {b/c} and it
+ // contains the slash, In this case we track such keys which are not added
+ // to the map, find the immediate child and check if they are present in
+ // the map. If not create a fake dir and add it. This is similar to the
+ // logic in findKeyInDbWithIterator.
+ if (!recursive) {
+ for (Map.Entry<String, OmKeyInfo> entry : remainingKeys.entrySet()) {
+ String remainingKey = entry.getKey();
+ String immediateChild =
+ OzoneFSUtils.getImmediateChild(remainingKey, keyArgs);
+ if (!cacheKeyMap.containsKey(immediateChild)) {
+ // immediateChild contains volume/bucket prefix remove it.
+ String immediateChildKeyName =
+ immediateChild.replaceAll(volumeBuckPrefix, "");
+ OmKeyInfo fakeDirEntry =
+ createDirectoryKey(entry.getValue(), immediateChildKeyName);
+ cacheKeyMap.put(immediateChild,
+ new OzoneFileStatus(fakeDirEntry, scmBlockSize, true));
+ }
+ }
+ }
}
/**
@@ -1521,14 +1552,15 @@ public class KeyManagerImpl implements KeyManager {
String startKey, long numEntries, String clientAddress,
boolean allowPartialPrefixes) throws IOException {
Preconditions.checkNotNull(args, "Key args can not be null");
- String volName = args.getVolumeName();
- String buckName = args.getBucketName();
+ String volumeName = args.getVolumeName();
+ String bucketName = args.getBucketName();
+ String keyName = args.getKeyName();
List<OzoneFileStatus> fileStatusList = new ArrayList<>();
if (numEntries <= 0) {
return fileStatusList;
}
- if (isBucketFSOptimized(volName, buckName)) {
+ if (isBucketFSOptimized(volumeName, bucketName)) {
Preconditions.checkArgument(!recursive);
OzoneListStatusHelper statusHelper =
new OzoneListStatusHelper(metadataManager, scmBlockSize,
@@ -1540,9 +1572,6 @@ public class KeyManagerImpl implements KeyManager {
return buildFinalStatusList(statuses, args, clientAddress);
}
- String volumeName = args.getVolumeName();
- String bucketName = args.getBucketName();
- String keyName = args.getKeyName();
// A map sorted by OmKey to combine results from TableCache and DB.
TreeMap<String, OzoneFileStatus> cacheKeyMap = new TreeMap<>();
@@ -1564,8 +1593,8 @@ public class KeyManagerImpl implements KeyManager {
metadataManager.getLock().acquireReadLock(BUCKET_LOCK, volumeName,
bucketName);
try {
- keyTable = metadataManager
- .getKeyTable(getBucketLayout(metadataManager, volName, buckName));
+ keyTable = metadataManager.getKeyTable(
+ getBucketLayout(metadataManager, volumeName, bucketName));
iterator = getIteratorForKeyInTableCache(recursive, startKey,
volumeName, bucketName, cacheKeyMap, keyArgs, keyTable);
} finally {
@@ -1687,7 +1716,11 @@ public class KeyManagerImpl implements KeyManager {
if (!entryKeyName.equals(immediateChild)) {
OmKeyInfo fakeDirEntry = createDirectoryKey(
omKeyInfo, immediateChild);
- cacheKeyMap.put(entryInDb,
+ String fakeDirKey = ozoneManager.getMetadataManager()
+ .getOzoneKey(fakeDirEntry.getVolumeName(),
+ fakeDirEntry.getBucketName(),
+ fakeDirEntry.getKeyName());
+ cacheKeyMap.put(fakeDirKey,
new OzoneFileStatus(fakeDirEntry,
scmBlockSize, true));
} else {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index e6b027427d..5616048ffd 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -1486,6 +1486,9 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
Map.Entry<CacheKey<String>, CacheValue<OmVolumeArgs>> entry =
cacheIterator.next();
omVolumeArgs = entry.getValue().getCacheValue();
+ if (omVolumeArgs == null) {
+ continue;
+ }
volumeName = omVolumeArgs.getVolume();
if (!prefixIsEmpty && !volumeName.startsWith(prefix)) {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java
index 115a8c5f8e..bc650339db 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java
@@ -308,6 +308,9 @@ public class TrashOzoneFileSystem extends FileSystem {
Map.Entry<CacheKey<String>, CacheValue<OmBucketInfo>> entry =
bucketIterator.next();
OmBucketInfo omBucketInfo = entry.getValue().getCacheValue();
+ if (omBucketInfo == null) {
+ continue;
+ }
Path volumePath = new Path(OZONE_URI_DELIMITER,
omBucketInfo.getVolumeName());
Path bucketPath = new Path(volumePath, omBucketInfo.getBucketName());
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 0551e6dfc7..9aefa41f46 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -753,4 +753,9 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
public void awaitDoubleBufferFlush() throws InterruptedException {
ozoneManagerDoubleBuffer.awaitFlush();
}
+
+ @VisibleForTesting
+ public OzoneManagerDoubleBuffer getOzoneManagerDoubleBuffer() {
+ return ozoneManagerDoubleBuffer;
+ }
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index c8e9b679cf..d26cce0c13 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -77,6 +77,9 @@ public class OzoneManagerProtocolServerSideTranslatorPB
implements
ProtocolMessageEnum> dispatcher;
private final RequestValidations requestValidations;
+ // always true, only used in tests
+ private boolean shouldFlushCache = true;
+
/**
* Constructs an instance of the server handler.
*
@@ -284,7 +287,9 @@ public class OzoneManagerProtocolServerSideTranslatorPB
implements
return createErrorResponse(request, ex);
}
try {
- omClientResponse.getFlushFuture().get();
+ if (shouldFlushCache) {
+ omClientResponse.getFlushFuture().get();
+ }
if (LOG.isTraceEnabled()) {
LOG.trace("Future for {} is completed", request);
}
@@ -340,4 +345,14 @@ public class OzoneManagerProtocolServerSideTranslatorPB
implements
public void awaitDoubleBufferFlush() throws InterruptedException {
ozoneManagerDoubleBuffer.awaitFlush();
}
+
+ @VisibleForTesting
+ public OzoneManagerDoubleBuffer getOzoneManagerDoubleBuffer() {
+ return ozoneManagerDoubleBuffer;
+ }
+
+ @VisibleForTesting
+ public void setShouldFlushCache(boolean shouldFlushCache) {
+ this.shouldFlushCache = shouldFlushCache;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]