dsmiley commented on a change in pull request #108:
URL: https://github.com/apache/solr/pull/108#discussion_r624374512



##########
File path: 
solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectory.java
##########
@@ -56,10 +61,22 @@
   private final Object lock = new Object();
   private volatile boolean isOpen;
 
-  public BlobDirectory(Directory delegate, String blobDirPath, BlobPusher 
blobPusher) {
+  public BlobDirectory(Directory delegate, String blobDirPath, 
BlobStoreConnection blobStoreConnection) throws IOException {
     super(delegate);
     this.blobDirPath = blobDirPath;
-    this.blobPusher = blobPusher;
+    this.blobStoreConnection = blobStoreConnection;
+    pullMissingFilesFromRepo();
+  }
+
+  private void pullMissingFilesFromRepo() throws IOException {
+    Set<String> localFileNames = new HashSet<>(Arrays.asList(in.listAll()));
+    blobStoreConnection.pull(blobDirPath, this::openOutput, blobFile -> {
+      //TODO: We could also check the size and checksum.

Review comment:
       Yeah, but this is quite adequate since the repo is dedicated to this 
replica so there should be no other replicas using the same repo for this 
directory, thus no risk of same files with different content.

##########
File path: 
solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectory.java
##########
@@ -56,10 +61,22 @@
   private final Object lock = new Object();
   private volatile boolean isOpen;
 
-  public BlobDirectory(Directory delegate, String blobDirPath, BlobPusher 
blobPusher) {
+  public BlobDirectory(Directory delegate, String blobDirPath, 
BlobStoreConnection blobStoreConnection) throws IOException {
     super(delegate);
     this.blobDirPath = blobDirPath;
-    this.blobPusher = blobPusher;
+    this.blobStoreConnection = blobStoreConnection;
+    pullMissingFilesFromRepo();
+  }
+
+  private void pullMissingFilesFromRepo() throws IOException {
+    Set<String> localFileNames = new HashSet<>(Arrays.asList(in.listAll()));
+    blobStoreConnection.pull(blobDirPath, this::openOutput, blobFile -> {
+      //TODO: We could also check the size and checksum.
+      return !localFileNames.remove(blobFile.fileName());
+    });
+    for (String localFileName : localFileNames) {

Review comment:
       If there is something to delete, let's log that this is true and maybe 
even some names?  I _think_ in practice there won't be, so INFO level at least.

##########
File path: 
solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectory.java
##########
@@ -56,10 +61,22 @@
   private final Object lock = new Object();
   private volatile boolean isOpen;
 
-  public BlobDirectory(Directory delegate, String blobDirPath, BlobPusher 
blobPusher) {
+  public BlobDirectory(Directory delegate, String blobDirPath, 
BlobStoreConnection blobStoreConnection) throws IOException {
     super(delegate);
     this.blobDirPath = blobDirPath;
-    this.blobPusher = blobPusher;
+    this.blobStoreConnection = blobStoreConnection;
+    pullMissingFilesFromRepo();
+  }
+
+  private void pullMissingFilesFromRepo() throws IOException {
+    Set<String> localFileNames = new HashSet<>(Arrays.asList(in.listAll()));
+    blobStoreConnection.pull(blobDirPath, this::openOutput, blobFile -> {

Review comment:
       If locally we're empty and we pull down files, then we should have a 
INFO log message saying that.
   On the other hand if there's no transfer or deletions to be done, we should 
not log anything.

##########
File path: 
solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobStoreConnection.java
##########
@@ -152,11 +180,66 @@ private void createDirectories(URI blobDirUri, String 
blobDirPath) throws IOExce
     }
   }
 
+  private void copyStream(IndexInput input, OutputStream output) throws 
IOException {
+    byte[] buffer = streamBuffers.get();
+    long remaining = input.length();
+    while (remaining > 0) {
+      int length = (int) Math.min(buffer.length, remaining);
+      input.readBytes(buffer, 0, length, false);
+      output.write(buffer, 0, length);
+      remaining -= length;
+    }
+  }
+
   private void deleteFiles(String blobDirPath, Collection<String> fileNames) 
throws IOException {
     URI blobDirUri = repository.resolve(repositoryLocation, blobDirPath);
     repository.delete(blobDirUri, fileNames, true);
   }
 
+  /**
+   * Lists the files in a specific directory of the repository and select them 
with the provided filter.
+   */
+  private List<BlobFile> list(URI blobDirUri, BlobFileFilter fileFilter) 
throws IOException {
+    String[] fileNames = repository.listAll(blobDirUri);
+    List<BlobFile> blobFiles = new ArrayList<>(fileNames.length);
+    for (String fileName : fileNames) {
+      BlobFile blobFile = new BlobFile(fileName, -1, -1);
+      if (fileFilter.accept(blobFile)) {
+        blobFiles.add(blobFile);
+      }
+    }
+    return blobFiles;
+  }
+
+  private List<Callable<Void>> pullFiles(

Review comment:
       IMO inlining to its only caller would be easier to follow than split up. 
 After all, it's caller is "pull" and this is about pulling so it's highly 
germane to stay as the substance of the "pull" method.

##########
File path: 
solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobStoreConnection.java
##########
@@ -166,4 +249,12 @@ public void close() {
       Thread.currentThread().interrupt();
     }
   }
+
+  /**
+   * Filters {@link BlobFile}s.
+   */
+  public interface BlobFileFilter {

Review comment:
       I'm not sure we need a BlobFilePredicate... maybe just a 
`Predicate<String>` for the file name.  WDYT?  Oh right... throws IOException.  
Maybe we don't need that?

##########
File path: 
solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobStoreConnection.java
##########
@@ -26,21 +29,30 @@
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.lang.invoke.MethodHandles;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 /**
- * Pushes a set of files to Blob, and works with listings.
+ * Connection to a {@link BackupRepository} to store persistently directories 
files.
+ * Provides support to:
+ * <ul>
+ *   <li>List and pull repository files.</li>
+ *   <li>Push local files to the repository.</li>
+ * </ul>
  */
-public class BlobPusher implements Closeable {
+public class BlobStoreConnection implements Closeable {

Review comment:
       Not a bad name but I'd prefer something better as well.  Since it works 
with the BackupRepository, the "Repository" word in there is important.  The 
"Connection" part I like the least; this isn't low level connection oriented.  
Maybe simply *BlobRepository*?  After all, it's close to the repository end (vs 
the local dir).  It's Blob related.

##########
File path: 
solr/contrib/blob-directory/src/test/org/apache/solr/blob/BlobDirectoryTest.java
##########
@@ -190,4 +211,23 @@ private void checkFileContent(String name, String 
expectedContent) throws IOExce
             assertEquals(expectedContent, input.readString());
         }
     }
+
+    private static void deleteRecursively(File file) throws IOException {

Review comment:
       I suggest simply calling com.google.common.io.MoreFiles#deleteRecursively

##########
File path: 
solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobStoreConnection.java
##########
@@ -152,11 +180,66 @@ private void createDirectories(URI blobDirUri, String 
blobDirPath) throws IOExce
     }
   }
 
+  private void copyStream(IndexInput input, OutputStream output) throws 
IOException {
+    byte[] buffer = streamBuffers.get();
+    long remaining = input.length();
+    while (remaining > 0) {
+      int length = (int) Math.min(buffer.length, remaining);
+      input.readBytes(buffer, 0, length, false);
+      output.write(buffer, 0, length);
+      remaining -= length;
+    }
+  }
+
   private void deleteFiles(String blobDirPath, Collection<String> fileNames) 
throws IOException {
     URI blobDirUri = repository.resolve(repositoryLocation, blobDirPath);
     repository.delete(blobDirUri, fileNames, true);
   }
 
+  /**
+   * Lists the files in a specific directory of the repository and select them 
with the provided filter.
+   */
+  private List<BlobFile> list(URI blobDirUri, BlobFileFilter fileFilter) 
throws IOException {
+    String[] fileNames = repository.listAll(blobDirUri);
+    List<BlobFile> blobFiles = new ArrayList<>(fileNames.length);
+    for (String fileName : fileNames) {
+      BlobFile blobFile = new BlobFile(fileName, -1, -1);
+      if (fileFilter.accept(blobFile)) {
+        blobFiles.add(blobFile);
+      }
+    }
+    return blobFiles;
+  }
+
+  private List<Callable<Void>> pullFiles(
+      URI blobDirUri,
+      Collection<BlobFile> blobFiles,
+      IOUtils.IOFunction<BlobFile, IndexOutput> outputSupplier) {
+    return blobFiles.stream()
+        .map(
+            (blobFile) ->
+                (Callable<Void>)
+                    () -> {
+                      try (IndexInput in = repository.openInput(blobDirUri, 
blobFile.fileName(), IOContext.READ);
+                           IndexOutput out = outputSupplier.apply(blobFile)) {
+                        copyStream(in, out);
+                      }
+                      return null;
+                    })
+        .collect(Collectors.toList());
+  }
+
+  private void copyStream(IndexInput input, IndexOutput output) throws 
IOException {

Review comment:
       Deja vu... you have the same method above?

##########
File path: 
solr/contrib/blob-directory/src/test/org/apache/solr/blob/BlobDirectoryTest.java
##########
@@ -164,6 +165,26 @@ public void testRename() throws IOException {
         checkDirListing(testFileName1Renamed, testFileName2, 
testFileName3Renamed);
     }
 
+    @Test
+    public void testSyncFromRepository() throws IOException {
+        // Given some files in the directory.
+        testWriteRead();
+
+        // When
+        // - The local files are wiped (e.g. host crash).
+        deleteRecursively(new File(directoryPath));

Review comment:
       In general I recommend embracing `Path` and leaving `File` to historical 
Java.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to