This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4370178eb0b [HUDI-7927] Lazy init secondary view in FS view (#10652)
4370178eb0b is described below
commit 4370178eb0b8d1adad5148a2967f60b921568b27
Author: Tim Brown <[email protected]>
AuthorDate: Tue Jun 25 19:34:59 2024 -0500
[HUDI-7927] Lazy init secondary view in FS view (#10652)
---
.../common/table/view/FileSystemViewManager.java | 50 ++++++++----
.../table/view/PriorityBasedFileSystemView.java | 90 ++++++++++++----------
.../view/TestPriorityBasedFileSystemView.java | 83 +++++++++++++++++++-
3 files changed, 164 insertions(+), 59 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
index 7b729dacac4..d875168085c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieMetaserverConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableFunctionUnchecked;
+import org.apache.hudi.common.function.SerializableSupplier;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Functions.Function2;
@@ -260,25 +261,42 @@ public class FileSystemViewManager {
return new FileSystemViewManager(context, config, (metaClient,
viewConfig) -> {
RemoteHoodieTableFileSystemView remoteFileSystemView =
createRemoteFileSystemView(viewConfig, metaClient);
- SyncableFileSystemView secondaryView;
- switch (viewConfig.getSecondaryStorageType()) {
- case MEMORY:
- secondaryView = createInMemoryFileSystemView(viewConfig,
metaClient, metadataCreator);
- break;
- case EMBEDDED_KV_STORE:
- secondaryView = createRocksDBBasedFileSystemView(viewConfig,
metaClient);
- break;
- case SPILLABLE_DISK:
- secondaryView =
createSpillableMapBasedFileSystemView(viewConfig, metaClient, commonConfig);
- break;
- default:
- throw new IllegalArgumentException("Secondary Storage type can
only be in-memory or spillable. Was :"
- + viewConfig.getSecondaryStorageType());
- }
- return new PriorityBasedFileSystemView(remoteFileSystemView,
secondaryView);
+ SerializableSupplier<SyncableFileSystemView> secondaryViewSupplier =
new SecondaryViewSupplier(viewConfig, metaClient, commonConfig,
metadataCreator);
+ return new PriorityBasedFileSystemView(remoteFileSystemView,
secondaryViewSupplier);
});
default:
throw new IllegalArgumentException("Unknown file system view type :" +
config.getStorageType());
}
}
+
+ private static class SecondaryViewSupplier implements
SerializableSupplier<SyncableFileSystemView> {
+ private final FileSystemViewStorageConfig viewConfig;
+ private final HoodieTableMetaClient metaClient;
+ private final HoodieCommonConfig commonConfig;
+ private final SerializableFunctionUnchecked<HoodieTableMetaClient,
HoodieTableMetadata> metadataCreator;
+
+ private SecondaryViewSupplier(FileSystemViewStorageConfig viewConfig,
+ HoodieTableMetaClient metaClient,
HoodieCommonConfig commonConfig,
+
SerializableFunctionUnchecked<HoodieTableMetaClient, HoodieTableMetadata>
metadataCreator) {
+ this.viewConfig = viewConfig;
+ this.metaClient = metaClient;
+ this.commonConfig = commonConfig;
+ this.metadataCreator = metadataCreator;
+ }
+
+ @Override
+ public SyncableFileSystemView get() {
+ switch (viewConfig.getSecondaryStorageType()) {
+ case MEMORY:
+ return createInMemoryFileSystemView(viewConfig, metaClient,
metadataCreator);
+ case EMBEDDED_KV_STORE:
+ return createRocksDBBasedFileSystemView(viewConfig, metaClient);
+ case SPILLABLE_DISK:
+ return createSpillableMapBasedFileSystemView(viewConfig, metaClient,
commonConfig);
+ default:
+ throw new IllegalArgumentException("Secondary Storage type can only
be in-memory or spillable. Was :"
+ + viewConfig.getSecondaryStorageType());
+ }
+ }
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
index 8cfd6d64713..87fb73893a7 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.table.view;
+import org.apache.hudi.common.function.SerializableSupplier;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
@@ -51,12 +52,14 @@ public class PriorityBasedFileSystemView implements
SyncableFileSystemView, Seri
private static final Logger LOG =
LoggerFactory.getLogger(PriorityBasedFileSystemView.class);
private final SyncableFileSystemView preferredView;
- private final SyncableFileSystemView secondaryView;
+ private final SerializableSupplier<SyncableFileSystemView>
secondaryViewSupplier;
+ private SyncableFileSystemView secondaryView;
private boolean errorOnPreferredView;
- public PriorityBasedFileSystemView(SyncableFileSystemView preferredView,
SyncableFileSystemView secondaryView) {
+ public PriorityBasedFileSystemView(SyncableFileSystemView preferredView,
SerializableSupplier<SyncableFileSystemView> secondaryViewSupplier) {
this.preferredView = preferredView;
- this.secondaryView = secondaryView;
+ this.secondaryViewSupplier = secondaryViewSupplier;
+ this.secondaryView = null; // only initialize secondary view when required
this.errorOnPreferredView = false;
}
@@ -132,39 +135,39 @@ public class PriorityBasedFileSystemView implements
SyncableFileSystemView, Seri
@Override
public Stream<HoodieBaseFile> getLatestBaseFiles(String partitionPath) {
- return execute(partitionPath, preferredView::getLatestBaseFiles,
secondaryView::getLatestBaseFiles);
+ return execute(partitionPath, preferredView::getLatestBaseFiles, (path) ->
getSecondaryView().getLatestBaseFiles(path));
}
@Override
public Stream<HoodieBaseFile> getLatestBaseFiles() {
- return execute(preferredView::getLatestBaseFiles,
secondaryView::getLatestBaseFiles);
+ return execute(preferredView::getLatestBaseFiles, () ->
getSecondaryView().getLatestBaseFiles());
}
@Override
public Stream<HoodieBaseFile> getLatestBaseFilesBeforeOrOn(String
partitionPath, String maxCommitTime) {
return execute(partitionPath, maxCommitTime,
preferredView::getLatestBaseFilesBeforeOrOn,
- secondaryView::getLatestBaseFilesBeforeOrOn);
+ (path, commitTime) ->
getSecondaryView().getLatestBaseFilesBeforeOrOn(path, commitTime));
}
@Override
public Map<String, Stream<HoodieBaseFile>>
getAllLatestBaseFilesBeforeOrOn(String maxCommitTime) {
return execute(maxCommitTime,
preferredView::getAllLatestBaseFilesBeforeOrOn,
- secondaryView::getAllLatestBaseFilesBeforeOrOn);
+ (commitTime) ->
getSecondaryView().getAllLatestBaseFilesBeforeOrOn(commitTime));
}
@Override
public Option<HoodieBaseFile> getLatestBaseFile(String partitionPath, String
fileId) {
- return execute(partitionPath, fileId, preferredView::getLatestBaseFile,
secondaryView::getLatestBaseFile);
+ return execute(partitionPath, fileId, preferredView::getLatestBaseFile,
(path, id) -> getSecondaryView().getLatestBaseFile(path, id));
}
@Override
public Option<HoodieBaseFile> getBaseFileOn(String partitionPath, String
instantTime, String fileId) {
- return execute(partitionPath, instantTime, fileId,
preferredView::getBaseFileOn, secondaryView::getBaseFileOn);
+ return execute(partitionPath, instantTime, fileId,
preferredView::getBaseFileOn, (path, instant, id) ->
getSecondaryView().getBaseFileOn(path, instant, id));
}
@Override
public Stream<HoodieBaseFile> getLatestBaseFilesInRange(List<String>
commitsToReturn) {
- return execute(commitsToReturn, preferredView::getLatestBaseFilesInRange,
secondaryView::getLatestBaseFilesInRange);
+ return execute(commitsToReturn, preferredView::getLatestBaseFilesInRange,
(commits) -> getSecondaryView().getLatestBaseFilesInRange(commits));
}
@Override
@@ -175,7 +178,7 @@ public class PriorityBasedFileSystemView implements
SyncableFileSystemView, Seri
return null;
},
() -> {
- secondaryView.loadAllPartitions();
+ getSecondaryView().loadAllPartitions();
return null;
});
}
@@ -188,151 +191,160 @@ public class PriorityBasedFileSystemView implements
SyncableFileSystemView, Seri
return null;
},
() -> {
- secondaryView.loadPartitions(partitionPaths);
+ getSecondaryView().loadPartitions(partitionPaths);
return null;
});
}
@Override
public Stream<HoodieBaseFile> getAllBaseFiles(String partitionPath) {
- return execute(partitionPath, preferredView::getAllBaseFiles,
secondaryView::getAllBaseFiles);
+ return execute(partitionPath, preferredView::getAllBaseFiles, (path) ->
getSecondaryView().getAllBaseFiles(path));
}
@Override
public Stream<FileSlice> getLatestFileSlices(String partitionPath) {
- return execute(partitionPath, preferredView::getLatestFileSlices,
secondaryView::getLatestFileSlices);
+ return execute(partitionPath, preferredView::getLatestFileSlices, (path)
-> getSecondaryView().getLatestFileSlices(path));
}
@Override
public Stream<FileSlice> getLatestFileSlicesIncludingInflight(String
partitionPath) {
- return execute(partitionPath,
preferredView::getLatestFileSlicesIncludingInflight,
secondaryView::getLatestFileSlicesIncludingInflight);
+ return execute(partitionPath,
preferredView::getLatestFileSlicesIncludingInflight, (path) ->
getSecondaryView().getLatestFileSlicesIncludingInflight(path));
}
@Override
public Stream<FileSlice> getLatestFileSlicesStateless(String partitionPath) {
- return execute(partitionPath, preferredView::getLatestFileSlicesStateless,
secondaryView::getLatestFileSlicesStateless);
+ return execute(partitionPath, preferredView::getLatestFileSlicesStateless,
(path) -> getSecondaryView().getLatestFileSlicesStateless(path));
}
@Override
public Stream<FileSlice> getLatestUnCompactedFileSlices(String
partitionPath) {
return execute(partitionPath,
preferredView::getLatestUnCompactedFileSlices,
- secondaryView::getLatestUnCompactedFileSlices);
+ (path) -> getSecondaryView().getLatestUnCompactedFileSlices(path));
}
@Override
public Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath,
String maxCommitTime,
boolean includeFileSlicesInPendingCompaction) {
return execute(partitionPath, maxCommitTime,
includeFileSlicesInPendingCompaction,
- preferredView::getLatestFileSlicesBeforeOrOn,
secondaryView::getLatestFileSlicesBeforeOrOn);
+ preferredView::getLatestFileSlicesBeforeOrOn, (path, commitTime,
includeSlices) -> getSecondaryView().getLatestFileSlicesBeforeOrOn(path,
commitTime, includeSlices));
}
@Override
public Map<String, Stream<FileSlice>>
getAllLatestFileSlicesBeforeOrOn(String maxCommitTime) {
return execute(maxCommitTime,
preferredView::getAllLatestFileSlicesBeforeOrOn,
- secondaryView::getAllLatestFileSlicesBeforeOrOn);
+ (instantTime) ->
getSecondaryView().getAllLatestFileSlicesBeforeOrOn(instantTime));
}
@Override
public Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String
partitionPath, String maxInstantTime) {
return execute(partitionPath, maxInstantTime,
preferredView::getLatestMergedFileSlicesBeforeOrOn,
- secondaryView::getLatestMergedFileSlicesBeforeOrOn);
+ (path, instantTime) ->
getSecondaryView().getLatestMergedFileSlicesBeforeOrOn(path, instantTime));
}
@Override
public Stream<FileSlice> getLatestFileSliceInRange(List<String>
commitsToReturn) {
- return execute(commitsToReturn, preferredView::getLatestFileSliceInRange,
secondaryView::getLatestFileSliceInRange);
+ return execute(commitsToReturn, preferredView::getLatestFileSliceInRange,
(commits) -> getSecondaryView().getLatestFileSliceInRange(commits));
}
@Override
public Stream<FileSlice> getAllFileSlices(String partitionPath) {
- return execute(partitionPath, preferredView::getAllFileSlices,
secondaryView::getAllFileSlices);
+ return execute(partitionPath, preferredView::getAllFileSlices, (path) ->
getSecondaryView().getAllFileSlices(path));
}
@Override
public Stream<HoodieFileGroup> getAllFileGroups(String partitionPath) {
- return execute(partitionPath, preferredView::getAllFileGroups,
secondaryView::getAllFileGroups);
+ return execute(partitionPath, preferredView::getAllFileGroups, (path) ->
getSecondaryView().getAllFileGroups(path));
}
@Override
public Stream<HoodieFileGroup> getAllFileGroupsStateless(String
partitionPath) {
- return execute(partitionPath, preferredView::getAllFileGroupsStateless,
secondaryView::getAllFileGroupsStateless);
+ return execute(partitionPath, preferredView::getAllFileGroupsStateless,
(path) -> getSecondaryView().getAllFileGroupsStateless(path));
}
@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String
maxCommitTime, String partitionPath) {
- return execute(maxCommitTime, partitionPath,
preferredView::getReplacedFileGroupsBeforeOrOn,
secondaryView::getReplacedFileGroupsBeforeOrOn);
+ return execute(maxCommitTime, partitionPath,
preferredView::getReplacedFileGroupsBeforeOrOn, (commitTime, path) ->
getSecondaryView().getReplacedFileGroupsBeforeOrOn(commitTime, path));
}
@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String
maxCommitTime, String partitionPath) {
- return execute(maxCommitTime, partitionPath,
preferredView::getReplacedFileGroupsBefore,
secondaryView::getReplacedFileGroupsBefore);
+ return execute(maxCommitTime, partitionPath,
preferredView::getReplacedFileGroupsBefore, (commitTime, path) ->
getSecondaryView().getReplacedFileGroupsBefore(commitTime, path));
}
@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsAfterOrOn(String
minCommitTime, String partitionPath) {
- return execute(minCommitTime, partitionPath,
preferredView::getReplacedFileGroupsAfterOrOn,
secondaryView::getReplacedFileGroupsAfterOrOn);
+ return execute(minCommitTime, partitionPath,
preferredView::getReplacedFileGroupsAfterOrOn, (commitTime, path) ->
getSecondaryView().getReplacedFileGroupsAfterOrOn(commitTime, path));
}
@Override
public Stream<HoodieFileGroup> getAllReplacedFileGroups(String
partitionPath) {
- return execute(partitionPath, preferredView::getAllReplacedFileGroups,
secondaryView::getAllReplacedFileGroups);
+ return execute(partitionPath, preferredView::getAllReplacedFileGroups,
(path) -> getSecondaryView().getAllReplacedFileGroups(path));
}
@Override
public Stream<Pair<String, CompactionOperation>>
getPendingCompactionOperations() {
- return execute(preferredView::getPendingCompactionOperations,
secondaryView::getPendingCompactionOperations);
+ return execute(preferredView::getPendingCompactionOperations, () ->
getSecondaryView().getPendingCompactionOperations());
}
@Override
public Stream<Pair<String, CompactionOperation>>
getPendingLogCompactionOperations() {
- return execute(preferredView::getPendingLogCompactionOperations,
secondaryView::getPendingLogCompactionOperations);
+ return execute(preferredView::getPendingLogCompactionOperations, () ->
getSecondaryView().getPendingLogCompactionOperations());
}
@Override
public Stream<Pair<HoodieFileGroupId, HoodieInstant>>
getFileGroupsInPendingClustering() {
- return execute(preferredView::getFileGroupsInPendingClustering,
secondaryView::getFileGroupsInPendingClustering);
+ return execute(preferredView::getFileGroupsInPendingClustering, () ->
getSecondaryView().getFileGroupsInPendingClustering());
}
@Override
public void close() {
preferredView.close();
- secondaryView.close();
+ if (secondaryView != null) {
+ secondaryView.close();
+ }
}
@Override
public void reset() {
preferredView.reset();
- secondaryView.reset();
+ if (secondaryView != null) {
+ secondaryView.reset();
+ }
errorOnPreferredView = false;
}
@Override
public Option<HoodieInstant> getLastInstant() {
- return execute(preferredView::getLastInstant,
secondaryView::getLastInstant);
+ return execute(preferredView::getLastInstant, () ->
getSecondaryView().getLastInstant());
}
@Override
public HoodieTimeline getTimeline() {
- return execute(preferredView::getTimeline, secondaryView::getTimeline);
+ return execute(preferredView::getTimeline, () ->
getSecondaryView().getTimeline());
}
@Override
public void sync() {
preferredView.sync();
- secondaryView.sync();
+ if (secondaryView != null) {
+ secondaryView.sync();
+ }
errorOnPreferredView = false;
}
@Override
public Option<FileSlice> getLatestFileSlice(String partitionPath, String
fileId) {
- return execute(partitionPath, fileId, preferredView::getLatestFileSlice,
secondaryView::getLatestFileSlice);
+ return execute(partitionPath, fileId, preferredView::getLatestFileSlice,
(path, fgId) -> getSecondaryView().getLatestFileSlice(path, fgId));
}
- public SyncableFileSystemView getPreferredView() {
+ SyncableFileSystemView getPreferredView() {
return preferredView;
}
- public SyncableFileSystemView getSecondaryView() {
+ synchronized SyncableFileSystemView getSecondaryView() {
+ if (secondaryView == null) {
+ secondaryView = secondaryViewSupplier.get();
+ }
return secondaryView;
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java
index 3d9a142e055..9ca9d88f874 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.table.view;
+import org.apache.hudi.common.function.SerializableSupplier;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
@@ -72,6 +73,8 @@ public class TestPriorityBasedFileSystemView {
@Mock
private SyncableFileSystemView secondary;
+ @Mock
+ private SerializableSupplier<SyncableFileSystemView> secondaryViewSupplier;
@InjectMocks
private PriorityBasedFileSystemView fsView;
@@ -81,7 +84,7 @@ public class TestPriorityBasedFileSystemView {
@BeforeEach
public void setUp() {
- fsView = new PriorityBasedFileSystemView(primary, secondary);
+ fsView = new PriorityBasedFileSystemView(primary, secondaryViewSupplier);
testBaseFileStream = Stream.of(new HoodieBaseFile("test"));
testFileSliceStream = Stream.of(new FileSlice("2020-01-01", "20:20",
"file0001" +
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension()));
@@ -106,8 +109,10 @@ public class TestPriorityBasedFileSystemView {
when(primary.getLatestBaseFiles()).thenReturn(testBaseFileStream);
actual = fsView.getLatestBaseFiles();
assertEquals(expected, actual);
+ verify(secondaryViewSupplier, never()).get();
resetMocks();
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
when(primary.getLatestBaseFiles()).thenThrow(new RuntimeException());
when(secondary.getLatestBaseFiles()).thenReturn(testBaseFileStream);
actual = fsView.getLatestBaseFiles();
@@ -136,6 +141,7 @@ public class TestPriorityBasedFileSystemView {
Stream<HoodieBaseFile> expected = testBaseFileStream;
resetMocks();
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
when(primary.getLatestBaseFiles()).thenThrow(new RuntimeException(new
HttpResponseException(400, "Bad Request")));
when(secondary.getLatestBaseFiles()).thenReturn(testBaseFileStream);
actual = fsView.getLatestBaseFiles();
@@ -159,8 +165,10 @@ public class TestPriorityBasedFileSystemView {
when(primary.getLatestBaseFiles(partitionPath)).thenReturn(testBaseFileStream);
actual = fsView.getLatestBaseFiles(partitionPath);
assertEquals(expected, actual);
+ verify(secondaryViewSupplier, never()).get();
resetMocks();
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
when(primary.getLatestBaseFiles(partitionPath)).thenThrow(new
RuntimeException());
when(secondary.getLatestBaseFiles(partitionPath)).thenReturn(testBaseFileStream);
actual = fsView.getLatestBaseFiles(partitionPath);
@@ -189,8 +197,10 @@ public class TestPriorityBasedFileSystemView {
.thenReturn(testBaseFileStream);
actual = fsView.getLatestBaseFilesBeforeOrOn(partitionPath, maxCommitTime);
assertEquals(expected, actual);
+ verify(secondaryViewSupplier, never()).get();
resetMocks();
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
when(primary.getLatestBaseFilesBeforeOrOn(partitionPath, maxCommitTime))
.thenThrow(new RuntimeException());
when(secondary.getLatestBaseFilesBeforeOrOn(partitionPath, maxCommitTime))
@@ -222,8 +232,10 @@ public class TestPriorityBasedFileSystemView {
when(primary.getLatestBaseFile(partitionPath,
fileID)).thenReturn(expected);
actual = fsView.getLatestBaseFile(partitionPath, fileID);
assertEquals(expected, actual);
+ verify(secondaryViewSupplier, never()).get();
resetMocks();
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
when(primary.getLatestBaseFile(partitionPath, fileID)).thenThrow(new
RuntimeException());
when(secondary.getLatestBaseFile(partitionPath,
fileID)).thenReturn(expected);
actual = fsView.getLatestBaseFile(partitionPath, fileID);
@@ -252,8 +264,10 @@ public class TestPriorityBasedFileSystemView {
when(primary.getBaseFileOn(partitionPath, instantTime,
fileID)).thenReturn(expected);
actual = fsView.getBaseFileOn(partitionPath, instantTime, fileID);
assertEquals(expected, actual);
+ verify(secondaryViewSupplier, never()).get();
resetMocks();
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
when(primary.getBaseFileOn(partitionPath, instantTime, fileID))
.thenThrow(new RuntimeException());
when(secondary.getBaseFileOn(partitionPath, instantTime,
fileID)).thenReturn(expected);
@@ -282,8 +296,10 @@ public class TestPriorityBasedFileSystemView {
when(primary.getLatestBaseFilesInRange(commitsToReturn)).thenReturn(testBaseFileStream);
actual = fsView.getLatestBaseFilesInRange(commitsToReturn);
assertEquals(expected, actual);
+ verify(secondaryViewSupplier, never()).get();
resetMocks();
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
when(primary.getLatestBaseFilesInRange(commitsToReturn)).thenThrow(new
RuntimeException());
when(secondary.getLatestBaseFilesInRange(commitsToReturn)).thenReturn(testBaseFileStream);
actual = fsView.getLatestBaseFilesInRange(commitsToReturn);
@@ -310,8 +326,10 @@ public class TestPriorityBasedFileSystemView {
when(primary.getAllBaseFiles(partitionPath)).thenReturn(testBaseFileStream);
actual = fsView.getAllBaseFiles(partitionPath);
assertEquals(expected, actual);
+ verify(secondaryViewSupplier, never()).get();
resetMocks();
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
when(primary.getAllBaseFiles(partitionPath)).thenThrow(new
RuntimeException());
when(secondary.getAllBaseFiles(partitionPath)).thenReturn(testBaseFileStream);
actual = fsView.getAllBaseFiles(partitionPath);
@@ -338,8 +356,10 @@ public class TestPriorityBasedFileSystemView {
when(primary.getLatestFileSlices(partitionPath)).thenReturn(testFileSliceStream);
actual = fsView.getLatestFileSlices(partitionPath);
assertEquals(expected, actual);
+ verify(secondaryViewSupplier, never()).get();
resetMocks();
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
when(primary.getLatestFileSlices(partitionPath)).thenThrow(new
RuntimeException());
when(secondary.getLatestFileSlices(partitionPath)).thenReturn(testFileSliceStream);
actual = fsView.getLatestFileSlices(partitionPath);
@@ -366,8 +386,10 @@ public class TestPriorityBasedFileSystemView {
when(primary.getLatestFileSlicesIncludingInflight(partitionPath)).thenReturn(testFileSliceStream);
actual = fsView.getLatestFileSlicesIncludingInflight(partitionPath);
assertEquals(expected, actual);
+ verify(secondaryViewSupplier, never()).get();
resetMocks();
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
when(primary.getLatestFileSlicesIncludingInflight(partitionPath)).thenThrow(new
RuntimeException());
when(secondary.getLatestFileSlicesIncludingInflight(partitionPath)).thenReturn(testFileSliceStream);
actual = fsView.getLatestFileSlicesIncludingInflight(partitionPath);
@@ -392,8 +414,10 @@ public class TestPriorityBasedFileSystemView {
when(primary.getLatestUnCompactedFileSlices(partitionPath)).thenReturn(testFileSliceStream);
actual = fsView.getLatestUnCompactedFileSlices(partitionPath);
assertEquals(expected, actual);
+ verify(secondaryViewSupplier, never()).get();
resetMocks();
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
when(primary.getLatestUnCompactedFileSlices(partitionPath)).thenThrow(new
RuntimeException());
when(secondary.getLatestUnCompactedFileSlices(partitionPath)).thenReturn(testFileSliceStream);
actual = fsView.getLatestUnCompactedFileSlices(partitionPath);
@@ -422,8 +446,10 @@ public class TestPriorityBasedFileSystemView {
.thenReturn(testFileSliceStream);
actual = fsView.getLatestFileSlicesBeforeOrOn(partitionPath,
maxCommitTime, false);
assertEquals(expected, actual);
+ verify(secondaryViewSupplier, never()).get();
resetMocks();
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
when(primary.getLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime,
false))
.thenThrow(new RuntimeException());
when(secondary.getLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime,
false))
@@ -456,8 +482,10 @@ public class TestPriorityBasedFileSystemView {
.thenReturn(testFileSliceStream);
actual = fsView.getLatestMergedFileSlicesBeforeOrOn(partitionPath,
maxInstantTime);
assertEquals(expected, actual);
+ verify(secondaryViewSupplier, never()).get();
resetMocks();
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
when(primary.getLatestMergedFileSlicesBeforeOrOn(partitionPath,
maxInstantTime))
.thenThrow(new RuntimeException());
when(secondary.getLatestMergedFileSlicesBeforeOrOn(partitionPath,
maxInstantTime))
@@ -488,8 +516,10 @@ public class TestPriorityBasedFileSystemView {
when(primary.getLatestFileSliceInRange(commitsToReturn)).thenReturn(testFileSliceStream);
actual = fsView.getLatestFileSliceInRange(commitsToReturn);
assertEquals(expected, actual);
+ verify(secondaryViewSupplier, never()).get();
resetMocks();
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
when(primary.getLatestFileSliceInRange(commitsToReturn)).thenThrow(new
RuntimeException());
when(secondary.getLatestFileSliceInRange(commitsToReturn)).thenReturn(testFileSliceStream);
actual = fsView.getLatestFileSliceInRange(commitsToReturn);
@@ -516,8 +546,10 @@ public class TestPriorityBasedFileSystemView {
when(primary.getAllFileSlices(partitionPath)).thenReturn(testFileSliceStream);
actual = fsView.getAllFileSlices(partitionPath);
assertEquals(expected, actual);
+ verify(secondaryViewSupplier, never()).get();
resetMocks();
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
when(primary.getAllFileSlices(partitionPath)).thenThrow(new
RuntimeException());
when(secondary.getAllFileSlices(partitionPath)).thenReturn(testFileSliceStream);
actual = fsView.getAllFileSlices(partitionPath);
@@ -546,8 +578,10 @@ public class TestPriorityBasedFileSystemView {
when(primary.getAllFileGroups(partitionPath)).thenReturn(expected);
actual = fsView.getAllFileGroups(partitionPath);
assertEquals(expected, actual);
+ verify(secondaryViewSupplier, never()).get();
resetMocks();
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
when(primary.getAllFileGroups(partitionPath)).thenThrow(new
RuntimeException());
when(secondary.getAllFileGroups(partitionPath)).thenReturn(expected);
actual = fsView.getAllFileGroups(partitionPath);
@@ -575,8 +609,10 @@ public class TestPriorityBasedFileSystemView {
when(primary.getPendingCompactionOperations()).thenReturn(expected);
actual = fsView.getPendingCompactionOperations();
assertEquals(expected, actual);
+ verify(secondaryViewSupplier, never()).get();
resetMocks();
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
when(primary.getPendingCompactionOperations()).thenThrow(new
RuntimeException());
when(secondary.getPendingCompactionOperations()).thenReturn(expected);
actual = fsView.getPendingCompactionOperations();
@@ -604,8 +640,10 @@ public class TestPriorityBasedFileSystemView {
when(primary.getPendingLogCompactionOperations()).thenReturn(expected);
actual = fsView.getPendingLogCompactionOperations();
assertEquals(expected, actual);
+ verify(secondaryViewSupplier, never()).get();
resetMocks();
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
when(primary.getPendingLogCompactionOperations()).thenThrow(new
RuntimeException());
when(secondary.getPendingLogCompactionOperations()).thenReturn(expected);
actual = fsView.getPendingLogCompactionOperations();
@@ -624,14 +662,33 @@ public class TestPriorityBasedFileSystemView {
}
@Test
- public void testClose() {
+ public void testClose_noSecondaryInitialized() {
+ fsView.close();
+ verify(primary, times(1)).close();
+ verify(secondary, never()).close();
+ }
+
+ @Test
+ public void testClose_withSecondaryInitialized() {
+ // force secondary view to initialize
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
+ fsView.getSecondaryView();
fsView.close();
verify(primary, times(1)).close();
verify(secondary, times(1)).close();
}
@Test
- public void testReset() {
+ public void testReset_noSecondaryInitialized() {
+ fsView.reset();
+ verify(primary, times(1)).reset();
+ verify(secondary, never()).reset();
+ }
+
+ @Test
+ public void testReset_withSecondaryInitialized() {
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
+ fsView.getSecondaryView();
fsView.reset();
verify(primary, times(1)).reset();
verify(secondary, times(1)).reset();
@@ -645,8 +702,10 @@ public class TestPriorityBasedFileSystemView {
when(primary.getLastInstant()).thenReturn(expected);
actual = fsView.getLastInstant();
assertEquals(expected, actual);
+ verify(secondaryViewSupplier, never()).get();
resetMocks();
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
when(primary.getLastInstant()).thenThrow(new RuntimeException());
when(secondary.getLastInstant()).thenReturn(expected);
actual = fsView.getLastInstant();
@@ -672,8 +731,10 @@ public class TestPriorityBasedFileSystemView {
when(primary.getTimeline()).thenReturn(expected);
actual = fsView.getTimeline();
assertEquals(expected, actual);
+ verify(secondaryViewSupplier, never()).get();
resetMocks();
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
when(primary.getTimeline()).thenThrow(new RuntimeException());
when(secondary.getTimeline()).thenReturn(expected);
actual = fsView.getTimeline();
@@ -692,7 +753,16 @@ public class TestPriorityBasedFileSystemView {
}
@Test
- public void testSync() {
+ public void testSync_noSecondaryInitialized() {
+ fsView.sync();
+ verify(primary, times(1)).sync();
+ verify(secondary, never()).sync();
+ }
+
+ @Test
+ public void testSync_withSecondaryInitialized() {
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
+ fsView.getSecondaryView();
fsView.sync();
verify(primary, times(1)).sync();
verify(secondary, times(1)).sync();
@@ -708,8 +778,10 @@ public class TestPriorityBasedFileSystemView {
when(primary.getLatestFileSlice(partitionPath,
fileID)).thenReturn(expected);
actual = fsView.getLatestFileSlice(partitionPath, fileID);
assertEquals(expected, actual);
+ verify(secondaryViewSupplier, never()).get();
resetMocks();
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
when(primary.getLatestFileSlice(partitionPath, fileID)).thenThrow(new
RuntimeException());
when(secondary.getLatestFileSlice(partitionPath,
fileID)).thenReturn(expected);
actual = fsView.getLatestFileSlice(partitionPath, fileID);
@@ -734,8 +806,10 @@ public class TestPriorityBasedFileSystemView {
fsView.loadPartitions(Collections.singletonList(partitionPath));
verify(primary,
times(1)).loadPartitions(Collections.singletonList(partitionPath));
verify(secondary, never()).loadPartitions(any());
+ verify(secondaryViewSupplier, never()).get();
resetMocks();
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
doThrow(new
RuntimeException()).when(primary).loadPartitions(Collections.singletonList(partitionPath));
fsView.loadPartitions(Collections.singletonList(partitionPath));
verify(primary,
times(1)).loadPartitions(Collections.singletonList(partitionPath));
@@ -755,6 +829,7 @@ public class TestPriorityBasedFileSystemView {
@Test
public void testGetSecondaryView() {
+ when(secondaryViewSupplier.get()).thenReturn(secondary);
assertEquals(secondary, fsView.getSecondaryView());
}