This is an automated email from the ASF dual-hosted git repository. jinglun pushed a commit to branch HADOOP-19236-original in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit ad155eecb4668ca7b748f7da543b25877f540faa Author: lijinglun <lijing...@bytedance.com> AuthorDate: Mon Aug 12 19:39:27 2024 +0800 Integration of TOS: Add ObjectStorage interface. --- .../org/apache/hadoop/fs/tosfs/oss/BucketInfo.java | 91 +++++ .../apache/hadoop/fs/tosfs/oss/ChecksumInfo.java | 38 +++ .../apache/hadoop/fs/tosfs/oss/ChecksumType.java | 61 ++++ .../org/apache/hadoop/fs/tosfs/oss/Constants.java | 30 ++ .../hadoop/fs/tosfs/oss/InputStreamProvider.java | 35 ++ .../hadoop/fs/tosfs/oss/MultipartUpload.java | 103 ++++++ .../apache/hadoop/fs/tosfs/oss/ObjectContent.java | 53 +++ .../org/apache/hadoop/fs/tosfs/oss/ObjectInfo.java | 118 +++++++ .../apache/hadoop/fs/tosfs/oss/ObjectStorage.java | 368 +++++++++++++++++++++ .../java/org/apache/hadoop/fs/tosfs/oss/Part.java | 79 +++++ .../fs/tosfs/oss/request/ListObjectsRequest.java | 85 +++++ .../fs/tosfs/oss/response/ListObjectsResponse.java | 43 +++ 12 files changed, 1104 insertions(+) diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/BucketInfo.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/BucketInfo.java new file mode 100644 index 00000000000..4e716f9c8f1 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/BucketInfo.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.oss; + +import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects; + +import java.util.Objects; + +/** + * There are two kinds of bucket types: general purpose bucket(common bucket) and directory bucket. + * Directory bucket organize data hierarchically into directories as opposed to the flat storage + * structure of general purpose buckets. + * <p> + * Only a few object storages support directory bucket, e.g. S3, OBS, TOS. Generally, directory + * bucket supports rename or delete dir with constant time complexity O(1), but these object + * storages have slight differences on these APIs. E.g. S3 doesn't provide rename API. S3 will + * recursively delete any empty parent directories in the object path during delete an object in a + * directory bucket, but TOS won't delete any empty parent directories. + * <p> + * And also there are some general difference between general purpose bucket and directory bucket. + * <ul> + * <li>Directory bucket treats the object end with '/' as the directory during creating object. + * </li> + * <li>TOS directory bucket will create missed parent dir during create object automatically, + * but general purpose bucket only create one object.</li> + * <li>Directory bucket doesn't allow create any object under a file, but general purpose bucket + * haven't this constraint.</li> + * <li>If a object 'a/b/' exists in directory bucket, both head('a/b') and head('a/b/') will get + * the object meta, but only head('a/b/') can get the object meta from general purpose bucket</li> + * <li>TOS directory bucket provides atomic rename/delete dir abilities</li> + * </ul> + */ +public class BucketInfo { + private final String name; + private final boolean directory; + + public BucketInfo(String name, boolean directory) { + this.name = name; + this.directory = directory; + } + + public String name() { + return name; + } + + public boolean isDirectory() { + return directory; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (!(o instanceof BucketInfo)) { + return false; + } + + BucketInfo that = (BucketInfo) o; + return Objects.equals(name, that.name) + && Objects.equals(directory, that.directory); + } + + @Override + public int hashCode() { + return Objects.hash(name, directory); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("name", name) + .add("directory", directory) + .toString(); + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/ChecksumInfo.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/ChecksumInfo.java new file mode 100644 index 00000000000..9af3e30fdde --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/ChecksumInfo.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.oss; + +public class ChecksumInfo { + private final String algorithm; + private final ChecksumType checksumType; + + public ChecksumInfo(String algorithm, ChecksumType checksumType) { + this.algorithm = algorithm; + this.checksumType = checksumType; + } + + public String algorithm() { + return algorithm; + } + + public ChecksumType checksumType() { + return checksumType; + } +} + diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/ChecksumType.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/ChecksumType.java new file mode 100644 index 00000000000..2d2b8fb98e4 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/ChecksumType.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.oss; + +public enum ChecksumType { + NULL((byte) 0, 0), + CRC32((byte) 1, 4), + + CRC32C((byte) 2, 4), + CRC64ECMA((byte) 3, 8), + MD5((byte) 4, 128); + + private final byte value; + private final int bytes; + + ChecksumType(byte value, int bytes) { + this.value = value; + this.bytes = bytes; + } + + public byte value() { + return value; + } + + public int bytes() { + return bytes; + } + + public static ChecksumType valueOf(byte value) { + for (ChecksumType t : values()) { + if (t.value == value) { + return t; + } + } + throw new IllegalStateException("Unknown checksum type with value: " + value); + } + + public static int maxBytes() { + int maxBytes = 0; + for (ChecksumType t : values()) { + maxBytes = Math.max(maxBytes, t.bytes()); + } + return maxBytes; + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/Constants.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/Constants.java new file mode 100644 index 00000000000..fb3439bca3b --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/Constants.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.oss; + +public class Constants { + private Constants() { + } + + // Magic checksum means doesn't support checksum, if the file type is dir or the filesystem/object + // storage doesn't implement checksum algorithm will use magic checksum as the file checksum. + public static final byte[] MAGIC_CHECKSUM = new byte[] { 'M' }; + + public static final String SLASH = "/"; +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/InputStreamProvider.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/InputStreamProvider.java new file mode 100644 index 00000000000..6b4f0a91e0a --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/InputStreamProvider.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.oss; + +import java.io.InputStream; + +/** + * Provides the content stream of a request. + * <p> + * Each call to the {@link #newStream()} method must result in a stream + * whose position is at the beginning of the content. + * Implementations may return a new stream or the same stream for each call. + * If returning a new stream, the implementation must ensure to {@code close()} + * and free any resources acquired by the previous stream. + */ +public interface InputStreamProvider { + InputStream newStream(); +} + diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/MultipartUpload.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/MultipartUpload.java new file mode 100644 index 00000000000..70907da224c --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/MultipartUpload.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.oss; + +import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects; + +import java.util.Objects; + +public class MultipartUpload implements Comparable<MultipartUpload> { + private final String key; + private final String uploadId; + private final int minPartSize; + private final int maxPartCount; + + public MultipartUpload(String key, String uploadId, int minPartSize, int maxPartCount) { + this.key = key; + this.uploadId = uploadId; + this.minPartSize = minPartSize; + this.maxPartCount = maxPartCount; + } + + public String key() { + return key; + } + + public String uploadId() { + return uploadId; + } + + public int minPartSize() { + return minPartSize; + } + + public int maxPartCount() { + return maxPartCount; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (!(o instanceof MultipartUpload)) { + return false; + } + + MultipartUpload that = (MultipartUpload) o; + if (!Objects.equals(key, that.key)) { + return false; + } + if (!Objects.equals(uploadId, that.uploadId)) { + return false; + } + if (!Objects.equals(minPartSize, that.minPartSize)) { + return false; + } + return Objects.equals(maxPartCount, that.maxPartCount); + } + + @Override + public int hashCode() { + return Objects.hash(key, uploadId, minPartSize, maxPartCount); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("key", key) + .add("uploadId", uploadId) + .add("minPartSize", minPartSize) + .add("maxPartCount", maxPartCount) + .toString(); + } + + @Override + public int compareTo(MultipartUpload o) { + if (this == o) { + return 0; + } else if (o == null) { + return 1; + } else if (this.key.compareTo(o.key) == 0) { + return this.uploadId.compareTo(o.uploadId); + } else { + return this.key.compareTo(o.key); + } + } +} + diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/ObjectContent.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/ObjectContent.java new file mode 100644 index 00000000000..453c3af1cd8 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/ObjectContent.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.oss; + +import org.apache.hadoop.fs.tosfs.common.ChecksumMismatchException; +import org.apache.hadoop.fs.tosfs.common.CommonUtils; + +import java.io.InputStream; +import java.util.Arrays; + +public class ObjectContent { + private final byte[] checksum; + private final InputStream stream; + + public ObjectContent(byte[] checksum, InputStream stream) { + this.checksum = checksum; + this.stream = stream; + } + + public InputStream stream() { + return stream; + } + + public InputStream verifiedStream(byte[] expectedChecksum) throws ChecksumMismatchException { + if (!Arrays.equals(expectedChecksum, checksum)) { + CommonUtils.runQuietly(stream::close); + throw new ChecksumMismatchException(expectedChecksum, checksum); + } + + return stream; + } + + public byte[] checksum() { + return checksum; + } +} + diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/ObjectInfo.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/ObjectInfo.java new file mode 100644 index 00000000000..3ef7e3fbf2f --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/ObjectInfo.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.oss; + +import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects; +import org.apache.hadoop.util.StringUtils; + +import java.util.Arrays; +import java.util.Date; +import java.util.Objects; + +import static org.apache.hadoop.util.Preconditions.checkArgument; + +public class ObjectInfo { + private final String key; + private final long size; + private final Date mtime; + private final boolean isDir; + private final byte[] checksum; + + public ObjectInfo(String key, long size, Date mtime, byte[] checksum) { + this(key, size, mtime, checksum, ObjectInfo.isDir(key)); + } + + public ObjectInfo(String key, long size, Date mtime, byte[] checksum, boolean isDir) { + checkArgument(key != null, "Key is null"); + checkArgument(size >= 0, "The size of key(%s) is negative", key); + checkArgument(mtime != null, "The modified time of key(%s) null.", key); + this.key = key; + this.size = size; + this.mtime = mtime; + this.isDir = isDir; + // checksum can be null since some object storage might not support checksum. + this.checksum = checksum == null || isDir ? Constants.MAGIC_CHECKSUM : checksum; + } + + public String key() { + return key; + } + + /** + * The size of directory object is 0; + * + * @return the size of object. + */ + public long size() { + return isDir ? 0 : size; + } + + public Date mtime() { + return mtime; + } + + /** + * @return {@link Constants#MAGIC_CHECKSUM} if the object is a dir or the object storage + * doesn't support the given checksum type. + */ + public byte[] checksum() { + return checksum; + } + + public boolean isDir() { + return isDir; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (!(o instanceof ObjectInfo)) { + return false; + } + + ObjectInfo that = (ObjectInfo) o; + return Objects.equals(key, that.key) + && Objects.equals(size, that.size) + && Objects.equals(mtime, that.mtime) + && Arrays.equals(checksum, that.checksum) + && Objects.equals(isDir, that.isDir); + } + + @Override + public int hashCode() { + return Objects.hash(key, size, mtime, Arrays.hashCode(checksum), isDir); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("key", key) + .add("size", size) + .add("mtime", mtime) + .add("checksum", StringUtils.byteToHexString(checksum)) + .add("isDir", isDir) + .toString(); + } + + public static boolean isDir(String key) { + return key.endsWith(Constants.SLASH); + } +} + diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/ObjectStorage.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/ObjectStorage.java new file mode 100644 index 00000000000..45fa4ed5865 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/ObjectStorage.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.oss; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.tosfs.common.InvalidObjectKeyException; +import org.apache.hadoop.fs.tosfs.common.LazyReload; +import org.apache.hadoop.fs.tosfs.common.NotAppendableException; +import org.apache.hadoop.fs.tosfs.oss.request.ListObjectsRequest; +import org.apache.hadoop.fs.tosfs.oss.response.ListObjectsResponse; + +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public interface ObjectStorage extends Closeable { + String EMPTY_DELIMITER = ""; + + /** + * @return Scheme of the object storage. + */ + String scheme(); + + /** + * @return null if bucket doesn't exist. + */ + BucketInfo bucket(); + + /** + * Initialize the Object storage, according to the properties. + * + * @param conf to initialize the {@link ObjectStorage} + * @param bucket the corresponding bucket name, each object store has one bucket. + */ + void initialize(Configuration conf, String bucket); + + /** + * @return storage conf + */ + Configuration conf(); + + default ObjectContent get(String key) { + return get(key, 0, -1); + } + + /** + * Get the data for the given object specified by key. + * Throw {@link RuntimeException} if object key doesn't exist. + * Throw {@link RuntimeException} if object key is null or empty. + * + * @return {@link InputStream} to read the object content. + */ + ObjectContent get(String key, long offset, long limit); + + default byte[] put(String key, byte[] data) { + return put(key, data, 0, data.length); + } + + default byte[] put(String key, byte[] data, int off, int len) { + return put(key, () -> new ByteArrayInputStream(data, off, len), len); + } + + /** + * Put data read from a reader to an object specified by key. The implementation must ensure to + * close the stream created by stream provider after finishing stream operation. + * Throw {@link RuntimeException} if object key is null or empty. + * + * @param key for the object. + * @param streamProvider the binary input stream provider that create input stream to write. + * @param contentLength the content length, if the actual data is bigger than content length, the + * object can be created, but the object data will be truncated to the given + * content length, if the actual data is smaller than content length, will + * create object failed with unexpect end of IOException. + * @return the checksum of uploaded object + */ + byte[] put(String key, InputStreamProvider streamProvider, long contentLength); + + default byte[] append(String key, byte[] data) { + return append(key, data, 0, data.length); + } + + default byte[] append(String key, byte[] data, int off, int len) { + return append(key, () -> new ByteArrayInputStream(data, off, len), len); + } + + /** + * Append data read from a reader to an object specified by key. If the object exists, data will + * be appended to the tail. Otherwise, the object will be created and data will be written to it. + * Content length could be zero if object exists. If the object doesn't exist and content length + * is zero, a {@link NotAppendableException} will be thrown. + * <p> + * The first one wins if there are concurrent appends. + * <p> + * The implementation must ensure to close the stream created by stream provider after finishing + * stream operation. + * Throw {@link RuntimeException} if object key is null or empty. + * + * @param key for the object. + * @param streamProvider the binary input stream provider that create input stream to write. + * @param contentLength the appended content length. If the actual appended data is bigger than + * content length, the object can be appended but the data to append will be + * truncated to the given content length. If the actual data is smaller than + * content length, append object will fail with unexpect end IOException. + * @return the checksum of appended object. + * @throws NotAppendableException if the object already exists and is not appendable, or the + * object doesn't exist and content length is zero. + */ + byte[] append(String key, InputStreamProvider streamProvider, long contentLength); + + /** + * Delete an object. + * No exception thrown if the object key doesn't exist. + * Throw {@link RuntimeException} if object key is null or empty. + * + * @param key the given object key to be deleted. + */ + void delete(String key); + + /** + * Delete multiple keys. If one key doesn't exist, it will be treated as delete succeed, won't be + * included in response list. + * + * @param keys the given object keys to be deleted + * @return the keys delete failed + */ + List<String> batchDelete(List<String> keys); + + /** + * Delete all objects with the given prefix(include the prefix if the corresponding object exists) + * + * @param prefix the prefix key. + */ + void deleteAll(String prefix); + + /** + * Head returns some information about the object or a null if not found. + * Throw {@link RuntimeException} if object key is null or empty. + * There are some differences between directory bucket and general purpose bucket: + * <ul> + * <li>Assume an file object 'a/b' exists, only head("a/b") will get the meta of object 'a/b' + * for both general purpose bucket and directory bucket</li> + * <li>Assume an dir object 'a/b/' exists, regarding general purpose bucket, only head("a/b/") + * will get the meta of object 'a/b/', but for directory bucket, both head("a/b") and + * head("a/b/") will get the meta of object 'a/b/'</li> + * </ul> + * + * @param key for the specified object. + * @return {@link ObjectInfo}, null if the object does not exist. + * @throws InvalidObjectKeyException if the object is locating under an existing file in directory + * bucket, which is not allowed. + */ + ObjectInfo head(String key); + + /** + * List objects according to the given {@link ListObjectsRequest} + * + * @param request {@link ListObjectsRequest} + * @return the iterable of {@link ListObjectsResponse} which contains objects and common prefixes + */ + Iterable<ListObjectsResponse> list(ListObjectsRequest request); + + /** + * List limited objects in a given bucket + * + * @param prefix Limits the response to keys that begin with the specified prefix. + * @param startAfter StartAfter is where you want the object storage to start listing from. + * object storage starts listing after this specified key. + * StartAfter can be any key in the bucket. + * @param limit Limit the maximum number of response objects. + * @return {@link ObjectInfo} the object list with matched prefix key + */ + default Iterable<ObjectInfo> list(String prefix, String startAfter, int limit) { + ListObjectsRequest request = ListObjectsRequest.builder() + .prefix(prefix) + .startAfter(startAfter) + .maxKeys(limit) + .delimiter(EMPTY_DELIMITER) + .build(); + + return new LazyReload<>(() -> { + Iterator<ListObjectsResponse> iterator = list(request).iterator(); + return buf -> { + if (!iterator.hasNext()) { + return true; + } + buf.addAll(iterator.next().objects()); + + return !iterator.hasNext(); + }; + }); + } + + /** + * List all objects in a given bucket + * + * @param prefix Limits the response to keys that begin with the specified prefix. + * @param startAfter StartAfter is where you want the object storage to start listing from. + * object storage starts listing after this specified key. + * StartAfter can be any key in the bucket. + * @return {@link ObjectInfo} Iterable to iterate over the objects with matched prefix key + * and StartAfter + */ + default Iterable<ObjectInfo> listAll(String prefix, String startAfter) { + return list(prefix, startAfter, -1); + } + + /** + * CreateMultipartUpload starts to upload a large object part by part. + * + * @param key for the specified object. + * @return {@link MultipartUpload}. + */ + MultipartUpload createMultipartUpload(String key); + + /** + * UploadPart upload a part of an object. The implementation must ensure to close the stream + * created by stream provider after finishing stream operation. + * + * @param key for the specified object. + * @param uploadId for the multipart upload id. + * @param partNum upload part number. + * @param streamProvider the stream provider to provider part stream + * @param contentLength the content length, if the actual data is bigger than content length, the + * object can be created, but the object data will be truncated to the given + * content length, if the actual data is smaller than content length, will + * create object failed with unexpect end of IOException. + */ + Part uploadPart(String key, String uploadId, int partNum, InputStreamProvider streamProvider, + long contentLength); + + /** + * Complete the multipart uploads with given object key and upload id. + * + * @param key for the specified object. + * @param uploadId id of the multipart upload. + * @param uploadParts parts to upload. + * @return the checksum of uploaded object + */ + byte[] completeUpload(String key, String uploadId, List<Part> uploadParts); + + /** + * Abort a multipart upload. + * + * @param key object key. + * @param uploadId multipart upload Id. + */ + void abortMultipartUpload(String key, String uploadId); + + /** + * List multipart uploads under a path. + * + * @param prefix for uploads to abort. + * @return Iterable to iterate over multipart unloads. + */ + Iterable<MultipartUpload> listUploads(String prefix); + + /** + * upload part copy with mutipart upload id + * + * @param srcKey source object key + * @param dstKey dest object key + * @param uploadId id of the multipart upload copy + * @param partNum part num of the multipart upload copy + * @param copySourceRangeStart copy source range start of source object + * @param copySourceRangeEnd copy source range end of source object + * @return {@link Part}. + */ + Part uploadPartCopy( + String srcKey, String dstKey, String uploadId, int partNum, long copySourceRangeStart, + long copySourceRangeEnd); + + /** + * Copy binary content from one object to another object. + * + * @param srcKey source object key + * @param dstKey dest object key + */ + void copy(String srcKey, String dstKey); + + /** + * Atomic rename source object to dest object without any data copying. + * Will overwrite dest object if dest object exists. + * + * @param srcKey source object key + * @param dstKey dest object key + * @throws RuntimeException if rename failed,e.g. srcKey is equal to dstKey or the source object + * doesn't exist. + */ + void rename(String srcKey, String dstKey); + + /** + * Attach tags to specified object. This method will overwrite all existed tags with the new tags. + * Remove all existed tags if the new tags are empty. The maximum tags number is 10. + * + * @param key the key of the object key. + * @param newTags the new tags to put. + * @throws RuntimeException if key doesn't exist. + */ + default void putTags(String key, Map<String, String> newTags) { + throw new UnsupportedOperationException( + this.getClass().getSimpleName() + " doesn't support putObjectTagging."); + } + + /** + * Get all attached tags of the object. + * + * @param key the key of the object. + * @return map containing all tags. + * @throws RuntimeException if key doesn't exist. + */ + default Map<String, String> getTags(String key) { + throw new UnsupportedOperationException( + this.getClass().getSimpleName() + " doesn't support getObjectTagging."); + } + + /** + * Gets the object status for the given key. + * It's different from {@link ObjectStorage#head(String)}, it returns object info if the key + * exists or the prefix with value key exists. + * <p> + * There are three kinds of implementations: + * <ul> + * <li>Uses the headObject API if the object storage support directory bucket and the requested + * bucket is a directory bucket, the object storage will return object directly if the file or + * dir exists, otherwise return null</li> + * <li>Uses getFileStatus API if the object storage support it, e.g. TOS. The object storage + * will return the object directly if the key or prefix exists, otherwise return null.</li> + * <li>If the object storage doesn't support above all cases, you have to try to headObject(key) + * at first, if the object doesn't exist, and then headObject(key + "/") later if the key + * doesn't end with '/', and if neither the new key doesn't exist, and then use listObjects API + * to check whether the prefix/key exist.</li> + * </ul> + * + * @param key the object + * @return object info if the key or prefix exists, otherwise return null. + * @throws InvalidObjectKeyException if the object is locating under an existing file in directory + * bucket, which is not allowed. + */ + ObjectInfo objectStatus(String key); + + /** + * Get the object storage checksum information, including checksum algorithm name, + * checksum type, etc. + * + * @return checksum information of this storage. + */ + ChecksumInfo checksumInfo(); +} + diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/Part.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/Part.java new file mode 100644 index 00000000000..f393d53d896 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/Part.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.oss; + +import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects; + +import java.util.Objects; + +public class Part { + private int num; + private long size; + private String eTag; + + // No-arg constructor for json serializer, don't use. + public Part() { + } + + public Part(int num, long size, String eTag) { + this.num = num; + this.size = size; + this.eTag = eTag; + } + + public int num() { + return num; + } + + public long size() { + return size; + } + + public String eTag() { + return eTag; + } + + @Override + public int hashCode() { + return Objects.hash(num, size, eTag); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (!(o instanceof Part)) { + return false; + } + Part that = (Part) o; + return Objects.equals(num, that.num) + && Objects.equals(size, that.size) + && Objects.equals(eTag, that.eTag); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("PartNum", num) + .add("PartSize", size) + .add("ETag", eTag) + .toString(); + } +} + diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/request/ListObjectsRequest.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/request/ListObjectsRequest.java new file mode 100644 index 00000000000..3c9b68d0ede --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/request/ListObjectsRequest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.oss.request; + +public class ListObjectsRequest { + private final String prefix; + private final String startAfter; + private final int maxKeys; + private final String delimiter; + + private ListObjectsRequest(String prefix, String startAfter, int maxKeys, String delimiter) { + this.prefix = prefix; + this.startAfter = startAfter; + this.maxKeys = maxKeys; + this.delimiter = delimiter; + } + + public String prefix() { + return prefix; + } + + public String startAfter() { + return startAfter; + } + + public int maxKeys() { + return maxKeys; + } + + public String delimiter() { + return delimiter; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private String prefix; + private String startAfter; + // -1 means list all object keys + private int maxKeys = -1; + private String delimiter; + + public Builder prefix(String prefix) { + this.prefix = prefix; + return this; + } + + public Builder startAfter(String startAfter) { + this.startAfter = startAfter; + return this; + } + + public Builder maxKeys(int maxKeys) { + this.maxKeys = maxKeys; + return this; + } + + public Builder delimiter(String delimiter) { + this.delimiter = delimiter; + return this; + } + + public ListObjectsRequest build() { + return new ListObjectsRequest(prefix, startAfter, maxKeys, delimiter); + } + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/response/ListObjectsResponse.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/response/ListObjectsResponse.java new file mode 100644 index 00000000000..694ccf9c9e4 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/response/ListObjectsResponse.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.oss.response; + +import org.apache.hadoop.fs.tosfs.oss.ObjectInfo; + +import java.util.List; + +public class ListObjectsResponse { + private final List<ObjectInfo> objects; + private final List<String> commonPrefixes; + + public ListObjectsResponse( + List<ObjectInfo> objects, + List<String> commonPrefixes) { + this.objects = objects; + this.commonPrefixes = commonPrefixes; + } + + public List<ObjectInfo> objects() { + return objects; + } + + public List<String> commonPrefixes() { + return commonPrefixes; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org