This is an automated email from the ASF dual-hosted git repository. jinglun pushed a commit to branch HADOOP-19236-original in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 59b0ea5dfee638888a4c409448b9248a9b00c82f Author: lijinglun <lijing...@bytedance.com> AuthorDate: Wed Oct 23 19:58:17 2024 +0800 Integration of TOS: Add RawFileSystem. --- .../org/apache/hadoop/fs/tosfs/RawFSUtils.java | 59 ++++++++++ .../org/apache/hadoop/fs/tosfs/RawFileSystem.java | 114 +++++++++---------- .../hadoop/fs/tosfs/RawLocatedFileStatus.java | 30 +++++ .../org/apache/hadoop/fs/tosfs/TosChecksum.java | 67 +++++++++++ .../org/apache/hadoop/fs/tosfs/common/Bytes.java | 13 +++ .../org/apache/hadoop/fs/tosfs/conf/ConfKeys.java | 23 ++++ .../org/apache/hadoop/fs/tosfs/util/FuseUtils.java | 28 +++++ .../hadoop/fs/tosfs/util/RemoteIterators.java | 122 +++++++++++++++++++++ 8 files changed, 391 insertions(+), 65 deletions(-) diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFSUtils.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFSUtils.java new file mode 100644 index 00000000000..a80c03dfcc4 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFSUtils.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; + +import java.util.Objects; + +public class RawFSUtils { + private RawFSUtils() { + } + + /** + * @return true means the node is included in the subtree which has the root node. + */ + public static boolean inSubtree(String root, String p) { + return inSubtree(new Path(root), new Path(p)); + } + + /** + * @return true means the node is included in the subtree which has the root node. + */ + public static boolean inSubtree(Path root, Path node) { + Preconditions.checkNotNull(root, "Root cannot be null"); + Preconditions.checkNotNull(node, "Node cannot be null"); + if (root.isRoot()) { + return true; + } + + if (Objects.equals(root, node)) { + return true; + } + + while (!node.isRoot()) { + if (Objects.equals(root, node)) { + return true; + } + node = node.getParent(); + } + return false; + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileSystem.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileSystem.java index ce3b85ab340..5a7a394b5c4 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileSystem.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileSystem.java @@ -18,32 +18,8 @@ package org.apache.hadoop.fs.tosfs; -import io.proton.commit.MagicOutputStream; -import io.proton.common.conf.Conf; -import io.proton.common.conf.ConfKeys; -import io.proton.common.object.DirectoryStorage; -import io.proton.common.object.ObjectInfo; -import io.proton.common.object.ObjectMultiRangeInputStream; -import io.proton.common.object.ObjectOutputStream; -import io.proton.common.object.ObjectRangeInputStream; -import io.proton.common.object.ObjectStorage; -import io.proton.common.object.ObjectStorageFactory; -import io.proton.common.object.ObjectUtils; -import io.proton.common.object.exceptions.InvalidObjectKeyException; -import io.proton.common.util.Bytes; -import io.proton.common.util.Constants; -import io.proton.common.util.FSUtils; -import io.proton.common.util.FuseUtils; -import io.proton.common.util.HadoopUtil; -import io.proton.common.util.Range; -import io.proton.common.util.RemoteIterators; -import io.proton.common.util.ThreadPools; -import io.proton.fs.ops.DefaultFsOps; -import io.proton.fs.ops.DirectoryFsOps; -import io.proton.fs.ops.FsOps; -import io.proton.shaded.com.google.common.annotations.VisibleForTesting; -import io.proton.shaded.com.google.common.base.Preconditions; -import io.proton.shaded.com.google.common.collect.Iterators; +import com.google.common.collect.Iterators; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CreateFlag; @@ -62,7 +38,31 @@ import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.tosfs.commit.MagicOutputStream; +import org.apache.hadoop.fs.tosfs.common.Bytes; +import org.apache.hadoop.fs.tosfs.common.ThreadPools; +import org.apache.hadoop.fs.tosfs.conf.ConfKeys; +import org.apache.hadoop.fs.tosfs.object.ChecksumInfo; +import org.apache.hadoop.fs.tosfs.object.Constants; +import org.apache.hadoop.fs.tosfs.object.DirectoryStorage; +import org.apache.hadoop.fs.tosfs.object.ObjectInfo; +import org.apache.hadoop.fs.tosfs.object.ObjectMultiRangeInputStream; +import org.apache.hadoop.fs.tosfs.object.ObjectOutputStream; +import org.apache.hadoop.fs.tosfs.object.ObjectRangeInputStream; +import org.apache.hadoop.fs.tosfs.object.ObjectStorage; +import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory; +import org.apache.hadoop.fs.tosfs.object.ObjectUtils; +import org.apache.hadoop.fs.tosfs.object.exceptions.InvalidObjectKeyException; +import org.apache.hadoop.fs.tosfs.ops.DefaultFsOps; +import org.apache.hadoop.fs.tosfs.ops.DirectoryFsOps; +import org.apache.hadoop.fs.tosfs.ops.FsOps; +import org.apache.hadoop.fs.tosfs.util.FSUtils; +import org.apache.hadoop.fs.tosfs.util.FuseUtils; +import org.apache.hadoop.fs.tosfs.util.Range; +import org.apache.hadoop.fs.tosfs.util.RemoteIterators; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; @@ -79,7 +79,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -99,7 +98,7 @@ public class RawFileSystem extends FileSystem { private static final long DFS_BLOCK_SIZE_DEFAULT = 128 << 20; private String scheme; - private Conf protonConf; + private Configuration protonConf; private String username; private Path workingDir; private URI uri; @@ -134,35 +133,24 @@ public class RawFileSystem extends FileSystem { @Override public FSDataInputStream open(Path path, int bufferSize) throws IOException { LOG.debug("Opening '{}' for reading.", path); - FileStatus status = innerFileStatus(path); + RawFileStatus status = innerFileStatus(path); if (status.isDirectory()) { throw new FileNotFoundException(String.format("Can't open %s because it is a directory", path)); } // Parse the range size from the hadoop conf. long rangeSize = getConf().getLong( - ConfKeys.OBJECT_STREAM_RANGE_SIZE.key(), - ConfKeys.OBJECT_STREAM_RANGE_SIZE.defaultValue()); + ConfKeys.OBJECT_STREAM_RANGE_SIZE, + ConfKeys.OBJECT_STREAM_RANGE_SIZE_DEFAULT); Preconditions.checkArgument(rangeSize > 0, "Object storage range size must be positive."); - FSInputStream fsIn = new ObjectMultiRangeInputStream(taskThreadPool, storage, path, status.getLen(), rangeSize); + FSInputStream fsIn = new ObjectMultiRangeInputStream(taskThreadPool, storage, path, + status.getLen(), rangeSize, status.checksum()); return new FSDataInputStream(fsIn); } - public FSDataInputStream open(Path path, String expectedChecksum, Range range) throws IOException { - LOG.debug("Opening '{}' for reading.", path); - RawFileStatus status = innerFileStatus(path); - if (status.isDirectory()) { - throw new FileNotFoundException(String.format("Can't open %s because it is a directory", path)); - } - - if (expectedChecksum != null && !Objects.equals(status.checksum(), expectedChecksum)) { - throw new ChecksumMismatchException(String.format("The requested file has been staled, " + - "request version's checksum is %s " + - "while current version's checksum is %s", expectedChecksum, status.checksum())); - } - - return new FSDataInputStream(new ObjectRangeInputStream(storage, path, range)); + public FSDataInputStream open(Path path, byte[] expectedChecksum, Range range) throws IOException { + return new FSDataInputStream(new ObjectRangeInputStream(storage, path, range, expectedChecksum)); } @Override @@ -547,7 +535,7 @@ public class RawFileSystem extends FileSystem { // Root directory always exists if (key.isEmpty()) { - return new RawFileStatus(0, true, 0, 0, qualifiedPath, username, Constants.EMPTY_CHECKSUM); + return new RawFileStatus(0, true, 0, 0, qualifiedPath, username, Constants.MAGIC_CHECKSUM); } try { @@ -577,7 +565,7 @@ public class RawFileSystem extends FileSystem { } @Override - public FsServerDefaults getServerDefaults(Path p) throws IOException { + public FsServerDefaults getServerDefaults(Path p) { Configuration config = getConf(); // CRC32 is chosen as default as it is available in all // releases that support checksum. @@ -595,8 +583,8 @@ public class RawFileSystem extends FileSystem { } private void stopAllServices() { - HadoopUtil.shutdownHadoopExecutors(uploadThreadPool, LOG, 30, TimeUnit.SECONDS); - HadoopUtil.shutdownHadoopExecutors(taskThreadPool, LOG, 30, TimeUnit.SECONDS); + ThreadPools.shutdown(uploadThreadPool, 30, TimeUnit.SECONDS); + ThreadPools.shutdown(taskThreadPool, 30, TimeUnit.SECONDS); } @Override @@ -605,10 +593,6 @@ public class RawFileSystem extends FileSystem { setConf(conf); this.scheme = FSUtils.scheme(conf, uri); - // Merge the deprecated configure keys with the new configure keys and convert hadoop conf to proton conf. - FSUtils.withCompatibleKeys(conf, scheme); - this.protonConf = Conf.copyOf(conf); - // Username is the current user at the time the FS was instantiated. this.username = UserGroupInformation.getCurrentUser().getShortUserName(); this.workingDir = new Path("/user", username).makeQualified(uri, null); @@ -619,14 +603,15 @@ public class RawFileSystem extends FileSystem { throw new FileNotFoundException(String.format("Bucket: %s not found.", uri.getAuthority())); } - int taskThreadPoolSize = protonConf.get(ConfKeys.TASK_THREAD_POOL_SIZE.format(scheme)); + int taskThreadPoolSize = + protonConf.getInt(ConfKeys.TASK_THREAD_POOL_SIZE, ConfKeys.TASK_THREAD_POOL_SIZE_DEFAULT); this.taskThreadPool = ThreadPools.newWorkerPool(TASK_THREAD_POOL_PREFIX, taskThreadPoolSize); - int uploadThreadPoolSize = protonConf.get(ConfKeys.MULTIPART_THREAD_POOL_SIZE.format(scheme)); + int uploadThreadPoolSize = protonConf.getInt(ConfKeys.MULTIPART_THREAD_POOL_SIZE, + ConfKeys.MULTIPART_THREAD_POOL_SIZE_DEFAULT); this.uploadThreadPool = ThreadPools.newWorkerPool(MULTIPART_THREAD_POOL_PREFIX, uploadThreadPoolSize); if (storage.bucket().isDirectory()) { - fsOps = new DirectoryFsOps((DirectoryStorage) storage, this::objectToFileStatus); } else { fsOps = new DefaultFsOps(storage, protonConf, taskThreadPool, this::objectToFileStatus); @@ -651,26 +636,24 @@ public class RawFileSystem extends FileSystem { return uploadThreadPool; } - String username() { - return username; - } - /** * @return null if checksum is not supported. */ @Override public FileChecksum getFileChecksum(Path f, long length) throws IOException { Preconditions.checkArgument(length >= 0); + RawFileStatus fileStatus = innerFileStatus(f); if (fileStatus.isDirectory()) { // Compatible with HDFS throw new FileNotFoundException(String.format("Path is not a file, %s", f)); } - if (!protonConf.get(ConfKeys.CHECKSUM_ENABLED.format(scheme))) { + if (!protonConf.getBoolean(ConfKeys.CHECKSUM_ENABLED, ConfKeys.CHECKSUM_ENABLED_DEFAULT)) { return null; } - return BaseChecksum.create(protonConf, fileStatus, length); + ChecksumInfo csInfo = storage.checksumInfo(); + return new TosChecksum(csInfo.algorithm(), fileStatus.checksum()); } @Override @@ -708,7 +691,8 @@ public class RawFileSystem extends FileSystem { String key = ObjectUtils.pathToKey(qualifiedPath); Map<String, String> tags = storage.getTags(key); - return tags.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, t -> Bytes.toBytes(t.getValue()))); + return tags.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, t -> Bytes.toBytes(t.getValue()))); } } @@ -736,7 +720,7 @@ public class RawFileSystem extends FileSystem { @Override public List<String> listXAttrs(Path path) throws IOException { - return getXAttrs(path).keySet().stream().collect(Collectors.toList()); + return Lists.newArrayList(getXAttrs(path).keySet()); } @Override diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawLocatedFileStatus.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawLocatedFileStatus.java new file mode 100644 index 00000000000..562289455d3 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawLocatedFileStatus.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs; + +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.LocatedFileStatus; + +import static org.apache.hadoop.util.Preconditions.checkNotNull; + +public class RawLocatedFileStatus extends LocatedFileStatus { + public RawLocatedFileStatus(RawFileStatus status, BlockLocation[] locations) { + super(checkNotNull(status), locations); + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/TosChecksum.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/TosChecksum.java new file mode 100644 index 00000000000..f0bedc60c4b --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/TosChecksum.java @@ -0,0 +1,67 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs; + +import org.apache.hadoop.fs.FileChecksum; +import org.apache.hadoop.fs.tosfs.common.Bytes; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class TosChecksum extends FileChecksum { + private String algorithm; + private byte[] checksum; + + public TosChecksum(String algorithm, byte[] checksum) { + this.algorithm = algorithm; + this.checksum = checksum; + } + + @Override + public String getAlgorithmName() { + return algorithm; + } + + @Override + public int getLength() { + return checksum.length; + } + + @Override + public byte[] getBytes() { + return checksum; + } + + @Override + public void write(DataOutput out) throws IOException { + byte[] algorithmBytes = Bytes.toBytes(algorithm); + out.write(algorithmBytes.length); + out.write(algorithmBytes); + out.write(checksum.length); + out.write(checksum); + } + + @Override + public void readFields(DataInput in) throws IOException { + byte[] algorithmBytes = new byte[in.readInt()]; + in.readFully(algorithmBytes); + algorithm = Bytes.toString(algorithmBytes); + checksum = new byte[in.readInt()]; + in.readFully(checksum); + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/Bytes.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/Bytes.java index c43bbf50359..eb632050a53 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/Bytes.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/Bytes.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.tosfs.common; import org.apache.hadoop.util.Preconditions; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; public class Bytes { private Bytes() { @@ -63,6 +64,10 @@ public class Bytes { return b; } + public static byte[] toBytes(String s) { + return s.getBytes(StandardCharsets.UTF_8); + } + // Decode big-endian binaries into basic Java types. public static boolean toBoolean(byte[] b) { @@ -166,4 +171,12 @@ public class Bytes { System.arraycopy(b, off, data, 0, len); return data; } + + public static String toString(byte[] b) { + return new String(b, StandardCharsets.UTF_8); + } + + public static String toString(byte[] b, int off, int len) { + return new String(b, off, len, StandardCharsets.UTF_8); + } } diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java index cf331cec999..ad694afad94 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java @@ -117,6 +117,29 @@ public class ConfKeys { public static final String OBJECT_STREAM_RANGE_SIZE = "proton.objectstorage.stream.range-size"; public static final long OBJECT_STREAM_RANGE_SIZE_DEFAULT = Long.MAX_VALUE; + /** + * The size of thread pool used for running tasks in parallel for the given object fs, + * e.g. delete objects, copy files. the key example: fs.tos.task.thread-pool-size. + */ + public static final String TASK_THREAD_POOL_SIZE = "fs.tos.task.thread-pool-size"; + public static final int TASK_THREAD_POOL_SIZE_DEFAULT = + Math.max(2, Runtime.getRuntime().availableProcessors()); + + /** + * The size of thread pool used for uploading multipart in parallel for the given object storage, + * e.g. fs.tos.multipart.thread-pool-size + */ + public static final String MULTIPART_THREAD_POOL_SIZE = "fs.tos.multipart.thread-pool-size"; + public static final int MULTIPART_THREAD_POOL_SIZE_DEFAULT = + Math.max(2, Runtime.getRuntime().availableProcessors()); + + /** + * The toggle indicates whether enable checksum during getting file status for the given object. + * E.g. fs.tos.checksum.enabled + */ + public static final String CHECKSUM_ENABLED = "fs.tos.checksum.enabled"; + public static final boolean CHECKSUM_ENABLED_DEFAULT = true; + public static String defaultDir(String basename) { String tmpdir = System.getProperty("java.io.tmpdir"); Preconditions.checkNotNull(tmpdir, "System property 'java.io.tmpdir' cannot be null"); diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/FuseUtils.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/FuseUtils.java new file mode 100644 index 00000000000..7fae5a23799 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/FuseUtils.java @@ -0,0 +1,28 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.util; + +public class FuseUtils { + public static final String ENV_TOS_ENABLE_FUSE = "TOS_ENABLE_FUSE"; + + private FuseUtils() { + } + + public static boolean fuseEnabled() { + return ParseUtils.envAsBoolean(ENV_TOS_ENABLE_FUSE, false); + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/RemoteIterators.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/RemoteIterators.java new file mode 100644 index 00000000000..bf1d428a124 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/RemoteIterators.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.util; + +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; + +public class RemoteIterators { + + private RemoteIterators() { + } + + /** + * Create an iterator from a singleton. + * + * @param singleton instance + * @param <T> type + * @return a remote iterator + */ + public static <T> RemoteIterator<T> fromSingleton(@Nullable T singleton) { + return new SingletonIterator<>(singleton); + } + + /** + * Create an iterator from an iterable and a transformation function. + * + * @param <S> source type + * @param <T> result type + * @param iterator source + * @param mapper transformation + * @return a remote iterator + */ + public static <S, T> RemoteIterator<T> fromIterable(Iterable<S> iterator, FunctionRaisingIOE<S, T> mapper) { + return new IterableRemoteIterator<>(iterator, mapper); + } + + public interface FunctionRaisingIOE<S, T> { + + /** + * Apply the function. + * + * @param s argument 1 + * @return result + * @throws IOException Any IO failure + */ + T apply(S s) throws IOException; + } + + private static final class IterableRemoteIterator<S, T> implements RemoteIterator<T> { + private final Iterator<S> sourceIterator; + private final FunctionRaisingIOE<S, T> mapper; + + private IterableRemoteIterator(Iterable<S> source, FunctionRaisingIOE<S, T> mapper) { + this.sourceIterator = source.iterator(); + this.mapper = mapper; + } + + @Override + public boolean hasNext() { + return sourceIterator.hasNext(); + } + + @Override + public T next() throws IOException { + return mapper.apply(sourceIterator.next()); + } + } + + private static final class SingletonIterator<T> implements RemoteIterator<T> { + private final T singleton; + + private boolean processed; + + private SingletonIterator(@Nullable T singleton) { + this.singleton = singleton; + this.processed = singleton == null; + } + + @Override + public boolean hasNext() { + return !processed; + } + + @Override + public T next() { + if (hasNext()) { + processed = true; + return singleton; + } else { + throw new NoSuchElementException(); + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("singleton", singleton) + .toString(); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org