This is an automated email from the ASF dual-hosted git repository.
jinglun pushed a commit to branch HADOOP-19236
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 59b0ea5dfee638888a4c409448b9248a9b00c82f
Author: lijinglun <lijing...@bytedance.com>
AuthorDate: Wed Oct 23 19:58:17 2024 +0800
Integration of TOS: Add RawFileSystem.
---
.../org/apache/hadoop/fs/tosfs/RawFSUtils.java | 59 ++++++++++
.../org/apache/hadoop/fs/tosfs/RawFileSystem.java | 114 +++++++++----------
.../hadoop/fs/tosfs/RawLocatedFileStatus.java | 30 +++++
.../org/apache/hadoop/fs/tosfs/TosChecksum.java | 67 +++++++++++
.../org/apache/hadoop/fs/tosfs/common/Bytes.java | 13 +++
.../org/apache/hadoop/fs/tosfs/conf/ConfKeys.java | 23 ++++
.../org/apache/hadoop/fs/tosfs/util/FuseUtils.java | 28 +++++
.../hadoop/fs/tosfs/util/RemoteIterators.java | 122 +++++++++++++++++++++
8 files changed, 391 insertions(+), 65 deletions(-)
diff --git
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFSUtils.java
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFSUtils.java
new file mode 100644
index 00000000000..a80c03dfcc4
--- /dev/null
+++
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFSUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+
+import java.util.Objects;
+
+public class RawFSUtils {
+ private RawFSUtils() {
+ }
+
+ /**
+ * @return true means the node is included in the subtree which has the root
node.
+ */
+ public static boolean inSubtree(String root, String p) {
+ return inSubtree(new Path(root), new Path(p));
+ }
+
+ /**
+ * @return true means the node is included in the subtree which has the root
node.
+ */
+ public static boolean inSubtree(Path root, Path node) {
+ Preconditions.checkNotNull(root, "Root cannot be null");
+ Preconditions.checkNotNull(node, "Node cannot be null");
+ if (root.isRoot()) {
+ return true;
+ }
+
+ if (Objects.equals(root, node)) {
+ return true;
+ }
+
+ while (!node.isRoot()) {
+ if (Objects.equals(root, node)) {
+ return true;
+ }
+ node = node.getParent();
+ }
+ return false;
+ }
+}
diff --git
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileSystem.java
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileSystem.java
index ce3b85ab340..5a7a394b5c4 100644
---
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileSystem.java
+++
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileSystem.java
@@ -18,32 +18,8 @@
package org.apache.hadoop.fs.tosfs;
-import io.proton.commit.MagicOutputStream;
-import io.proton.common.conf.Conf;
-import io.proton.common.conf.ConfKeys;
-import io.proton.common.object.DirectoryStorage;
-import io.proton.common.object.ObjectInfo;
-import io.proton.common.object.ObjectMultiRangeInputStream;
-import io.proton.common.object.ObjectOutputStream;
-import io.proton.common.object.ObjectRangeInputStream;
-import io.proton.common.object.ObjectStorage;
-import io.proton.common.object.ObjectStorageFactory;
-import io.proton.common.object.ObjectUtils;
-import io.proton.common.object.exceptions.InvalidObjectKeyException;
-import io.proton.common.util.Bytes;
-import io.proton.common.util.Constants;
-import io.proton.common.util.FSUtils;
-import io.proton.common.util.FuseUtils;
-import io.proton.common.util.HadoopUtil;
-import io.proton.common.util.Range;
-import io.proton.common.util.RemoteIterators;
-import io.proton.common.util.ThreadPools;
-import io.proton.fs.ops.DefaultFsOps;
-import io.proton.fs.ops.DirectoryFsOps;
-import io.proton.fs.ops.FsOps;
-import io.proton.shaded.com.google.common.annotations.VisibleForTesting;
-import io.proton.shaded.com.google.common.base.Preconditions;
-import io.proton.shaded.com.google.common.collect.Iterators;
+import com.google.common.collect.Iterators;
+import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.CreateFlag;
@@ -62,7 +38,31 @@ import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.tosfs.commit.MagicOutputStream;
+import org.apache.hadoop.fs.tosfs.common.Bytes;
+import org.apache.hadoop.fs.tosfs.common.ThreadPools;
+import org.apache.hadoop.fs.tosfs.conf.ConfKeys;
+import org.apache.hadoop.fs.tosfs.object.ChecksumInfo;
+import org.apache.hadoop.fs.tosfs.object.Constants;
+import org.apache.hadoop.fs.tosfs.object.DirectoryStorage;
+import org.apache.hadoop.fs.tosfs.object.ObjectInfo;
+import org.apache.hadoop.fs.tosfs.object.ObjectMultiRangeInputStream;
+import org.apache.hadoop.fs.tosfs.object.ObjectOutputStream;
+import org.apache.hadoop.fs.tosfs.object.ObjectRangeInputStream;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory;
+import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
+import org.apache.hadoop.fs.tosfs.object.exceptions.InvalidObjectKeyException;
+import org.apache.hadoop.fs.tosfs.ops.DefaultFsOps;
+import org.apache.hadoop.fs.tosfs.ops.DirectoryFsOps;
+import org.apache.hadoop.fs.tosfs.ops.FsOps;
+import org.apache.hadoop.fs.tosfs.util.FSUtils;
+import org.apache.hadoop.fs.tosfs.util.FuseUtils;
+import org.apache.hadoop.fs.tosfs.util.Range;
+import org.apache.hadoop.fs.tosfs.util.RemoteIterators;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
@@ -79,7 +79,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -99,7 +98,7 @@ public class RawFileSystem extends FileSystem {
private static final long DFS_BLOCK_SIZE_DEFAULT = 128 << 20;
private String scheme;
- private Conf protonConf;
+ private Configuration protonConf;
private String username;
private Path workingDir;
private URI uri;
@@ -134,35 +133,24 @@ public class RawFileSystem extends FileSystem {
@Override
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
LOG.debug("Opening '{}' for reading.", path);
- FileStatus status = innerFileStatus(path);
+ RawFileStatus status = innerFileStatus(path);
if (status.isDirectory()) {
throw new FileNotFoundException(String.format("Can't open %s because it
is a directory", path));
}
// Parse the range size from the hadoop conf.
long rangeSize = getConf().getLong(
- ConfKeys.OBJECT_STREAM_RANGE_SIZE.key(),
- ConfKeys.OBJECT_STREAM_RANGE_SIZE.defaultValue());
+ ConfKeys.OBJECT_STREAM_RANGE_SIZE,
+ ConfKeys.OBJECT_STREAM_RANGE_SIZE_DEFAULT);
Preconditions.checkArgument(rangeSize > 0, "Object storage range size must
be positive.");
- FSInputStream fsIn = new ObjectMultiRangeInputStream(taskThreadPool,
storage, path, status.getLen(), rangeSize);
+ FSInputStream fsIn = new ObjectMultiRangeInputStream(taskThreadPool,
storage, path,
+ status.getLen(), rangeSize, status.checksum());
return new FSDataInputStream(fsIn);
}
- public FSDataInputStream open(Path path, String expectedChecksum, Range
range) throws IOException {
- LOG.debug("Opening '{}' for reading.", path);
- RawFileStatus status = innerFileStatus(path);
- if (status.isDirectory()) {
- throw new FileNotFoundException(String.format("Can't open %s because it
is a directory", path));
- }
-
- if (expectedChecksum != null && !Objects.equals(status.checksum(),
expectedChecksum)) {
- throw new ChecksumMismatchException(String.format("The requested file
has been staled, " +
- "request version's checksum is %s " +
- "while current version's checksum is %s", expectedChecksum,
status.checksum()));
- }
-
- return new FSDataInputStream(new ObjectRangeInputStream(storage, path,
range));
+ public FSDataInputStream open(Path path, byte[] expectedChecksum, Range
range) throws IOException {
+ return new FSDataInputStream(new ObjectRangeInputStream(storage, path,
range, expectedChecksum));
}
@Override
@@ -547,7 +535,7 @@ public class RawFileSystem extends FileSystem {
// Root directory always exists
if (key.isEmpty()) {
- return new RawFileStatus(0, true, 0, 0, qualifiedPath, username,
Constants.EMPTY_CHECKSUM);
+ return new RawFileStatus(0, true, 0, 0, qualifiedPath, username,
Constants.MAGIC_CHECKSUM);
}
try {
@@ -577,7 +565,7 @@ public class RawFileSystem extends FileSystem {
}
@Override
- public FsServerDefaults getServerDefaults(Path p) throws IOException {
+ public FsServerDefaults getServerDefaults(Path p) {
Configuration config = getConf();
// CRC32 is chosen as default as it is available in all
// releases that support checksum.
@@ -595,8 +583,8 @@ public class RawFileSystem extends FileSystem {
}
private void stopAllServices() {
- HadoopUtil.shutdownHadoopExecutors(uploadThreadPool, LOG, 30,
TimeUnit.SECONDS);
- HadoopUtil.shutdownHadoopExecutors(taskThreadPool, LOG, 30,
TimeUnit.SECONDS);
+ ThreadPools.shutdown(uploadThreadPool, 30, TimeUnit.SECONDS);
+ ThreadPools.shutdown(taskThreadPool, 30, TimeUnit.SECONDS);
}
@Override
@@ -605,10 +593,6 @@ public class RawFileSystem extends FileSystem {
setConf(conf);
this.scheme = FSUtils.scheme(conf, uri);
- // Merge the deprecated configure keys with the new configure keys and
convert hadoop conf to proton conf.
- FSUtils.withCompatibleKeys(conf, scheme);
- this.protonConf = Conf.copyOf(conf);
-
// Username is the current user at the time the FS was instantiated.
this.username = UserGroupInformation.getCurrentUser().getShortUserName();
this.workingDir = new Path("/user", username).makeQualified(uri, null);
@@ -619,14 +603,15 @@ public class RawFileSystem extends FileSystem {
throw new FileNotFoundException(String.format("Bucket: %s not found.",
uri.getAuthority()));
}
- int taskThreadPoolSize =
protonConf.get(ConfKeys.TASK_THREAD_POOL_SIZE.format(scheme));
+ int taskThreadPoolSize =
+ protonConf.getInt(ConfKeys.TASK_THREAD_POOL_SIZE,
ConfKeys.TASK_THREAD_POOL_SIZE_DEFAULT);
this.taskThreadPool = ThreadPools.newWorkerPool(TASK_THREAD_POOL_PREFIX,
taskThreadPoolSize);
- int uploadThreadPoolSize =
protonConf.get(ConfKeys.MULTIPART_THREAD_POOL_SIZE.format(scheme));
+ int uploadThreadPoolSize =
protonConf.getInt(ConfKeys.MULTIPART_THREAD_POOL_SIZE,
+ ConfKeys.MULTIPART_THREAD_POOL_SIZE_DEFAULT);
this.uploadThreadPool =
ThreadPools.newWorkerPool(MULTIPART_THREAD_POOL_PREFIX, uploadThreadPoolSize);
if (storage.bucket().isDirectory()) {
-
fsOps = new DirectoryFsOps((DirectoryStorage) storage,
this::objectToFileStatus);
} else {
fsOps = new DefaultFsOps(storage, protonConf, taskThreadPool,
this::objectToFileStatus);
@@ -651,26 +636,24 @@ public class RawFileSystem extends FileSystem {
return uploadThreadPool;
}
- String username() {
- return username;
- }
-
/**
* @return null if checksum is not supported.
*/
@Override
public FileChecksum getFileChecksum(Path f, long length) throws IOException {
Preconditions.checkArgument(length >= 0);
+
RawFileStatus fileStatus = innerFileStatus(f);
if (fileStatus.isDirectory()) {
// Compatible with HDFS
throw new FileNotFoundException(String.format("Path is not a file, %s",
f));
}
- if (!protonConf.get(ConfKeys.CHECKSUM_ENABLED.format(scheme))) {
+ if (!protonConf.getBoolean(ConfKeys.CHECKSUM_ENABLED,
ConfKeys.CHECKSUM_ENABLED_DEFAULT)) {
return null;
}
- return BaseChecksum.create(protonConf, fileStatus, length);
+ ChecksumInfo csInfo = storage.checksumInfo();
+ return new TosChecksum(csInfo.algorithm(), fileStatus.checksum());
}
@Override
@@ -708,7 +691,8 @@ public class RawFileSystem extends FileSystem {
String key = ObjectUtils.pathToKey(qualifiedPath);
Map<String, String> tags = storage.getTags(key);
- return
tags.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, t ->
Bytes.toBytes(t.getValue())));
+ return tags.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, t ->
Bytes.toBytes(t.getValue())));
}
}
@@ -736,7 +720,7 @@ public class RawFileSystem extends FileSystem {
@Override
public List<String> listXAttrs(Path path) throws IOException {
- return getXAttrs(path).keySet().stream().collect(Collectors.toList());
+ return Lists.newArrayList(getXAttrs(path).keySet());
}
@Override
diff --git
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawLocatedFileStatus.java
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawLocatedFileStatus.java
new file mode 100644
index 00000000000..562289455d3
--- /dev/null
+++
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawLocatedFileStatus.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs;
+
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.LocatedFileStatus;
+
+import static org.apache.hadoop.util.Preconditions.checkNotNull;
+
+public class RawLocatedFileStatus extends LocatedFileStatus {
+ public RawLocatedFileStatus(RawFileStatus status, BlockLocation[] locations)
{
+ super(checkNotNull(status), locations);
+ }
+}
diff --git
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/TosChecksum.java
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/TosChecksum.java
new file mode 100644
index 00000000000..f0bedc60c4b
--- /dev/null
+++
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/TosChecksum.java
@@ -0,0 +1,67 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs;
+
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.tosfs.common.Bytes;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class TosChecksum extends FileChecksum {
+ private String algorithm;
+ private byte[] checksum;
+
+ public TosChecksum(String algorithm, byte[] checksum) {
+ this.algorithm = algorithm;
+ this.checksum = checksum;
+ }
+
+ @Override
+ public String getAlgorithmName() {
+ return algorithm;
+ }
+
+ @Override
+ public int getLength() {
+ return checksum.length;
+ }
+
+ @Override
+ public byte[] getBytes() {
+ return checksum;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ byte[] algorithmBytes = Bytes.toBytes(algorithm);
+ out.write(algorithmBytes.length);
+ out.write(algorithmBytes);
+ out.write(checksum.length);
+ out.write(checksum);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ byte[] algorithmBytes = new byte[in.readInt()];
+ in.readFully(algorithmBytes);
+ algorithm = Bytes.toString(algorithmBytes);
+ checksum = new byte[in.readInt()];
+ in.readFully(checksum);
+ }
+}
diff --git
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/Bytes.java
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/Bytes.java
index c43bbf50359..eb632050a53 100644
---
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/Bytes.java
+++
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/Bytes.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.fs.tosfs.common;
import org.apache.hadoop.util.Preconditions;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
public class Bytes {
private Bytes() {
@@ -63,6 +64,10 @@ public class Bytes {
return b;
}
+ public static byte[] toBytes(String s) {
+ return s.getBytes(StandardCharsets.UTF_8);
+ }
+
// Decode big-endian binaries into basic Java types.
public static boolean toBoolean(byte[] b) {
@@ -166,4 +171,12 @@ public class Bytes {
System.arraycopy(b, off, data, 0, len);
return data;
}
+
+ public static String toString(byte[] b) {
+ return new String(b, StandardCharsets.UTF_8);
+ }
+
+ public static String toString(byte[] b, int off, int len) {
+ return new String(b, off, len, StandardCharsets.UTF_8);
+ }
}
diff --git
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java
index cf331cec999..ad694afad94 100644
---
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java
+++
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java
@@ -117,6 +117,29 @@ public class ConfKeys {
public static final String OBJECT_STREAM_RANGE_SIZE =
"proton.objectstorage.stream.range-size";
public static final long OBJECT_STREAM_RANGE_SIZE_DEFAULT = Long.MAX_VALUE;
+ /**
+ * The size of thread pool used for running tasks in parallel for the given
object fs,
+ * e.g. delete objects, copy files. the key example:
fs.tos.task.thread-pool-size.
+ */
+ public static final String TASK_THREAD_POOL_SIZE =
"fs.tos.task.thread-pool-size";
+ public static final int TASK_THREAD_POOL_SIZE_DEFAULT =
+ Math.max(2, Runtime.getRuntime().availableProcessors());
+
+ /**
+ * The size of thread pool used for uploading multipart in parallel for the
given object storage,
+ * e.g. fs.tos.multipart.thread-pool-size
+ */
+ public static final String MULTIPART_THREAD_POOL_SIZE =
"fs.tos.multipart.thread-pool-size";
+ public static final int MULTIPART_THREAD_POOL_SIZE_DEFAULT =
+ Math.max(2, Runtime.getRuntime().availableProcessors());
+
+ /**
+ * The toggle indicates whether enable checksum during getting file status
for the given object.
+ * E.g. fs.tos.checksum.enabled
+ */
+ public static final String CHECKSUM_ENABLED = "fs.tos.checksum.enabled";
+ public static final boolean CHECKSUM_ENABLED_DEFAULT = true;
+
public static String defaultDir(String basename) {
String tmpdir = System.getProperty("java.io.tmpdir");
Preconditions.checkNotNull(tmpdir, "System property 'java.io.tmpdir'
cannot be null");
diff --git
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/FuseUtils.java
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/FuseUtils.java
new file mode 100644
index 00000000000..7fae5a23799
--- /dev/null
+++
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/FuseUtils.java
@@ -0,0 +1,28 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.util;
+
+public class FuseUtils {
+ public static final String ENV_TOS_ENABLE_FUSE = "TOS_ENABLE_FUSE";
+
+ private FuseUtils() {
+ }
+
+ public static boolean fuseEnabled() {
+ return ParseUtils.envAsBoolean(ENV_TOS_ENABLE_FUSE, false);
+ }
+}
diff --git
a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/RemoteIterators.java
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/RemoteIterators.java
new file mode 100644
index 00000000000..bf1d428a124
--- /dev/null
+++
b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/RemoteIterators.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.util;
+
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public class RemoteIterators {
+
+ private RemoteIterators() {
+ }
+
+ /**
+ * Create an iterator from a singleton.
+ *
+ * @param singleton instance
+ * @param <T> type
+ * @return a remote iterator
+ */
+ public static <T> RemoteIterator<T> fromSingleton(@Nullable T singleton) {
+ return new SingletonIterator<>(singleton);
+ }
+
+ /**
+ * Create an iterator from an iterable and a transformation function.
+ *
+ * @param <S> source type
+ * @param <T> result type
+ * @param iterator source
+ * @param mapper transformation
+ * @return a remote iterator
+ */
+ public static <S, T> RemoteIterator<T> fromIterable(Iterable<S> iterator,
FunctionRaisingIOE<S, T> mapper) {
+ return new IterableRemoteIterator<>(iterator, mapper);
+ }
+
+ public interface FunctionRaisingIOE<S, T> {
+
+ /**
+ * Apply the function.
+ *
+ * @param s argument 1
+ * @return result
+ * @throws IOException Any IO failure
+ */
+ T apply(S s) throws IOException;
+ }
+
+ private static final class IterableRemoteIterator<S, T> implements
RemoteIterator<T> {
+ private final Iterator<S> sourceIterator;
+ private final FunctionRaisingIOE<S, T> mapper;
+
+ private IterableRemoteIterator(Iterable<S> source, FunctionRaisingIOE<S,
T> mapper) {
+ this.sourceIterator = source.iterator();
+ this.mapper = mapper;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return sourceIterator.hasNext();
+ }
+
+ @Override
+ public T next() throws IOException {
+ return mapper.apply(sourceIterator.next());
+ }
+ }
+
+ private static final class SingletonIterator<T> implements RemoteIterator<T>
{
+ private final T singleton;
+
+ private boolean processed;
+
+ private SingletonIterator(@Nullable T singleton) {
+ this.singleton = singleton;
+ this.processed = singleton == null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !processed;
+ }
+
+ @Override
+ public T next() {
+ if (hasNext()) {
+ processed = true;
+ return singleton;
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("singleton", singleton)
+ .toString();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org