This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f647cf  Unify move method in PinotFS (#3834)
8f647cf is described below

commit 8f647cf423d177b612865b9206ecd490c35e731b
Author: Jialiang Li <[email protected]>
AuthorDate: Wed Feb 27 09:56:10 2019 -0800

    Unify move method in PinotFS (#3834)
    
    * Unify move method in PinotFS
---
 .../org/apache/pinot/filesystem/AzurePinotFS.java  |  6 +--
 .../apache/pinot/controller/ControllerConf.java    |  4 +-
 .../helix/core/SegmentDeletionManager.java         | 16 +++---
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  5 +-
 .../org/apache/pinot/filesystem/LocalPinotFS.java  | 17 ++----
 .../java/org/apache/pinot/filesystem/PinotFS.java  | 62 +++++++++++++++++-----
 .../apache/pinot/filesystem/LocalPinotFSTest.java  | 13 ++++-
 .../pinot/filesystem/PinotFSFactoryTest.java       |  2 +-
 .../org/apache/pinot/filesystem/HadoopPinotFS.java |  5 +-
 9 files changed, 82 insertions(+), 48 deletions(-)

diff --git 
a/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java
 
b/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java
index 766b2d4..0f07f67 100644
--- 
a/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java
+++ 
b/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java
@@ -98,12 +98,8 @@ public class AzurePinotFS extends PinotFS {
   }
 
   @Override
-  public boolean move(URI srcUri, URI dstUri, boolean overwrite)
+  protected boolean doMove(URI srcUri, URI dstUri)
       throws IOException {
-    if (exists(dstUri) && !overwrite) {
-      return false;
-    }
-    //rename the file
     return _adlStoreClient.rename(srcUri.getPath(), dstUri.getPath());
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 53a66bd..036add8 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -170,8 +170,8 @@ public class ControllerConf extends PropertiesConfiguration 
{
 
   public static URI constructSegmentLocation(String baseDataDir, String 
tableName, String segmentName) {
     try {
-      return new URI(StringUtil.join(File.separator, baseDataDir, tableName, 
URLEncoder.encode(segmentName, "UTF-8")));
-    } catch (UnsupportedEncodingException | URISyntaxException e) {
+      return getUriFromPath(StringUtil.join(File.separator, baseDataDir, 
tableName, URLEncoder.encode(segmentName, "UTF-8")));
+    } catch (UnsupportedEncodingException e) {
       LOGGER
           .error("Could not construct segment location with baseDataDir {}, 
tableName {}, segmentName {}", baseDataDir,
               tableName, segmentName);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index adb6cae..19ad746 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -188,12 +188,16 @@ public class SegmentDeletionManager {
       try {
         if (pinotFS.exists(fileToMoveURI)) {
           // Overwrites the file if it already exists in the target directory.
-          pinotFS.move(fileToMoveURI, deletedSegmentDestURI, true);
-          // Updates last modified.
-          // Touch is needed here so that removeAgedDeletedSegments() works 
correctly.
-          pinotFS.touch(deletedSegmentDestURI);
-          LOGGER.info("Moved segment {} from {} to {}", segmentId, 
fileToMoveURI.toString(),
-              deletedSegmentDestURI.toString());
+          if (pinotFS.move(fileToMoveURI, deletedSegmentDestURI, true)) {
+            // Updates last modified.
+            // Touch is needed here so that removeAgedDeletedSegments() works 
correctly.
+            pinotFS.touch(deletedSegmentDestURI);
+            LOGGER.info("Moved segment {} from {} to {}", segmentId, 
fileToMoveURI.toString(),
+                deletedSegmentDestURI.toString());
+          } else {
+            LOGGER.warn("Failed to move segment {} from {} to {}", segmentId, 
fileToMoveURI.toString(),
+                deletedSegmentDestURI.toString());
+          }
         } else {
           if (!SegmentName.isHighLevelConsumerSegmentName(segmentId)) {
             LOGGER.warn("Not found local segment file for segment {}" + 
fileToMoveURI.toString());
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 6b32400..e814abe 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -385,7 +385,10 @@ public class PinotLLCRealtimeSegmentManager {
     }
 
     try {
-      pinotFS.move(segmentFileURI, uriToMoveTo, true);
+      if (!pinotFS.move(segmentFileURI, uriToMoveTo, true)) {
+        LOGGER.error("Could not move {} to {}", segmentLocation, segmentName);
+        return false;
+      }
     } catch (Exception e) {
       LOGGER.error("Could not move {} to {}", segmentLocation, segmentName, e);
       return false;
diff --git 
a/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java 
b/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java
index 423702b..7161031 100644
--- 
a/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java
+++ 
b/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java
@@ -75,24 +75,15 @@ public class LocalPinotFS extends PinotFS {
   }
 
   @Override
-  public boolean move(URI srcUri, URI dstUri, boolean overwrite)
+  protected boolean doMove(URI srcUri, URI dstUri)
       throws IOException {
     File srcFile = new File(decodeURI(srcUri.getRawPath()));
     File dstFile = new File(decodeURI(dstUri.getRawPath()));
-    if (dstFile.exists()) {
-      if (overwrite) {
-        FileUtils.deleteQuietly(dstFile);
-      } else {
-        // dst file exists, returning
-        return false;
-      }
+    if (srcFile.isDirectory()) {
+      FileUtils.moveDirectory(srcFile, dstFile);
     } else {
-      // ensure the dst path exists
-      FileUtils.forceMkdir(dstFile.getParentFile());
+      FileUtils.moveFile(srcFile, dstFile);
     }
-
-    Files.move(srcFile.toPath(), dstFile.toPath());
-
     return true;
   }
 
diff --git 
a/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/PinotFS.java 
b/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/PinotFS.java
index 0ed9d56..8032014 100644
--- a/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/PinotFS.java
+++ b/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/PinotFS.java
@@ -22,14 +22,25 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.nio.file.Paths;
 import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * The PinotFS is intended to be a thin wrapper on top of different 
filesystems. This interface is intended for internal
- * Pinot use only. This class will be implemented for each pluggable storage 
type.
+ * PinotFS is a restricted FS API that exposes functionality that is required 
for Pinot to use
+ * different FS implementations. The restrictions are in place due to 2 
driving factors:
+ * 1. Prevent unexpected performance hit when a broader API is implemented - 
especially, we would like
+ *    to reduce calls to remote filesystems that might be needed for a broader 
API,
+ *    but not necessarily required by Pinot (see the documentation for move() 
method below).
+ * 2. Provide an interface that is simple to be implemented across different 
FS types.
+ *    The contract that developers have to adhere to will be simpler.
+ * Please read the method level docs carefully to note the exceptions while 
using the APIs.
  */
 public abstract class PinotFS implements Closeable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotFS.class);
+
   /**
    * Initializes the configurations specific to that filesystem. For instance, 
any security related parameters can be
    * initialized here and will not be logged.
@@ -58,31 +69,54 @@ public abstract class PinotFS implements Closeable {
   /**
    * Moves the file or directory from the src to dst. Does not keep the 
original file. If the dst has parent directories
    * that haven't been created, this method will create all the necessary 
parent directories.
-   * If both src and dst are files, dst will be overwritten.
-   * If src is a file and dst is a directory, src file will get moved under 
dst directory.
-   * If both src and dst are directories, src directory will get moved under 
dst directory.
-   * If src is a directory and dst is a file, operation will fail.
+   * Note: In Pinot we recommend the full paths of both src and dst be 
specified.
    * For example, if a file /a/b/c is moved to a file /x/y/z, in the case of 
overwrite, the directory /a/b still exists,
    * but will not contain the file 'c'. Instead, /x/y/z will contain the 
contents of 'c'.
-   * If a file /a is moved to a directory /x/y, all the original files under 
/x/y will be kept.
+   * If src is a directory /a/b which contains two files /a/b/c and /a/b/d, 
and the dst is /x/y, the result would be
+   * that the directory /a/b under /a gets removed and dst directory contains 
two files which is /x/y/c and /x/y/d.
+   * If src is a directory /a/b needs to be moved under another directory 
/x/y, please specify the dst to /x/y/b.
    * @param srcUri URI of the original file
    * @param dstUri URI of the final file location
    * @param overwrite true if we want to overwrite the dstURI, false otherwise
    * @return true if move is successful
    * @throws IOException on IO failure
    */
-  public abstract boolean move(URI srcUri, URI dstUri, boolean overwrite)
+  public boolean move(URI srcUri, URI dstUri, boolean overwrite)
+      throws IOException {
+    if (!exists(srcUri)) {
+      LOGGER.warn("Source {} does not exist", srcUri);
+      return false;
+    }
+    if (exists(dstUri)) {
+      if (overwrite) {
+        delete(dstUri, true);
+      } else {
+        // dst file exists, returning
+        LOGGER.warn("Cannot move {} to {}. Destination exists and overwrite 
flag set to false.", srcUri, dstUri);
+        return false;
+      }
+    } else {
+      // ensures the parent path of dst exists.
+      URI parentUri = Paths.get(dstUri).getParent().toUri();
+      mkdir(parentUri);
+    }
+    return doMove(srcUri, dstUri);
+  }
+
+  /**
+   * Does the actual behavior of move in each FS.
+   */
+  protected abstract boolean doMove(URI srcUri, URI dstUri)
       throws IOException;
 
   /**
    * Copies the file or directory from the src to dst. The original file is 
retained. If the dst has parent directories
    * that haven't been created, this method will create all the necessary 
parent directories.
-   * If both src and dst are files, dst will be overwritten.
-   * If src is a file and dst is a directory, src file will get copied under 
dst directory.
-   * If both src and dst are directories, src directory will get copied under 
dst directory.
-   * If src is a directory and dst is a file, operation will fail.
-   * For example, if a file /x/y/z is copied to /a/b/c, /x/y/z will be 
retained and /x/y/z will also be present as /a/b/c;
-   * if a file /a is copied to a directory /x/y, all the original files under 
/x/y will be kept.
+   * Note: In Pinot we recommend the full paths of both src and dst be 
specified.
+   * For example, if a file /a/b/c is copied to a file /x/y/z, the directory 
/a/b still exists containing the file 'c'.
+   * The dst file /x/y/z will contain the contents of 'c'.
+   * If a directory /a/b is copied to another directory /x/y, the directory 
/x/y will contain the content of /a/b.
+   * If a directory /a/b is copied under the directory /x/y, the dst needs to 
be specify as /x/y/b.
    * @param srcUri URI of the original file
    * @param dstUri URI of the final file location
    * @return true if copy is successful
diff --git 
a/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/LocalPinotFSTest.java
 
b/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/LocalPinotFSTest.java
index 0c0957c..5f1c595 100644
--- 
a/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/LocalPinotFSTest.java
+++ 
b/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/LocalPinotFSTest.java
@@ -141,7 +141,7 @@ public class LocalPinotFSTest {
     Assert.assertEquals(_newTmpDir.listFiles().length, files);
     Assert.assertFalse(dstFile.exists());
 
-    // Check that a moving a file a non-existent destination folder will work
+    // Check that a moving a file to a non-existent destination folder will 
work
     FileUtils.deleteQuietly(_nonExistentTmpFolder);
     Assert.assertFalse(_nonExistentTmpFolder.exists());
     File srcFile = new File(_absoluteTmpDirPath, "srcFile");
@@ -153,7 +153,7 @@ public class LocalPinotFSTest {
     Assert.assertFalse(srcFile.exists());
     Assert.assertTrue(dstFile.exists());
 
-    //Check that moving a folder to a non-existent destination folder works
+    // Check that moving a folder to a non-existent destination folder works
     FileUtils.deleteQuietly(_nonExistentTmpFolder);
     Assert.assertFalse(_nonExistentTmpFolder.exists());
     srcFile = new File(_absoluteTmpDirPath, "srcFile");
@@ -210,6 +210,15 @@ public class LocalPinotFSTest {
     localPinotFS.copy(firstTempDir.toURI(), secondTempDir.toURI());
     Assert.assertEquals(localPinotFS.listFiles(secondTempDir.toURI(), 
true).length, 1);
 
+    // Copying directory with files under another directory.
+    File firstTempDirUnderSecondTempDir = new File(secondTempDir, 
firstTempDir.getName());
+    localPinotFS.copy(firstTempDir.toURI(), 
firstTempDirUnderSecondTempDir.toURI());
+    
Assert.assertTrue(localPinotFS.exists(firstTempDirUnderSecondTempDir.toURI()));
+    // There're two files/directories under secondTempDir.
+    Assert.assertEquals(localPinotFS.listFiles(secondTempDir.toURI(), 
false).length, 2);
+    // The file under src directory also got copied under dst directory.
+    
Assert.assertEquals(localPinotFS.listFiles(firstTempDirUnderSecondTempDir.toURI(),
 true).length, 1);
+
     // len of dir = exception
     try {
       localPinotFS.length(firstTempDir.toURI());
diff --git 
a/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/PinotFSFactoryTest.java
 
b/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/PinotFSFactoryTest.java
index 2da5766..803bcb8 100644
--- 
a/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/PinotFSFactoryTest.java
+++ 
b/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/PinotFSFactoryTest.java
@@ -85,7 +85,7 @@ public class PinotFSFactoryTest {
     }
 
     @Override
-    public boolean move(URI srcUri, URI dstUri, boolean overwrite)
+    public boolean doMove(URI srcUri, URI dstUri)
         throws IOException {
       return true;
     }
diff --git 
a/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
 
b/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
index 1620b5a..8f6b13a 100644
--- 
a/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
+++ 
b/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
@@ -91,11 +91,8 @@ public class HadoopPinotFS extends PinotFS {
   }
 
   @Override
-  public boolean move(URI srcUri, URI dstUri, boolean overwrite)
+  protected boolean doMove(URI srcUri, URI dstUri)
       throws IOException {
-    if (exists(dstUri) && !overwrite) {
-      return false;
-    }
     return _hadoopFS.rename(new Path(srcUri), new Path(dstUri));
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to