This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4c4fe76f079 [HUDI-7336] Introduce new HoodieStorage abstraction
(#10567)
4c4fe76f079 is described below
commit 4c4fe76f079540159245207cac1e7a5ab10a476c
Author: Y Ethan Guo <[email protected]>
AuthorDate: Sun Jan 28 21:27:16 2024 -0800
[HUDI-7336] Introduce new HoodieStorage abstraction (#10567)
This commit introduces `HoodieStorage` abstraction and Hudi's counterpart
classes for Hadoop File System classes (`org.apache.hadoop.fs.`[`FileSystem`,
`Path`, `PathFilter`, `FileStatus`]) to decouple Hudi's implementation from
Hadoop classes, so it's much easier to plugin different file system
implementation.
---
hudi-hadoop-common/pom.xml | 8 +
.../hudi/storage/hadoop/HoodieHadoopStorage.java | 201 ++++++++++++
.../hadoop/storage/TestHoodieHadoopStorage.java | 53 +++
.../java/org/apache/hudi/ApiMaturityLevel.java | 0
.../main/java/org/apache/hudi/PublicAPIClass.java | 0
.../main/java/org/apache/hudi/PublicAPIMethod.java | 0
.../main/java/org/apache/hudi/io/util/IOUtils.java | 16 +
.../org/apache/hudi/storage/HoodieFileStatus.java | 120 +++++++
.../org/apache/hudi/storage/HoodieLocation.java | 262 +++++++++++++++
.../apache/hudi/storage/HoodieLocationFilter.java | 42 +++
.../org/apache/hudi/storage/HoodieStorage.java | 355 +++++++++++++++++++++
.../hudi/io/storage/TestHoodieFileStatus.java | 102 ++++++
.../apache/hudi/io/storage/TestHoodieLocation.java | 192 +++++++++++
.../hudi/io/storage/TestHoodieLocationFilter.java | 73 +++++
.../hudi/io/storage/TestHoodieStorageBase.java | 353 ++++++++++++++++++++
15 files changed, 1777 insertions(+)
diff --git a/hudi-hadoop-common/pom.xml b/hudi-hadoop-common/pom.xml
index d6f4d3442fb..2ae9c610703 100644
--- a/hudi-hadoop-common/pom.xml
+++ b/hudi-hadoop-common/pom.xml
@@ -98,5 +98,13 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-io</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
new file mode 100644
index 00000000000..b863e97cba1
--- /dev/null
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.storage.hadoop;
+
+import org.apache.hudi.storage.HoodieFileStatus;
+import org.apache.hudi.storage.HoodieLocation;
+import org.apache.hudi.storage.HoodieLocationFilter;
+import org.apache.hudi.storage.HoodieStorage;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of {@link HoodieStorage} using Hadoop's {@link FileSystem}
+ */
+public class HoodieHadoopStorage extends HoodieStorage {
+ private final FileSystem fs;
+
+ public HoodieHadoopStorage(FileSystem fs) {
+ this.fs = fs;
+ }
+
+ @Override
+ public String getScheme() {
+ return fs.getScheme();
+ }
+
+ @Override
+ public OutputStream create(HoodieLocation location, boolean overwrite)
throws IOException {
+ return fs.create(convertHoodieLocationToPath(location), overwrite);
+ }
+
+ @Override
+ public InputStream open(HoodieLocation location) throws IOException {
+ return fs.open(convertHoodieLocationToPath(location));
+ }
+
+ @Override
+ public OutputStream append(HoodieLocation location) throws IOException {
+ return fs.append(convertHoodieLocationToPath(location));
+ }
+
+ @Override
+ public boolean exists(HoodieLocation location) throws IOException {
+ return fs.exists(convertHoodieLocationToPath(location));
+ }
+
+ @Override
+ public HoodieFileStatus getFileStatus(HoodieLocation location) throws
IOException {
+ return
convertToHoodieFileStatus(fs.getFileStatus(convertHoodieLocationToPath(location)));
+ }
+
+ @Override
+ public boolean createDirectory(HoodieLocation location) throws IOException {
+ return fs.mkdirs(convertHoodieLocationToPath(location));
+ }
+
+ @Override
+ public List<HoodieFileStatus> listDirectEntries(HoodieLocation location)
throws IOException {
+ return Arrays.stream(fs.listStatus(convertHoodieLocationToPath(location)))
+ .map(this::convertToHoodieFileStatus)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public List<HoodieFileStatus> listFiles(HoodieLocation location) throws
IOException {
+ List<HoodieFileStatus> result = new ArrayList<>();
+ RemoteIterator<LocatedFileStatus> iterator =
fs.listFiles(convertHoodieLocationToPath(location), true);
+ while (iterator.hasNext()) {
+ result.add(convertToHoodieFileStatus(iterator.next()));
+ }
+ return result;
+ }
+
+ @Override
+ public List<HoodieFileStatus> listDirectEntries(List<HoodieLocation>
locationList) throws IOException {
+ return Arrays.stream(fs.listStatus(locationList.stream()
+ .map(this::convertHoodieLocationToPath)
+ .toArray(Path[]::new)))
+ .map(this::convertToHoodieFileStatus)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public List<HoodieFileStatus> listDirectEntries(HoodieLocation location,
+ HoodieLocationFilter filter)
+ throws IOException {
+ return Arrays.stream(fs.listStatus(
+ convertHoodieLocationToPath(location), path ->
+ filter.accept(convertPathToHoodieLocation(path))))
+ .map(this::convertToHoodieFileStatus)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public List<HoodieFileStatus> globEntries(HoodieLocation locationPattern)
+ throws IOException {
+ return
Arrays.stream(fs.globStatus(convertHoodieLocationToPath(locationPattern)))
+ .map(this::convertToHoodieFileStatus)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public List<HoodieFileStatus> globEntries(HoodieLocation locationPattern,
HoodieLocationFilter filter)
+ throws IOException {
+ return
Arrays.stream(fs.globStatus(convertHoodieLocationToPath(locationPattern), path
->
+ filter.accept(convertPathToHoodieLocation(path))))
+ .map(this::convertToHoodieFileStatus)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean rename(HoodieLocation oldLocation, HoodieLocation
newLocation) throws IOException {
+ return fs.rename(convertHoodieLocationToPath(oldLocation),
convertHoodieLocationToPath(newLocation));
+ }
+
+ @Override
+ public boolean deleteDirectory(HoodieLocation location) throws IOException {
+ return fs.delete(convertHoodieLocationToPath(location), true);
+ }
+
+ @Override
+ public boolean deleteFile(HoodieLocation location) throws IOException {
+ return fs.delete(convertHoodieLocationToPath(location), false);
+ }
+
+ @Override
+ public HoodieLocation makeQualified(HoodieLocation location) {
+ return convertPathToHoodieLocation(
+ fs.makeQualified(convertHoodieLocationToPath(location)));
+ }
+
+ @Override
+ public Object getFileSystem() {
+ return fs;
+ }
+
+ @Override
+ public Object getConf() {
+ return fs.getConf();
+ }
+
+ @Override
+ public OutputStream create(HoodieLocation location) throws IOException {
+ return fs.create(convertHoodieLocationToPath(location));
+ }
+
+ @Override
+ public boolean createNewFile(HoodieLocation location) throws IOException {
+ return fs.createNewFile(convertHoodieLocationToPath(location));
+ }
+
+ private Path convertHoodieLocationToPath(HoodieLocation loc) {
+ return new Path(loc.toUri());
+ }
+
+ private HoodieLocation convertPathToHoodieLocation(Path path) {
+ return new HoodieLocation(path.toUri());
+ }
+
+ private HoodieFileStatus convertToHoodieFileStatus(FileStatus fileStatus) {
+ return new HoodieFileStatus(
+ convertPathToHoodieLocation(fileStatus.getPath()),
+ fileStatus.getLen(),
+ fileStatus.isDirectory(),
+ fileStatus.getModificationTime());
+ }
+
+ @Override
+ public void close() throws IOException {
+ fs.close();
+ }
+}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/hadoop/storage/TestHoodieHadoopStorage.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/hadoop/storage/TestHoodieHadoopStorage.java
new file mode 100644
index 00000000000..3eaf4135032
--- /dev/null
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/hadoop/storage/TestHoodieHadoopStorage.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop.storage;
+
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.io.storage.TestHoodieStorageBase;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * Tests {@link HoodieHadoopStorage}.
+ */
+public class TestHoodieHadoopStorage extends TestHoodieStorageBase {
+ private static final String CONF_KEY = "hudi.testing.key";
+ private static final String CONF_VALUE = "value";
+
+ @Override
+ protected HoodieStorage getHoodieStorage(Object fs, Object conf) {
+ return new HoodieHadoopStorage((FileSystem) fs);
+ }
+
+ @Override
+ protected Object getFileSystem(Object conf) {
+ return HadoopFSUtils.getFs(getTempDir(), (Configuration) conf, true);
+ }
+
+ @Override
+ protected Object getConf() {
+ Configuration conf = new Configuration();
+ conf.set(CONF_KEY, CONF_VALUE);
+ return conf;
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/ApiMaturityLevel.java
b/hudi-io/src/main/java/org/apache/hudi/ApiMaturityLevel.java
similarity index 100%
rename from hudi-common/src/main/java/org/apache/hudi/ApiMaturityLevel.java
rename to hudi-io/src/main/java/org/apache/hudi/ApiMaturityLevel.java
diff --git a/hudi-common/src/main/java/org/apache/hudi/PublicAPIClass.java
b/hudi-io/src/main/java/org/apache/hudi/PublicAPIClass.java
similarity index 100%
rename from hudi-common/src/main/java/org/apache/hudi/PublicAPIClass.java
rename to hudi-io/src/main/java/org/apache/hudi/PublicAPIClass.java
diff --git a/hudi-common/src/main/java/org/apache/hudi/PublicAPIMethod.java
b/hudi-io/src/main/java/org/apache/hudi/PublicAPIMethod.java
similarity index 100%
rename from hudi-common/src/main/java/org/apache/hudi/PublicAPIMethod.java
rename to hudi-io/src/main/java/org/apache/hudi/PublicAPIMethod.java
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/util/IOUtils.java
b/hudi-io/src/main/java/org/apache/hudi/io/util/IOUtils.java
index 5eeb21011cf..96cc6df95cc 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/util/IOUtils.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/util/IOUtils.java
@@ -19,8 +19,10 @@
package org.apache.hudi.io.util;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
/**
* Util methods on I/O.
@@ -249,4 +251,18 @@ public class IOUtils {
}
return totalBytesRead;
}
+
+ public static byte[] readAsByteArray(InputStream input, int outputSize)
throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(outputSize);
+ copy(input, bos);
+ return bos.toByteArray();
+ }
+
+ public static void copy(InputStream inputStream, OutputStream outputStream)
throws IOException {
+ byte[] buffer = new byte[1024];
+ int len;
+ while ((len = inputStream.read(buffer)) != -1) {
+ outputStream.write(buffer, 0, len);
+ }
+ }
}
diff --git
a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieFileStatus.java
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieFileStatus.java
new file mode 100644
index 00000000000..6f033c5bc95
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieFileStatus.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.storage;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+
+import java.io.Serializable;
+
+/**
+ * Represents the information of a directory or a file.
+ * The APIs are mainly based on {@code org.apache.hadoop.fs.FileStatus} class
+ * with simplification based on what Hudi needs.
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public class HoodieFileStatus implements Serializable {
+ private final HoodieLocation location;
+ private final long length;
+ private final boolean isDirectory;
+ private final long modificationTime;
+
+ public HoodieFileStatus(HoodieLocation location,
+ long length,
+ boolean isDirectory,
+ long modificationTime) {
+ this.location = location;
+ this.length = length;
+ this.isDirectory = isDirectory;
+ this.modificationTime = modificationTime;
+ }
+
+ /**
+ * @return the location.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public HoodieLocation getLocation() {
+ return location;
+ }
+
+ /**
+ * @return the length of a file in bytes.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public long getLength() {
+ return length;
+ }
+
+ /**
+ * @return whether this is a file.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public boolean isFile() {
+ return !isDirectory;
+ }
+
+ /**
+ * @return whether this is a directory.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public boolean isDirectory() {
+ return isDirectory;
+ }
+
+ /**
+ * @return the modification of a file.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public long getModificationTime() {
+ return modificationTime;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HoodieFileStatus that = (HoodieFileStatus) o;
+ // PLEASE NOTE that here we follow the same contract hadoop's FileStatus
provides,
+ // i.e., the equality is purely based on the location.
+ return getLocation().equals(that.getLocation());
+ }
+
+ @Override
+ public int hashCode() {
+ // PLEASE NOTE that here we follow the same contract hadoop's FileStatus
provides,
+ // i.e., the hash code is purely based on the location.
+ return getLocation().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "HoodieFileStatus{"
+ + "location=" + location
+ + ", length=" + length
+ + ", isDirectory=" + isDirectory
+ + ", modificationTime=" + modificationTime
+ + '}';
+ }
+}
diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieLocation.java
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieLocation.java
new file mode 100644
index 00000000000..3b3a05dc9b4
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieLocation.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.storage;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * Names a file or directory on storage.
+ * Location strings use slash (`/`) as the directory separator.
+ * The APIs are mainly based on {@code org.apache.hadoop.fs.Path} class.
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public class HoodieLocation implements Comparable<HoodieLocation>,
Serializable {
+ public static final char SEPARATOR_CHAR = '/';
+ public static final char COLON_CHAR = ':';
+ public static final String SEPARATOR = "" + SEPARATOR_CHAR;
+ private final URI uri;
+ private transient volatile HoodieLocation cachedParent;
+ private transient volatile String cachedName;
+ private transient volatile String uriString;
+
+ public HoodieLocation(URI uri) {
+ this.uri = uri.normalize();
+ }
+
+ public HoodieLocation(String path) {
+ try {
+ // This part of parsing is compatible with hadoop's Path
+ // and required for properly handling encoded path with URI
+ String scheme = null;
+ String authority = null;
+
+ int start = 0;
+
+ // Parse URI scheme, if any
+ int colon = path.indexOf(COLON_CHAR);
+ int slash = path.indexOf(SEPARATOR_CHAR);
+ if (colon != -1
+ && ((slash == -1) || (colon < slash))) {
+ scheme = path.substring(0, colon);
+ start = colon + 1;
+ }
+
+ // Parse URI authority, if any
+ if (path.startsWith("//", start)
+ && (path.length() - start > 2)) {
+ int nextSlash = path.indexOf(SEPARATOR_CHAR, start + 2);
+ int authEnd = nextSlash > 0 ? nextSlash : path.length();
+ authority = path.substring(start + 2, authEnd);
+ start = authEnd;
+ }
+
+ // URI path is the rest of the string -- query & fragment not supported
+ String uriPath = path.substring(start);
+
+ this.uri = new URI(scheme, authority, normalize(uriPath, true), null,
null).normalize();
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ public HoodieLocation(String parent, String child) {
+ this(new HoodieLocation(parent), child);
+ }
+
+ public HoodieLocation(HoodieLocation parent, String child) {
+ URI parentUri = parent.toUri();
+ String normalizedChild = normalize(child, false);
+
+ if (normalizedChild.isEmpty()) {
+ this.uri = parentUri;
+ return;
+ }
+
+ if (!child.contains(SEPARATOR)) {
+ this.cachedParent = parent;
+ }
+ String parentPathWithSeparator = parentUri.getPath();
+ if (!parentPathWithSeparator.endsWith(SEPARATOR)) {
+ parentPathWithSeparator = parentPathWithSeparator + SEPARATOR;
+ }
+ try {
+ URI resolvedUri = new URI(
+ parentUri.getScheme(),
+ parentUri.getAuthority(),
+ parentPathWithSeparator,
+ null,
+ parentUri.getFragment()).resolve(normalizedChild);
+ this.uri = new URI(
+ parentUri.getScheme(),
+ parentUri.getAuthority(),
+ resolvedUri.getPath(),
+ null,
+ resolvedUri.getFragment()).normalize();
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public boolean isAbsolute() {
+ return uri.getPath().startsWith(SEPARATOR);
+ }
+
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public HoodieLocation getParent() {
+ // This value could be overwritten concurrently and that's okay, since
+ // {@code HoodieLocation} is immutable
+ if (cachedParent == null) {
+ String path = uri.getPath();
+ int lastSlash = path.lastIndexOf(SEPARATOR_CHAR);
+ if (path.isEmpty() || path.equals(SEPARATOR)) {
+ throw new IllegalStateException("Cannot get parent location of a root
location");
+ }
+ String parentPath = lastSlash == -1
+ ? "" : path.substring(0, lastSlash == 0 ? 1 : lastSlash);
+ try {
+ cachedParent = new HoodieLocation(new URI(
+ uri.getScheme(), uri.getAuthority(), parentPath, null,
uri.getFragment()));
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+ return cachedParent;
+ }
+
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public String getName() {
+ // This value could be overwritten concurrently and that's okay, since
+ // {@code HoodieLocation} is immutable
+ if (cachedName == null) {
+ String path = uri.getPath();
+ int slash = path.lastIndexOf(SEPARATOR);
+ cachedName = path.substring(slash + 1);
+ }
+ return cachedName;
+ }
+
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public HoodieLocation getLocationWithoutSchemeAndAuthority() {
+ try {
+ return new HoodieLocation(
+ new URI(null, null, uri.getPath(), uri.getQuery(),
uri.getFragment()));
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public int depth() {
+ String path = uri.getPath();
+ int depth = 0;
+ int slash = path.length() == 1 && path.charAt(0) == SEPARATOR_CHAR ? -1 :
0;
+ while (slash != -1) {
+ depth++;
+ slash = path.indexOf(SEPARATOR_CHAR, slash + 1);
+ }
+ return depth;
+ }
+
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public URI toUri() {
+ return uri;
+ }
+
+ @Override
+ public String toString() {
+ // This value could be overwritten concurrently and that's okay, since
+ // {@code HoodieLocation} is immutable
+ if (uriString == null) {
+ // We can't use uri.toString(), which escapes everything, because we want
+ // illegal characters unescaped in the string, for glob processing, etc.
+ StringBuilder buffer = new StringBuilder();
+ if (uri.getScheme() != null) {
+ buffer.append(uri.getScheme())
+ .append(":");
+ }
+ if (uri.getAuthority() != null) {
+ buffer.append("//")
+ .append(uri.getAuthority());
+ }
+ if (uri.getPath() != null) {
+ String path = uri.getPath();
+ buffer.append(path);
+ }
+ if (uri.getFragment() != null) {
+ buffer.append("#").append(uri.getFragment());
+ }
+ uriString = buffer.toString();
+ }
+ return uriString;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof HoodieLocation)) {
+ return false;
+ }
+ return this.uri.equals(((HoodieLocation) o).toUri());
+ }
+
+ @Override
+ public int hashCode() {
+ return uri.hashCode();
+ }
+
+ @Override
+ public int compareTo(HoodieLocation o) {
+ return this.uri.compareTo(o.uri);
+ }
+
+ /**
+ * Normalizes the path by removing the trailing slashes (`/`).
+ * When {@code keepSingleSlash} is {@code true}, `/` as the path is not
changed;
+ * otherwise ({@code false}), `/` becomes empty String after normalization.
+ *
+ * @param path {@link String} path to normalize.
+ * @param keepSingleSlash whether to keep `/` as the path.
+ * @return normalized path.
+ */
+ private static String normalize(String path, boolean keepSingleSlash) {
+ int indexOfLastSlash = path.length() - 1;
+ while (indexOfLastSlash >= 0) {
+ if (path.charAt(indexOfLastSlash) != SEPARATOR_CHAR) {
+ break;
+ }
+ indexOfLastSlash--;
+ }
+ indexOfLastSlash++;
+ if (indexOfLastSlash == path.length()) {
+ return path;
+ }
+ if (keepSingleSlash && indexOfLastSlash == 0) {
+ // All slashes and we want to keep one slash
+ return SEPARATOR;
+ }
+ return path.substring(0, indexOfLastSlash);
+ }
+}
diff --git
a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieLocationFilter.java
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieLocationFilter.java
new file mode 100644
index 00000000000..d33686c030c
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieLocationFilter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.storage;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+
+import java.io.Serializable;
+
+/**
+ * Filter for {@link HoodieLocation}
+ * The APIs are mainly based on {@code org.apache.hadoop.fs.PathFilter} class.
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public interface HoodieLocationFilter extends Serializable {
+ /**
+ * Tests whether the specified location should be included in a location
list.
+ *
+ * @param location the location to be tested.
+ * @return {@code true} if and only if <code>location</code> should be
included.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ boolean accept(HoodieLocation location);
+}
diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
new file mode 100644
index 00000000000..eea2c3ff692
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.storage;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides I/O APIs on files and directories on storage.
+ * The APIs are mainly based on {@code org.apache.hadoop.fs.FileSystem} class.
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public abstract class HoodieStorage implements Closeable {
+ public static final Logger LOG =
LoggerFactory.getLogger(HoodieStorage.class);
+ public static final String TMP_PATH_POSTFIX = ".tmp";
+
+ /**
+ * @return the scheme of the storage.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public abstract String getScheme();
+
+ /**
+ * Creates an OutputStream at the indicated location.
+ *
+ * @param location the file to create.
+ * @param overwrite if a file with this name already exists, then if {@code
true},
+ * the file will be overwritten, and if {@code false} an
exception will be thrown.
+ * @return the OutputStream to write to.
+ * @throws IOException IO error.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public abstract OutputStream create(HoodieLocation location, boolean
overwrite) throws IOException;
+
+ /**
+ * Opens an InputStream at the indicated location.
+ *
+ * @param location the file to open.
+ * @return the InputStream to read from.
+ * @throws IOException IO error.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public abstract InputStream open(HoodieLocation location) throws IOException;
+
+ /**
+ * Appends to an existing file (optional operation).
+ *
+ * @param location the file to append.
+ * @return the OutputStream to write to.
+ * @throws IOException IO error.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public abstract OutputStream append(HoodieLocation location) throws
IOException;
+
+ /**
+ * Checks if a location exists.
+ *
+ * @param location location to check.
+ * @return {@code true} if the location exists.
+ * @throws IOException IO error.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public abstract boolean exists(HoodieLocation location) throws IOException;
+
+ /**
+ * Returns a file status object that represents the location.
+ *
+ * @param location location to check.
+ * @return a {@link HoodieFileStatus} object.
+ * @throws FileNotFoundException when the path does not exist.
+ * @throws IOException IO error.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public abstract HoodieFileStatus getFileStatus(HoodieLocation location)
throws IOException;
+
+ /**
+ * Creates the directory and non-existent parent directories.
+ *
+ * @param location location to create.
+ * @return {@code true} if the directory was created.
+ * @throws IOException IO error.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public abstract boolean createDirectory(HoodieLocation location) throws
IOException;
+
+ /**
+ * Lists the statuses of the direct files/directories in the given location
if the path is a directory.
+ *
+ * @param location given location.
+ * @return the statuses of the files/directories in the given location.
+ * @throws FileNotFoundException when the location does not exist.
+ * @throws IOException IO error.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public abstract List<HoodieFileStatus> listDirectEntries(HoodieLocation
location) throws IOException;
+
+ /**
+ * Lists the statuses of all files under the give location recursively.
+ *
+ * @param location given location.
+ * @return the statuses of the files under the given location.
+ * @throws FileNotFoundException when the location does not exist.
+ * @throws IOException IO error.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public abstract List<HoodieFileStatus> listFiles(HoodieLocation location)
throws IOException;
+
+ /**
+ * Lists the statuses of the direct files/directories in the given location
+ * and filters the results, if the path is a directory.
+ *
+ * @param location given location.
+ * @param filter filter to apply.
+ * @return the statuses of the files/directories in the given location.
+ * @throws FileNotFoundException when the location does not exist.
+ * @throws IOException IO error.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public abstract List<HoodieFileStatus> listDirectEntries(HoodieLocation
location,
+
HoodieLocationFilter filter) throws IOException;
+
+ /**
+ * Returns all the files that match the locationPattern and are not checksum
files,
+ * and filters the results.
+ *
+ * @param locationPattern given pattern.
+ * @param filter filter to apply.
+ * @return the statuses of the files.
+ * @throws IOException IO error.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public abstract List<HoodieFileStatus> globEntries(HoodieLocation
locationPattern,
+ HoodieLocationFilter
filter) throws IOException;
+
+ /**
+ * Renames the location from old to new.
+ *
+ * @param oldLocation source location.
+ * @param newLocation destination location.
+ * @return {@true} if rename is successful.
+ * @throws IOException IO error.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public abstract boolean rename(HoodieLocation oldLocation,
+ HoodieLocation newLocation) throws
IOException;
+
+ /**
+ * Deletes a directory at location.
+ *
+ * @param location directory to delete.
+ * @return {@code true} if successful.
+ * @throws IOException IO error.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public abstract boolean deleteDirectory(HoodieLocation location) throws
IOException;
+
+ /**
+ * Deletes a file at location.
+ *
+ * @param location file to delete.
+ * @return {@code true} if successful.
+ * @throws IOException IO error.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public abstract boolean deleteFile(HoodieLocation location) throws
IOException;
+
+ /**
+ * Qualifies a path to one which uses this storage and, if relative, made
absolute.
+ *
+ * @param location to qualify.
+ * @return Qualified location.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public abstract HoodieLocation makeQualified(HoodieLocation location);
+
+ /**
+ * @return the underlying file system instance if exists.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public abstract Object getFileSystem();
+
+ /**
+ * @return the underlying configuration instance if exists.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public abstract Object getConf();
+
+ /**
+ * Creates a new file with overwrite set to false. This ensures files are
created
+ * only once and never rewritten, also, here we take care if the content is
not
+ * empty, will first write the content to a temp file if
{needCreateTempFile} is
+ * true, and then rename it back after the content is written.
+ *
+ * @param location file Path.
+ * @param content content to be stored.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public final void createImmutableFileInPath(HoodieLocation location,
+ Option<byte[]> content) throws
IOException {
+ OutputStream fsout = null;
+ HoodieLocation tmpLocation = null;
+
+ boolean needTempFile = needCreateTempFile();
+
+ try {
+ if (!content.isPresent()) {
+ fsout = create(location, false);
+ }
+
+ if (content.isPresent() && needTempFile) {
+ HoodieLocation parent = location.getParent();
+ tmpLocation = new HoodieLocation(parent, location.getName() +
TMP_PATH_POSTFIX);
+ fsout = create(tmpLocation, false);
+ fsout.write(content.get());
+ }
+
+ if (content.isPresent() && !needTempFile) {
+ fsout = create(location, false);
+ fsout.write(content.get());
+ }
+ } catch (IOException e) {
+ String errorMsg = "Failed to create file " + (tmpLocation != null ?
tmpLocation : location);
+ throw new HoodieIOException(errorMsg, e);
+ } finally {
+ try {
+ if (null != fsout) {
+ fsout.close();
+ }
+ } catch (IOException e) {
+ String errorMsg = "Failed to close file " + (needTempFile ?
tmpLocation : location);
+ throw new HoodieIOException(errorMsg, e);
+ }
+
+ boolean renameSuccess = false;
+ try {
+ if (null != tmpLocation) {
+ renameSuccess = rename(tmpLocation, location);
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException(
+ "Failed to rename " + tmpLocation + " to the target " + location,
+ e);
+ } finally {
+ if (!renameSuccess && null != tmpLocation) {
+ try {
+ deleteFile(tmpLocation);
+ LOG.warn("Fail to rename " + tmpLocation + " to " + location
+ + ", target file exists: " + exists(location));
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to delete tmp file " +
tmpLocation, e);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * @return whether a temporary file needs to be created for immutability.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public final boolean needCreateTempFile() {
+ return StorageSchemes.HDFS.getScheme().equals(getScheme());
+ }
+
+ /**
+ * Create an OutputStream at the indicated location.
+ * The file is overwritten by default.
+ *
+ * @param location the file to create.
+ * @return the OutputStream to write to.
+ * @throws IOException IO error.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public OutputStream create(HoodieLocation location) throws IOException {
+ return create(location, true);
+ }
+
+ /**
+ * Creates an empty new file at the indicated location.
+ *
+ * @param location the file to create.
+ * @return {@code true} if successfully created; {@code false} if already
exists.
+ * @throws IOException IO error.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public boolean createNewFile(HoodieLocation location) throws IOException {
+ if (exists(location)) {
+ return false;
+ } else {
+ create(location, false).close();
+ return true;
+ }
+ }
+
+ /**
+ * Lists the statuses of the direct files/directories in the given list of
locations,
+ * if the locations are directory.
+ *
+ * @param locationList given location list.
+ * @return the statuses of the files/directories in the given locations.
+ * @throws FileNotFoundException when the location does not exist.
+ * @throws IOException IO error.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public List<HoodieFileStatus> listDirectEntries(List<HoodieLocation>
locationList) throws IOException {
+ List<HoodieFileStatus> result = new ArrayList<>();
+ for (HoodieLocation location : locationList) {
+ result.addAll(listDirectEntries(location));
+ }
+ return result;
+ }
+
+ /**
+ * Returns all the files that match the locationPattern and are not checksum
files.
+ *
+ * @param locationPattern given pattern.
+ * @return the statuses of the files.
+ * @throws IOException IO error.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public List<HoodieFileStatus> globEntries(HoodieLocation locationPattern)
throws IOException {
+ return globEntries(locationPattern, e -> true);
+ }
+}
diff --git
a/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieFileStatus.java
b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieFileStatus.java
new file mode 100644
index 00000000000..903fc4b4e3a
--- /dev/null
+++ b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieFileStatus.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.storage.HoodieFileStatus;
+import org.apache.hudi.storage.HoodieLocation;
+
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/**
+ * Tests {@link HoodieFileStatus}
+ */
+public class TestHoodieFileStatus {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestHoodieFileStatus.class);
+ private static final long LENGTH = 100;
+ private static final long MODIFICATION_TIME = System.currentTimeMillis();
+ private static final String PATH1 = "/abc/xyz1";
+ private static final String PATH2 = "/abc/xyz2";
+ private static final HoodieLocation LOCATION1 = new HoodieLocation(PATH1);
+ private static final HoodieLocation LOCATION2 = new HoodieLocation(PATH2);
+
+ @Test
+ public void testConstructor() {
+ HoodieFileStatus fileStatus = new HoodieFileStatus(LOCATION1, LENGTH,
false, MODIFICATION_TIME);
+ validateAccessors(fileStatus, PATH1, LENGTH, false, MODIFICATION_TIME);
+ fileStatus = new HoodieFileStatus(LOCATION2, -1, true, MODIFICATION_TIME +
2L);
+ validateAccessors(fileStatus, PATH2, -1, true, MODIFICATION_TIME + 2L);
+ }
+
+ @Test
+ public void testSerializability() throws IOException, ClassNotFoundException
{
+ HoodieFileStatus fileStatus = new HoodieFileStatus(LOCATION1, LENGTH,
false, MODIFICATION_TIME);
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(fileStatus);
+ try (ByteArrayInputStream bais = new
ByteArrayInputStream(baos.toByteArray());
+ ObjectInputStream ois = new ObjectInputStream(bais)) {
+ HoodieFileStatus deserialized = (HoodieFileStatus) ois.readObject();
+ validateAccessors(deserialized, PATH1, LENGTH, false,
MODIFICATION_TIME);
+ }
+ }
+ }
+
+ @Test
+ public void testEquals() {
+ HoodieFileStatus fileStatus1 = new HoodieFileStatus(
+ new HoodieLocation(PATH1), LENGTH, false, MODIFICATION_TIME);
+ HoodieFileStatus fileStatus2 = new HoodieFileStatus(
+ new HoodieLocation(PATH1), LENGTH + 2, false, MODIFICATION_TIME + 2L);
+ assertEquals(fileStatus1, fileStatus2);
+ }
+
+ @Test
+ public void testNotEquals() {
+ HoodieFileStatus fileStatus1 = new HoodieFileStatus(
+ LOCATION1, LENGTH, false, MODIFICATION_TIME);
+ HoodieFileStatus fileStatus2 = new HoodieFileStatus(
+ LOCATION2, LENGTH, false, MODIFICATION_TIME + 2L);
+ assertFalse(fileStatus1.equals(fileStatus2));
+ assertFalse(fileStatus2.equals(fileStatus1));
+ }
+
+ private void validateAccessors(HoodieFileStatus fileStatus,
+ String location,
+ long length,
+ boolean isDirectory,
+ long modificationTime) {
+ assertEquals(new HoodieLocation(location), fileStatus.getLocation());
+ assertEquals(length, fileStatus.getLength());
+ assertEquals(isDirectory, fileStatus.isDirectory());
+ assertEquals(!isDirectory, fileStatus.isFile());
+ assertEquals(modificationTime, fileStatus.getModificationTime());
+ }
+}
diff --git
a/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieLocation.java
b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieLocation.java
new file mode 100644
index 00000000000..4c765d2cc3f
--- /dev/null
+++ b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieLocation.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.storage.HoodieLocation;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests {@link HoodieLocation}
+ */
+public class TestHoodieLocation {
+ @Test
+ public void testToString() {
+ Arrays.stream(
+ new String[] {
+ "/",
+ "/foo",
+ "/foo/bar",
+ "foo",
+ "foo/bar",
+ "/foo/bar#boo",
+ "foo/bar#boo",
+ "file:/a/b/c",
+ "s3://a/b/c"})
+ .forEach(this::toStringTest);
+ }
+
+ @Test
+ public void testNormalize() throws URISyntaxException {
+ assertEquals("", new HoodieLocation(".").toString());
+ assertEquals("..", new HoodieLocation("..").toString());
+ assertEquals("/", new HoodieLocation("/").toString());
+ assertEquals("/", new HoodieLocation("//").toString());
+ assertEquals("/", new HoodieLocation("///").toString());
+ assertEquals("//foo/", new HoodieLocation("//foo/").toString());
+ assertEquals("//foo/", new HoodieLocation("//foo//").toString());
+ assertEquals("//foo/bar", new HoodieLocation("//foo//bar").toString());
+ assertEquals("/foo", new HoodieLocation("/foo/").toString());
+ assertEquals("/foo", new HoodieLocation("/foo/").toString());
+ assertEquals("foo", new HoodieLocation("foo/").toString());
+ assertEquals("foo", new HoodieLocation("foo//").toString());
+ assertEquals("foo/bar", new HoodieLocation("foo//bar").toString());
+ assertEquals("file:/a/b/c", new
HoodieLocation("file:///a/b/c").toString());
+ assertEquals("s3://a/b/c/d/e", new HoodieLocation("s3://a/b/c",
"d/e").toString());
+ assertEquals("s3://a/b/c/d/e", new HoodieLocation("s3://a/b/c/",
"d/e").toString());
+ assertEquals("s3://a/b/c/d/e", new HoodieLocation("s3://a/b/c/",
"d/e/").toString());
+ assertEquals("s3://a/b/c", new HoodieLocation("s3://a/b/c/",
"/").toString());
+ assertEquals("s3://a/b/c", new HoodieLocation("s3://a/b/c/",
"").toString());
+ assertEquals("s3://a/b/c/d/e", new HoodieLocation(new
HoodieLocation("s3://a/b/c"), "d/e").toString());
+ assertEquals("s3://a/b/c/d/e", new HoodieLocation(new
HoodieLocation("s3://a/b/c/"), "d/e").toString());
+ assertEquals("s3://a/b/c/d/e", new HoodieLocation(new
HoodieLocation("s3://a/b/c/"), "d/e/").toString());
+ assertEquals("s3://a/b/c", new HoodieLocation(new
HoodieLocation("s3://a/b/c/"), "/").toString());
+ assertEquals("s3://a/b/c", new HoodieLocation(new
HoodieLocation("s3://a/b/c/"), "").toString());
+ assertEquals("hdfs://foo/foo2/bar/baz/", new HoodieLocation(new
URI("hdfs://foo//foo2///bar/baz///")).toString());
+ }
+
+ @Test
+ public void testIsAbsolute() {
+ assertTrue(new HoodieLocation("/").isAbsolute());
+ assertTrue(new HoodieLocation("/foo").isAbsolute());
+ assertFalse(new HoodieLocation("foo").isAbsolute());
+ assertFalse(new HoodieLocation("foo/bar").isAbsolute());
+ assertFalse(new HoodieLocation(".").isAbsolute());
+ }
+
+ @Test
+ public void testGetParent() {
+ assertEquals(new HoodieLocation("/foo"), new
HoodieLocation("/foo/bar").getParent());
+ assertEquals(new HoodieLocation("foo"), new
HoodieLocation("foo/bar").getParent());
+ assertEquals(new HoodieLocation("/"), new
HoodieLocation("/foo").getParent());
+ assertEquals(new HoodieLocation("/foo/bar/x"), new
HoodieLocation("/foo/bar", "x/y").getParent());
+ assertEquals(new HoodieLocation("/foo/bar"), new
HoodieLocation("/foo/bar/", "y").getParent());
+ assertEquals(new HoodieLocation("/foo"), new HoodieLocation("/foo/bar/",
"/").getParent());
+ assertThrows(IllegalStateException.class, () -> new
HoodieLocation("/").getParent());
+ }
+
+ @Test
+ public void testURI() throws URISyntaxException {
+ URI uri = new URI("file:///bar#baz");
+ HoodieLocation location = new HoodieLocation(uri);
+ assertEquals(uri, new URI(location.toString()));
+ assertEquals("foo://bar/baz#boo", new HoodieLocation("foo://bar/",
"/baz#boo").toString());
+ assertEquals("foo://bar/baz/fud#boo",
+ new HoodieLocation(new HoodieLocation(new URI("foo://bar/baz#bud")),
"fud#boo").toString());
+ assertEquals("foo://bar/fud#boo",
+ new HoodieLocation(new HoodieLocation(new URI("foo://bar/baz#bud")),
"/fud#boo").toString());
+ }
+
+ @Test
+ public void testPathToUriConversion() throws URISyntaxException {
+ assertEquals(new URI(null, null, "/foo?bar", null, null),
+ new HoodieLocation("/foo?bar").toUri());
+ assertEquals(new URI(null, null, "/foo\"bar", null, null),
+ new HoodieLocation("/foo\"bar").toUri());
+ assertEquals(new URI(null, null, "/foo bar", null, null),
+ new HoodieLocation("/foo bar").toUri());
+ assertEquals("/foo?bar", new
HoodieLocation("http://localhost/foo?bar").toUri().getPath());
+ assertEquals("/foo", new URI("http://localhost/foo?bar").getPath());
+ assertEquals((new URI("/foo;bar")).getPath(), new
HoodieLocation("/foo;bar").toUri().getPath());
+ assertEquals(new URI("/foo;bar"), new HoodieLocation("/foo;bar").toUri());
+ assertEquals(new URI("/foo+bar"), new HoodieLocation("/foo+bar").toUri());
+ assertEquals(new URI("/foo-bar"), new HoodieLocation("/foo-bar").toUri());
+ assertEquals(new URI("/foo=bar"), new HoodieLocation("/foo=bar").toUri());
+ assertEquals(new URI("/foo,bar"), new HoodieLocation("/foo,bar").toUri());
+ }
+
+ @Test
+ public void testGetName() {
+ assertEquals("", new HoodieLocation("/").getName());
+ assertEquals("foo", new HoodieLocation("foo").getName());
+ assertEquals("foo", new HoodieLocation("/foo").getName());
+ assertEquals("foo", new HoodieLocation("/foo/").getName());
+ assertEquals("bar", new HoodieLocation("/foo/bar").getName());
+ assertEquals("bar", new HoodieLocation("hdfs://host/foo/bar").getName());
+ assertEquals("bar", new HoodieLocation("hdfs://host",
"foo/bar").getName());
+ assertEquals("bar", new HoodieLocation("hdfs://host/foo/",
"bar").getName());
+ }
+
+ @Test
+ public void testGetLocationWithoutSchemeAndAuthority() {
+ assertEquals(
+ new HoodieLocation("/foo/bar/boo"),
+ new
HoodieLocation("/foo/bar/boo").getLocationWithoutSchemeAndAuthority());
+ assertEquals(
+ new HoodieLocation("/foo/bar/boo"),
+ new
HoodieLocation("file:///foo/bar/boo").getLocationWithoutSchemeAndAuthority());
+ assertEquals(
+ new HoodieLocation("/bar/boo"),
+ new
HoodieLocation("s3://foo/bar/boo").getLocationWithoutSchemeAndAuthority());
+ }
+
+ @Test
+ public void testDepth() throws URISyntaxException {
+ assertEquals(0, new HoodieLocation("/").depth());
+ assertEquals(0, new HoodieLocation("///").depth());
+ assertEquals(0, new HoodieLocation("//foo/").depth());
+ assertEquals(1, new HoodieLocation("//foo//bar").depth());
+ assertEquals(5, new HoodieLocation("/a/b/c/d/e").depth());
+ assertEquals(4, new HoodieLocation("s3://a/b/c", "d/e").depth());
+ assertEquals(2, new HoodieLocation("s3://a/b/c/", "").depth());
+ assertEquals(4, new HoodieLocation(new HoodieLocation("s3://a/b/c"),
"d/e").depth());
+ }
+
+ @Test
+ public void testEquals() {
+ assertEquals(new HoodieLocation("/foo"), new HoodieLocation("/foo"));
+ assertEquals(new HoodieLocation("/foo"), new HoodieLocation("/foo/"));
+ assertEquals(new HoodieLocation("/foo/bar"), new
HoodieLocation("/foo//bar/"));
+ assertNotEquals(new HoodieLocation("/"), new HoodieLocation("/foo"));
+ }
+
+ @Test
+ public void testCachedResults() {
+ HoodieLocation location = new HoodieLocation("s3://x/y/z/");
+ assertSame(location.getParent(), location.getParent());
+ assertSame(location.getName(), location.getName());
+ assertSame(location.toString(), location.toString());
+ }
+
+ private void toStringTest(String pathString) {
+ assertEquals(pathString, new HoodieLocation(pathString).toString());
+ }
+}
diff --git
a/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieLocationFilter.java
b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieLocationFilter.java
new file mode 100644
index 00000000000..2d66cc23f87
--- /dev/null
+++
b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieLocationFilter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.storage.HoodieLocation;
+import org.apache.hudi.storage.HoodieLocationFilter;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests {@link HoodieLocationFilter}
+ */
+public class TestHoodieLocationFilter {
+ @Test
+ public void testFilter() {
+ HoodieLocation location1 = new HoodieLocation("/x/y/1");
+ HoodieLocation location2 = new HoodieLocation("/x/y/2");
+ HoodieLocation location3 = new HoodieLocation("/x/z/1");
+ HoodieLocation location4 = new HoodieLocation("/x/z/2");
+
+ List<HoodieLocation> locationList = Arrays.stream(
+ new HoodieLocation[] {location1, location2, location3, location4}
+ ).collect(Collectors.toList());
+
+ List<HoodieLocation> expected = Arrays.stream(
+ new HoodieLocation[] {location1, location2}
+ ).collect(Collectors.toList());
+
+ assertEquals(expected.stream().sorted().collect(Collectors.toList()),
+ locationList.stream()
+ .filter(e -> new HoodieLocationFilter() {
+ @Override
+ public boolean accept(HoodieLocation location) {
+ return location.getParent().equals(new HoodieLocation("/x/y"));
+ }
+ }.accept(e))
+ .sorted()
+ .collect(Collectors.toList()));
+ assertEquals(locationList,
+ locationList.stream()
+ .filter(e -> new HoodieLocationFilter() {
+ @Override
+ public boolean accept(HoodieLocation location) {
+ return true;
+ }
+ }.accept(e))
+ .sorted()
+ .collect(Collectors.toList()));
+ }
+}
diff --git
a/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java
b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java
new file mode 100644
index 00000000000..0424d22157d
--- /dev/null
+++
b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.io.util.IOUtils;
+import org.apache.hudi.storage.HoodieFileStatus;
+import org.apache.hudi.storage.HoodieLocation;
+import org.apache.hudi.storage.HoodieStorage;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class for testing different implementation of {@link HoodieStorage}.
+ */
+public abstract class TestHoodieStorageBase {
+ @TempDir
+ protected Path tempDir;
+
+ protected static final String[] RELATIVE_FILE_PATHS = new String[] {
+ "w/1.file", "w/2.file", "x/1.file", "x/2.file",
+ "x/y/1.file", "x/y/2.file", "x/z/1.file", "x/z/2.file"
+ };
+ private static final byte[] EMPTY_BYTES = new byte[] {};
+
+ /**
+ * @param fs file system instance.
+ * @param conf configuration instance.
+ * @return {@link HoodieStorage} instance based on the implementation for
testing.
+ */
+ protected abstract HoodieStorage getHoodieStorage(Object fs, Object conf);
+
+ /**
+ * @param conf configuration instance.
+ * @return the underlying file system instance used if required.
+ */
+ protected abstract Object getFileSystem(Object conf);
+
+ /**
+ * @return configurations for the storage.
+ */
+ protected abstract Object getConf();
+
+ @AfterEach
+ public void cleanUpTempDir() {
+ HoodieStorage storage = getHoodieStorage();
+ try {
+ for (HoodieFileStatus status : storage.listDirectEntries(new
HoodieLocation(getTempDir()))) {
+ HoodieLocation location = status.getLocation();
+ if (status.isDirectory()) {
+ storage.deleteDirectory(location);
+ } else {
+ storage.deleteFile(location);
+ }
+ }
+ } catch (IOException e) {
+ // Silently fail
+ }
+ }
+
+ @Test
+ public void testGetScheme() {
+ assertEquals("file", getHoodieStorage().getScheme());
+ }
+
+ @Test
+ public void testCreateWriteAndRead() throws IOException {
+ HoodieStorage storage = getHoodieStorage();
+
+ HoodieLocation location = new HoodieLocation(getTempDir(),
"testCreateAppendAndRead/1.file");
+ assertFalse(storage.exists(location));
+ storage.create(location).close();
+ validateFileStatus(storage, location, EMPTY_BYTES, false);
+
+ byte[] data = new byte[] {2, 42, 49, (byte) 158, (byte) 233, 66, 9};
+
+ // By default, create overwrites the file
+ try (OutputStream stream = storage.create(location)) {
+ stream.write(data);
+ stream.flush();
+ }
+ validateFileStatus(storage, location, data, false);
+
+ assertThrows(IOException.class, () -> storage.create(location, false));
+ validateFileStatus(storage, location, data, false);
+
+ assertThrows(IOException.class, () -> storage.create(location, false));
+ validateFileStatus(storage, location, data, false);
+
+ HoodieLocation location2 = new HoodieLocation(getTempDir(),
"testCreateAppendAndRead/2.file");
+ assertFalse(storage.exists(location2));
+ assertTrue(storage.createNewFile(location2));
+ validateFileStatus(storage, location2, EMPTY_BYTES, false);
+ assertFalse(storage.createNewFile(location2));
+
+ HoodieLocation location3 = new HoodieLocation(getTempDir(),
"testCreateAppendAndRead/3.file");
+ assertFalse(storage.exists(location3));
+ storage.createImmutableFileInPath(location3, Option.of(data));
+ validateFileStatus(storage, location3, data, false);
+
+ HoodieLocation location4 = new HoodieLocation(getTempDir(),
"testCreateAppendAndRead/4");
+ assertFalse(storage.exists(location4));
+ assertTrue(storage.createDirectory(location4));
+ validateFileStatus(storage, location4, EMPTY_BYTES, true);
+ assertTrue(storage.createDirectory(location4));
+ }
+
+ @Test
+ public void testListing() throws IOException {
+ HoodieStorage storage = getHoodieStorage();
+ // Full list:
+ // w/1.file
+ // w/2.file
+ // x/1.file
+ // x/2.file
+ // x/y/1.file
+ // x/y/2.file
+ // x/z/1.file
+ // x/z/2.file
+ prepareFilesOnStorage(storage);
+
+ validateHoodieFileStatusList(
+ Arrays.stream(new HoodieFileStatus[] {
+ new HoodieFileStatus(new HoodieLocation(getTempDir(), "x/1.file"),
0, false, 0),
+ new HoodieFileStatus(new HoodieLocation(getTempDir(), "x/2.file"),
0, false, 0),
+ new HoodieFileStatus(new HoodieLocation(getTempDir(), "x/y"), 0,
true, 0),
+ new HoodieFileStatus(new HoodieLocation(getTempDir(), "x/z"), 0,
true, 0),
+ }).collect(Collectors.toList()),
+ storage.listDirectEntries(new HoodieLocation(getTempDir(), "x")));
+
+ validateHoodieFileStatusList(
+ Arrays.stream(new HoodieFileStatus[] {
+ new HoodieFileStatus(new HoodieLocation(getTempDir(), "x/1.file"),
0, false, 0),
+ new HoodieFileStatus(new HoodieLocation(getTempDir(), "x/2.file"),
0, false, 0),
+ new HoodieFileStatus(new HoodieLocation(getTempDir(),
"x/y/1.file"), 0, false, 0),
+ new HoodieFileStatus(new HoodieLocation(getTempDir(),
"x/y/2.file"), 0, false, 0),
+ new HoodieFileStatus(new HoodieLocation(getTempDir(),
"x/z/1.file"), 0, false, 0),
+ new HoodieFileStatus(new HoodieLocation(getTempDir(),
"x/z/2.file"), 0, false, 0)
+ }).collect(Collectors.toList()),
+ storage.listFiles(new HoodieLocation(getTempDir(), "x")));
+
+ validateHoodieFileStatusList(
+ Arrays.stream(new HoodieFileStatus[] {
+ new HoodieFileStatus(new HoodieLocation(getTempDir(), "x/2.file"),
0, false, 0)
+ }).collect(Collectors.toList()),
+ storage.listDirectEntries(
+ new HoodieLocation(getTempDir(), "x"), e ->
e.getName().contains("2")));
+
+ validateHoodieFileStatusList(
+ Arrays.stream(new HoodieFileStatus[] {
+ new HoodieFileStatus(new HoodieLocation(getTempDir(), "w/1.file"),
0, false, 0),
+ new HoodieFileStatus(new HoodieLocation(getTempDir(), "w/2.file"),
0, false, 0),
+ new HoodieFileStatus(new HoodieLocation(getTempDir(),
"x/z/1.file"), 0, false, 0),
+ new HoodieFileStatus(new HoodieLocation(getTempDir(),
"x/z/2.file"), 0, false, 0)
+ }).collect(Collectors.toList()),
+ storage.listDirectEntries(Arrays.stream(new HoodieLocation[] {
+ new HoodieLocation(getTempDir(), "w"),
+ new HoodieLocation(getTempDir(), "x/z")
+ }).collect(Collectors.toList())));
+
+ assertThrows(FileNotFoundException.class,
+ () -> storage.listDirectEntries(new HoodieLocation(getTempDir(),
"*")));
+
+ validateHoodieFileStatusList(
+ Arrays.stream(new HoodieFileStatus[] {
+ new HoodieFileStatus(new HoodieLocation(getTempDir(),
"x/y/1.file"), 0, false, 0),
+ new HoodieFileStatus(new HoodieLocation(getTempDir(),
"x/z/1.file"), 0, false, 0)
+ }).collect(Collectors.toList()),
+ storage.globEntries(new HoodieLocation(getTempDir(), "x/*/1.file")));
+
+ validateHoodieFileStatusList(
+ Arrays.stream(new HoodieFileStatus[] {
+ new HoodieFileStatus(new HoodieLocation(getTempDir(), "x/1.file"),
0, false, 0),
+ new HoodieFileStatus(new HoodieLocation(getTempDir(), "x/2.file"),
0, false, 0),
+ }).collect(Collectors.toList()),
+ storage.globEntries(new HoodieLocation(getTempDir(), "x/*.file")));
+
+ validateHoodieFileStatusList(
+ Arrays.stream(new HoodieFileStatus[] {
+ new HoodieFileStatus(new HoodieLocation(getTempDir(),
"x/y/1.file"), 0, false, 0),
+ }).collect(Collectors.toList()),
+ storage.globEntries(
+ new HoodieLocation(getTempDir(), "x/*/*.file"),
+ e -> e.getParent().getName().equals("y") &&
e.getName().contains("1")));
+ }
+
+ @Test
+ public void testFileNotFound() throws IOException {
+ HoodieStorage storage = getHoodieStorage();
+
+ HoodieLocation fileLocation = new HoodieLocation(getTempDir(),
"testFileNotFound/1.file");
+ HoodieLocation dirLocation = new HoodieLocation(getTempDir(),
"testFileNotFound/2");
+ assertFalse(storage.exists(fileLocation));
+ assertThrows(FileNotFoundException.class, () ->
storage.open(fileLocation));
+ assertThrows(FileNotFoundException.class, () ->
storage.getFileStatus(fileLocation));
+ assertThrows(FileNotFoundException.class, () ->
storage.listDirectEntries(fileLocation));
+ assertThrows(FileNotFoundException.class, () ->
storage.listDirectEntries(dirLocation));
+ assertThrows(FileNotFoundException.class, () ->
storage.listDirectEntries(dirLocation, e -> true));
+ assertThrows(FileNotFoundException.class, () -> storage.listDirectEntries(
+ Arrays.stream(new HoodieLocation[]
{dirLocation}).collect(Collectors.toList())));
+ }
+
+ @Test
+ public void testRename() throws IOException {
+ HoodieStorage storage = getHoodieStorage();
+
+ HoodieLocation location = new HoodieLocation(getTempDir(),
"testRename/1.file");
+ assertFalse(storage.exists(location));
+ storage.create(location).close();
+ validateFileStatus(storage, location, EMPTY_BYTES, false);
+
+ HoodieLocation newLocation = new HoodieLocation(getTempDir(),
"testRename/1_renamed.file");
+ assertTrue(storage.rename(location, newLocation));
+ assertFalse(storage.exists(location));
+ validateFileStatus(storage, newLocation, EMPTY_BYTES, false);
+ }
+
+ @Test
+ public void testDelete() throws IOException {
+ HoodieStorage storage = getHoodieStorage();
+
+ HoodieLocation location = new HoodieLocation(getTempDir(),
"testDelete/1.file");
+ assertFalse(storage.exists(location));
+ storage.create(location).close();
+ assertTrue(storage.exists(location));
+
+ assertTrue(storage.deleteFile(location));
+ assertFalse(storage.exists(location));
+ assertFalse(storage.deleteFile(location));
+
+ HoodieLocation location2 = new HoodieLocation(getTempDir(),
"testDelete/2");
+ assertFalse(storage.exists(location2));
+ assertTrue(storage.createDirectory(location2));
+ assertTrue(storage.exists(location2));
+
+ assertTrue(storage.deleteDirectory(location2));
+ assertFalse(storage.exists(location2));
+ assertFalse(storage.deleteDirectory(location2));
+ }
+
+ @Test
+ public void testMakeQualified() {
+ HoodieStorage storage = getHoodieStorage();
+ HoodieLocation location = new
HoodieLocation("/tmp/testMakeQualified/1.file");
+ assertEquals(
+ new HoodieLocation("file:/tmp/testMakeQualified/1.file"),
+ storage.makeQualified(location));
+ }
+
+ @Test
+ public void testGetFileSystem() {
+ Object conf = getConf();
+ Object fs = getFileSystem(conf);
+ HoodieStorage storage = getHoodieStorage(fs, conf);
+ assertSame(fs, storage.getFileSystem());
+ }
+
+ protected String getTempDir() {
+ return "file:" + tempDir.toUri().getPath();
+ }
+
+ /**
+ * Prepares files on storage for testing.
+ *
+ * @storage {@link HoodieStorage} to use.
+ */
+ private void prepareFilesOnStorage(HoodieStorage storage) throws IOException
{
+ String dir = getTempDir();
+ for (String relativePath : RELATIVE_FILE_PATHS) {
+ storage.create(new HoodieLocation(dir, relativePath)).close();
+ }
+ }
+
+ private HoodieStorage getHoodieStorage() {
+ Object conf = getConf();
+ return getHoodieStorage(getFileSystem(conf), conf);
+ }
+
+ private void validateFileStatus(HoodieStorage storage,
+ HoodieLocation location,
+ byte[] data,
+ boolean isDirectory) throws IOException {
+ assertTrue(storage.exists(location));
+ HoodieFileStatus fileStatus = storage.getFileStatus(location);
+ assertEquals(location, fileStatus.getLocation());
+ assertEquals(isDirectory, fileStatus.isDirectory());
+ assertEquals(!isDirectory, fileStatus.isFile());
+ if (!isDirectory) {
+ assertEquals(data.length, fileStatus.getLength());
+ try (InputStream stream = storage.open(location)) {
+ assertArrayEquals(data, IOUtils.readAsByteArray(stream, data.length));
+ }
+ }
+ assertTrue(fileStatus.getModificationTime() > 0);
+ }
+
+ private void validateHoodieFileStatusList(List<HoodieFileStatus> expected,
+ List<HoodieFileStatus> actual) {
+ assertEquals(expected.size(), actual.size());
+ List<HoodieFileStatus> sortedExpected = expected.stream()
+ .sorted(Comparator.comparing(HoodieFileStatus::getLocation))
+ .collect(Collectors.toList());
+ List<HoodieFileStatus> sortedActual = actual.stream()
+ .sorted(Comparator.comparing(HoodieFileStatus::getLocation))
+ .collect(Collectors.toList());
+ for (int i = 0; i < expected.size(); i++) {
+ // We cannot use HoodieFileStatus#equals as that only compares the
location
+ assertEquals(sortedExpected.get(i).getLocation(),
sortedActual.get(i).getLocation());
+ assertEquals(sortedExpected.get(i).isDirectory(),
sortedActual.get(i).isDirectory());
+ assertEquals(sortedExpected.get(i).isFile(),
sortedActual.get(i).isFile());
+ if (sortedExpected.get(i).isFile()) {
+ assertEquals(sortedExpected.get(i).getLength(),
sortedActual.get(i).getLength());
+ }
+ assertTrue(sortedActual.get(i).getModificationTime() > 0);
+ }
+ }
+}