This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c4fe76f079 [HUDI-7336] Introduce new HoodieStorage abstraction 
(#10567)
4c4fe76f079 is described below

commit 4c4fe76f079540159245207cac1e7a5ab10a476c
Author: Y Ethan Guo <[email protected]>
AuthorDate: Sun Jan 28 21:27:16 2024 -0800

    [HUDI-7336] Introduce new HoodieStorage abstraction (#10567)
    
    This commit introduces `HoodieStorage` abstraction and Hudi's counterpart 
classes for Hadoop File System classes (`org.apache.hadoop.fs.`[`FileSystem`, 
`Path`, `PathFilter`, `FileStatus`]) to decouple Hudi's implementation from 
Hadoop classes, so it's much easier to plugin different file system 
implementation.
---
 hudi-hadoop-common/pom.xml                         |   8 +
 .../hudi/storage/hadoop/HoodieHadoopStorage.java   | 201 ++++++++++++
 .../hadoop/storage/TestHoodieHadoopStorage.java    |  53 +++
 .../java/org/apache/hudi/ApiMaturityLevel.java     |   0
 .../main/java/org/apache/hudi/PublicAPIClass.java  |   0
 .../main/java/org/apache/hudi/PublicAPIMethod.java |   0
 .../main/java/org/apache/hudi/io/util/IOUtils.java |  16 +
 .../org/apache/hudi/storage/HoodieFileStatus.java  | 120 +++++++
 .../org/apache/hudi/storage/HoodieLocation.java    | 262 +++++++++++++++
 .../apache/hudi/storage/HoodieLocationFilter.java  |  42 +++
 .../org/apache/hudi/storage/HoodieStorage.java     | 355 +++++++++++++++++++++
 .../hudi/io/storage/TestHoodieFileStatus.java      | 102 ++++++
 .../apache/hudi/io/storage/TestHoodieLocation.java | 192 +++++++++++
 .../hudi/io/storage/TestHoodieLocationFilter.java  |  73 +++++
 .../hudi/io/storage/TestHoodieStorageBase.java     | 353 ++++++++++++++++++++
 15 files changed, 1777 insertions(+)

diff --git a/hudi-hadoop-common/pom.xml b/hudi-hadoop-common/pom.xml
index d6f4d3442fb..2ae9c610703 100644
--- a/hudi-hadoop-common/pom.xml
+++ b/hudi-hadoop-common/pom.xml
@@ -98,5 +98,13 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-io</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
new file mode 100644
index 00000000000..b863e97cba1
--- /dev/null
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.storage.hadoop;
+
+import org.apache.hudi.storage.HoodieFileStatus;
+import org.apache.hudi.storage.HoodieLocation;
+import org.apache.hudi.storage.HoodieLocationFilter;
+import org.apache.hudi.storage.HoodieStorage;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of {@link HoodieStorage} using Hadoop's {@link FileSystem}
+ */
+public class HoodieHadoopStorage extends HoodieStorage {
+  private final FileSystem fs;
+
+  public HoodieHadoopStorage(FileSystem fs) {
+    this.fs = fs;
+  }
+
+  @Override
+  public String getScheme() {
+    return fs.getScheme();
+  }
+
+  @Override
+  public OutputStream create(HoodieLocation location, boolean overwrite) 
throws IOException {
+    return fs.create(convertHoodieLocationToPath(location), overwrite);
+  }
+
+  @Override
+  public InputStream open(HoodieLocation location) throws IOException {
+    return fs.open(convertHoodieLocationToPath(location));
+  }
+
+  @Override
+  public OutputStream append(HoodieLocation location) throws IOException {
+    return fs.append(convertHoodieLocationToPath(location));
+  }
+
+  @Override
+  public boolean exists(HoodieLocation location) throws IOException {
+    return fs.exists(convertHoodieLocationToPath(location));
+  }
+
+  @Override
+  public HoodieFileStatus getFileStatus(HoodieLocation location) throws 
IOException {
+    return 
convertToHoodieFileStatus(fs.getFileStatus(convertHoodieLocationToPath(location)));
+  }
+
+  @Override
+  public boolean createDirectory(HoodieLocation location) throws IOException {
+    return fs.mkdirs(convertHoodieLocationToPath(location));
+  }
+
+  @Override
+  public List<HoodieFileStatus> listDirectEntries(HoodieLocation location) 
throws IOException {
+    return Arrays.stream(fs.listStatus(convertHoodieLocationToPath(location)))
+        .map(this::convertToHoodieFileStatus)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public List<HoodieFileStatus> listFiles(HoodieLocation location) throws 
IOException {
+    List<HoodieFileStatus> result = new ArrayList<>();
+    RemoteIterator<LocatedFileStatus> iterator = 
fs.listFiles(convertHoodieLocationToPath(location), true);
+    while (iterator.hasNext()) {
+      result.add(convertToHoodieFileStatus(iterator.next()));
+    }
+    return result;
+  }
+
+  @Override
+  public List<HoodieFileStatus> listDirectEntries(List<HoodieLocation> 
locationList) throws IOException {
+    return Arrays.stream(fs.listStatus(locationList.stream()
+            .map(this::convertHoodieLocationToPath)
+            .toArray(Path[]::new)))
+        .map(this::convertToHoodieFileStatus)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public List<HoodieFileStatus> listDirectEntries(HoodieLocation location,
+                                                  HoodieLocationFilter filter)
+      throws IOException {
+    return Arrays.stream(fs.listStatus(
+            convertHoodieLocationToPath(location), path ->
+                filter.accept(convertPathToHoodieLocation(path))))
+        .map(this::convertToHoodieFileStatus)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public List<HoodieFileStatus> globEntries(HoodieLocation locationPattern)
+      throws IOException {
+    return 
Arrays.stream(fs.globStatus(convertHoodieLocationToPath(locationPattern)))
+        .map(this::convertToHoodieFileStatus)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public List<HoodieFileStatus> globEntries(HoodieLocation locationPattern, 
HoodieLocationFilter filter)
+      throws IOException {
+    return 
Arrays.stream(fs.globStatus(convertHoodieLocationToPath(locationPattern), path 
->
+            filter.accept(convertPathToHoodieLocation(path))))
+        .map(this::convertToHoodieFileStatus)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public boolean rename(HoodieLocation oldLocation, HoodieLocation 
newLocation) throws IOException {
+    return fs.rename(convertHoodieLocationToPath(oldLocation), 
convertHoodieLocationToPath(newLocation));
+  }
+
+  @Override
+  public boolean deleteDirectory(HoodieLocation location) throws IOException {
+    return fs.delete(convertHoodieLocationToPath(location), true);
+  }
+
+  @Override
+  public boolean deleteFile(HoodieLocation location) throws IOException {
+    return fs.delete(convertHoodieLocationToPath(location), false);
+  }
+
+  @Override
+  public HoodieLocation makeQualified(HoodieLocation location) {
+    return convertPathToHoodieLocation(
+        fs.makeQualified(convertHoodieLocationToPath(location)));
+  }
+
+  @Override
+  public Object getFileSystem() {
+    return fs;
+  }
+
+  @Override
+  public Object getConf() {
+    return fs.getConf();
+  }
+
+  @Override
+  public OutputStream create(HoodieLocation location) throws IOException {
+    return fs.create(convertHoodieLocationToPath(location));
+  }
+
+  @Override
+  public boolean createNewFile(HoodieLocation location) throws IOException {
+    return fs.createNewFile(convertHoodieLocationToPath(location));
+  }
+
+  private Path convertHoodieLocationToPath(HoodieLocation loc) {
+    return new Path(loc.toUri());
+  }
+
+  private HoodieLocation convertPathToHoodieLocation(Path path) {
+    return new HoodieLocation(path.toUri());
+  }
+
+  private HoodieFileStatus convertToHoodieFileStatus(FileStatus fileStatus) {
+    return new HoodieFileStatus(
+        convertPathToHoodieLocation(fileStatus.getPath()),
+        fileStatus.getLen(),
+        fileStatus.isDirectory(),
+        fileStatus.getModificationTime());
+  }
+
+  @Override
+  public void close() throws IOException {
+    fs.close();
+  }
+}
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/hadoop/storage/TestHoodieHadoopStorage.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/hadoop/storage/TestHoodieHadoopStorage.java
new file mode 100644
index 00000000000..3eaf4135032
--- /dev/null
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/hadoop/storage/TestHoodieHadoopStorage.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop.storage;
+
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.io.storage.TestHoodieStorageBase;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * Tests {@link HoodieHadoopStorage}.
+ */
+public class TestHoodieHadoopStorage extends TestHoodieStorageBase {
+  private static final String CONF_KEY = "hudi.testing.key";
+  private static final String CONF_VALUE = "value";
+
+  @Override
+  protected HoodieStorage getHoodieStorage(Object fs, Object conf) {
+    return new HoodieHadoopStorage((FileSystem) fs);
+  }
+
+  @Override
+  protected Object getFileSystem(Object conf) {
+    return HadoopFSUtils.getFs(getTempDir(), (Configuration) conf, true);
+  }
+
+  @Override
+  protected Object getConf() {
+    Configuration conf = new Configuration();
+    conf.set(CONF_KEY, CONF_VALUE);
+    return conf;
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/ApiMaturityLevel.java 
b/hudi-io/src/main/java/org/apache/hudi/ApiMaturityLevel.java
similarity index 100%
rename from hudi-common/src/main/java/org/apache/hudi/ApiMaturityLevel.java
rename to hudi-io/src/main/java/org/apache/hudi/ApiMaturityLevel.java
diff --git a/hudi-common/src/main/java/org/apache/hudi/PublicAPIClass.java 
b/hudi-io/src/main/java/org/apache/hudi/PublicAPIClass.java
similarity index 100%
rename from hudi-common/src/main/java/org/apache/hudi/PublicAPIClass.java
rename to hudi-io/src/main/java/org/apache/hudi/PublicAPIClass.java
diff --git a/hudi-common/src/main/java/org/apache/hudi/PublicAPIMethod.java 
b/hudi-io/src/main/java/org/apache/hudi/PublicAPIMethod.java
similarity index 100%
rename from hudi-common/src/main/java/org/apache/hudi/PublicAPIMethod.java
rename to hudi-io/src/main/java/org/apache/hudi/PublicAPIMethod.java
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/util/IOUtils.java 
b/hudi-io/src/main/java/org/apache/hudi/io/util/IOUtils.java
index 5eeb21011cf..96cc6df95cc 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/util/IOUtils.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/util/IOUtils.java
@@ -19,8 +19,10 @@
 
 package org.apache.hudi.io.util;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 
 /**
  * Util methods on I/O.
@@ -249,4 +251,18 @@ public class IOUtils {
     }
     return totalBytesRead;
   }
+
+  public static byte[] readAsByteArray(InputStream input, int outputSize) 
throws IOException {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream(outputSize);
+    copy(input, bos);
+    return bos.toByteArray();
+  }
+
+  public static void copy(InputStream inputStream, OutputStream outputStream) 
throws IOException {
+    byte[] buffer = new byte[1024];
+    int len;
+    while ((len = inputStream.read(buffer)) != -1) {
+      outputStream.write(buffer, 0, len);
+    }
+  }
 }
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieFileStatus.java 
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieFileStatus.java
new file mode 100644
index 00000000000..6f033c5bc95
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieFileStatus.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.storage;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+
+import java.io.Serializable;
+
+/**
+ * Represents the information of a directory or a file.
+ * The APIs are mainly based on {@code org.apache.hadoop.fs.FileStatus} class
+ * with simplification based on what Hudi needs.
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public class HoodieFileStatus implements Serializable {
+  private final HoodieLocation location;
+  private final long length;
+  private final boolean isDirectory;
+  private final long modificationTime;
+
+  public HoodieFileStatus(HoodieLocation location,
+                          long length,
+                          boolean isDirectory,
+                          long modificationTime) {
+    this.location = location;
+    this.length = length;
+    this.isDirectory = isDirectory;
+    this.modificationTime = modificationTime;
+  }
+
+  /**
+   * @return the location.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public HoodieLocation getLocation() {
+    return location;
+  }
+
+  /**
+   * @return the length of a file in bytes.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public long getLength() {
+    return length;
+  }
+
+  /**
+   * @return whether this is a file.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public boolean isFile() {
+    return !isDirectory;
+  }
+
+  /**
+   * @return whether this is a directory.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public boolean isDirectory() {
+    return isDirectory;
+  }
+
+  /**
+   * @return the modification of a file.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public long getModificationTime() {
+    return modificationTime;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    HoodieFileStatus that = (HoodieFileStatus) o;
+    // PLEASE NOTE that here we follow the same contract hadoop's FileStatus 
provides,
+    // i.e., the equality is purely based on the location.
+    return getLocation().equals(that.getLocation());
+  }
+
+  @Override
+  public int hashCode() {
+    // PLEASE NOTE that here we follow the same contract hadoop's FileStatus 
provides,
+    // i.e., the hash code is purely based on the location.
+    return getLocation().hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "HoodieFileStatus{"
+        + "location=" + location
+        + ", length=" + length
+        + ", isDirectory=" + isDirectory
+        + ", modificationTime=" + modificationTime
+        + '}';
+  }
+}
diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieLocation.java 
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieLocation.java
new file mode 100644
index 00000000000..3b3a05dc9b4
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieLocation.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.storage;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * Names a file or directory on storage.
+ * Location strings use slash (`/`) as the directory separator.
+ * The APIs are mainly based on {@code org.apache.hadoop.fs.Path} class.
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public class HoodieLocation implements Comparable<HoodieLocation>, 
Serializable {
+  public static final char SEPARATOR_CHAR = '/';
+  public static final char COLON_CHAR = ':';
+  public static final String SEPARATOR = "" + SEPARATOR_CHAR;
+  private final URI uri;
+  private transient volatile HoodieLocation cachedParent;
+  private transient volatile String cachedName;
+  private transient volatile String uriString;
+
+  public HoodieLocation(URI uri) {
+    this.uri = uri.normalize();
+  }
+
+  public HoodieLocation(String path) {
+    try {
+      // This part of parsing is compatible with hadoop's Path
+      // and required for properly handling encoded path with URI
+      String scheme = null;
+      String authority = null;
+
+      int start = 0;
+
+      // Parse URI scheme, if any
+      int colon = path.indexOf(COLON_CHAR);
+      int slash = path.indexOf(SEPARATOR_CHAR);
+      if (colon != -1
+          && ((slash == -1) || (colon < slash))) {
+        scheme = path.substring(0, colon);
+        start = colon + 1;
+      }
+
+      // Parse URI authority, if any
+      if (path.startsWith("//", start)
+          && (path.length() - start > 2)) {
+        int nextSlash = path.indexOf(SEPARATOR_CHAR, start + 2);
+        int authEnd = nextSlash > 0 ? nextSlash : path.length();
+        authority = path.substring(start + 2, authEnd);
+        start = authEnd;
+      }
+
+      // URI path is the rest of the string -- query & fragment not supported
+      String uriPath = path.substring(start);
+
+      this.uri = new URI(scheme, authority, normalize(uriPath, true), null, 
null).normalize();
+    } catch (URISyntaxException e) {
+      throw new IllegalArgumentException(e);
+    }
+  }
+
+  public HoodieLocation(String parent, String child) {
+    this(new HoodieLocation(parent), child);
+  }
+
+  public HoodieLocation(HoodieLocation parent, String child) {
+    URI parentUri = parent.toUri();
+    String normalizedChild = normalize(child, false);
+
+    if (normalizedChild.isEmpty()) {
+      this.uri = parentUri;
+      return;
+    }
+
+    if (!child.contains(SEPARATOR)) {
+      this.cachedParent = parent;
+    }
+    String parentPathWithSeparator = parentUri.getPath();
+    if (!parentPathWithSeparator.endsWith(SEPARATOR)) {
+      parentPathWithSeparator = parentPathWithSeparator + SEPARATOR;
+    }
+    try {
+      URI resolvedUri = new URI(
+          parentUri.getScheme(),
+          parentUri.getAuthority(),
+          parentPathWithSeparator,
+          null,
+          parentUri.getFragment()).resolve(normalizedChild);
+      this.uri = new URI(
+          parentUri.getScheme(),
+          parentUri.getAuthority(),
+          resolvedUri.getPath(),
+          null,
+          resolvedUri.getFragment()).normalize();
+    } catch (URISyntaxException e) {
+      throw new IllegalArgumentException(e);
+    }
+  }
+
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public boolean isAbsolute() {
+    return uri.getPath().startsWith(SEPARATOR);
+  }
+
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public HoodieLocation getParent() {
+    // This value could be overwritten concurrently and that's okay, since
+    // {@code HoodieLocation} is immutable
+    if (cachedParent == null) {
+      String path = uri.getPath();
+      int lastSlash = path.lastIndexOf(SEPARATOR_CHAR);
+      if (path.isEmpty() || path.equals(SEPARATOR)) {
+        throw new IllegalStateException("Cannot get parent location of a root 
location");
+      }
+      String parentPath = lastSlash == -1
+          ? "" : path.substring(0, lastSlash == 0 ? 1 : lastSlash);
+      try {
+        cachedParent = new HoodieLocation(new URI(
+            uri.getScheme(), uri.getAuthority(), parentPath, null, 
uri.getFragment()));
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e);
+      }
+    }
+    return cachedParent;
+  }
+
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public String getName() {
+    // This value could be overwritten concurrently and that's okay, since
+    // {@code HoodieLocation} is immutable
+    if (cachedName == null) {
+      String path = uri.getPath();
+      int slash = path.lastIndexOf(SEPARATOR);
+      cachedName = path.substring(slash + 1);
+    }
+    return cachedName;
+  }
+
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public HoodieLocation getLocationWithoutSchemeAndAuthority() {
+    try {
+      return new HoodieLocation(
+          new URI(null, null, uri.getPath(), uri.getQuery(), 
uri.getFragment()));
+    } catch (URISyntaxException e) {
+      throw new IllegalArgumentException(e);
+    }
+  }
+
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public int depth() {
+    String path = uri.getPath();
+    int depth = 0;
+    int slash = path.length() == 1 && path.charAt(0) == SEPARATOR_CHAR ? -1 : 
0;
+    while (slash != -1) {
+      depth++;
+      slash = path.indexOf(SEPARATOR_CHAR, slash + 1);
+    }
+    return depth;
+  }
+
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public URI toUri() {
+    return uri;
+  }
+
+  @Override
+  public String toString() {
+    // This value could be overwritten concurrently and that's okay, since
+    // {@code HoodieLocation} is immutable
+    if (uriString == null) {
+      // We can't use uri.toString(), which escapes everything, because we want
+      // illegal characters unescaped in the string, for glob processing, etc.
+      StringBuilder buffer = new StringBuilder();
+      if (uri.getScheme() != null) {
+        buffer.append(uri.getScheme())
+            .append(":");
+      }
+      if (uri.getAuthority() != null) {
+        buffer.append("//")
+            .append(uri.getAuthority());
+      }
+      if (uri.getPath() != null) {
+        String path = uri.getPath();
+        buffer.append(path);
+      }
+      if (uri.getFragment() != null) {
+        buffer.append("#").append(uri.getFragment());
+      }
+      uriString = buffer.toString();
+    }
+    return uriString;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof HoodieLocation)) {
+      return false;
+    }
+    return this.uri.equals(((HoodieLocation) o).toUri());
+  }
+
+  @Override
+  public int hashCode() {
+    return uri.hashCode();
+  }
+
+  @Override
+  public int compareTo(HoodieLocation o) {
+    return this.uri.compareTo(o.uri);
+  }
+
+  /**
+   * Normalizes the path by removing the trailing slashes (`/`).
+   * When {@code keepSingleSlash} is {@code true}, `/` as the path is not 
changed;
+   * otherwise ({@code false}), `/` becomes empty String after normalization.
+   *
+   * @param path            {@link String} path to normalize.
+   * @param keepSingleSlash whether to keep `/` as the path.
+   * @return normalized path.
+   */
+  private static String normalize(String path, boolean keepSingleSlash) {
+    int indexOfLastSlash = path.length() - 1;
+    while (indexOfLastSlash >= 0) {
+      if (path.charAt(indexOfLastSlash) != SEPARATOR_CHAR) {
+        break;
+      }
+      indexOfLastSlash--;
+    }
+    indexOfLastSlash++;
+    if (indexOfLastSlash == path.length()) {
+      return path;
+    }
+    if (keepSingleSlash && indexOfLastSlash == 0) {
+      // All slashes and we want to keep one slash
+      return SEPARATOR;
+    }
+    return path.substring(0, indexOfLastSlash);
+  }
+}
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieLocationFilter.java 
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieLocationFilter.java
new file mode 100644
index 00000000000..d33686c030c
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieLocationFilter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.storage;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+
+import java.io.Serializable;
+
+/**
+ * Filter for {@link HoodieLocation}
+ * The APIs are mainly based on {@code org.apache.hadoop.fs.PathFilter} class.
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public interface HoodieLocationFilter extends Serializable {
+  /**
+   * Tests whether the specified location should be included in a location 
list.
+   *
+   * @param location the location to be tested.
+   * @return {@code true} if and only if <code>location</code> should be 
included.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  boolean accept(HoodieLocation location);
+}
diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java 
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
new file mode 100644
index 00000000000..eea2c3ff692
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.storage;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides I/O APIs on files and directories on storage.
+ * The APIs are mainly based on {@code org.apache.hadoop.fs.FileSystem} class.
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public abstract class HoodieStorage implements Closeable {
+  public static final Logger LOG = 
LoggerFactory.getLogger(HoodieStorage.class);
+  public static final String TMP_PATH_POSTFIX = ".tmp";
+
+  /**
+   * @return the scheme of the storage.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract String getScheme();
+
+  /**
+   * Creates an OutputStream at the indicated location.
+   *
+   * @param location  the file to create.
+   * @param overwrite if a file with this name already exists, then if {@code 
true},
+   *                  the file will be overwritten, and if {@code false} an 
exception will be thrown.
+   * @return the OutputStream to write to.
+   * @throws IOException IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract OutputStream create(HoodieLocation location, boolean 
overwrite) throws IOException;
+
+  /**
+   * Opens an InputStream at the indicated location.
+   *
+   * @param location the file to open.
+   * @return the InputStream to read from.
+   * @throws IOException IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract InputStream open(HoodieLocation location) throws IOException;
+
+  /**
+   * Appends to an existing file (optional operation).
+   *
+   * @param location the file to append.
+   * @return the OutputStream to write to.
+   * @throws IOException IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract OutputStream append(HoodieLocation location) throws 
IOException;
+
+  /**
+   * Checks if a location exists.
+   *
+   * @param location location to check.
+   * @return {@code true} if the location exists.
+   * @throws IOException IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract boolean exists(HoodieLocation location) throws IOException;
+
+  /**
+   * Returns a file status object that represents the location.
+   *
+   * @param location location to check.
+   * @return a {@link HoodieFileStatus} object.
+   * @throws FileNotFoundException when the path does not exist.
+   * @throws IOException           IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract HoodieFileStatus getFileStatus(HoodieLocation location) 
throws IOException;
+
+  /**
+   * Creates the directory and non-existent parent directories.
+   *
+   * @param location location to create.
+   * @return {@code true} if the directory was created.
+   * @throws IOException IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract boolean createDirectory(HoodieLocation location) throws 
IOException;
+
+  /**
+   * Lists the statuses of the direct files/directories in the given location 
if the path is a directory.
+   *
+   * @param location given location.
+   * @return the statuses of the files/directories in the given location.
+   * @throws FileNotFoundException when the location does not exist.
+   * @throws IOException           IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract List<HoodieFileStatus> listDirectEntries(HoodieLocation 
location) throws IOException;
+
+  /**
+   * Lists the statuses of all files under the give location recursively.
+   *
+   * @param location given location.
+   * @return the statuses of the files under the given location.
+   * @throws FileNotFoundException when the location does not exist.
+   * @throws IOException           IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract List<HoodieFileStatus> listFiles(HoodieLocation location) 
throws IOException;
+
+  /**
+   * Lists the statuses of the direct files/directories in the given location
+   * and filters the results, if the path is a directory.
+   *
+   * @param location given location.
+   * @param filter   filter to apply.
+   * @return the statuses of the files/directories in the given location.
+   * @throws FileNotFoundException when the location does not exist.
+   * @throws IOException           IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract List<HoodieFileStatus> listDirectEntries(HoodieLocation 
location,
+                                                           
HoodieLocationFilter filter) throws IOException;
+
+  /**
+   * Returns all the files that match the locationPattern and are not checksum 
files,
+   * and filters the results.
+   *
+   * @param locationPattern given pattern.
+   * @param filter          filter to apply.
+   * @return the statuses of the files.
+   * @throws IOException IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract List<HoodieFileStatus> globEntries(HoodieLocation 
locationPattern,
+                                                     HoodieLocationFilter 
filter) throws IOException;
+
+  /**
+   * Renames the location from old to new.
+   *
+   * @param oldLocation source location.
+   * @param newLocation destination location.
+   * @return {@true} if rename is successful.
+   * @throws IOException IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract boolean rename(HoodieLocation oldLocation,
+                                 HoodieLocation newLocation) throws 
IOException;
+
+  /**
+   * Deletes a directory at location.
+   *
+   * @param location directory to delete.
+   * @return {@code true} if successful.
+   * @throws IOException IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract boolean deleteDirectory(HoodieLocation location) throws 
IOException;
+
+  /**
+   * Deletes a file at location.
+   *
+   * @param location file to delete.
+   * @return {@code true} if successful.
+   * @throws IOException IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract boolean deleteFile(HoodieLocation location) throws 
IOException;
+
+  /**
+   * Qualifies a path to one which uses this storage and, if relative, made 
absolute.
+   *
+   * @param location to qualify.
+   * @return Qualified location.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract HoodieLocation makeQualified(HoodieLocation location);
+
+  /**
+   * @return the underlying file system instance if exists.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract Object getFileSystem();
+
+  /**
+   * @return the underlying configuration instance if exists.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract Object getConf();
+
+  /**
+   * Creates a new file with overwrite set to false. This ensures files are 
created
+   * only once and never rewritten, also, here we take care if the content is 
not
+   * empty, will first write the content to a temp file if 
{needCreateTempFile} is
+   * true, and then rename it back after the content is written.
+   *
+   * @param location file Path.
+   * @param content  content to be stored.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public final void createImmutableFileInPath(HoodieLocation location,
+                                              Option<byte[]> content) throws 
IOException {
+    OutputStream fsout = null;
+    HoodieLocation tmpLocation = null;
+
+    boolean needTempFile = needCreateTempFile();
+
+    try {
+      if (!content.isPresent()) {
+        fsout = create(location, false);
+      }
+
+      if (content.isPresent() && needTempFile) {
+        HoodieLocation parent = location.getParent();
+        tmpLocation = new HoodieLocation(parent, location.getName() + 
TMP_PATH_POSTFIX);
+        fsout = create(tmpLocation, false);
+        fsout.write(content.get());
+      }
+
+      if (content.isPresent() && !needTempFile) {
+        fsout = create(location, false);
+        fsout.write(content.get());
+      }
+    } catch (IOException e) {
+      String errorMsg = "Failed to create file " + (tmpLocation != null ? 
tmpLocation : location);
+      throw new HoodieIOException(errorMsg, e);
+    } finally {
+      try {
+        if (null != fsout) {
+          fsout.close();
+        }
+      } catch (IOException e) {
+        String errorMsg = "Failed to close file " + (needTempFile ? 
tmpLocation : location);
+        throw new HoodieIOException(errorMsg, e);
+      }
+
+      boolean renameSuccess = false;
+      try {
+        if (null != tmpLocation) {
+          renameSuccess = rename(tmpLocation, location);
+        }
+      } catch (IOException e) {
+        throw new HoodieIOException(
+            "Failed to rename " + tmpLocation + " to the target " + location,
+            e);
+      } finally {
+        if (!renameSuccess && null != tmpLocation) {
+          try {
+            deleteFile(tmpLocation);
+            LOG.warn("Fail to rename " + tmpLocation + " to " + location
+                + ", target file exists: " + exists(location));
+          } catch (IOException e) {
+            throw new HoodieIOException("Failed to delete tmp file " + 
tmpLocation, e);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * @return whether a temporary file needs to be created for immutability.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public final boolean needCreateTempFile() {
+    return StorageSchemes.HDFS.getScheme().equals(getScheme());
+  }
+
+  /**
+   * Create an OutputStream at the indicated location.
+   * The file is overwritten by default.
+   *
+   * @param location the file to create.
+   * @return the OutputStream to write to.
+   * @throws IOException IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public OutputStream create(HoodieLocation location) throws IOException {
+    return create(location, true);
+  }
+
+  /**
+   * Creates an empty new file at the indicated location.
+   *
+   * @param location the file to create.
+   * @return {@code true} if successfully created; {@code false} if already 
exists.
+   * @throws IOException IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public boolean createNewFile(HoodieLocation location) throws IOException {
+    if (exists(location)) {
+      return false;
+    } else {
+      create(location, false).close();
+      return true;
+    }
+  }
+
+  /**
+   * Lists the statuses of the direct files/directories in the given list of 
locations,
+   * if the locations are directory.
+   *
+   * @param locationList given location list.
+   * @return the statuses of the files/directories in the given locations.
+   * @throws FileNotFoundException when the location does not exist.
+   * @throws IOException           IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public List<HoodieFileStatus> listDirectEntries(List<HoodieLocation> 
locationList) throws IOException {
+    List<HoodieFileStatus> result = new ArrayList<>();
+    for (HoodieLocation location : locationList) {
+      result.addAll(listDirectEntries(location));
+    }
+    return result;
+  }
+
+  /**
+   * Returns all the files that match the locationPattern and are not checksum 
files.
+   *
+   * @param locationPattern given pattern.
+   * @return the statuses of the files.
+   * @throws IOException IO error.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public List<HoodieFileStatus> globEntries(HoodieLocation locationPattern) 
throws IOException {
+    return globEntries(locationPattern, e -> true);
+  }
+}
diff --git 
a/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieFileStatus.java 
b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieFileStatus.java
new file mode 100644
index 00000000000..903fc4b4e3a
--- /dev/null
+++ b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieFileStatus.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.storage.HoodieFileStatus;
+import org.apache.hudi.storage.HoodieLocation;
+
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/**
+ * Tests {@link HoodieFileStatus}
+ */
+public class TestHoodieFileStatus {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestHoodieFileStatus.class);
+  private static final long LENGTH = 100;
+  private static final long MODIFICATION_TIME = System.currentTimeMillis();
+  private static final String PATH1 = "/abc/xyz1";
+  private static final String PATH2 = "/abc/xyz2";
+  private static final HoodieLocation LOCATION1 = new HoodieLocation(PATH1);
+  private static final HoodieLocation LOCATION2 = new HoodieLocation(PATH2);
+
+  @Test
+  public void testConstructor() {
+    HoodieFileStatus fileStatus = new HoodieFileStatus(LOCATION1, LENGTH, 
false, MODIFICATION_TIME);
+    validateAccessors(fileStatus, PATH1, LENGTH, false, MODIFICATION_TIME);
+    fileStatus = new HoodieFileStatus(LOCATION2, -1, true, MODIFICATION_TIME + 
2L);
+    validateAccessors(fileStatus, PATH2, -1, true, MODIFICATION_TIME + 2L);
+  }
+
+  @Test
+  public void testSerializability() throws IOException, ClassNotFoundException 
{
+    HoodieFileStatus fileStatus = new HoodieFileStatus(LOCATION1, LENGTH, 
false, MODIFICATION_TIME);
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+         ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+      oos.writeObject(fileStatus);
+      try (ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
+           ObjectInputStream ois = new ObjectInputStream(bais)) {
+        HoodieFileStatus deserialized = (HoodieFileStatus) ois.readObject();
+        validateAccessors(deserialized, PATH1, LENGTH, false, 
MODIFICATION_TIME);
+      }
+    }
+  }
+
+  @Test
+  public void testEquals() {
+    HoodieFileStatus fileStatus1 = new HoodieFileStatus(
+        new HoodieLocation(PATH1), LENGTH, false, MODIFICATION_TIME);
+    HoodieFileStatus fileStatus2 = new HoodieFileStatus(
+        new HoodieLocation(PATH1), LENGTH + 2, false, MODIFICATION_TIME + 2L);
+    assertEquals(fileStatus1, fileStatus2);
+  }
+
+  @Test
+  public void testNotEquals() {
+    HoodieFileStatus fileStatus1 = new HoodieFileStatus(
+        LOCATION1, LENGTH, false, MODIFICATION_TIME);
+    HoodieFileStatus fileStatus2 = new HoodieFileStatus(
+        LOCATION2, LENGTH, false, MODIFICATION_TIME + 2L);
+    assertFalse(fileStatus1.equals(fileStatus2));
+    assertFalse(fileStatus2.equals(fileStatus1));
+  }
+
+  private void validateAccessors(HoodieFileStatus fileStatus,
+                                 String location,
+                                 long length,
+                                 boolean isDirectory,
+                                 long modificationTime) {
+    assertEquals(new HoodieLocation(location), fileStatus.getLocation());
+    assertEquals(length, fileStatus.getLength());
+    assertEquals(isDirectory, fileStatus.isDirectory());
+    assertEquals(!isDirectory, fileStatus.isFile());
+    assertEquals(modificationTime, fileStatus.getModificationTime());
+  }
+}
diff --git 
a/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieLocation.java 
b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieLocation.java
new file mode 100644
index 00000000000..4c765d2cc3f
--- /dev/null
+++ b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieLocation.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.storage.HoodieLocation;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests {@link HoodieLocation}
+ */
+public class TestHoodieLocation {
+  @Test
+  public void testToString() {
+    Arrays.stream(
+            new String[] {
+                "/",
+                "/foo",
+                "/foo/bar",
+                "foo",
+                "foo/bar",
+                "/foo/bar#boo",
+                "foo/bar#boo",
+                "file:/a/b/c",
+                "s3://a/b/c"})
+        .forEach(this::toStringTest);
+  }
+
+  @Test
+  public void testNormalize() throws URISyntaxException {
+    assertEquals("", new HoodieLocation(".").toString());
+    assertEquals("..", new HoodieLocation("..").toString());
+    assertEquals("/", new HoodieLocation("/").toString());
+    assertEquals("/", new HoodieLocation("//").toString());
+    assertEquals("/", new HoodieLocation("///").toString());
+    assertEquals("//foo/", new HoodieLocation("//foo/").toString());
+    assertEquals("//foo/", new HoodieLocation("//foo//").toString());
+    assertEquals("//foo/bar", new HoodieLocation("//foo//bar").toString());
+    assertEquals("/foo", new HoodieLocation("/foo/").toString());
+    assertEquals("/foo", new HoodieLocation("/foo/").toString());
+    assertEquals("foo", new HoodieLocation("foo/").toString());
+    assertEquals("foo", new HoodieLocation("foo//").toString());
+    assertEquals("foo/bar", new HoodieLocation("foo//bar").toString());
+    assertEquals("file:/a/b/c", new 
HoodieLocation("file:///a/b/c").toString());
+    assertEquals("s3://a/b/c/d/e", new HoodieLocation("s3://a/b/c", 
"d/e").toString());
+    assertEquals("s3://a/b/c/d/e", new HoodieLocation("s3://a/b/c/", 
"d/e").toString());
+    assertEquals("s3://a/b/c/d/e", new HoodieLocation("s3://a/b/c/", 
"d/e/").toString());
+    assertEquals("s3://a/b/c", new HoodieLocation("s3://a/b/c/", 
"/").toString());
+    assertEquals("s3://a/b/c", new HoodieLocation("s3://a/b/c/", 
"").toString());
+    assertEquals("s3://a/b/c/d/e", new HoodieLocation(new 
HoodieLocation("s3://a/b/c"), "d/e").toString());
+    assertEquals("s3://a/b/c/d/e", new HoodieLocation(new 
HoodieLocation("s3://a/b/c/"), "d/e").toString());
+    assertEquals("s3://a/b/c/d/e", new HoodieLocation(new 
HoodieLocation("s3://a/b/c/"), "d/e/").toString());
+    assertEquals("s3://a/b/c", new HoodieLocation(new 
HoodieLocation("s3://a/b/c/"), "/").toString());
+    assertEquals("s3://a/b/c", new HoodieLocation(new 
HoodieLocation("s3://a/b/c/"), "").toString());
+    assertEquals("hdfs://foo/foo2/bar/baz/", new HoodieLocation(new 
URI("hdfs://foo//foo2///bar/baz///")).toString());
+  }
+
+  @Test
+  public void testIsAbsolute() {
+    assertTrue(new HoodieLocation("/").isAbsolute());
+    assertTrue(new HoodieLocation("/foo").isAbsolute());
+    assertFalse(new HoodieLocation("foo").isAbsolute());
+    assertFalse(new HoodieLocation("foo/bar").isAbsolute());
+    assertFalse(new HoodieLocation(".").isAbsolute());
+  }
+
+  @Test
+  public void testGetParent() {
+    assertEquals(new HoodieLocation("/foo"), new 
HoodieLocation("/foo/bar").getParent());
+    assertEquals(new HoodieLocation("foo"), new 
HoodieLocation("foo/bar").getParent());
+    assertEquals(new HoodieLocation("/"), new 
HoodieLocation("/foo").getParent());
+    assertEquals(new HoodieLocation("/foo/bar/x"), new 
HoodieLocation("/foo/bar", "x/y").getParent());
+    assertEquals(new HoodieLocation("/foo/bar"), new 
HoodieLocation("/foo/bar/", "y").getParent());
+    assertEquals(new HoodieLocation("/foo"), new HoodieLocation("/foo/bar/", 
"/").getParent());
+    assertThrows(IllegalStateException.class, () -> new 
HoodieLocation("/").getParent());
+  }
+
+  @Test
+  public void testURI() throws URISyntaxException {
+    URI uri = new URI("file:///bar#baz");
+    HoodieLocation location = new HoodieLocation(uri);
+    assertEquals(uri, new URI(location.toString()));
+    assertEquals("foo://bar/baz#boo", new HoodieLocation("foo://bar/", 
"/baz#boo").toString());
+    assertEquals("foo://bar/baz/fud#boo",
+        new HoodieLocation(new HoodieLocation(new URI("foo://bar/baz#bud")), 
"fud#boo").toString());
+    assertEquals("foo://bar/fud#boo",
+        new HoodieLocation(new HoodieLocation(new URI("foo://bar/baz#bud")), 
"/fud#boo").toString());
+  }
+
+  @Test
+  public void testPathToUriConversion() throws URISyntaxException {
+    assertEquals(new URI(null, null, "/foo?bar", null, null),
+        new HoodieLocation("/foo?bar").toUri());
+    assertEquals(new URI(null, null, "/foo\"bar", null, null),
+        new HoodieLocation("/foo\"bar").toUri());
+    assertEquals(new URI(null, null, "/foo bar", null, null),
+        new HoodieLocation("/foo bar").toUri());
+    assertEquals("/foo?bar", new 
HoodieLocation("http://localhost/foo?bar";).toUri().getPath());
+    assertEquals("/foo", new URI("http://localhost/foo?bar";).getPath());
+    assertEquals((new URI("/foo;bar")).getPath(), new 
HoodieLocation("/foo;bar").toUri().getPath());
+    assertEquals(new URI("/foo;bar"), new HoodieLocation("/foo;bar").toUri());
+    assertEquals(new URI("/foo+bar"), new HoodieLocation("/foo+bar").toUri());
+    assertEquals(new URI("/foo-bar"), new HoodieLocation("/foo-bar").toUri());
+    assertEquals(new URI("/foo=bar"), new HoodieLocation("/foo=bar").toUri());
+    assertEquals(new URI("/foo,bar"), new HoodieLocation("/foo,bar").toUri());
+  }
+
+  @Test
+  public void testGetName() {
+    assertEquals("", new HoodieLocation("/").getName());
+    assertEquals("foo", new HoodieLocation("foo").getName());
+    assertEquals("foo", new HoodieLocation("/foo").getName());
+    assertEquals("foo", new HoodieLocation("/foo/").getName());
+    assertEquals("bar", new HoodieLocation("/foo/bar").getName());
+    assertEquals("bar", new HoodieLocation("hdfs://host/foo/bar").getName());
+    assertEquals("bar", new HoodieLocation("hdfs://host", 
"foo/bar").getName());
+    assertEquals("bar", new HoodieLocation("hdfs://host/foo/", 
"bar").getName());
+  }
+
+  @Test
+  public void testGetLocationWithoutSchemeAndAuthority() {
+    assertEquals(
+        new HoodieLocation("/foo/bar/boo"),
+        new 
HoodieLocation("/foo/bar/boo").getLocationWithoutSchemeAndAuthority());
+    assertEquals(
+        new HoodieLocation("/foo/bar/boo"),
+        new 
HoodieLocation("file:///foo/bar/boo").getLocationWithoutSchemeAndAuthority());
+    assertEquals(
+        new HoodieLocation("/bar/boo"),
+        new 
HoodieLocation("s3://foo/bar/boo").getLocationWithoutSchemeAndAuthority());
+  }
+
+  @Test
+  public void testDepth() throws URISyntaxException {
+    assertEquals(0, new HoodieLocation("/").depth());
+    assertEquals(0, new HoodieLocation("///").depth());
+    assertEquals(0, new HoodieLocation("//foo/").depth());
+    assertEquals(1, new HoodieLocation("//foo//bar").depth());
+    assertEquals(5, new HoodieLocation("/a/b/c/d/e").depth());
+    assertEquals(4, new HoodieLocation("s3://a/b/c", "d/e").depth());
+    assertEquals(2, new HoodieLocation("s3://a/b/c/", "").depth());
+    assertEquals(4, new HoodieLocation(new HoodieLocation("s3://a/b/c"), 
"d/e").depth());
+  }
+
+  @Test
+  public void testEquals() {
+    assertEquals(new HoodieLocation("/foo"), new HoodieLocation("/foo"));
+    assertEquals(new HoodieLocation("/foo"), new HoodieLocation("/foo/"));
+    assertEquals(new HoodieLocation("/foo/bar"), new 
HoodieLocation("/foo//bar/"));
+    assertNotEquals(new HoodieLocation("/"), new HoodieLocation("/foo"));
+  }
+
+  @Test
+  public void testCachedResults() {
+    HoodieLocation location = new HoodieLocation("s3://x/y/z/");
+    assertSame(location.getParent(), location.getParent());
+    assertSame(location.getName(), location.getName());
+    assertSame(location.toString(), location.toString());
+  }
+
+  private void toStringTest(String pathString) {
+    assertEquals(pathString, new HoodieLocation(pathString).toString());
+  }
+}
diff --git 
a/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieLocationFilter.java
 
b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieLocationFilter.java
new file mode 100644
index 00000000000..2d66cc23f87
--- /dev/null
+++ 
b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieLocationFilter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.storage.HoodieLocation;
+import org.apache.hudi.storage.HoodieLocationFilter;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests {@link HoodieLocationFilter}
+ */
+public class TestHoodieLocationFilter {
+  @Test
+  public void testFilter() {
+    HoodieLocation location1 = new HoodieLocation("/x/y/1");
+    HoodieLocation location2 = new HoodieLocation("/x/y/2");
+    HoodieLocation location3 = new HoodieLocation("/x/z/1");
+    HoodieLocation location4 = new HoodieLocation("/x/z/2");
+
+    List<HoodieLocation> locationList = Arrays.stream(
+        new HoodieLocation[] {location1, location2, location3, location4}
+    ).collect(Collectors.toList());
+
+    List<HoodieLocation> expected = Arrays.stream(
+        new HoodieLocation[] {location1, location2}
+    ).collect(Collectors.toList());
+
+    assertEquals(expected.stream().sorted().collect(Collectors.toList()),
+        locationList.stream()
+            .filter(e -> new HoodieLocationFilter() {
+              @Override
+              public boolean accept(HoodieLocation location) {
+                return location.getParent().equals(new HoodieLocation("/x/y"));
+              }
+            }.accept(e))
+            .sorted()
+            .collect(Collectors.toList()));
+    assertEquals(locationList,
+        locationList.stream()
+            .filter(e -> new HoodieLocationFilter() {
+              @Override
+              public boolean accept(HoodieLocation location) {
+                return true;
+              }
+            }.accept(e))
+            .sorted()
+            .collect(Collectors.toList()));
+  }
+}
diff --git 
a/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java 
b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java
new file mode 100644
index 00000000000..0424d22157d
--- /dev/null
+++ 
b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.io.util.IOUtils;
+import org.apache.hudi.storage.HoodieFileStatus;
+import org.apache.hudi.storage.HoodieLocation;
+import org.apache.hudi.storage.HoodieStorage;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class for testing different implementation of {@link HoodieStorage}.
+ */
+public abstract class TestHoodieStorageBase {
+  @TempDir
+  protected Path tempDir;
+
+  protected static final String[] RELATIVE_FILE_PATHS = new String[] {
+      "w/1.file", "w/2.file", "x/1.file", "x/2.file",
+      "x/y/1.file", "x/y/2.file", "x/z/1.file", "x/z/2.file"
+  };
+  private static final byte[] EMPTY_BYTES = new byte[] {};
+
+  /**
+   * @param fs   file system instance.
+   * @param conf configuration instance.
+   * @return {@link HoodieStorage} instance based on the implementation for 
testing.
+   */
+  protected abstract HoodieStorage getHoodieStorage(Object fs, Object conf);
+
+  /**
+   * @param conf configuration instance.
+   * @return the underlying file system instance used if required.
+   */
+  protected abstract Object getFileSystem(Object conf);
+
+  /**
+   * @return configurations for the storage.
+   */
+  protected abstract Object getConf();
+
+  @AfterEach
+  public void cleanUpTempDir() {
+    HoodieStorage storage = getHoodieStorage();
+    try {
+      for (HoodieFileStatus status : storage.listDirectEntries(new 
HoodieLocation(getTempDir()))) {
+        HoodieLocation location = status.getLocation();
+        if (status.isDirectory()) {
+          storage.deleteDirectory(location);
+        } else {
+          storage.deleteFile(location);
+        }
+      }
+    } catch (IOException e) {
+      // Silently fail
+    }
+  }
+
+  @Test
+  public void testGetScheme() {
+    assertEquals("file", getHoodieStorage().getScheme());
+  }
+
+  @Test
+  public void testCreateWriteAndRead() throws IOException {
+    HoodieStorage storage = getHoodieStorage();
+
+    HoodieLocation location = new HoodieLocation(getTempDir(), 
"testCreateAppendAndRead/1.file");
+    assertFalse(storage.exists(location));
+    storage.create(location).close();
+    validateFileStatus(storage, location, EMPTY_BYTES, false);
+
+    byte[] data = new byte[] {2, 42, 49, (byte) 158, (byte) 233, 66, 9};
+
+    // By default, create overwrites the file
+    try (OutputStream stream = storage.create(location)) {
+      stream.write(data);
+      stream.flush();
+    }
+    validateFileStatus(storage, location, data, false);
+
+    assertThrows(IOException.class, () -> storage.create(location, false));
+    validateFileStatus(storage, location, data, false);
+
+    assertThrows(IOException.class, () -> storage.create(location, false));
+    validateFileStatus(storage, location, data, false);
+
+    HoodieLocation location2 = new HoodieLocation(getTempDir(), 
"testCreateAppendAndRead/2.file");
+    assertFalse(storage.exists(location2));
+    assertTrue(storage.createNewFile(location2));
+    validateFileStatus(storage, location2, EMPTY_BYTES, false);
+    assertFalse(storage.createNewFile(location2));
+
+    HoodieLocation location3 = new HoodieLocation(getTempDir(), 
"testCreateAppendAndRead/3.file");
+    assertFalse(storage.exists(location3));
+    storage.createImmutableFileInPath(location3, Option.of(data));
+    validateFileStatus(storage, location3, data, false);
+
+    HoodieLocation location4 = new HoodieLocation(getTempDir(), 
"testCreateAppendAndRead/4");
+    assertFalse(storage.exists(location4));
+    assertTrue(storage.createDirectory(location4));
+    validateFileStatus(storage, location4, EMPTY_BYTES, true);
+    assertTrue(storage.createDirectory(location4));
+  }
+
+  @Test
+  public void testListing() throws IOException {
+    HoodieStorage storage = getHoodieStorage();
+    // Full list:
+    // w/1.file
+    // w/2.file
+    // x/1.file
+    // x/2.file
+    // x/y/1.file
+    // x/y/2.file
+    // x/z/1.file
+    // x/z/2.file
+    prepareFilesOnStorage(storage);
+
+    validateHoodieFileStatusList(
+        Arrays.stream(new HoodieFileStatus[] {
+            new HoodieFileStatus(new HoodieLocation(getTempDir(), "x/1.file"), 
0, false, 0),
+            new HoodieFileStatus(new HoodieLocation(getTempDir(), "x/2.file"), 
0, false, 0),
+            new HoodieFileStatus(new HoodieLocation(getTempDir(), "x/y"), 0, 
true, 0),
+            new HoodieFileStatus(new HoodieLocation(getTempDir(), "x/z"), 0, 
true, 0),
+        }).collect(Collectors.toList()),
+        storage.listDirectEntries(new HoodieLocation(getTempDir(), "x")));
+
+    validateHoodieFileStatusList(
+        Arrays.stream(new HoodieFileStatus[] {
+            new HoodieFileStatus(new HoodieLocation(getTempDir(), "x/1.file"), 
0, false, 0),
+            new HoodieFileStatus(new HoodieLocation(getTempDir(), "x/2.file"), 
0, false, 0),
+            new HoodieFileStatus(new HoodieLocation(getTempDir(), 
"x/y/1.file"), 0, false, 0),
+            new HoodieFileStatus(new HoodieLocation(getTempDir(), 
"x/y/2.file"), 0, false, 0),
+            new HoodieFileStatus(new HoodieLocation(getTempDir(), 
"x/z/1.file"), 0, false, 0),
+            new HoodieFileStatus(new HoodieLocation(getTempDir(), 
"x/z/2.file"), 0, false, 0)
+        }).collect(Collectors.toList()),
+        storage.listFiles(new HoodieLocation(getTempDir(), "x")));
+
+    validateHoodieFileStatusList(
+        Arrays.stream(new HoodieFileStatus[] {
+            new HoodieFileStatus(new HoodieLocation(getTempDir(), "x/2.file"), 
0, false, 0)
+        }).collect(Collectors.toList()),
+        storage.listDirectEntries(
+            new HoodieLocation(getTempDir(), "x"), e -> 
e.getName().contains("2")));
+
+    validateHoodieFileStatusList(
+        Arrays.stream(new HoodieFileStatus[] {
+            new HoodieFileStatus(new HoodieLocation(getTempDir(), "w/1.file"), 
0, false, 0),
+            new HoodieFileStatus(new HoodieLocation(getTempDir(), "w/2.file"), 
0, false, 0),
+            new HoodieFileStatus(new HoodieLocation(getTempDir(), 
"x/z/1.file"), 0, false, 0),
+            new HoodieFileStatus(new HoodieLocation(getTempDir(), 
"x/z/2.file"), 0, false, 0)
+        }).collect(Collectors.toList()),
+        storage.listDirectEntries(Arrays.stream(new HoodieLocation[] {
+            new HoodieLocation(getTempDir(), "w"),
+            new HoodieLocation(getTempDir(), "x/z")
+        }).collect(Collectors.toList())));
+
+    assertThrows(FileNotFoundException.class,
+        () -> storage.listDirectEntries(new HoodieLocation(getTempDir(), 
"*")));
+
+    validateHoodieFileStatusList(
+        Arrays.stream(new HoodieFileStatus[] {
+            new HoodieFileStatus(new HoodieLocation(getTempDir(), 
"x/y/1.file"), 0, false, 0),
+            new HoodieFileStatus(new HoodieLocation(getTempDir(), 
"x/z/1.file"), 0, false, 0)
+        }).collect(Collectors.toList()),
+        storage.globEntries(new HoodieLocation(getTempDir(), "x/*/1.file")));
+
+    validateHoodieFileStatusList(
+        Arrays.stream(new HoodieFileStatus[] {
+            new HoodieFileStatus(new HoodieLocation(getTempDir(), "x/1.file"), 
0, false, 0),
+            new HoodieFileStatus(new HoodieLocation(getTempDir(), "x/2.file"), 
0, false, 0),
+        }).collect(Collectors.toList()),
+        storage.globEntries(new HoodieLocation(getTempDir(), "x/*.file")));
+
+    validateHoodieFileStatusList(
+        Arrays.stream(new HoodieFileStatus[] {
+            new HoodieFileStatus(new HoodieLocation(getTempDir(), 
"x/y/1.file"), 0, false, 0),
+        }).collect(Collectors.toList()),
+        storage.globEntries(
+            new HoodieLocation(getTempDir(), "x/*/*.file"),
+            e -> e.getParent().getName().equals("y") && 
e.getName().contains("1")));
+  }
+
+  @Test
+  public void testFileNotFound() throws IOException {
+    HoodieStorage storage = getHoodieStorage();
+
+    HoodieLocation fileLocation = new HoodieLocation(getTempDir(), 
"testFileNotFound/1.file");
+    HoodieLocation dirLocation = new HoodieLocation(getTempDir(), 
"testFileNotFound/2");
+    assertFalse(storage.exists(fileLocation));
+    assertThrows(FileNotFoundException.class, () -> 
storage.open(fileLocation));
+    assertThrows(FileNotFoundException.class, () -> 
storage.getFileStatus(fileLocation));
+    assertThrows(FileNotFoundException.class, () -> 
storage.listDirectEntries(fileLocation));
+    assertThrows(FileNotFoundException.class, () -> 
storage.listDirectEntries(dirLocation));
+    assertThrows(FileNotFoundException.class, () -> 
storage.listDirectEntries(dirLocation, e -> true));
+    assertThrows(FileNotFoundException.class, () -> storage.listDirectEntries(
+        Arrays.stream(new HoodieLocation[] 
{dirLocation}).collect(Collectors.toList())));
+  }
+
+  @Test
+  public void testRename() throws IOException {
+    HoodieStorage storage = getHoodieStorage();
+
+    HoodieLocation location = new HoodieLocation(getTempDir(), 
"testRename/1.file");
+    assertFalse(storage.exists(location));
+    storage.create(location).close();
+    validateFileStatus(storage, location, EMPTY_BYTES, false);
+
+    HoodieLocation newLocation = new HoodieLocation(getTempDir(), 
"testRename/1_renamed.file");
+    assertTrue(storage.rename(location, newLocation));
+    assertFalse(storage.exists(location));
+    validateFileStatus(storage, newLocation, EMPTY_BYTES, false);
+  }
+
+  @Test
+  public void testDelete() throws IOException {
+    HoodieStorage storage = getHoodieStorage();
+
+    HoodieLocation location = new HoodieLocation(getTempDir(), 
"testDelete/1.file");
+    assertFalse(storage.exists(location));
+    storage.create(location).close();
+    assertTrue(storage.exists(location));
+
+    assertTrue(storage.deleteFile(location));
+    assertFalse(storage.exists(location));
+    assertFalse(storage.deleteFile(location));
+
+    HoodieLocation location2 = new HoodieLocation(getTempDir(), 
"testDelete/2");
+    assertFalse(storage.exists(location2));
+    assertTrue(storage.createDirectory(location2));
+    assertTrue(storage.exists(location2));
+
+    assertTrue(storage.deleteDirectory(location2));
+    assertFalse(storage.exists(location2));
+    assertFalse(storage.deleteDirectory(location2));
+  }
+
+  @Test
+  public void testMakeQualified() {
+    HoodieStorage storage = getHoodieStorage();
+    HoodieLocation location = new 
HoodieLocation("/tmp/testMakeQualified/1.file");
+    assertEquals(
+        new HoodieLocation("file:/tmp/testMakeQualified/1.file"),
+        storage.makeQualified(location));
+  }
+
+  @Test
+  public void testGetFileSystem() {
+    Object conf = getConf();
+    Object fs = getFileSystem(conf);
+    HoodieStorage storage = getHoodieStorage(fs, conf);
+    assertSame(fs, storage.getFileSystem());
+  }
+
+  protected String getTempDir() {
+    return "file:" + tempDir.toUri().getPath();
+  }
+
+  /**
+   * Prepares files on storage for testing.
+   *
+   * @storage {@link HoodieStorage} to use.
+   */
+  private void prepareFilesOnStorage(HoodieStorage storage) throws IOException 
{
+    String dir = getTempDir();
+    for (String relativePath : RELATIVE_FILE_PATHS) {
+      storage.create(new HoodieLocation(dir, relativePath)).close();
+    }
+  }
+
+  private HoodieStorage getHoodieStorage() {
+    Object conf = getConf();
+    return getHoodieStorage(getFileSystem(conf), conf);
+  }
+
+  private void validateFileStatus(HoodieStorage storage,
+                                  HoodieLocation location,
+                                  byte[] data,
+                                  boolean isDirectory) throws IOException {
+    assertTrue(storage.exists(location));
+    HoodieFileStatus fileStatus = storage.getFileStatus(location);
+    assertEquals(location, fileStatus.getLocation());
+    assertEquals(isDirectory, fileStatus.isDirectory());
+    assertEquals(!isDirectory, fileStatus.isFile());
+    if (!isDirectory) {
+      assertEquals(data.length, fileStatus.getLength());
+      try (InputStream stream = storage.open(location)) {
+        assertArrayEquals(data, IOUtils.readAsByteArray(stream, data.length));
+      }
+    }
+    assertTrue(fileStatus.getModificationTime() > 0);
+  }
+
+  private void validateHoodieFileStatusList(List<HoodieFileStatus> expected,
+                                            List<HoodieFileStatus> actual) {
+    assertEquals(expected.size(), actual.size());
+    List<HoodieFileStatus> sortedExpected = expected.stream()
+        .sorted(Comparator.comparing(HoodieFileStatus::getLocation))
+        .collect(Collectors.toList());
+    List<HoodieFileStatus> sortedActual = actual.stream()
+        .sorted(Comparator.comparing(HoodieFileStatus::getLocation))
+        .collect(Collectors.toList());
+    for (int i = 0; i < expected.size(); i++) {
+      // We cannot use HoodieFileStatus#equals as that only compares the 
location
+      assertEquals(sortedExpected.get(i).getLocation(), 
sortedActual.get(i).getLocation());
+      assertEquals(sortedExpected.get(i).isDirectory(), 
sortedActual.get(i).isDirectory());
+      assertEquals(sortedExpected.get(i).isFile(), 
sortedActual.get(i).isFile());
+      if (sortedExpected.get(i).isFile()) {
+        assertEquals(sortedExpected.get(i).getLength(), 
sortedActual.get(i).getLength());
+      }
+      assertTrue(sortedActual.get(i).getModificationTime() > 0);
+    }
+  }
+}

Reply via email to