This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch pinotFS-open
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/pinotFS-open by this push:
     new 603b5ce  Adding pinotFS.open(URI) method to pinotFS
603b5ce is described below

commit 603b5ce68cc3a71776cd6d8705b0b08777ac819a
Author: kishoreg <[email protected]>
AuthorDate: Sat Nov 30 16:07:50 2019 -0800

    Adding pinotFS.open(URI) method to pinotFS
---
 .../main/java/org/apache/pinot/filesystem/AzurePinotFS.java |  6 ++++++
 .../main/java/org/apache/pinot/filesystem/LocalPinotFS.java |  9 +++++++++
 .../org/apache/pinot/filesystem/PinotFSFactoryTest.java     |  7 +++++++
 .../java/org/apache/pinot/filesystem/HadoopPinotFS.java     |  8 ++++++++
 .../main/java/org/apache/pinot/spi/filesystem/PinotFS.java  | 13 +++++++++++++
 5 files changed, 43 insertions(+)

diff --git 
a/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java
 
b/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java
index 9b4d7be..2a2a51c 100644
--- 
a/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java
+++ 
b/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java
@@ -250,4 +250,10 @@ public class AzurePinotFS extends PinotFS {
     }
     return true;
   }
+
+  @Override
+  public InputStream open(URI uri)
+      throws IOException {
+    return _adlStoreClient.getReadStream(uri.getPath());
+  }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java 
b/pinot-common/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java
index ba0a025..f2fd595 100644
--- a/pinot-common/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java
+++ b/pinot-common/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java
@@ -18,8 +18,11 @@
  */
 package org.apache.pinot.filesystem;
 
+import java.io.BufferedInputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -144,6 +147,12 @@ public class LocalPinotFS extends PinotFS {
     return file.setLastModified(System.currentTimeMillis());
   }
 
+  @Override
+  public InputStream open(URI uri)
+      throws IOException {
+    return new BufferedInputStream(new FileInputStream(toFile(uri)));
+  }
+
   private static File toFile(URI uri) {
     // NOTE: Do not use new File(uri) because scheme might not exist and it 
does not decode '+' to ' '
     //       Do not use uri.getPath() because it does not decode '+' to ' '
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/filesystem/PinotFSFactoryTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/filesystem/PinotFSFactoryTest.java
index 6e83d1b..6cad9b2 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/filesystem/PinotFSFactoryTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/filesystem/PinotFSFactoryTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.filesystem;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.lang.reflect.Constructor;
 import java.net.URI;
 import org.apache.commons.configuration.Configuration;
@@ -140,5 +141,11 @@ public class PinotFSFactoryTest {
         throws IOException {
       return true;
     }
+
+    @Override
+    public InputStream open(URI uri)
+        throws IOException {
+      return null;
+    }
   }
 }
diff --git 
a/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
 
b/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
index e5d7637..1339b6a 100644
--- 
a/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
+++ 
b/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
@@ -21,6 +21,7 @@ package org.apache.pinot.filesystem;
 import com.google.common.base.Strings;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
@@ -230,6 +231,13 @@ public class HadoopPinotFS extends PinotFS {
     return true;
   }
 
+  @Override
+  public InputStream open(URI uri)
+      throws IOException {
+    Path path = new Path(uri);
+    return _hadoopFS.open(path);
+  }
+
   private void authenticate(org.apache.hadoop.conf.Configuration hadoopConf,
       org.apache.commons.configuration.Configuration configs) {
     String principal = configs.getString(PRINCIPAL);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java
index 2485ab6..0d2f9d5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java
@@ -21,6 +21,7 @@ package org.apache.pinot.spi.filesystem;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
 import java.nio.file.Paths;
 import org.apache.commons.configuration.Configuration;
@@ -207,6 +208,18 @@ public abstract class PinotFS implements Closeable {
       throws IOException;
 
   /**
+   * Opens a file in the underlying filesystem and returns an InputStream to 
read it.
+   * Note that the caller can invoke close on this inputstream.
+   * Some implementations can choose to copy the original file to local temp 
file and return the inputstream.
+   * In this case, the implementation it should delete the temp file when 
close is invoked.
+   * @param uri location of the file to open
+   * @return a new InputStream
+   * @throws IOException on any IO error - missing file, not a file etc
+   */
+  public abstract InputStream open(URI uri)
+      throws IOException;
+
+  /**
    * For certain filesystems, we may need to close the filesystem and do 
relevant operations to prevent leaks.
    * By default, this method does nothing.
    * @throws IOException on IO failure


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to