dsmiley commented on a change in pull request #108:
URL: https://github.com/apache/solr/pull/108#discussion_r624374512
##########
File path:
solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectory.java
##########
@@ -56,10 +61,22 @@
private final Object lock = new Object();
private volatile boolean isOpen;
- public BlobDirectory(Directory delegate, String blobDirPath, BlobPusher
blobPusher) {
+ public BlobDirectory(Directory delegate, String blobDirPath,
BlobStoreConnection blobStoreConnection) throws IOException {
super(delegate);
this.blobDirPath = blobDirPath;
- this.blobPusher = blobPusher;
+ this.blobStoreConnection = blobStoreConnection;
+ pullMissingFilesFromRepo();
+ }
+
+ private void pullMissingFilesFromRepo() throws IOException {
+ Set<String> localFileNames = new HashSet<>(Arrays.asList(in.listAll()));
+ blobStoreConnection.pull(blobDirPath, this::openOutput, blobFile -> {
+ //TODO: We could also check the size and checksum.
Review comment:
Yeah, but this is quite adequate since the repo is dedicated to this
replica so there should be no other replicas using the same repo for this
directory, thus no risk of same files with different content.
##########
File path:
solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectory.java
##########
@@ -56,10 +61,22 @@
private final Object lock = new Object();
private volatile boolean isOpen;
- public BlobDirectory(Directory delegate, String blobDirPath, BlobPusher
blobPusher) {
+ public BlobDirectory(Directory delegate, String blobDirPath,
BlobStoreConnection blobStoreConnection) throws IOException {
super(delegate);
this.blobDirPath = blobDirPath;
- this.blobPusher = blobPusher;
+ this.blobStoreConnection = blobStoreConnection;
+ pullMissingFilesFromRepo();
+ }
+
+ private void pullMissingFilesFromRepo() throws IOException {
+ Set<String> localFileNames = new HashSet<>(Arrays.asList(in.listAll()));
+ blobStoreConnection.pull(blobDirPath, this::openOutput, blobFile -> {
+ //TODO: We could also check the size and checksum.
+ return !localFileNames.remove(blobFile.fileName());
+ });
+ for (String localFileName : localFileNames) {
Review comment:
If there is something to delete, let's log that this is true and maybe
even some names? I _think_ in practice there won't be, so INFO level at least.
##########
File path:
solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectory.java
##########
@@ -56,10 +61,22 @@
private final Object lock = new Object();
private volatile boolean isOpen;
- public BlobDirectory(Directory delegate, String blobDirPath, BlobPusher
blobPusher) {
+ public BlobDirectory(Directory delegate, String blobDirPath,
BlobStoreConnection blobStoreConnection) throws IOException {
super(delegate);
this.blobDirPath = blobDirPath;
- this.blobPusher = blobPusher;
+ this.blobStoreConnection = blobStoreConnection;
+ pullMissingFilesFromRepo();
+ }
+
+ private void pullMissingFilesFromRepo() throws IOException {
+ Set<String> localFileNames = new HashSet<>(Arrays.asList(in.listAll()));
+ blobStoreConnection.pull(blobDirPath, this::openOutput, blobFile -> {
Review comment:
If locally we're empty and we pull down files, then we should have a
INFO log message saying that.
On the other hand if there's no transfer or deletions to be done, we should
not log anything.
##########
File path:
solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobStoreConnection.java
##########
@@ -152,11 +180,66 @@ private void createDirectories(URI blobDirUri, String
blobDirPath) throws IOExce
}
}
+ private void copyStream(IndexInput input, OutputStream output) throws
IOException {
+ byte[] buffer = streamBuffers.get();
+ long remaining = input.length();
+ while (remaining > 0) {
+ int length = (int) Math.min(buffer.length, remaining);
+ input.readBytes(buffer, 0, length, false);
+ output.write(buffer, 0, length);
+ remaining -= length;
+ }
+ }
+
private void deleteFiles(String blobDirPath, Collection<String> fileNames)
throws IOException {
URI blobDirUri = repository.resolve(repositoryLocation, blobDirPath);
repository.delete(blobDirUri, fileNames, true);
}
+ /**
+ * Lists the files in a specific directory of the repository and select them
with the provided filter.
+ */
+ private List<BlobFile> list(URI blobDirUri, BlobFileFilter fileFilter)
throws IOException {
+ String[] fileNames = repository.listAll(blobDirUri);
+ List<BlobFile> blobFiles = new ArrayList<>(fileNames.length);
+ for (String fileName : fileNames) {
+ BlobFile blobFile = new BlobFile(fileName, -1, -1);
+ if (fileFilter.accept(blobFile)) {
+ blobFiles.add(blobFile);
+ }
+ }
+ return blobFiles;
+ }
+
+ private List<Callable<Void>> pullFiles(
Review comment:
IMO inlining to its only caller would be easier to follow than split up.
After all, it's caller is "pull" and this is about pulling so it's highly
germane to stay as the substance of the "pull" method.
##########
File path:
solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobStoreConnection.java
##########
@@ -166,4 +249,12 @@ public void close() {
Thread.currentThread().interrupt();
}
}
+
+ /**
+ * Filters {@link BlobFile}s.
+ */
+ public interface BlobFileFilter {
Review comment:
I'm not sure we need a BlobFilePredicate... maybe just a
`Predicate<String>` for the file name. WDYT? Oh right... throws IOException.
Maybe we don't need that?
##########
File path:
solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobStoreConnection.java
##########
@@ -26,21 +29,30 @@
import java.io.Closeable;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
- * Pushes a set of files to Blob, and works with listings.
+ * Connection to a {@link BackupRepository} to store persistently directories
files.
+ * Provides support to:
+ * <ul>
+ * <li>List and pull repository files.</li>
+ * <li>Push local files to the repository.</li>
+ * </ul>
*/
-public class BlobPusher implements Closeable {
+public class BlobStoreConnection implements Closeable {
Review comment:
Not a bad name but I'd prefer something better as well. Since it works
with the BackupRepository, the "Repository" word in there is important. The
"Connection" part I like the least; this isn't low level connection oriented.
Maybe simply *BlobRepository*? After all, it's close to the repository end (vs
the local dir). It's Blob related.
##########
File path:
solr/contrib/blob-directory/src/test/org/apache/solr/blob/BlobDirectoryTest.java
##########
@@ -190,4 +211,23 @@ private void checkFileContent(String name, String
expectedContent) throws IOExce
assertEquals(expectedContent, input.readString());
}
}
+
+ private static void deleteRecursively(File file) throws IOException {
Review comment:
I suggest simply calling com.google.common.io.MoreFiles#deleteRecursively
##########
File path:
solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobStoreConnection.java
##########
@@ -152,11 +180,66 @@ private void createDirectories(URI blobDirUri, String
blobDirPath) throws IOExce
}
}
+ private void copyStream(IndexInput input, OutputStream output) throws
IOException {
+ byte[] buffer = streamBuffers.get();
+ long remaining = input.length();
+ while (remaining > 0) {
+ int length = (int) Math.min(buffer.length, remaining);
+ input.readBytes(buffer, 0, length, false);
+ output.write(buffer, 0, length);
+ remaining -= length;
+ }
+ }
+
private void deleteFiles(String blobDirPath, Collection<String> fileNames)
throws IOException {
URI blobDirUri = repository.resolve(repositoryLocation, blobDirPath);
repository.delete(blobDirUri, fileNames, true);
}
+ /**
+ * Lists the files in a specific directory of the repository and select them
with the provided filter.
+ */
+ private List<BlobFile> list(URI blobDirUri, BlobFileFilter fileFilter)
throws IOException {
+ String[] fileNames = repository.listAll(blobDirUri);
+ List<BlobFile> blobFiles = new ArrayList<>(fileNames.length);
+ for (String fileName : fileNames) {
+ BlobFile blobFile = new BlobFile(fileName, -1, -1);
+ if (fileFilter.accept(blobFile)) {
+ blobFiles.add(blobFile);
+ }
+ }
+ return blobFiles;
+ }
+
+ private List<Callable<Void>> pullFiles(
+ URI blobDirUri,
+ Collection<BlobFile> blobFiles,
+ IOUtils.IOFunction<BlobFile, IndexOutput> outputSupplier) {
+ return blobFiles.stream()
+ .map(
+ (blobFile) ->
+ (Callable<Void>)
+ () -> {
+ try (IndexInput in = repository.openInput(blobDirUri,
blobFile.fileName(), IOContext.READ);
+ IndexOutput out = outputSupplier.apply(blobFile)) {
+ copyStream(in, out);
+ }
+ return null;
+ })
+ .collect(Collectors.toList());
+ }
+
+ private void copyStream(IndexInput input, IndexOutput output) throws
IOException {
Review comment:
Deja vu... you have the same method above?
##########
File path:
solr/contrib/blob-directory/src/test/org/apache/solr/blob/BlobDirectoryTest.java
##########
@@ -164,6 +165,26 @@ public void testRename() throws IOException {
checkDirListing(testFileName1Renamed, testFileName2,
testFileName3Renamed);
}
+ @Test
+ public void testSyncFromRepository() throws IOException {
+ // Given some files in the directory.
+ testWriteRead();
+
+ // When
+ // - The local files are wiped (e.g. host crash).
+ deleteRecursively(new File(directoryPath));
Review comment:
In general I recommend embracing `Path` and leaving `File` to historical
Java.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]