This is an automated email from the ASF dual-hosted git repository. jinglun pushed a commit to branch HADOOP-19236 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 12ddf2b16a41f89579c12ea8821250273e7ada49 Author: lijinglun <lijing...@bytedance.com> AuthorDate: Mon Oct 21 14:29:27 2024 +0800 Integration of TOS: Add ObjectMultiRangeInputStream and ObjectRangeInputStream. --- .../org/apache/hadoop/fs/tosfs/conf/ConfKeys.java | 6 +- .../tosfs/object/ObjectMultiRangeInputStream.java | 233 +++++++++++ .../fs/tosfs/object/ObjectRangeInputStream.java | 198 +++++++++ .../tosfs/object/tos/DelegationClientBuilder.java | 4 +- .../org/apache/hadoop/fs/tosfs/util/FSUtils.java | 76 ++++ .../fs/tosfs/object/ObjectStorageTestBase.java | 85 ++++ .../object/TestObjectMultiRangeInputStream.java | 447 +++++++++++++++++++++ .../tosfs/object/TestObjectRangeInputStream.java | 142 +++++++ .../object/tos/TestDelegationClientBuilder.java | 16 +- .../fs/tosfs/object/tos/TestTOSRetryPolicy.java | 2 +- .../apache/hadoop/fs/tosfs/util/TestUtility.java | 6 +- 11 files changed, 1197 insertions(+), 18 deletions(-) diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java index 1ef8e83d63d..cf331cec999 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java @@ -20,21 +20,19 @@ package org.apache.hadoop.fs.tosfs.conf; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; -import java.util.List; - public class ConfKeys { /** * Object storage endpoint to connect to, which should include both region and object domain name. * e.g. 'fs.tos.endpoint'='tos-cn-beijing.volces.com'. */ - public static final ArgumentKey FS_TOS_ENDPOINT = new ArgumentKey("fs.%s.endpoint"); + public static final ArgumentKey FS_OBJECT_STORAGE_ENDPOINT = new ArgumentKey("fs.%s.endpoint"); /** * The region of the object storage, e.g. fs.tos.region. Parsing template "fs.%s.endpoint" to * know the region. */ - public static final ArgumentKey FS_TOS_REGION = new ArgumentKey("fs.%s.region"); + public static final ArgumentKey FS_OBJECT_STORAGE_REGION = new ArgumentKey("fs.%s.region"); /** * The object storage implementation for the defined scheme. For example, we can delegate the diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/ObjectMultiRangeInputStream.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/ObjectMultiRangeInputStream.java new file mode 100644 index 00000000000..99ed5bf23a7 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/ObjectMultiRangeInputStream.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.tosfs.util.CommonUtils; +import org.apache.hadoop.fs.tosfs.util.FSUtils; +import org.apache.hadoop.fs.tosfs.util.Range; +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import org.apache.hadoop.thirdparty.com.google.common.primitives.Ints; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; + +public class ObjectMultiRangeInputStream extends FSInputStream { + private final AtomicBoolean closed = new AtomicBoolean(false); + private final ExecutorService threadPool; + private final ObjectStorage storage; + private final String objectKey; + private final long contentLength; + private final long rangeSize; + + private volatile ObjectRangeInputStream stream; + private volatile long nextPos = 0; + private volatile long currPos = 0; + // All range streams should have same checksum. + private final byte[] checksum; + + public ObjectMultiRangeInputStream( + ExecutorService threadPool, + ObjectStorage storage, + Path path, + long contentLength, + long rangeSize, + byte[] checksum) { + this(threadPool, storage, ObjectUtils.pathToKey(path), contentLength, rangeSize, checksum); + } + + public ObjectMultiRangeInputStream( + ExecutorService threadPool, + ObjectStorage storage, + String objectKey, + long contentLength, + long rangeSize, + byte[] checksum) { + this.threadPool = threadPool; + this.storage = storage; + this.objectKey = objectKey; + this.contentLength = contentLength; + this.rangeSize = rangeSize; + this.checksum = checksum; + + Preconditions.checkNotNull(checksum, "Checksum should not be null."); + } + + @Override + public synchronized void seek(long pos) throws IOException { + if (pos < 0) { + throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos); + } + + if (contentLength <= 0) { + return; + } + + nextPos = pos; + } + + @Override + public synchronized long getPos() { + return nextPos; + } + + @Override + public synchronized boolean seekToNewSource(long targetPos) throws IOException { + checkNotClosed(); + return false; + } + + @Override + public synchronized int read() throws IOException { + byte[] buf = new byte[1]; + int n = read(buf, 0, buf.length); + if (n < 0) { + return -1; + } else { + return buf[0] & 0xFF; + } + } + + @Override + public synchronized int read(byte[] buffer, int offset, int length) throws IOException { + checkNotClosed(); + FSUtils.checkReadParameters(buffer, offset, length); + if (length == 0) { + return 0; + } + + int total = 0; + while (total < length) { + if (contentLength == 0 || nextPos >= contentLength) { + return total == 0 ? -1 : total; + } + + seekStream(); + int n = stream.read(buffer, offset, length - total); + if (n < 0) { + return total == 0 ? -1 : total; + } + + total += n; + offset += n; + currPos += n; + nextPos += n; + } + + return total; + } + + @Override + public int read(long position, byte[] buffer, int offset, int length) throws IOException { + checkNotClosed(); + // Check the arguments, according to the HDFS contract. + if (position < 0) { + throw new EOFException("position is negative"); + } + FSUtils.checkReadParameters(buffer, offset, length); + if (length == 0) { + return 0; + } + + if (contentLength == 0 || position >= contentLength) { + return -1; + } + + long remaining = contentLength - position; + int limit = (remaining >= length) ? length : (int) remaining; + + try (InputStream in = storage.get(objectKey, position, limit).verifiedStream(checksum)) { + return in.read(buffer, offset, limit); + } + } + + private void seekStream() throws IOException { + if (stream != null && stream.include(nextPos)) { + // Seek to a random position which is still located in the current range of stream. + if (nextPos != currPos) { + stream.seek(nextPos); + currPos = nextPos; + } + return; + } + + // Seek to a position which is located in another range of new stream. + currPos = nextPos; + openStream(); + } + + private void openStream() throws IOException { + closeStream(true); + + long off = (nextPos / rangeSize) * rangeSize; + Range range = Range.of(off, Math.min(contentLength - off, rangeSize)); + if (nextPos < range.end()) { + stream = new ObjectRangeInputStream(storage, objectKey, range, checksum); + stream.seek(nextPos); + } + } + + private void closeStream(boolean asyncClose) throws IOException { + if (stream != null) { + if (asyncClose) { + final ObjectRangeInputStream streamToClose = stream; + threadPool.submit(() -> CommonUtils.runQuietly(streamToClose::close)); + } else { + stream.close(); + } + stream = null; + } + } + + private void checkNotClosed() throws IOException { + if (closed.get()) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + } + + @Override + public synchronized void close() throws IOException { + super.close(); + if (closed.compareAndSet(false, true)) { + closeStream(false); + } + } + + // for test + public long nextExpectPos() { + return currPos; + } + + @Override + public synchronized int available() throws IOException { + checkNotClosed(); + return Ints.saturatedCast(contentLength - nextPos); + } + + @VisibleForTesting + ObjectRangeInputStream stream() { + return stream; + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/ObjectRangeInputStream.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/ObjectRangeInputStream.java new file mode 100644 index 00000000000..d33d0079e4f --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/ObjectRangeInputStream.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object; + +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.tosfs.util.FSUtils; +import org.apache.hadoop.fs.tosfs.util.Range; +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import org.apache.hadoop.thirdparty.com.google.common.io.ByteStreams; +import org.apache.hadoop.thirdparty.com.google.common.primitives.Ints; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; + +public class ObjectRangeInputStream extends FSInputStream { + private static final Logger LOG = LoggerFactory.getLogger(ObjectRangeInputStream.class); + private static final int MAX_SKIP_SIZE = 1024 * 1024; + + private final ObjectStorage storage; + private final String objectKey; + private final Range range; + private final byte[] checksum; + + private InputStream stream; + private long nextPos; + private long currPos; + private boolean closed = false; + + public ObjectRangeInputStream(ObjectStorage storage, Path path, Range range, byte[] checksum) { + this(storage, ObjectUtils.pathToKey(path), range, checksum); + } + + public ObjectRangeInputStream( + ObjectStorage storage, String objectKey, Range range, byte[] checksum) { + this.storage = storage; + this.objectKey = objectKey; + this.range = range; + this.checksum = checksum; + + this.stream = null; + this.nextPos = range.off(); + this.currPos = nextPos; + + Preconditions.checkNotNull(checksum, "Checksum should not be null."); + } + + @Override + public int read() throws IOException { + byte[] buf = new byte[1]; + int n = read(buf, 0, buf.length); + if (n < 0) { + return -1; + } else { + return buf[0] & 0xFF; + } + } + + @Override + public int read(byte[] buffer, int offset, int length) throws IOException { + checkNotClosed(); + FSUtils.checkReadParameters(buffer, offset, length); + + if (length == 0) { + return 0; + } + + if (!range.include(nextPos)) { + return -1; + } + + seekStream(); + + int toRead = Math.min(length, Ints.saturatedCast(range.end() - nextPos)); + int readLen = stream.read(buffer, offset, toRead); + if (readLen > 0) { + nextPos += readLen; + currPos += readLen; + } + return readLen; + } + + @Override + public void close() throws IOException { + super.close(); + closeStream(); + closed = true; + } + + @Override + public int read(long position, byte[] buffer, int offset, int length) throws IOException { + checkNotClosed(); + + FSUtils.checkReadParameters(buffer, offset, length); + if (!range.include(position)) { + return -1; + } + + int toRead = Math.min(length, Ints.saturatedCast(range.end() - position)); + if (toRead == 0) { + return 0; + } + + try (InputStream in = openStream(position, toRead)) { + return in.read(buffer, offset, toRead); + } + } + + @Override + public void seek(long pos) throws IOException { + checkNotClosed(); + Preconditions.checkArgument(range.include(pos), "Position %s must be in range %s", pos, range); + this.nextPos = pos; + } + + @Override + public long getPos() throws IOException { + checkNotClosed(); + return nextPos; + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + checkNotClosed(); + return false; + } + + private void seekStream() throws IOException { + // sequential read + if (stream != null && nextPos == currPos) { + return; + } + + // random read + if (stream != null && nextPos > currPos) { + long skip = nextPos - currPos; + // It is not worth skipping because the skip size is too big, or it can't read any bytes after skip. + if (skip < MAX_SKIP_SIZE) { + try { + ByteStreams.skipFully(stream, skip); + currPos = nextPos; + return; + } catch (IOException ignored) { + LOG.warn("Failed to skip {} bytes in stream, will try to reopen the stream", skip); + } + } + } + + currPos = nextPos; + + closeStream(); + stream = openStream(nextPos, range.end() - nextPos); + } + + private InputStream openStream(long offset, long limit) throws IOException { + return storage.get(objectKey, offset, limit).verifiedStream(checksum); + } + + private void closeStream() throws IOException { + if (stream != null) { + stream.close(); + } + stream = null; + } + + private void checkNotClosed() throws IOException { + if (closed) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + } + + public boolean include(long pos) { + return range.include(pos); + } + + public Range range() { + return range; + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/DelegationClientBuilder.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/DelegationClientBuilder.java index ebf7f738579..ef4f86865ab 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/DelegationClientBuilder.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/DelegationClientBuilder.java @@ -44,8 +44,8 @@ import static org.apache.hadoop.fs.tosfs.object.tos.TOS.TOS_SCHEME; public class DelegationClientBuilder { public static final int DISABLE_TOS_RETRY_VALUE = -1; - private static final String TOS_ENDPOINT_KEY = ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME); - private static final String TOS_REGION_KEY = ConfKeys.FS_TOS_REGION.key(TOS_SCHEME); + private static final String TOS_ENDPOINT_KEY = ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME); + private static final String TOS_REGION_KEY = ConfKeys.FS_OBJECT_STORAGE_REGION.key(TOS_SCHEME); @VisibleForTesting static final Map<String, DelegationClient> CACHE = new ConcurrentHashMap<>(); diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/FSUtils.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/FSUtils.java new file mode 100644 index 00000000000..5c3400c953c --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/FSUtils.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import org.apache.hadoop.util.ReflectionUtils; + +import java.io.IOException; +import java.net.URI; + +public class FSUtils { + private static final String OVERFLOW_ERROR_HINT = FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER + + ": request length = %s, with offset = %s, buffer capacity = %s"; + + private FSUtils() { + } + + public static void checkReadParameters(byte[] buffer, int offset, int length) { + Preconditions.checkArgument(buffer != null, "Null buffer"); + Preconditions.checkArgument(offset >= 0 && offset <= buffer.length, + "offset: %s is out of range [%s, %s]", offset, 0, buffer.length); + Preconditions.checkArgument(length >= 0, "length: %s is negative", length); + Preconditions.checkArgument(buffer.length >= offset + length, + OVERFLOW_ERROR_HINT, length, offset, (buffer.length - offset)); + } + + public static URI normalizeURI(URI fsUri, Configuration hadoopConfig) { + final String scheme = fsUri.getScheme(); + final String authority = fsUri.getAuthority(); + + if (scheme == null && authority == null) { + fsUri = FileSystem.getDefaultUri(hadoopConfig); + } else if (scheme != null && authority == null) { + URI defaultUri = FileSystem.getDefaultUri(hadoopConfig); + if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) { + fsUri = defaultUri; + } + } + return fsUri; + } + + public static String scheme(Configuration conf, URI uri) { + if (uri.getScheme() == null || uri.getScheme().isEmpty()) { + return FileSystem.getDefaultUri(conf).getScheme(); + } else { + return uri.getScheme(); + } + } + + /** + * The difference with {@link FileSystem#get(URI, Configuration)} is this approach won't cache the filesystem. + */ + public static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException { + Class<? extends FileSystem> clazz = FileSystem.getFileSystemClass(uri.getScheme(), conf); + return ReflectionUtils.newInstance(clazz, conf); + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/ObjectStorageTestBase.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/ObjectStorageTestBase.java new file mode 100644 index 00000000000..a3677a54df1 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/ObjectStorageTestBase.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.tosfs.conf.ConfKeys; +import org.apache.hadoop.fs.tosfs.util.CommonUtils; +import org.apache.hadoop.fs.tosfs.util.TestUtility; +import org.apache.hadoop.fs.tosfs.util.UUIDUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class ObjectStorageTestBase { + private static final Logger LOG = LoggerFactory.getLogger(ObjectStorageTestBase.class); + protected Configuration conf; + protected Configuration protonConf; + protected Path testDir; + protected FileSystem fs; + protected String scheme; + protected ObjectStorage storage; + + @Rule + public TemporaryFolder tempDir = new TemporaryFolder(); + + @Before + public void setUp() throws IOException { + LOG.info("The test temporary folder is {}", tempDir.getRoot().getAbsolutePath()); + + String tempDirPath = tempDir.getRoot().getAbsolutePath(); + conf = new Configuration(); + conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key("filestore"), tempDirPath); + conf.set("fs.filestore.impl", LocalFileSystem.class.getName()); + protonConf = new Configuration(conf); + // Set the environment variable for ObjectTestUtils#assertObject + TestUtility.setSystemEnv(FileStore.ENV_FILE_STORAGE_ROOT, tempDirPath); + + testDir = new Path("filestore://" + FileStore.DEFAULT_BUCKET + "/", UUIDUtils.random()); + fs = testDir.getFileSystem(conf); + scheme = testDir.toUri().getScheme(); + storage = ObjectStorageFactory.create(scheme, testDir.toUri().getAuthority(), protonConf); + } + + @After + public void tearDown() throws IOException { + if (storage != null) { + // List all keys with test dir prefix and delete them. + String prefix = ObjectUtils.pathToKey(testDir); + CommonUtils.runQuietly(() -> storage.deleteAll(prefix)); + // List all multipart uploads and abort them. + CommonUtils.runQuietly(() -> { + for (MultipartUpload upload : storage.listUploads(prefix)) { + LOG.info("Abort the multipart upload {}", upload); + storage.abortMultipartUpload(upload.key(), upload.uploadId()); + } + }); + + storage.close(); + } + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/TestObjectMultiRangeInputStream.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/TestObjectMultiRangeInputStream.java new file mode 100644 index 00000000000..c8980fac017 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/TestObjectMultiRangeInputStream.java @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.tosfs.common.Bytes; +import org.apache.hadoop.fs.tosfs.common.ThreadPools; +import org.apache.hadoop.fs.tosfs.object.exceptions.ChecksumMismatchException; +import org.apache.hadoop.fs.tosfs.util.Range; +import org.apache.hadoop.fs.tosfs.util.TestUtility; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +public class TestObjectMultiRangeInputStream extends ObjectStorageTestBase { + private static ExecutorService threadPool; + + @BeforeClass + public static void beforeClass() { + threadPool = ThreadPools.newWorkerPool("TestObjectInputStream-pool"); + } + + @AfterClass + public static void afterClass() { + if (!threadPool.isShutdown()) { + threadPool.shutdown(); + } + } + + @Test + public void testSequentialAndRandomRead() throws IOException { + Path outPath = new Path(testDir, "testSequentialAndRandomRead.txt"); + String key = ObjectUtils.pathToKey(outPath); + byte[] rawData = TestUtility.rand(5 << 20); + storage.put(key, rawData); + + ObjectContent content = storage.get(key); + assertArrayEquals(rawData, IOUtils.toByteArray(content.stream())); + + try (ObjectMultiRangeInputStream in = new ObjectMultiRangeInputStream(threadPool, storage, + ObjectUtils.pathToKey(outPath), rawData.length, Long.MAX_VALUE, content.checksum())) { + // sequential read + assertEquals(0, in.getPos()); + assertEquals(0, in.nextExpectPos()); + + byte[] b = new byte[1024]; + int readCnt = in.read(b); + assertEquals(readCnt, b.length); + assertArrayEquals(Arrays.copyOfRange(rawData, 0, 1024), b); + assertEquals(1024, in.getPos()); + assertEquals(1024, in.nextExpectPos()); + + readCnt = in.read(b); + assertEquals(readCnt, b.length); + assertArrayEquals(Arrays.copyOfRange(rawData, 1024, 2048), b); + assertEquals(2048, in.getPos()); + assertEquals(2048, in.nextExpectPos()); + + // random read forward + in.seek(4 << 20); + assertEquals(4 << 20, in.getPos()); + assertEquals(2048, in.nextExpectPos()); + + readCnt = in.read(b); + assertEquals(readCnt, b.length); + assertArrayEquals(Arrays.copyOfRange(rawData, 4 << 20, 1024 + (4 << 20)), b); + assertEquals((4 << 20) + 1024, in.getPos()); + assertEquals((4 << 20) + 1024, in.nextExpectPos()); + + // random read back + in.seek(2 << 20); + assertEquals(2 << 20, in.getPos()); + assertEquals((4 << 20) + 1024, in.nextExpectPos()); + + readCnt = in.read(b); + assertEquals(readCnt, b.length); + assertArrayEquals(Arrays.copyOfRange(rawData, 2 << 20, 1024 + (2 << 20)), b); + assertEquals((2 << 20) + 1024, in.getPos()); + assertEquals((2 << 20) + 1024, in.nextExpectPos()); + } + } + + private InputStream getStream(String key) { + return storage.get(key).stream(); + } + + @Test + public void testReadSingleByte() throws IOException { + int len = 10; + Path outPath = new Path(testDir, "testReadSingleByte.txt"); + byte[] data = TestUtility.rand(len); + String key = ObjectUtils.pathToKey(outPath); + byte[] checksum = storage.put(key, data); + + try (InputStream in = new ObjectMultiRangeInputStream(threadPool, storage, key, + data.length, Long.MAX_VALUE, checksum)) { + for (int i = 0; i < data.length; i++) { + assertTrue(in.read() >= 0); + } + assertEquals(-1, in.read()); + } + } + + @Test + public void testReadStreamButTheFileChangedDuringReading() throws IOException { + int len = 2048; + Path outPath = new Path(testDir, "testReadStreamButTheFileChangedDuringReading.txt"); + byte[] data = TestUtility.rand(len); + String key = ObjectUtils.pathToKey(outPath); + byte[] checksum = storage.put(key, data); + + try (InputStream in = new ObjectMultiRangeInputStream(threadPool, storage, key, data.length, 1024, checksum)) { + byte[] read = new byte[1024]; + int n = in.read(read); + Assert.assertEquals(1024, n); + + storage.put(key, TestUtility.rand(1024)); + assertThrows("The file is staled", ChecksumMismatchException.class, () -> in.read(read)); + } + } + + @Test + public void testRead100M() throws IOException { + testSequentialReadData(100 << 20, 6 << 20); + testSequentialReadData(100 << 20, 5 << 20); + } + + @Test + public void testRead10M() throws IOException { + testSequentialReadData(10 << 20, 4 << 20); + testSequentialReadData(10 << 20, 5 << 20); + } + + @Test + public void testParallelRead10M() throws IOException, ExecutionException, InterruptedException { + testParallelRandomRead(10 << 20, 4 << 20); + testParallelRandomRead(10 << 20, 5 << 20); + } + + @Test + public void testRead100b() throws IOException { + testSequentialReadData(100, 40); + testSequentialReadData(100, 50); + testSequentialReadData(100, 100); + testSequentialReadData(100, 101); + } + + private void testSequentialReadData(int dataSize, int partSize) throws IOException { + Path outPath = new Path(testDir, String.format("%d-%d.txt", dataSize, partSize)); + String key = ObjectUtils.pathToKey(outPath); + byte[] rawData = TestUtility.rand(dataSize); + storage.put(key, rawData); + + ObjectContent content = storage.get(key); + assertArrayEquals(rawData, IOUtils.toByteArray(content.stream())); + + int batchSize = (dataSize - 1) / partSize + 1; + try (InputStream in = new ObjectMultiRangeInputStream(threadPool, storage, ObjectUtils.pathToKey(outPath), + rawData.length, Long.MAX_VALUE, content.checksum())) { + for (int i = 0; i < batchSize; i++) { + int start = i * partSize; + int end = Math.min(dataSize, start + partSize); + byte[] expectArr = Arrays.copyOfRange(rawData, start, end); + + byte[] b = new byte[end - start]; + int ret = in.read(b, 0, b.length); + + assertEquals(b.length, ret); + assertArrayEquals(String.format("the read bytes mismatched at batch: %d", i), expectArr, b); + } + assertEquals(-1, in.read()); + } + } + + private void testParallelRandomRead(int dataSize, int partSize) + throws IOException, ExecutionException, InterruptedException { + + Path outPath = new Path(testDir, String.format("%d-%d.txt", dataSize, partSize)); + String key = ObjectUtils.pathToKey(outPath); + byte[] rawData = TestUtility.rand(dataSize); + storage.put(key, rawData); + + ObjectContent content = storage.get(key); + assertArrayEquals(rawData, IOUtils.toByteArray(content.stream())); + + Random random = new Random(); + List<Future<Boolean>> tasks = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + int position = random.nextInt(rawData.length); + tasks.add(threadPool.submit(() -> + testReadDataFromSpecificPosition(rawData, outPath, position, partSize, content.checksum()))); + } + + for (Future<Boolean> task : tasks) { + assertTrue(task.get()); + } + } + + private boolean testReadDataFromSpecificPosition( + final byte[] rawData, + final Path objPath, + final int startPosition, + final int partSize, + byte[] checksum) { + int rawDataSize = rawData.length; + int batchSize = (rawDataSize - startPosition - 1) / partSize + 1; + try (ObjectMultiRangeInputStream in = new ObjectMultiRangeInputStream(threadPool, storage, + ObjectUtils.pathToKey(objPath), rawDataSize, Long.MAX_VALUE, checksum)) { + in.seek(startPosition); + + for (int i = 0; i < batchSize; i++) { + int start = startPosition + i * partSize; + int end = Math.min(rawDataSize, start + partSize); + byte[] expectArr = Arrays.copyOfRange(rawData, start, end); + + byte[] b = new byte[end - start]; + int ret = in.read(b, 0, b.length); + + assertEquals(b.length, ret); + assertArrayEquals(String.format("the read bytes mismatched at batch: %d", i), expectArr, b); + } + assertEquals(-1, in.read()); + return true; + } catch (IOException e) { + return false; + } + } + + @Test + public void testParallelReadFromOneInputStream() throws IOException, ExecutionException, InterruptedException { + testParallelReadFromOneInputStreamImpl(10 << 20, 512, 10); + testParallelReadFromOneInputStreamImpl(10 << 20, 64, 100); + testParallelReadFromOneInputStreamImpl(1 << 20, 2 << 20, 5); + } + + public void testParallelReadFromOneInputStreamImpl(int dataSize, int batchSize, int parallel) + throws IOException, ExecutionException, InterruptedException { + + Path outPath = new Path(testDir, + String.format("%d-%d-testParallelReadFromOneInputStreamImpl.txt", dataSize, batchSize)); + String key = ObjectUtils.pathToKey(outPath); + byte[] rawData = TestUtility.rand(dataSize); + storage.put(key, rawData); + ObjectContent content = storage.get(key); + assertArrayEquals(rawData, IOUtils.toByteArray(content.stream())); + + AtomicInteger sum = new AtomicInteger(0); + CopyOnWriteArrayList<byte[]> readBytes = new CopyOnWriteArrayList(); + List<Future<?>> futures = new ArrayList<>(); + try (ObjectMultiRangeInputStream inputStream = new ObjectMultiRangeInputStream(threadPool, + storage, ObjectUtils.pathToKey(outPath), rawData.length, Long.MAX_VALUE, content.checksum())) { + for (int i = 0; i < parallel; i++) { + futures.add(threadPool.submit(() -> { + byte[] data = new byte[batchSize]; + try { + int count; + while ((count = inputStream.read(data)) != -1) { + sum.getAndAdd(count); + readBytes.add(Arrays.copyOfRange(data, 0, count)); + data = new byte[batchSize]; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + } + + for (Future<?> future : futures) { + future.get(); + + } + assertEquals(rawData.length, sum.get()); + } + + byte[] actualBytes = new byte[rawData.length]; + int offset = 0; + for (byte[] bytes : readBytes) { + System.arraycopy(bytes, 0, actualBytes, offset, bytes.length); + offset += bytes.length; + } + + Arrays.sort(actualBytes); + Arrays.sort(rawData); + assertArrayEquals(rawData, actualBytes); + } + + @Test + public void testPositionalRead() throws IOException { + Path outPath = new Path(testDir, "testPositionalRead.txt"); + String key = ObjectUtils.pathToKey(outPath); + int fileSize = 5 << 20; + byte[] rawData = TestUtility.rand(fileSize); + storage.put(key, rawData); + ObjectContent content = storage.get(key); + assertArrayEquals(rawData, IOUtils.toByteArray(content.stream())); + + Random rand = ThreadLocalRandom.current(); + + try (ObjectMultiRangeInputStream in = new ObjectMultiRangeInputStream(threadPool, storage, + ObjectUtils.pathToKey(outPath), fileSize, Long.MAX_VALUE, content.checksum())) { + for (int i = 0; i < 100; i++) { + int pos = rand.nextInt(fileSize); + int len = rand.nextInt(fileSize); + + int expectSize = Math.min(fileSize - pos, len); + byte[] actual = new byte[expectSize]; + int actualLen = in.read(pos, actual, 0, expectSize); + + assertEquals(expectSize, actualLen); + assertArrayEquals(Bytes.toBytes(rawData, pos, expectSize), actual); + } + } + } + + @Test + public void testReadAcrossRange() throws IOException { + Path outPath = new Path(testDir, "testReadAcrossRange.txt"); + String key = ObjectUtils.pathToKey(outPath); + int fileSize = 1 << 10; + byte[] rawData = TestUtility.rand(fileSize); + storage.put(key, rawData); + ObjectContent content = storage.get(key); + assertArrayEquals(rawData, IOUtils.toByteArray(content.stream())); + + try (ObjectMultiRangeInputStream in = new ObjectMultiRangeInputStream(ThreadPools.defaultWorkerPool(), + storage, key, fileSize, 10, content.checksum())) { + byte[] data = new byte[fileSize / 2]; + for (int i = 0; i < 2; i++) { + assertEquals(data.length, in.read(data)); + assertEquals((i + 1) * data.length, in.getPos()); + assertArrayEquals(Bytes.toBytes(rawData, i * data.length, data.length), data); + } + } + } + + @Test + public void testStorageRange() throws IOException { + Path outPath = new Path(testDir, "testStorageRange.txt"); + String key = ObjectUtils.pathToKey(outPath); + int fileSize = 5 << 20; + byte[] rawData = TestUtility.rand(fileSize); + storage.put(key, rawData); + ObjectContent content = storage.get(key); + assertArrayEquals(rawData, IOUtils.toByteArray(content.stream())); + + int oneMB = 1 << 20; + long rangeOpenLen = oneMB; + try (ObjectMultiRangeInputStream in = new ObjectMultiRangeInputStream(ThreadPools.defaultWorkerPool(), + storage, key, fileSize, rangeOpenLen, content.checksum())) { + assertNull(in.stream()); + + // Init range. + in.read(); + assertEquals(Range.of(0, rangeOpenLen), in.stream().range()); + // Range doesn't change. + in.read(new byte[(int) (rangeOpenLen - 1)], 0, (int) (rangeOpenLen - 1)); + assertEquals(Range.of(0, rangeOpenLen), in.stream().range()); + + // Move to next range. + in.read(); + assertEquals(Range.of(rangeOpenLen, rangeOpenLen), in.stream().range()); + + // Seek and move. + in.seek(rangeOpenLen * 3 + 10); + in.read(); + assertEquals(Range.of(rangeOpenLen * 3, rangeOpenLen), in.stream().range()); + + // Seek small and range doesn't change. + in.seek(in.getPos() + 1); + in.read(); + assertEquals(Range.of(rangeOpenLen * 3, rangeOpenLen), in.stream().range()); + + // Seek big and range changes. + in.seek(rangeOpenLen * 2); + in.read(new byte[(int) (rangeOpenLen - 10)], 0, (int) (rangeOpenLen - 10)); + assertEquals(Range.of(rangeOpenLen * 2, rangeOpenLen), in.stream().range()); + // Old range has 10 bytes left. Seek 10 bytes then read 10 bytes. Old range can't read any bytes, so + // range changes. + assertEquals(rangeOpenLen * 3 - 10, in.getPos()); + in.seek(rangeOpenLen * 3); + in.read(new byte[10], 0, 10); + assertEquals(Range.of(rangeOpenLen * 3, rangeOpenLen), in.stream().range()); + + // Read big buffer. + in.seek(10); + in.read(new byte[oneMB * 3], 0, oneMB * 3); + assertEquals(oneMB * 3 + 10, in.getPos()); + assertEquals(Range.of(3 * rangeOpenLen, rangeOpenLen), in.stream().range()); + } + + try (ObjectMultiRangeInputStream in = new ObjectMultiRangeInputStream(threadPool, storage, + ObjectUtils.pathToKey(outPath), fileSize, Long.MAX_VALUE, content.checksum())) { + assertNull(in.stream()); + + // Init range. + in.read(); + assertEquals(Range.of(0, fileSize), in.stream().range()); + + // Range doesn't change. + in.read(new byte[oneMB], 0, oneMB); + assertEquals(Range.of(0, fileSize), in.stream().range()); + + // Seek and move. + long pos = oneMB * 3 + 10; + in.seek(pos); + in.read(); + assertEquals(Range.of(0, fileSize), in.stream().range()); + } + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/TestObjectRangeInputStream.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/TestObjectRangeInputStream.java new file mode 100644 index 00000000000..63a81bbd2da --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/TestObjectRangeInputStream.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.tosfs.util.Range; +import org.apache.hadoop.fs.tosfs.util.TestUtility; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +public class TestObjectRangeInputStream extends ObjectStorageTestBase { + + @Test + public void testRead() throws IOException { + Path outPath = new Path(testDir, "testRead.txt"); + String key = ObjectUtils.pathToKey(outPath); + byte[] rawData = TestUtility.rand(1 << 10); + storage.put(key, rawData); + ObjectContent content = storage.get(key); + assertArrayEquals(rawData, IOUtils.toByteArray(content.stream())); + + int position = 100; + int len = 200; + try (ObjectRangeInputStream ri = + new ObjectRangeInputStream(storage, key, Range.of(position, len), content.checksum())) { + // Test read byte. + assertEquals(rawData[position] & 0xff, ri.read()); + + // Test read buffer. + byte[] buffer = new byte[len]; + assertEquals(buffer.length - 1, ri.read(buffer, 0, buffer.length)); + assertArrayEquals( + Arrays.copyOfRange(rawData, position + 1, position + len), + Arrays.copyOfRange(buffer, 0, buffer.length - 1)); + assertEquals(0, ri.available()); + + assertEquals(-1, ri.read()); + assertEquals(-1, ri.read(buffer, 0, buffer.length)); + } + } + + @Test + public void testRangeExceedInnerStream() throws IOException { + Path outPath = new Path(testDir, "testRangeExceedInnerStream.txt"); + String key = ObjectUtils.pathToKey(outPath); + byte[] rawData = TestUtility.rand(10); + storage.put(key, rawData); + ObjectContent content = storage.get(key); + assertArrayEquals(rawData, IOUtils.toByteArray(content.stream())); + + int position = 10; + int badLen = 10; + try (ObjectRangeInputStream ri = + new ObjectRangeInputStream(storage, key, Range.of(position, badLen), content.checksum())) { + byte[] buffer = new byte[1]; + assertEquals(-1, ri.read()); + assertEquals(-1, ri.read(buffer, 0, buffer.length)); + } + } + + @Test + public void testRangeInclude() throws IOException { + Path outPath = new Path(testDir, "testRangeInclude.txt"); + String key = ObjectUtils.pathToKey(outPath); + byte[] rawData = TestUtility.rand(10); + storage.put(key, rawData); + ObjectContent content = storage.get(key); + assertArrayEquals(rawData, IOUtils.toByteArray(content.stream())); + + long pos = 100; + long len = 300; + + try (ObjectRangeInputStream in = new ObjectRangeInputStream(storage, key, Range.of(pos, len), content.checksum())) { + assertEquals(Range.of(pos, len), in.range()); + + assertTrue(in.include(pos)); + assertTrue(in.include((pos + len) / 2)); + assertTrue(in.include(pos + len - 1)); + + assertFalse(in.include(pos - 1)); + assertFalse(in.include(pos + len)); + } + } + + @Test + public void testSeek() throws IOException { + Path outPath = new Path(testDir, "testSeek.txt"); + String key = ObjectUtils.pathToKey(outPath); + byte[] rawData = TestUtility.rand(1 << 10); + storage.put(key, rawData); + ObjectContent content = storage.get(key); + assertArrayEquals(rawData, IOUtils.toByteArray(content.stream())); + + long pos = 100; + long len = 300; + + try (ObjectRangeInputStream in = new ObjectRangeInputStream(storage, key, Range.of(pos, len), content.checksum())) { + assertEquals(pos, in.getPos()); + + Exception error = assertThrows("Overflow", IllegalArgumentException.class, () -> in.seek(-1)); + assertTrue(error.getMessage().contains("must be in range Range{offset=100, length=300}")); + error = assertThrows("Overflow", IllegalArgumentException.class, () -> in.seek(99)); + assertTrue(error.getMessage().contains("must be in range Range{offset=100, length=300}")); + error = assertThrows("Overflow", IllegalArgumentException.class, () -> in.seek(401)); + assertTrue(error.getMessage().contains("must be in range Range{offset=100, length=300}")); + error = assertThrows("Overflow", IllegalArgumentException.class, () -> in.seek(1 << 20)); + assertTrue(error.getMessage().contains("must be in range Range{offset=100, length=300}")); + + in.seek(399); + assertTrue(0 <= in.read()); + assertEquals(-1, in.read()); + + in.seek(100); + assertTrue(in.read() >= 0); + } + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestDelegationClientBuilder.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestDelegationClientBuilder.java index e56dabd3751..a5e8c115592 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestDelegationClientBuilder.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestDelegationClientBuilder.java @@ -106,7 +106,7 @@ public class TestDelegationClientBuilder { @Test public void testHeadApiRetry() throws IOException { Configuration conf = new Configuration(); - conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com"); + conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com"); conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME); conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, false); conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key("test"), "ACCESS_KEY"); @@ -163,7 +163,7 @@ public class TestDelegationClientBuilder { public void testEnableCrcCheck() throws IOException { String bucket = name.getMethodName(); Configuration conf = new Configuration(); - conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com"); + conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com"); conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME); conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, true); conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key("test"), "ACCESS_KEY"); @@ -183,7 +183,7 @@ public class TestDelegationClientBuilder { public void testClientCache() throws IOException { String bucket = name.getMethodName(); Configuration conf = new Configuration(); - conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com"); + conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com"); conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME); conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, false); conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(bucket), "ACCESS_KEY"); @@ -217,7 +217,7 @@ public class TestDelegationClientBuilder { @Test public void testOverwriteHttpConfig() throws IOException { Configuration conf = new Configuration(); - conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com"); + conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com"); conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME); conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key("test"), "ACCESS_KEY"); conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key("test"), "SECRET_KEY"); @@ -243,7 +243,7 @@ public class TestDelegationClientBuilder { @Test public void testDynamicRefreshAkSk() throws IOException { Configuration conf = new Configuration(); - conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com"); + conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com"); conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME); conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(TestUtility.bucket()), ENV_ACCESS_KEY); conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key(TestUtility.bucket()), ENV_SECRET_KEY); @@ -281,7 +281,7 @@ public class TestDelegationClientBuilder { @Test public void testCreateClientWithEnvironmentCredentials() throws IOException { Configuration conf = new Configuration(); - conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com"); + conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com"); conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, EnvironmentCredentialsProvider.NAME); DelegationClient tosV2 = @@ -300,7 +300,7 @@ public class TestDelegationClientBuilder { @Test public void testCreateClientWithSimpleCredentials() throws IOException { Configuration conf = new Configuration(); - conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), ENV_ENDPOINT); + conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME), ENV_ENDPOINT); conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME); conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(TestUtility.bucket()), ENV_ACCESS_KEY); conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key(TestUtility.bucket()), ENV_SECRET_KEY); @@ -331,7 +331,7 @@ public class TestDelegationClientBuilder { Function<String, Configuration> commonConf = bucket -> { Configuration conf = new Configuration(); - conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), ENV_ENDPOINT); + conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME), ENV_ENDPOINT); conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME); conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(bucket), ENV_ACCESS_KEY); conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key(bucket), ENV_SECRET_KEY); diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestTOSRetryPolicy.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestTOSRetryPolicy.java index 59b12dcf791..03fb32f940a 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestTOSRetryPolicy.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestTOSRetryPolicy.java @@ -76,7 +76,7 @@ public class TestTOSRetryPolicy { private DelegationClient createRetryableDelegationClient() { Configuration conf = new Configuration(); - conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://tos-cn-beijing.ivolces.com"); + conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME), "https://tos-cn-beijing.ivolces.com"); conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME); conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, true); conf.set(TosKeys.FS_TOS_ACCESS_KEY_ID, "ACCESS_KEY"); diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/util/TestUtility.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/util/TestUtility.java index a944843a755..ad2e7ddc813 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/util/TestUtility.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/util/TestUtility.java @@ -145,7 +145,7 @@ public class TestUtility { Configuration conf = new Configuration(); String endpoint = ParseUtils.envAsString(TOS.ENV_TOS_ENDPOINT, ""); if (!StringUtils.isEmpty(endpoint)) { - conf.set(ConfKeys.FS_TOS_ENDPOINT.key(scheme()), endpoint); + conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(scheme()), endpoint); } return ObjectStorageFactory.createWithPrefix( @@ -159,7 +159,7 @@ public class TestUtility { } else { String endpoint = ParseUtils.envAsString(ENV_DIRECTORY_BUCKET_ENDPOINT, ""); if (!StringUtils.isEmpty(endpoint)) { - conf.set(ConfKeys.FS_TOS_ENDPOINT.key(scheme()), endpoint); + conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(scheme()), endpoint); } return ObjectStorageFactory.createWithPrefix( String.format("%s-%s/", scheme(), UUIDUtils.random()), scheme(), bucket, conf); @@ -171,7 +171,7 @@ public class TestUtility { // 1. FileStore Configuration fileStoreConf = new Configuration(); - fileStoreConf.set(ConfKeys.FS_TOS_ENDPOINT.key("filestore"), fileStoreRoot); + fileStoreConf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key("filestore"), fileStoreRoot); storages.add(ObjectStorageFactory.create("filestore", TestUtility.bucket(), fileStoreConf)); // 2. General Bucket --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org