This is an automated email from the ASF dual-hosted git repository. jinglun pushed a commit to branch HADOOP-19236-original in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 42c88327e16dd2b4623f799e8d48bc80175bfc52 Author: lijinglun <lijing...@bytedance.com> AuthorDate: Tue Sep 10 18:01:59 2024 +0800 Integration of TOS: TOS test cases. 1. Add test cases TestChainTOSInputStream, TestDelegationClientBuilder, TestTOSInputStream, TestTOSObjectStorage, TestTOSRetryPolicy. 2. Add common utils: ThreadPools, Tasks. --- hadoop-cloud-storage-project/hadoop-tos/pom.xml | 5 + .../org/apache/hadoop/fs/tosfs/TosFileSystem.java | 22 + .../org/apache/hadoop/fs/tosfs/common/Bytes.java | 113 +++- .../org/apache/hadoop/fs/tosfs/common/Tasks.java | 590 +++++++++++++++++++++ .../apache/hadoop/fs/tosfs/common/ThreadPools.java | 142 +++++ .../tosfs/object/tos/TestChainTOSInputStream.java | 241 +++++++++ .../object/tos/TestDelegationClientBuilder.java | 450 ++++++++++++++++ .../fs/tosfs/object/tos/TestTOSInputStream.java | 150 ++++++ .../fs/tosfs/object/tos/TestTOSObjectStorage.java | 290 ++++++++++ .../fs/tosfs/object/tos/TestTOSRetryPolicy.java | 201 +++++++ 10 files changed, 2201 insertions(+), 3 deletions(-) diff --git a/hadoop-cloud-storage-project/hadoop-tos/pom.xml b/hadoop-cloud-storage-project/hadoop-tos/pom.xml index 63568057c55..6d6bb49d1a4 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/pom.xml +++ b/hadoop-cloud-storage-project/hadoop-tos/pom.xml @@ -66,6 +66,11 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/TosFileSystem.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/TosFileSystem.java new file mode 100644 index 00000000000..47c9096096a --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/TosFileSystem.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs; + +public class TosFileSystem { +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/Bytes.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/Bytes.java index 5b8d8108640..c43bbf50359 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/Bytes.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/Bytes.java @@ -16,7 +16,10 @@ package org.apache.hadoop.fs.tosfs.common; -// TODO: Remove this class? +import org.apache.hadoop.util.Preconditions; + +import java.nio.ByteBuffer; + public class Bytes { private Bytes() { } @@ -26,11 +29,11 @@ public class Bytes { // Encode basic Java types into big-endian binaries. public static byte[] toBytes(boolean b) { - return new byte[]{b ? (byte) -1 : (byte) 0}; + return new byte[] { b ? (byte) -1 : (byte) 0 }; } public static byte[] toBytes(byte b) { - return new byte[]{b}; + return new byte[] { b }; } public static byte[] toBytes(short val) { @@ -59,4 +62,108 @@ public class Bytes { } return b; } + + // Decode big-endian binaries into basic Java types. + + public static boolean toBoolean(byte[] b) { + return toBoolean(b, 0, 1); + } + + public static boolean toBoolean(byte[] b, int off) { + return toBoolean(b, off, 1); + } + + public static boolean toBoolean(byte[] b, int off, int len) { + Preconditions.checkArgument(len == 1, "Invalid len: %s", len); + return b[off] != (byte) 0; + } + + public static byte toByte(byte[] b) { + return b[0]; + } + + public static byte toByte(byte[] b, int off) { + return b[off]; + } + + public static short toShort(byte[] b) { + return toShort(b, 0, 2); + } + + public static short toShort(byte[] b, int off) { + return toShort(b, off, 2); + } + + public static short toShort(byte[] b, int off, int len) { + Preconditions.checkArgument(len == 2, "Invalid len: %s", len); + Preconditions.checkArgument(off >= 0 && off + len <= b.length, + "Invalid off: %s, len: %s, array size: %s", off, len, b.length); + short n = 0; + n = (short) ((n ^ b[off]) & 0xFF); + n = (short) (n << 8); + n ^= (short) (b[off + 1] & 0xFF); + return n; + } + + public static int toInt(byte[] b) { + return toInt(b, 0, 4); + } + + public static int toInt(byte[] b, int off) { + return toInt(b, off, 4); + } + + public static int toInt(byte[] b, int off, int len) { + Preconditions.checkArgument(len == 4, "Invalid len: %s", len); + Preconditions.checkArgument(off >= 0 && off + len <= b.length, + "Invalid off: %s, len: %s, array size: %s", off, len, b.length); + int n = 0; + for (int i = off; i < (off + len); i++) { + n <<= 8; + n ^= b[i] & 0xFF; + } + return n; + } + + public static int toInt(ByteBuffer b) { + Preconditions.checkArgument(4 <= b.remaining(), + "Invalid ByteBuffer which remaining must be >= 4, but is: %s", b.remaining()); + int n = 0; + int off = b.position(); + for (int i = off; i < (off + 4); i++) { + n <<= 8; + n ^= b.get(i) & 0xFF; + } + return n; + } + + public static long toLong(byte[] b) { + return toLong(b, 0, 8); + } + + public static long toLong(byte[] b, int off) { + return toLong(b, off, 8); + } + + public static long toLong(byte[] b, int off, int len) { + Preconditions.checkArgument(len == 8, "Invalid len: %s", len); + Preconditions.checkArgument(off >= 0 && off + len <= b.length, + "Invalid off: %s, len: %s, array size: %s", off, len, b.length); + long l = 0; + for (int i = off; i < off + len; i++) { + l <<= 8; + l ^= b[i] & 0xFF; + } + return l; + } + + public static byte[] toBytes(byte[] b, int off, int len) { + Preconditions.checkArgument(off >= 0, "off %s must be >=0", off); + Preconditions.checkArgument(len >= 0, "len %s must be >= 0", len); + Preconditions.checkArgument(off + len <= b.length, "off (%s) + len (%s) must be <= %s", off, + len, b.length); + byte[] data = new byte[len]; + System.arraycopy(b, off, data, 0, len); + return data; + } } diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/Tasks.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/Tasks.java new file mode 100644 index 00000000000..563d7813cd3 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/Tasks.java @@ -0,0 +1,590 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.common; + +import org.apache.hadoop.util.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; +import java.util.stream.Stream; + +/** + * Copied from Apache Iceberg, please see: + * <a href="https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/util/Tasks.java"> + * Tasks</a> + */ +public class Tasks { + + private static final Logger LOG = LoggerFactory.getLogger(Tasks.class); + + private Tasks() { + } + + public static class UnrecoverableException extends RuntimeException { + public UnrecoverableException(String message) { + super(message); + } + + public UnrecoverableException(String message, Throwable cause) { + super(message, cause); + } + + public UnrecoverableException(Throwable cause) { + super(cause); + } + } + + public interface FailureTask<I, E extends Exception> { + void run(I item, Exception exception) throws E; + } + + public interface Task<I, E extends Exception> { + void run(I item) throws E; + } + + public static class Builder<I> { + private final Iterable<I> items; + private ExecutorService service = null; + private FailureTask<I, ?> onFailure = null; + private boolean stopOnFailure = false; + private boolean throwFailureWhenFinished = true; + private Task<I, ?> revertTask = null; + private boolean stopRevertsOnFailure = false; + private Task<I, ?> abortTask = null; + private boolean stopAbortsOnFailure = false; + + // retry settings + private List<Class<? extends Exception>> stopRetryExceptions = + Lists.newArrayList(UnrecoverableException.class); + private List<Class<? extends Exception>> onlyRetryExceptions = null; + private Predicate<Exception> shouldRetryPredicate = null; + private int maxAttempts = 1; // not all operations can be retried + private long minSleepTimeMs = 1000; // 1 second + private long maxSleepTimeMs = 600000; // 10 minutes + private long maxDurationMs = 600000; // 10 minutes + private double scaleFactor = 2.0; // exponential + + public Builder(Iterable<I> items) { + this.items = items; + } + + public Builder<I> executeWith(ExecutorService svc) { + this.service = svc; + return this; + } + + public Builder<I> onFailure(FailureTask<I, ?> task) { + this.onFailure = task; + return this; + } + + public Builder<I> stopOnFailure() { + this.stopOnFailure = true; + return this; + } + + public Builder<I> throwFailureWhenFinished() { + this.throwFailureWhenFinished = true; + return this; + } + + public Builder<I> throwFailureWhenFinished(boolean throwWhenFinished) { + this.throwFailureWhenFinished = throwWhenFinished; + return this; + } + + public Builder<I> suppressFailureWhenFinished() { + this.throwFailureWhenFinished = false; + return this; + } + + public Builder<I> revertWith(Task<I, ?> task) { + this.revertTask = task; + return this; + } + + public Builder<I> stopRevertsOnFailure() { + this.stopRevertsOnFailure = true; + return this; + } + + public Builder<I> abortWith(Task<I, ?> task) { + this.abortTask = task; + return this; + } + + public Builder<I> stopAbortsOnFailure() { + this.stopAbortsOnFailure = true; + return this; + } + + @SafeVarargs public final Builder<I> stopRetryOn(Class<? extends Exception>... exceptions) { + stopRetryExceptions.addAll(Arrays.asList(exceptions)); + return this; + } + + public Builder<I> shouldRetryTest(Predicate<Exception> shouldRetry) { + this.shouldRetryPredicate = shouldRetry; + return this; + } + + public Builder<I> noRetry() { + this.maxAttempts = 1; + return this; + } + + public Builder<I> retry(int nTimes) { + this.maxAttempts = nTimes + 1; + return this; + } + + public Builder<I> onlyRetryOn(Class<? extends Exception> exception) { + this.onlyRetryExceptions = Collections.singletonList(exception); + return this; + } + + @SafeVarargs public final Builder<I> onlyRetryOn(Class<? extends Exception>... exceptions) { + this.onlyRetryExceptions = Lists.newArrayList(exceptions); + return this; + } + + public Builder<I> exponentialBackoff(long backoffMinSleepTimeMs, long backoffMaxSleepTimeMs, + long backoffMaxRetryTimeMs, double backoffScaleFactor) { + this.minSleepTimeMs = backoffMinSleepTimeMs; + this.maxSleepTimeMs = backoffMaxSleepTimeMs; + this.maxDurationMs = backoffMaxRetryTimeMs; + this.scaleFactor = backoffScaleFactor; + return this; + } + + public boolean run(Task<I, RuntimeException> task) { + return run(task, RuntimeException.class); + } + + public <E extends Exception> boolean run(Task<I, E> task, Class<E> exceptionClass) throws E { + if (service != null) { + return runParallel(task, exceptionClass); + } else { + return runSingleThreaded(task, exceptionClass); + } + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") private <E extends Exception> boolean runSingleThreaded( + Task<I, E> task, Class<E> exceptionClass) throws E { + List<I> succeeded = Lists.newArrayList(); + List<Throwable> exceptions = Lists.newArrayList(); + + Iterator<I> iterator = items.iterator(); + boolean threw = true; + try { + while (iterator.hasNext()) { + I item = iterator.next(); + try { + runTaskWithRetry(task, item); + succeeded.add(item); + } catch (Exception e) { + exceptions.add(e); + + if (onFailure != null) { + tryRunOnFailure(item, e); + } + + if (stopOnFailure) { + break; + } + } + } + + threw = false; + + } finally { + // threw handles exceptions that were *not* caught by the catch block, + // and exceptions that were caught and possibly handled by onFailure + // are kept in exceptions. + if (threw || !exceptions.isEmpty()) { + if (revertTask != null) { + boolean failed = false; + for (I item : succeeded) { + try { + revertTask.run(item); + } catch (Exception e) { + failed = true; + LOG.error("Failed to revert task", e); + // keep going + } + if (stopRevertsOnFailure && failed) { + break; + } + } + } + + if (abortTask != null) { + boolean failed = false; + while (iterator.hasNext()) { + try { + abortTask.run(iterator.next()); + } catch (Exception e) { + failed = true; + LOG.error("Failed to abort task", e); + // keep going + } + if (stopAbortsOnFailure && failed) { + break; + } + } + } + } + } + + if (throwFailureWhenFinished && !exceptions.isEmpty()) { + Tasks.throwOne(exceptions, exceptionClass); + } else if (throwFailureWhenFinished && threw) { + throw new RuntimeException("Task set failed with an uncaught throwable"); + } + + return !threw; + } + + private void tryRunOnFailure(I item, Exception failure) { + try { + onFailure.run(item, failure); + } catch (Exception failException) { + failure.addSuppressed(failException); + LOG.error("Failed to clean up on failure", failException); + // keep going + } + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") private <E extends Exception> boolean runParallel( + final Task<I, E> task, Class<E> exceptionClass) throws E { + final Queue<I> succeeded = new ConcurrentLinkedQueue<>(); + final Queue<Throwable> exceptions = new ConcurrentLinkedQueue<>(); + final AtomicBoolean taskFailed = new AtomicBoolean(false); + final AtomicBoolean abortFailed = new AtomicBoolean(false); + final AtomicBoolean revertFailed = new AtomicBoolean(false); + + List<Future<?>> futures = Lists.newArrayList(); + + for (final I item : items) { + // submit a task for each item that will either run or abort the task + futures.add(service.submit(() -> { + if (!(stopOnFailure && taskFailed.get())) { + // run the task with retries + boolean threw = true; + try { + runTaskWithRetry(task, item); + + succeeded.add(item); + + threw = false; + + } catch (Exception e) { + taskFailed.set(true); + exceptions.add(e); + + if (onFailure != null) { + tryRunOnFailure(item, e); + } + } finally { + if (threw) { + taskFailed.set(true); + } + } + + } else if (abortTask != null) { + // abort the task instead of running it + if (stopAbortsOnFailure && abortFailed.get()) { + return; + } + + boolean failed = true; + try { + abortTask.run(item); + failed = false; + } catch (Exception e) { + LOG.error("Failed to abort task", e); + // swallow the exception + } finally { + if (failed) { + abortFailed.set(true); + } + } + } + })); + } + + // let the above tasks complete (or abort) + exceptions.addAll(waitFor(futures)); + futures.clear(); + + if (taskFailed.get() && revertTask != null) { + // at least one task failed, revert any that succeeded + for (final I item : succeeded) { + futures.add(service.submit(() -> { + if (stopRevertsOnFailure && revertFailed.get()) { + return; + } + + boolean failed = true; + try { + revertTask.run(item); + failed = false; + } catch (Exception e) { + LOG.error("Failed to revert task", e); + // swallow the exception + } finally { + if (failed) { + revertFailed.set(true); + } + } + })); + } + + // let the revert tasks complete + exceptions.addAll(waitFor(futures)); + } + + if (throwFailureWhenFinished && !exceptions.isEmpty()) { + Tasks.throwOne(exceptions, exceptionClass); + } else if (throwFailureWhenFinished && taskFailed.get()) { + throw new RuntimeException("Task set failed with an uncaught throwable"); + } + + return !taskFailed.get(); + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") private <E extends Exception> void runTaskWithRetry( + Task<I, E> task, I item) throws E { + long start = System.currentTimeMillis(); + int attempt = 0; + while (true) { + attempt += 1; + try { + task.run(item); + break; + + } catch (Exception e) { + long durationMs = System.currentTimeMillis() - start; + if (attempt >= maxAttempts || (durationMs > maxDurationMs && attempt > 1)) { + if (durationMs > maxDurationMs) { + LOG.info("Stopping retries after {} ms", durationMs); + } + throw e; + } + + if (shouldRetryPredicate != null) { + if (!shouldRetryPredicate.test(e)) { + throw e; + } + + } else if (onlyRetryExceptions != null) { + // if onlyRetryExceptions are present, then this retries if one is found + boolean matchedRetryException = false; + for (Class<? extends Exception> exClass : onlyRetryExceptions) { + if (exClass.isInstance(e)) { + matchedRetryException = true; + break; + } + } + if (!matchedRetryException) { + throw e; + } + + } else { + // otherwise, always retry unless one of the stop exceptions is found + for (Class<? extends Exception> exClass : stopRetryExceptions) { + if (exClass.isInstance(e)) { + throw e; + } + } + } + + int delayMs = + (int) Math.min(minSleepTimeMs * Math.pow(scaleFactor, attempt - 1), maxSleepTimeMs); + int jitter = ThreadLocalRandom.current().nextInt(Math.max(1, (int) (delayMs * 0.1))); + + LOG.warn("Retrying task after failure: {}", e.getMessage(), e); + + try { + TimeUnit.MILLISECONDS.sleep(delayMs + jitter); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ie); + } + } + } + } + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") private static Collection<Throwable> waitFor( + Collection<Future<?>> futures) { + while (true) { + int numFinished = 0; + for (Future<?> future : futures) { + if (future.isDone()) { + numFinished += 1; + } + } + + if (numFinished == futures.size()) { + List<Throwable> uncaught = Lists.newArrayList(); + // all of the futures are done, get any uncaught exceptions + for (Future<?> future : futures) { + try { + future.get(); + + } catch (InterruptedException e) { + LOG.warn("Interrupted while getting future results", e); + for (Throwable t : uncaught) { + e.addSuppressed(t); + } + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + + } catch (CancellationException e) { + // ignore cancellations + + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (Error.class.isInstance(cause)) { + for (Throwable t : uncaught) { + cause.addSuppressed(t); + } + throw (Error) cause; + } + + if (cause != null) { + uncaught.add(e); + } + + LOG.warn("Task threw uncaught exception", cause); + } + } + + return uncaught; + + } else { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for tasks to finish", e); + + for (Future<?> future : futures) { + future.cancel(true); + } + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + } + } + + /** + * A range, [ 0, size ) + */ + private static class Range implements Iterable<Integer> { + private final int size; + + Range(int size) { + this.size = size; + } + + @Override + public Iterator<Integer> iterator() { + return new Iterator<Integer>() { + private int current = 0; + + @Override + public boolean hasNext() { + return current < size; + } + + @Override + public Integer next() { + int ret = current; + current += 1; + return ret; + } + }; + } + } + + public static Builder<Integer> range(int upTo) { + return new Builder<>(new Range(upTo)); + } + + public static <I> Builder<I> foreach(Iterator<I> items) { + return new Builder<>(() -> items); + } + + public static <I> Builder<I> foreach(Iterable<I> items) { + return new Builder<>(items); + } + + @SafeVarargs public static <I> Builder<I> foreach(I... items) { + return new Builder<>(Arrays.asList(items)); + } + + @SuppressWarnings("StreamToIterable") public static <I> Builder<I> foreach(Stream<I> items) { + return new Builder<>(items::iterator); + } + + private static <E extends Exception> void throwOne(Collection<Throwable> exceptions, + Class<E> allowedException) throws E { + Iterator<Throwable> iter = exceptions.iterator(); + Throwable exception = iter.next(); + Class<? extends Throwable> exceptionClass = exception.getClass(); + + while (iter.hasNext()) { + Throwable other = iter.next(); + if (!exceptionClass.isInstance(other)) { + exception.addSuppressed(other); + } + } + + castAndThrow(exception, allowedException); + } + + @SuppressWarnings("unchecked") public static <E extends Exception> void castAndThrow( + Throwable exception, Class<E> exceptionClass) throws E { + if (exception instanceof RuntimeException) { + throw (RuntimeException) exception; + } else if (exception instanceof Error) { + throw (Error) exception; + } else if (exceptionClass.isInstance(exception)) { + throw (E) exception; + } + throw new RuntimeException(exception); + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/ThreadPools.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/ThreadPools.java new file mode 100644 index 00000000000..b69dd9c6d7f --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/ThreadPools.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.common; + +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * Copied from Apache Iceberg, please see: + * <a href="https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/util/ThreadPools.java"> + * ThreadPools</a> + */ +public class ThreadPools { + + private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class); + + private ThreadPools() { + } + + public static final String WORKER_THREAD_POOL_SIZE_PROP = "proton.worker.num-threads"; + + public static final int WORKER_THREAD_POOL_SIZE = + poolSize(Math.max(2, Runtime.getRuntime().availableProcessors())); + + private static final ExecutorService WORKER_POOL = newWorkerPool("proton-default-worker-pool"); + + public static ExecutorService defaultWorkerPool() { + return WORKER_POOL; + } + + public static ExecutorService newWorkerPool(String namePrefix) { + return newWorkerPool(namePrefix, WORKER_THREAD_POOL_SIZE); + } + + public static ExecutorService newWorkerPool(String namePrefix, int poolSize) { + return Executors.newFixedThreadPool(poolSize, newDaemonThreadFactory(namePrefix)); + } + + public static ScheduledExecutorService newScheduleWorkerPool(String namePrefix, int poolSize) { + return Executors.newScheduledThreadPool(poolSize, newDaemonThreadFactory(namePrefix)); + } + + /** + * Helper routine to shutdown a {@link ExecutorService}. Will wait up to a + * certain timeout for the ExecutorService to gracefully shutdown. If the + * ExecutorService did not shutdown and there are still tasks unfinished after + * the timeout period, the ExecutorService will be notified to forcibly shut + * down. Another timeout period will be waited before giving up. So, at most, + * a shutdown will be allowed to wait up to twice the timeout value before + * giving up. + * <p> + * This method is copied from + * {@link org.apache.hadoop.util.concurrent.HadoopExecutors#shutdown(ExecutorService, Logger, long, TimeUnit)}. + * + * @param executorService ExecutorService to shutdown + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + */ + public static void shutdown(ExecutorService executorService, long timeout, TimeUnit unit) { + if (executorService == null) { + return; + } + + try { + executorService.shutdown(); + LOG.debug("Gracefully shutting down executor service. Waiting max {} {}", timeout, unit); + + if (!executorService.awaitTermination(timeout, unit)) { + LOG.debug( + "Executor service has not shutdown yet. Forcing. Will wait up to an additional {} {} for shutdown", + timeout, unit); + executorService.shutdownNow(); + } + + if (executorService.awaitTermination(timeout, unit)) { + LOG.debug("Succesfully shutdown executor service"); + } else { + LOG.error("Unable to shutdown executor service after timeout {} {}", (2 * timeout), unit); + } + } catch (InterruptedException e) { + LOG.error("Interrupted while attempting to shutdown", e); + executorService.shutdownNow(); + } catch (Exception e) { + LOG.warn("Exception closing executor service {}", e.getMessage()); + LOG.debug("Exception closing executor service", e); + throw e; + } + } + + private static int poolSize(int defaultSize) { + String value = System.getProperty(WORKER_THREAD_POOL_SIZE_PROP); + if (value != null) { + try { + return Integer.parseUnsignedInt(value); + } catch (NumberFormatException e) { + // will return the default + } + } + return defaultSize; + } + + public static ThreadFactory newDaemonThreadFactory(String namePrefix) { + return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix + "-%d") + .setUncaughtExceptionHandler( + (t, e) -> LOG.error("Thread {} encounter uncaught exception", t, e)).build(); + } + + public static Thread newDaemonThread(String name, Runnable runnable, + UncaughtExceptionHandler handler) { + Thread t = new Thread(runnable); + t.setName(name); + t.setDaemon(true); + if (handler != null) { + t.setUncaughtExceptionHandler(handler); + } + return t; + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestChainTOSInputStream.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestChainTOSInputStream.java new file mode 100644 index 00000000000..0dd31d208c8 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestChainTOSInputStream.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object.tos; + +import com.volcengine.tos.model.object.GetObjectBasicOutput; +import com.volcengine.tos.model.object.GetObjectV2Output; +import org.apache.hadoop.fs.tosfs.common.Bytes; +import org.apache.hadoop.fs.tosfs.object.Constants; +import org.apache.hadoop.fs.tosfs.util.TestUtility; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; + +public class TestChainTOSInputStream { + + private static final int DATA_SIZE = 1 << 20; + private static final byte[] DATA = TestUtility.rand(DATA_SIZE); + + @Test + public void testRetryReadData() throws IOException { + int readLen = DATA_SIZE - 1; + int cutOff = readLen / 2; + try (ChainTOSInputStream stream = createTestChainTOSInputStream(DATA, 0, DATA_SIZE - 1, 1024, + cutOff)) { + // The read length is more than the cut-off position, and equal to data length, + // so the first stream will throw IOException, and fallback to the second stream. + byte[] data = new byte[readLen]; + int n = stream.read(data); + Assert.assertEquals(readLen, n); + Assert.assertArrayEquals(Bytes.toBytes(DATA, 0, readLen), data); + } + + try (ChainTOSInputStream stream = createTestChainTOSInputStream(DATA, 0, DATA_SIZE - 1, 1024, + cutOff)) { + // The read length is more than data length, so the first stream will throw IOException, + // and fallback to the second stream. + byte[] data = new byte[readLen + 2]; + int n = stream.read(data); + Assert.assertEquals(readLen, n); + Assert.assertArrayEquals(Bytes.toBytes(DATA, 0, readLen), Bytes.toBytes(data, 0, n)); + } + + readLen = DATA_SIZE / 3; + cutOff = DATA_SIZE / 2; + try (ChainTOSInputStream stream = createTestChainTOSInputStream(DATA, 0, DATA_SIZE, 1024, + cutOff)) { + for (int i = 0; i <= 3; i++) { + // The cut-off position is between (readLen, 2 * readLen), so the data of first read come from the first stream, + // and then the second read will meet IOException, and fallback to the second stream. + byte[] data = new byte[readLen]; + int n = stream.read(data); + + int off = i * readLen; + int len = Math.min(readLen, DATA_SIZE - off); + + Assert.assertEquals(len, n); + Assert.assertArrayEquals(Bytes.toBytes(DATA, off, len), Bytes.toBytes(data, 0, len)); + } + } + + int smallDataSize = 1 << 10; + cutOff = smallDataSize / 2; + byte[] smallData = TestUtility.rand(1 << 10); + try (ChainTOSInputStream stream = createTestChainTOSInputStream(smallData, 0, smallDataSize, + 1024, cutOff)) { + for (int i = 0; i < smallDataSize; i++) { + // The cut-off position is 512, the 512th read operation will meet IOException, + // and then fallback to the second stream. + int read = stream.read(); + Assert.assertEquals(smallData[i] & 0xFF, read); + } + } + } + + @Test + public void testSkipAndRead() throws IOException { + int cutOff = (DATA_SIZE - 1) / 2; + try (ChainTOSInputStream stream = createTestChainTOSInputStream(DATA, 0, DATA_SIZE - 1, 1024, + cutOff)) { + // The skip pos is equal to cut-off pos, once skip finished, the first read operation will meet IOException, + // and the fallback to the second stream. + int readPos = (DATA_SIZE - 1) / 2; + stream.skip(readPos); + + int readLen = 1024; + byte[] data = new byte[readLen]; + int n = stream.read(data); + Assert.assertEquals(readLen, n); + Assert.assertArrayEquals(Bytes.toBytes(DATA, readPos, readLen), data); + } + + try (ChainTOSInputStream stream = createTestChainTOSInputStream(DATA, 0, DATA_SIZE - 1, 1024, + cutOff)) { + // The skip pos is more than cut-off pos, the skip operation will throw IOException, + // and the fallback to the second stream and skip(readPos) again + int readPos = cutOff + 1024; + stream.skip(readPos); + + int readLen = 1024; + byte[] data = new byte[readLen]; + int n = stream.read(data); + Assert.assertEquals(readLen, n); + Assert.assertArrayEquals(Bytes.toBytes(DATA, readPos, readLen), data); + } + + try (ChainTOSInputStream stream = createTestChainTOSInputStream(DATA, 0, DATA_SIZE - 1, 1024, + cutOff)) { + // The skip pos = cut-off pos - 1025, the skip operation will succeed on the first stream, + // the 1024 bytes read operation also succeed on the first stream, + // but the next 1024 bytes read operation will fail on the first stream, and fallback to the second stream + int readPos = cutOff - 1024 - 1; + stream.skip(readPos); + + int readLen = 1024; + byte[] data = new byte[readLen]; + int n = stream.read(data); + Assert.assertEquals(readLen, n); + Assert.assertArrayEquals(Bytes.toBytes(DATA, readPos, readLen), data); + + n = stream.read(data); + Assert.assertEquals(readLen, n); + Assert.assertArrayEquals(Bytes.toBytes(DATA, readPos + 1024, readLen), data); + } + + try (ChainTOSInputStream stream = createTestChainTOSInputStream(DATA, 0, DATA_SIZE - 1, 1024, + cutOff)) { + // 1. Skip 1024 bytes and then read 1024 bytes from the first stream. + // 2. And then skip cut-off - 512 bytes, the target off = 1024 + 1024 + cut-off - 512, + // which is bigger than cut-off pos, so the second skip operation will fail, + // and then fallback to the second stream. + // 3. Read 1024 bytes + int readPos = 1024; + stream.skip(readPos); + + int readLen = 1024; + byte[] data = new byte[readLen]; + int n = stream.read(data); + Assert.assertEquals(readLen, n); + Assert.assertArrayEquals(Bytes.toBytes(DATA, readPos, readLen), data); + + int skipPos = cutOff - 512; + stream.skip(skipPos); + + n = stream.read(data); + Assert.assertEquals(readLen, n); + int targetOff = readPos + 1024 + skipPos; + Assert.assertArrayEquals(Bytes.toBytes(DATA, targetOff, readLen), data); + } + } + + /** + * The ChainTOSInputStream contains two stream created by TestObjectFactory. + * Once the read pos of first stream is more than cutPos, the stream will throw IOException with + * unexpect end of stream error msg, but the second stream will contain the remaining data. + */ + private ChainTOSInputStream createTestChainTOSInputStream(byte[] data, long startOff, long endOff, + long maxDrainSize, long cutPos) { + String key = "dummy-key"; + TOS.GetObjectFactory factory = new TestObjectFactory(data, Arrays.asList(cutPos, -1L)); + return new ChainTOSInputStream(factory, key, startOff, endOff, maxDrainSize, 1); + } + + private static class TestObjectFactory implements TOS.GetObjectFactory { + private final byte[] data; + private final List<Long> streamBreakPoses; + private int streamIndex = 0; + + TestObjectFactory(byte[] data, List<Long> streamBreakPoses) { + this.data = data; + this.streamBreakPoses = streamBreakPoses; + } + + @Override + public GetObjectOutput create(String key, long offset, long end) { + long len = Math.min(end, data.length) - offset; + ByteArrayInputStream data = new ByteArrayInputStream(this.data, (int) offset, (int) len); + + if (streamIndex < streamBreakPoses.size()) { + return new GetObjectOutput(new GetObjectV2Output(new GetObjectBasicOutput(), + new UnExpectedEndOfStream(data, streamBreakPoses.get(streamIndex++))), + Constants.MAGIC_CHECKSUM); + } else { + throw new RuntimeException("No more output"); + } + } + } + + private static class UnExpectedEndOfStream extends InputStream { + private final ByteArrayInputStream delegate; + private final long breakPos; + private int readPos; + + UnExpectedEndOfStream(ByteArrayInputStream stream, long breakPos) { + delegate = stream; + this.breakPos = breakPos; + } + + @Override + public int read() throws IOException { + if (breakPos != -1 && readPos >= breakPos) { + throw new IOException("unexpected end of stream on dummy source."); + } else { + int n = delegate.read(); + readPos += 1; + return n; + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (breakPos != -1 && readPos >= breakPos) { + throw new IOException("unexpected end of stream on dummy source."); + } else { + int n = delegate.read(b, off, len); + readPos += n; + return n; + } + } + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestDelegationClientBuilder.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestDelegationClientBuilder.java new file mode 100644 index 00000000000..e56dabd3751 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestDelegationClientBuilder.java @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object.tos; + +import com.volcengine.tos.TOSV2; +import com.volcengine.tos.TOSV2ClientBuilder; +import com.volcengine.tos.TosClientException; +import com.volcengine.tos.TosException; +import com.volcengine.tos.TosServerException; +import com.volcengine.tos.auth.Credential; +import com.volcengine.tos.auth.StaticCredentials; +import com.volcengine.tos.comm.HttpStatus; +import com.volcengine.tos.model.object.DeleteObjectInput; +import com.volcengine.tos.model.object.HeadObjectV2Input; +import com.volcengine.tos.model.object.HeadObjectV2Output; +import com.volcengine.tos.model.object.ListObjectsV2Input; +import com.volcengine.tos.model.object.ListObjectsV2Output; +import com.volcengine.tos.model.object.PutObjectInput; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.tosfs.common.Tasks; +import org.apache.hadoop.fs.tosfs.common.ThreadPools; +import org.apache.hadoop.fs.tosfs.conf.ConfKeys; +import org.apache.hadoop.fs.tosfs.conf.TosKeys; +import org.apache.hadoop.fs.tosfs.object.tos.auth.EnvironmentCredentialsProvider; +import org.apache.hadoop.fs.tosfs.object.tos.auth.SimpleCredentialsProvider; +import org.apache.hadoop.fs.tosfs.util.ParseUtils; +import org.apache.hadoop.fs.tosfs.util.TestUtility; +import org.apache.hadoop.fs.tosfs.util.UUIDUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.IntStream; +import javax.net.ssl.SSLException; + +import static org.apache.hadoop.fs.tosfs.object.tos.DelegationClient.isRetryableException; +import static org.apache.hadoop.fs.tosfs.object.tos.TOS.TOS_SCHEME; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestDelegationClientBuilder { + + private static final String TEST_KEY = UUIDUtils.random(); + private static final String TEST_DATA = "1234567890"; + private static final String ENV_ACCESS_KEY = + ParseUtils.envAsString(TOS.ENV_TOS_ACCESS_KEY_ID, false); + private static final String ENV_SECRET_KEY = + ParseUtils.envAsString(TOS.ENV_TOS_SECRET_ACCESS_KEY, false); + private static final String ENV_ENDPOINT = ParseUtils.envAsString(TOS.ENV_TOS_ENDPOINT, false); + + @Rule + public TestName name = new TestName(); + + // Maximum retry times of the tos http client. + public static final String MAX_RETRY_COUNT_KEY = "fs.tos.http.maxRetryCount"; + public static final int MAX_RETRY_COUNT_DEFAULT = -1; + + @Before + public void setUp() { + TOSV2 tosSdkClientV2 = + new TOSV2ClientBuilder().build(TestUtility.region(), TestUtility.endpoint(), + new StaticCredentials(ENV_ACCESS_KEY, ENV_SECRET_KEY)); + try (ByteArrayInputStream stream = new ByteArrayInputStream(TEST_DATA.getBytes())) { + PutObjectInput putObjectInput = + new PutObjectInput().setBucket(TestUtility.bucket()).setKey(TEST_KEY).setContent(stream); + tosSdkClientV2.putObject(putObjectInput); + } catch (IOException e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void testHeadApiRetry() throws IOException { + Configuration conf = new Configuration(); + conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com"); + conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME); + conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, false); + conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key("test"), "ACCESS_KEY"); + conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key("test"), "SECRET_KEY"); + + DelegationClient tosV2 = new DelegationClientBuilder().bucket("test").conf(conf).build(); + TOSV2 mockClient = mock(TOSV2.class); + tosV2.setClient(mockClient); + tosV2.setMaxRetryTimes(5); + + HeadObjectV2Input input = HeadObjectV2Input.builder().bucket("test").build(); + when(tosV2.headObject(input)).thenThrow( + new TosServerException(HttpStatus.INTERNAL_SERVER_ERROR), + new TosServerException(HttpStatus.TOO_MANY_REQUESTS), + new TosClientException("fake toe", new IOException("fake ioe")), + new TosException(new SocketException("fake msg")), + new TosException(new UnknownHostException("fake msg")), + new TosException(new SSLException("fake msg")), + new TosException(new InterruptedException("fake msg")), + new TosException(new InterruptedException("fake msg"))) + .thenReturn(new HeadObjectV2Output()); + + RuntimeException exception = + assertThrows(RuntimeException.class, () -> tosV2.headObject(input)); + assertTrue(exception instanceof TosException); + assertTrue(exception.getCause() instanceof UnknownHostException); + verify(tosV2.client(), times(5)).headObject(input); + + HeadObjectV2Input inputOneTime = HeadObjectV2Input.builder().bucket("inputOneTime").build(); + HeadObjectV2Output output = new HeadObjectV2Output(); + when(tosV2.headObject(inputOneTime)).thenReturn(output); + HeadObjectV2Output headObject = tosV2.headObject(inputOneTime); + Assert.assertEquals(headObject, output); + verify(tosV2.client(), times(1)).headObject(inputOneTime); + tosV2.close(); + + DelegationClient newClient = new DelegationClientBuilder().bucket("test").conf(conf).build(); + mockClient = mock(TOSV2.class); + newClient.setClient(mockClient); + newClient.setMaxRetryTimes(5); + when(newClient.headObject(input)).thenThrow( + new TosClientException("fake toe", new EOFException("fake eof")), + new TosServerException(HttpStatus.INTERNAL_SERVER_ERROR), + new TosServerException(HttpStatus.TOO_MANY_REQUESTS)).thenReturn(new HeadObjectV2Output()); + + exception = assertThrows(RuntimeException.class, () -> newClient.headObject(input)); + assertTrue(exception instanceof TosClientException); + assertTrue(exception.getCause() instanceof EOFException); + verify(newClient.client(), times(1)).headObject(input); + newClient.close(); + } + + @Test + public void testEnableCrcCheck() throws IOException { + String bucket = name.getMethodName(); + Configuration conf = new Configuration(); + conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com"); + conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME); + conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, true); + conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key("test"), "ACCESS_KEY"); + conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key("test"), "SECRET_KEY"); + + DelegationClient tosV2 = new DelegationClientBuilder().bucket(bucket).conf(conf).build(); + Assert.assertTrue(tosV2.config().isEnableCrc()); + + conf.setBoolean(TosKeys.FS_TOS_CRC_CHECK_ENABLED, false); + tosV2 = new DelegationClientBuilder().bucket(bucket).conf(conf).build(); + Assert.assertFalse(tosV2.config().isEnableCrc()); + + tosV2.close(); + } + + @Test + public void testClientCache() throws IOException { + String bucket = name.getMethodName(); + Configuration conf = new Configuration(); + conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com"); + conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME); + conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, false); + conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(bucket), "ACCESS_KEY"); + conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key(bucket), "SECRET_KEY"); + + DelegationClient tosV2 = new DelegationClientBuilder().bucket(bucket).conf(conf).build(); + DelegationClient tosV2Cached = new DelegationClientBuilder().bucket(bucket).conf(conf).build(); + Assert.assertEquals("client must be load in cache", tosV2Cached, tosV2); + Assert.assertEquals("ACCESS_KEY_A", tosV2.usedCredential().getAccessKeyId()); + tosV2Cached.close(); + + String newBucket = "new-test-bucket"; + conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(newBucket), "ACCESS_KEY_B"); + conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key(newBucket), "SECRET_KEY_B"); + DelegationClient changeBucketClient = + new DelegationClientBuilder().bucket(newBucket).conf(conf).build(); + Assert.assertNotEquals("client should be created entirely new", changeBucketClient, tosV2); + Assert.assertEquals("ACCESS_KEY_B", changeBucketClient.usedCredential().getAccessKeyId()); + changeBucketClient.close(); + + conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, true); // disable cache: true + DelegationClient tosV2NotCached = + new DelegationClientBuilder().bucket(bucket).conf(conf).build(); + Assert.assertNotEquals("client should be created entirely new", tosV2NotCached, tosV2); + Assert.assertEquals("ACCESS_KEY_A", tosV2NotCached.usedCredential().getAccessKeyId()); + tosV2NotCached.close(); + + tosV2.close(); + } + + @Test + public void testOverwriteHttpConfig() throws IOException { + Configuration conf = new Configuration(); + conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com"); + conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME); + conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key("test"), "ACCESS_KEY"); + conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key("test"), "SECRET_KEY"); + conf.setInt(TosKeys.FS_TOS_HTTP_MAX_CONNECTIONS, 24); + conf.setInt(MAX_RETRY_COUNT_KEY, 24); + conf.setInt(TosKeys.FS_TOS_REQUEST_MAX_RETRY_TIMES, 24); + conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, true); + + DelegationClient tosV2 = new DelegationClientBuilder().bucket("test").conf(conf).build(); + Assert.assertEquals("ACCESS_KEY", tosV2.usedCredential().getAccessKeyId()); + Assert.assertEquals("http max connection overwrite to 24 from 1024, must be 24", 24, + tosV2.config().getTransportConfig().getMaxConnections()); + Assert.assertEquals("tos maxRetryCount disabled, must be -1", + DelegationClientBuilder.DISABLE_TOS_RETRY_VALUE, + tosV2.config().getTransportConfig().getMaxRetryCount()); + Assert.assertEquals("maxRetryTimes must be 24", 24, tosV2.maxRetryTimes()); + Assert.assertEquals("endpoint must be equals to https://tos-cn-beijing.ivolces.com", + "https://tos-cn-beijing.ivolces.com", tosV2.config().getEndpoint()); + + tosV2.close(); + } + + @Test + public void testDynamicRefreshAkSk() throws IOException { + Configuration conf = new Configuration(); + conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com"); + conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME); + conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(TestUtility.bucket()), ENV_ACCESS_KEY); + conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key(TestUtility.bucket()), ENV_SECRET_KEY); + conf.setInt(TosKeys.FS_TOS_HTTP_MAX_CONNECTIONS, 24); + conf.setInt(MAX_RETRY_COUNT_KEY, 24); + + TOSV2 tosSdkClientV2 = + new TOSV2ClientBuilder().build(TestUtility.region(), TestUtility.endpoint(), + new StaticCredentials("a", "b")); + DelegationClient delegationClientV2 = + new DelegationClientBuilder().bucket(TestUtility.bucket()).conf(conf).build(); + + ListObjectsV2Input inputV2 = + ListObjectsV2Input.builder().bucket(TestUtility.bucket()).prefix(TEST_KEY).marker("") + .maxKeys(10).build(); + + Assert.assertThrows(TosServerException.class, () -> tosSdkClientV2.listObjects(inputV2)); + + tosSdkClientV2.changeCredentials(new StaticCredentials(ENV_ACCESS_KEY, ENV_SECRET_KEY)); + + ListObjectsV2Output tosSdkOutput = tosSdkClientV2.listObjects(inputV2); + ListObjectsV2Output delegateOutput = delegationClientV2.listObjects(inputV2); + int nativeContentSize = + tosSdkOutput.getContents() == null ? -1 : tosSdkOutput.getContents().size(); + int delegateContentSize = + delegateOutput.getContents() == null ? -1 : delegateOutput.getContents().size(); + + Assert.assertEquals("delegation client must same as native client", nativeContentSize, + delegateContentSize); + Assert.assertEquals(ENV_ACCESS_KEY, delegationClientV2.usedCredential().getAccessKeyId()); + + delegationClientV2.close(); + } + + @Test + public void testCreateClientWithEnvironmentCredentials() throws IOException { + Configuration conf = new Configuration(); + conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com"); + conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, EnvironmentCredentialsProvider.NAME); + + DelegationClient tosV2 = + new DelegationClientBuilder().bucket(TestUtility.bucket()).conf(conf).build(); + Credential cred = tosV2.usedCredential(); + + String assertMsg = + String.format("expect %s, but got %s", ENV_ACCESS_KEY, cred.getAccessKeyId()); + Assert.assertEquals(assertMsg, cred.getAccessKeyId(), ENV_ACCESS_KEY); + assertMsg = String.format("expect %s, but got %s", ENV_SECRET_KEY, cred.getAccessKeySecret()); + Assert.assertEquals(assertMsg, cred.getAccessKeySecret(), ENV_SECRET_KEY); + + tosV2.close(); + } + + @Test + public void testCreateClientWithSimpleCredentials() throws IOException { + Configuration conf = new Configuration(); + conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), ENV_ENDPOINT); + conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME); + conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(TestUtility.bucket()), ENV_ACCESS_KEY); + conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key(TestUtility.bucket()), ENV_SECRET_KEY); + conf.setInt(TosKeys.FS_TOS_HTTP_MAX_CONNECTIONS, 24); + conf.setInt(MAX_RETRY_COUNT_KEY, 24); + + ListObjectsV2Input input = + ListObjectsV2Input.builder().bucket(TestUtility.bucket()).prefix(TEST_KEY).marker("") + .maxKeys(10).build(); + + TOSV2 v2 = new TOSV2ClientBuilder().build(TestUtility.region(), TestUtility.endpoint(), + new StaticCredentials(ENV_ACCESS_KEY, ENV_SECRET_KEY)); + ListObjectsV2Output outputV2 = v2.listObjects(input); + + DelegationClient tosV2 = + new DelegationClientBuilder().bucket(TestUtility.bucket()).conf(conf).build(); + + ListObjectsV2Output output = tosV2.listObjects(input); + Assert.assertEquals("delegation client must be same as native client", + outputV2.getContents().size(), output.getContents().size()); + + tosV2.close(); + } + + @Test + public void testCachedConcurrently() { + String bucketName = name.getMethodName(); + + Function<String, Configuration> commonConf = bucket -> { + Configuration conf = new Configuration(); + conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), ENV_ENDPOINT); + conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME); + conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(bucket), ENV_ACCESS_KEY); + conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key(bucket), ENV_SECRET_KEY); + return conf; + }; + + // enable cache + Function<String, Configuration> enableCachedConf = bucket -> { + Configuration conf = commonConf.apply(bucket); + conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, false); + return conf; + }; + + ExecutorService es = ThreadPools.newWorkerPool("testCachedConcurrently", 32); + int bucketCount = 5; + int taskCount = 10000; + + AtomicInteger success = new AtomicInteger(0); + AtomicInteger failure = new AtomicInteger(0); + Tasks.foreach(IntStream.range(0, taskCount).boxed().map(i -> bucketName + (i % bucketCount))) + .executeWith(es).run(bucket -> { + try { + Configuration conf = enableCachedConf.apply(bucket); + DelegationClient client = new DelegationClientBuilder().bucket(bucket).conf(conf).build(); + client.close(); + success.incrementAndGet(); + } catch (Exception e) { + failure.incrementAndGet(); + } + }); + + Assert.assertEquals(bucketCount, DelegationClientBuilder.CACHE.size()); + Assert.assertEquals(taskCount, success.get()); + Assert.assertEquals(0, failure.get()); + + // clear cache + DelegationClientBuilder.CACHE.clear(); + + // disable cache + Function<String, Configuration> disableCachedConf = bucket -> { + Configuration conf = commonConf.apply(bucket); + conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, true); + return conf; + }; + + success.set(0); + failure.set(0); + Tasks.foreach(IntStream.range(0, taskCount).boxed().map(i -> bucketName + (i % bucketCount))) + .executeWith(es).run(bucket -> { + try { + Configuration conf = disableCachedConf.apply(bucket); + DelegationClient client = new DelegationClientBuilder().bucket(bucket).conf(conf).build(); + client.close(); + success.incrementAndGet(); + } catch (Exception e) { + failure.incrementAndGet(); + } + }); + + Assert.assertTrue(DelegationClientBuilder.CACHE.isEmpty()); + Assert.assertEquals(taskCount, success.get()); + Assert.assertEquals(0, failure.get()); + + es.shutdown(); + } + + @After + public void deleteAllTestData() throws IOException { + TOSV2 tosSdkClientV2 = + new TOSV2ClientBuilder().build(TestUtility.region(), TestUtility.endpoint(), + new StaticCredentials(ENV_ACCESS_KEY, ENV_SECRET_KEY)); + tosSdkClientV2.deleteObject( + DeleteObjectInput.builder().bucket(TestUtility.bucket()).key(TEST_KEY).build()); + + tosSdkClientV2.close(); + DelegationClientBuilder.CACHE.clear(); + } + + @Test + public void testRetryableException() { + assertTrue(retryableException(new TosServerException(500))); + assertTrue(retryableException(new TosServerException(501))); + assertTrue(retryableException(new TosServerException(429))); + assertFalse(retryableException(new TosServerException(404))); + + assertTrue(retryableException(new TosException(new SocketException()))); + assertTrue(retryableException(new TosException(new UnknownHostException()))); + assertTrue(retryableException(new TosException(new SSLException("fake ssl")))); + assertTrue(retryableException(new TosException(new SocketTimeoutException()))); + assertTrue(retryableException(new TosException(new InterruptedException()))); + + assertTrue(retryableException(new TosClientException("fake ioe", new IOException()))); + assertFalse(retryableException(new TosClientException("fake eof", new EOFException()))); + + assertTrue(retryableException(new TosServerException(409))); + assertTrue( + retryableException(new TosServerException(409).setEc(TOSErrorCodes.PATH_LOCK_CONFLICT))); + assertFalse( + retryableException(new TosServerException(409).setEc(TOSErrorCodes.DELETE_NON_EMPTY_DIR))); + assertFalse( + retryableException(new TosServerException(409).setEc(TOSErrorCodes.LOCATED_UNDER_A_FILE))); + assertFalse(retryableException( + new TosServerException(409).setEc(TOSErrorCodes.COPY_BETWEEN_DIR_AND_FILE))); + assertFalse(retryableException( + new TosServerException(409).setEc(TOSErrorCodes.RENAME_TO_AN_EXISTED_DIR))); + assertFalse( + retryableException(new TosServerException(409).setEc(TOSErrorCodes.RENAME_TO_SUB_DIR))); + assertFalse(retryableException( + new TosServerException(409).setEc(TOSErrorCodes.RENAME_BETWEEN_DIR_AND_FILE))); + } + + private boolean retryableException(TosException e) { + return isRetryableException(e, + Arrays.asList(TOSErrorCodes.FAST_FAILURE_CONFLICT_ERROR_CODES.split(","))); + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestTOSInputStream.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestTOSInputStream.java new file mode 100644 index 00000000000..59f9862d7de --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestTOSInputStream.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object.tos; + +import com.volcengine.tos.internal.util.aborthook.AbortInputStreamHook; +import com.volcengine.tos.model.object.GetObjectBasicOutput; +import com.volcengine.tos.model.object.GetObjectV2Output; +import org.apache.hadoop.fs.tosfs.object.Constants; +import org.apache.hadoop.fs.tosfs.util.TestUtility; +import org.apache.hadoop.thirdparty.com.google.common.io.ByteStreams; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +public class TestTOSInputStream { + + private static final int DATA_SIZE = 1 << 20; + private static final byte[] DATA = TestUtility.rand(DATA_SIZE); + + @Test + public void testForceClose() throws IOException { + TOSInputStream stream = createStream(DATA, 0, DATA_SIZE - 1, 1024); + stream.close(); + Assert.assertTrue("Expected force close", cast(stream).isForceClose()); + + stream = createStream(DATA, 0, DATA_SIZE - 1, 1024); + ByteStreams.skipFully(stream, DATA_SIZE - 1024 - 1); + stream.close(); + Assert.assertTrue("Expected force close", cast(stream).isForceClose()); + + stream = createStream(DATA, 0, -1, 1024); + stream.close(); + Assert.assertTrue("Expected force close", cast(stream).isForceClose()); + + stream = createStream(DATA, 0, -1, 1024); + ByteStreams.skipFully(stream, DATA_SIZE - 1024 - 1); + stream.close(); + Assert.assertTrue("Expected force close", cast(stream).isForceClose()); + + stream = createStream(DATA, 0, -1, 1024); + ByteStreams.skipFully(stream, DATA_SIZE - 1024); + stream.close(); + Assert.assertTrue("Expected force close", cast(stream).isForceClose()); + } + + @Test + public void testClose() throws IOException { + TOSInputStream stream = createStream(DATA, 0, DATA_SIZE - 1, DATA_SIZE); + stream.close(); + Assert.assertFalse("Expected close by skipping bytes", cast(stream).isForceClose()); + + stream = createStream(DATA, 0, DATA_SIZE - 1, 1024); + ByteStreams.skipFully(stream, DATA_SIZE - 1024); + stream.close(); + Assert.assertFalse("Expected close by skipping bytes", cast(stream).isForceClose()); + + stream = createStream(DATA, 0, DATA_SIZE - 1, 1024); + ByteStreams.skipFully(stream, DATA_SIZE - 1023); + stream.close(); + Assert.assertFalse("Expected close by skipping bytes", cast(stream).isForceClose()); + + stream = createStream(DATA, 0, -1, DATA_SIZE + 1); + stream.close(); + Assert.assertFalse("Expected close by skipping bytes", cast(stream).isForceClose()); + + stream = createStream(DATA, 0, -1, 1024); + ByteStreams.skipFully(stream, DATA_SIZE - 1023); + stream.close(); + Assert.assertFalse("Expected close by skipping bytes", cast(stream).isForceClose()); + + stream = createStream(DATA, 0, -1, 1024); + ByteStreams.skipFully(stream, DATA_SIZE); + stream.close(); + Assert.assertFalse("Expected close by skipping bytes", cast(stream).isForceClose()); + } + + private TestInputStream cast(TOSInputStream stream) throws IOException { + InputStream content = stream.getObjectOutput().verifiedContent(Constants.MAGIC_CHECKSUM); + Assert.assertTrue("Not a TestInputStream", content instanceof TestInputStream); + return (TestInputStream) content; + } + + private TOSInputStream createStream(byte[] data, long startOff, long endOff, long maxDrainSize) + throws IOException { + TestInputStream stream = + new TestInputStream(data, (int) startOff, (int) (data.length - startOff)); + GetObjectV2Output output = new GetObjectV2Output(new GetObjectBasicOutput(), stream).setHook( + new ForceCloseHook(stream)); + + return new TOSInputStream(new GetObjectOutput(output, Constants.MAGIC_CHECKSUM), startOff, + endOff, maxDrainSize, Constants.MAGIC_CHECKSUM); + } + + private static class TestInputStream extends ByteArrayInputStream { + // -1 means call close() + // 0 means neither call close() nor forceClose() + // 1 means call forceClose() + private int cloeState = 0; + + private TestInputStream(byte[] buf, int off, int len) { + super(buf, off, len); + } + + @Override public void close() { + cloeState = -1; + } + + public void forceClose() { + cloeState = 1; + } + + boolean isForceClose() { + Assert.assertTrue("Neither call close() nor forceClose()", cloeState == -1 || cloeState == 1); + return cloeState == 1; + } + } + + private static class ForceCloseHook implements AbortInputStreamHook { + private final TestInputStream in; + + private ForceCloseHook(TestInputStream in) { + this.in = in; + } + + @Override public void abort() { + if (in != null) { + in.forceClose(); + } + } + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestTOSObjectStorage.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestTOSObjectStorage.java new file mode 100644 index 00000000000..6d2561769cd --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestTOSObjectStorage.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object.tos; + +import com.volcengine.tos.internal.model.CRC64Checksum; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.tosfs.common.Bytes; +import org.apache.hadoop.fs.tosfs.conf.TosKeys; +import org.apache.hadoop.fs.tosfs.object.ChecksumType; +import org.apache.hadoop.fs.tosfs.object.Constants; +import org.apache.hadoop.fs.tosfs.object.MultipartUpload; +import org.apache.hadoop.fs.tosfs.object.ObjectInfo; +import org.apache.hadoop.fs.tosfs.object.ObjectStorage; +import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory; +import org.apache.hadoop.fs.tosfs.object.Part; +import org.apache.hadoop.fs.tosfs.object.exceptions.NotAppendableException; +import org.apache.hadoop.fs.tosfs.object.request.ListObjectsRequest; +import org.apache.hadoop.fs.tosfs.object.response.ListObjectsResponse; +import org.apache.hadoop.fs.tosfs.util.CommonUtils; +import org.apache.hadoop.fs.tosfs.util.TestUtility; +import org.apache.hadoop.fs.tosfs.util.UUIDUtils; +import org.apache.hadoop.util.PureJavaCrc32C; +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.ByteArrayInputStream; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.zip.Checksum; + +import static org.apache.hadoop.fs.tosfs.object.tos.TOS.TOS_SCHEME; +import static org.apache.hadoop.fs.tosfs.util.TestUtility.directoryBucketObjectStorage; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +@RunWith(Parameterized.class) +public class TestTOSObjectStorage { + + private final ObjectStorage tos; + private final Checksum checksum; + private final ChecksumType type; + + public TestTOSObjectStorage(ObjectStorage tos, Checksum checksum, ChecksumType checksumType) { + this.tos = tos; + this.checksum = checksum; + this.type = checksumType; + } + + @Parameterized.Parameters(name = "ObjectStorage = {0}, Checksum = {1}, ChecksumType = {2}") + public static Iterable<Object[]> collections() { + List<Object[]> values = new ArrayList<>(); + + Configuration conf = new Configuration(); + conf.set(TosKeys.FS_TOS_CHECKSUM_TYPE, ChecksumType.CRC64ECMA.name()); + values.add(new Object[] { + ObjectStorageFactory.createWithPrefix(String.format("tos-%s/", UUIDUtils.random()), + TOS_SCHEME, TestUtility.bucket(), conf), new CRC64Checksum(), ChecksumType.CRC64ECMA }); + + conf = new Configuration(); + conf.set(TosKeys.FS_TOS_CHECKSUM_TYPE, ChecksumType.CRC32C.name()); + values.add(new Object[] { + ObjectStorageFactory.createWithPrefix(String.format("tos-%s/", UUIDUtils.random()), + TOS_SCHEME, TestUtility.bucket(), conf), new PureJavaCrc32C(), ChecksumType.CRC32C }); + + conf = new Configuration(); + conf.set(TosKeys.FS_TOS_CHECKSUM_TYPE, ChecksumType.CRC64ECMA.name()); + ObjectStorage directoryObjectStorage = directoryBucketObjectStorage(conf); + values.add( + new Object[] { directoryObjectStorage, new CRC64Checksum(), ChecksumType.CRC64ECMA }); + + conf = new Configuration(); + conf.set(TosKeys.FS_TOS_CHECKSUM_TYPE, ChecksumType.CRC32C.name()); + directoryObjectStorage = directoryBucketObjectStorage(conf); + values.add(new Object[] { directoryObjectStorage, new PureJavaCrc32C(), ChecksumType.CRC32C }); + + return values; + } + + @After + public void tearDown() throws Exception { + checksum.reset(); + + CommonUtils.runQuietly(() -> tos.deleteAll("")); + for (MultipartUpload upload : tos.listUploads("")) { + tos.abortMultipartUpload(upload.key(), upload.uploadId()); + } + tos.close(); + } + + @Test + public void testHeadObj() { + String key = "testPutChecksum"; + byte[] data = TestUtility.rand(1024); + checksum.update(data, 0, data.length); + assertEquals(checksum.getValue(), parseChecksum(tos.put(key, data))); + + ObjectInfo objInfo = tos.head(key); + assertEquals(checksum.getValue(), parseChecksum(objInfo.checksum())); + } + + @Test + public void testGetFileStatus() { + Assume.assumeFalse(tos.bucket().isDirectory()); + + Configuration conf = new Configuration(tos.conf()); + conf.setBoolean(TosKeys.FS_TOS_GET_FILE_STATUS_ENABLED, true); + tos.initialize(conf, tos.bucket().name()); + + String key = "testFileStatus"; + byte[] data = TestUtility.rand(256); + byte[] checksum = tos.put(key, data); + + ObjectInfo obj1 = tos.objectStatus(key); + Assert.assertArrayEquals(checksum, obj1.checksum()); + Assert.assertEquals(key, obj1.key()); + Assert.assertEquals(obj1, tos.head(key)); + + ObjectInfo obj2 = tos.objectStatus(key + "/"); + Assert.assertNull(obj2); + + String dirKey = "testDirStatus/"; + checksum = tos.put(dirKey, new byte[0]); + + ObjectInfo obj3 = tos.objectStatus("testDirStatus"); + Assert.assertArrayEquals(checksum, obj3.checksum()); + Assert.assertEquals(dirKey, obj3.key()); + Assert.assertEquals(obj3, tos.head(dirKey)); + Assert.assertNull(tos.head("testDirStatus")); + ObjectInfo obj4 = tos.objectStatus(dirKey); + Assert.assertArrayEquals(checksum, obj4.checksum()); + Assert.assertEquals(dirKey, obj4.key()); + Assert.assertEquals(obj4, tos.head(dirKey)); + + String prefix = "testPrefix/"; + tos.put(prefix + "subfile", data); + ObjectInfo obj5 = tos.objectStatus(prefix); + Assert.assertEquals(prefix, obj5.key()); + Assert.assertArrayEquals(Constants.MAGIC_CHECKSUM, obj5.checksum()); + Assert.assertNull(tos.head(prefix)); + ObjectInfo obj6 = tos.objectStatus("testPrefix"); + Assert.assertEquals(prefix, obj6.key()); + Assert.assertArrayEquals(Constants.MAGIC_CHECKSUM, obj6.checksum()); + Assert.assertNull(tos.head("testPrefix")); + } + + @Test + public void testObjectStatus() { + Assume.assumeFalse(tos.bucket().isDirectory()); + + String key = "testObjectStatus"; + byte[] data = TestUtility.rand(1024); + checksum.update(data, 0, data.length); + assertEquals(checksum.getValue(), parseChecksum(tos.put(key, data))); + + ObjectInfo objInfo = tos.objectStatus(key); + assertEquals(checksum.getValue(), parseChecksum(objInfo.checksum())); + + objInfo = tos.head(key); + assertEquals(checksum.getValue(), parseChecksum(objInfo.checksum())); + + String dir = key + "/"; + tos.put(dir, new byte[0]); + objInfo = tos.objectStatus(dir); + assertEquals(Constants.MAGIC_CHECKSUM, objInfo.checksum()); + + objInfo = tos.head(dir); + assertEquals(Constants.MAGIC_CHECKSUM, objInfo.checksum()); + } + + @Test + public void testListObjs() { + String key = "testListObjs"; + byte[] data = TestUtility.rand(1024); + checksum.update(data, 0, data.length); + for (int i = 0; i < 5; i++) { + assertEquals(checksum.getValue(), parseChecksum(tos.put(key, data))); + } + + ListObjectsRequest request = + ListObjectsRequest.builder().prefix(key).startAfter(null).maxKeys(-1).delimiter("/") + .build(); + Iterator<ListObjectsResponse> iter = tos.list(request).iterator(); + while (iter.hasNext()) { + List<ObjectInfo> objs = iter.next().objects(); + for (ObjectInfo obj : objs) { + assertEquals(checksum.getValue(), parseChecksum(obj.checksum())); + } + } + } + + @Test + public void testPutChecksum() { + String key = "testPutChecksum"; + byte[] data = TestUtility.rand(1024); + checksum.update(data, 0, data.length); + + byte[] checksumStr = tos.put(key, data); + + assertEquals(checksum.getValue(), parseChecksum(checksumStr)); + } + + @Test + public void testMPUChecksum() { + int partNumber = 2; + String key = "testMPUChecksum"; + MultipartUpload mpu = tos.createMultipartUpload(key); + byte[] data = TestUtility.rand(mpu.minPartSize() * partNumber); + checksum.update(data, 0, data.length); + + List<Part> parts = new ArrayList<>(); + for (int i = 0; i < partNumber; i++) { + final int index = i; + Part part = tos.uploadPart(key, mpu.uploadId(), index + 1, + () -> new ByteArrayInputStream(data, index * mpu.minPartSize(), mpu.minPartSize()), + mpu.minPartSize()); + parts.add(part); + } + + byte[] checksumStr = tos.completeUpload(key, mpu.uploadId(), parts); + assertEquals(checksum.getValue(), parseChecksum(checksumStr)); + } + + @Test + public void testAppendable() { + Assume.assumeFalse(tos.bucket().isDirectory()); + + // Test create object with append then append. + byte[] data = TestUtility.rand(256); + String prefix = "a/testAppendable/"; + String key = prefix + "object.txt"; + tos.append(key, data); + + tos.append(key, new byte[0]); + + // Test create object with put then append. + data = TestUtility.rand(256); + tos.put(key, data); + + assertThrows("Expect not appendable.", NotAppendableException.class, + () -> tos.append(key, new byte[0])); + + tos.delete(key); + } + + @Test + public void testDirectoryBucketAppendable() { + Assume.assumeTrue(tos.bucket().isDirectory()); + + byte[] data = TestUtility.rand(256); + String prefix = "a/testAppendable/"; + String key = prefix + "object.txt"; + tos.put(key, data); + + tos.append(key, new byte[1024]); + + tos.delete(key); + } + + private long parseChecksum(byte[] checksum) { + switch (type) { + case CRC32C: + case CRC64ECMA: + return Bytes.toLong(checksum); + default: + throw new IllegalArgumentException( + String.format("Checksum type %s is not supported by TOS.", type.name())); + } + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestTOSRetryPolicy.java b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestTOSRetryPolicy.java new file mode 100644 index 00000000000..59b12dcf791 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestTOSRetryPolicy.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object.tos; + +import com.volcengine.tos.TOSV2; +import com.volcengine.tos.TosException; +import com.volcengine.tos.TosServerException; +import com.volcengine.tos.comm.HttpStatus; +import com.volcengine.tos.model.RequestInfo; +import com.volcengine.tos.model.object.PutObjectOutput; +import com.volcengine.tos.model.object.UploadPartV2Output; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.tosfs.conf.ConfKeys; +import org.apache.hadoop.fs.tosfs.conf.TosKeys; +import org.apache.hadoop.fs.tosfs.object.InputStreamProvider; +import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory; +import org.apache.hadoop.fs.tosfs.object.Part; +import org.apache.hadoop.fs.tosfs.object.tos.auth.SimpleCredentialsProvider; +import org.apache.hadoop.fs.tosfs.util.TestUtility; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; +import javax.net.ssl.SSLException; + +import static org.apache.hadoop.fs.tosfs.object.tos.TOS.TOS_SCHEME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestTOSRetryPolicy { + + private final String retryKey = "retryKey.txt"; + private TOSV2 tosClient; + private DelegationClient client; + + @Before + public void setUp() { + client = createRetryableDelegationClient(); + tosClient = mock(TOSV2.class); + client.setClient(tosClient); + } + + @After + public void tearDown() throws IOException { + tosClient.close(); + client.close(); + } + + private DelegationClient createRetryableDelegationClient() { + Configuration conf = new Configuration(); + conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://tos-cn-beijing.ivolces.com"); + conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME); + conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, true); + conf.set(TosKeys.FS_TOS_ACCESS_KEY_ID, "ACCESS_KEY"); + conf.set(TosKeys.FS_TOS_SECRET_ACCESS_KEY, "SECRET_KEY"); + return new DelegationClientBuilder().bucket("test").conf(conf).build(); + } + + @Test + public void testShouldThrowExceptionAfterRunOut5RetryTimesIfNoRetryConfigSet() + throws IOException { + TOS storage = + (TOS) ObjectStorageFactory.create(TOS_SCHEME, TestUtility.bucket(), new Configuration()); + storage.setClient(client); + client.setMaxRetryTimes(5); + + PutObjectOutput response = mock(PutObjectOutput.class); + InputStreamProvider streamProvider = mock(InputStreamProvider.class); + + when(tosClient.putObject(any())).thenThrow( + new TosServerException(HttpStatus.INTERNAL_SERVER_ERROR), + new TosServerException(HttpStatus.TOO_MANY_REQUESTS), + new TosException(new SocketException("fake msg")), + new TosException(new UnknownHostException("fake msg")), + new TosException(new SSLException("fake msg")), + new TosException(new InterruptedException("fake msg")), + new TosException(new InterruptedException("fake msg"))).thenReturn(response); + + // after run out retry times, should throw exception + RuntimeException exception = + assertThrows(RuntimeException.class, () -> storage.put(retryKey, streamProvider, 0)); + assertTrue(exception instanceof TosException); + assertTrue(exception.getCause() instanceof SSLException); + + // the newStream method of stream provider should be called 5 times + verify(streamProvider, times(5)).newStream(); + + storage.close(); + } + + @Test + public void testShouldReturnResultAfterRetry8TimesIfConfigured10TimesRetry() + throws IOException { + TOS storage = + (TOS) ObjectStorageFactory.create(TOS_SCHEME, TestUtility.bucket(), new Configuration()); + DelegationClient delegationClient = createRetryableDelegationClient(); + delegationClient.setClient(tosClient); + delegationClient.setMaxRetryTimes(10); + storage.setClient(delegationClient); + + UploadPartV2Output response = new UploadPartV2Output().setPartNumber(1).setEtag("etag"); + + InputStream in = mock(InputStream.class); + InputStreamProvider streamProvider = mock(InputStreamProvider.class); + when(streamProvider.newStream()).thenReturn(in); + + when(tosClient.uploadPart(any())).thenThrow( + new TosServerException(HttpStatus.INTERNAL_SERVER_ERROR), + new TosServerException(HttpStatus.TOO_MANY_REQUESTS), + new TosException(new SocketException("fake msg")), + new TosException(new UnknownHostException("fake msg")), + new TosException(new SSLException("fake msg")), + new TosException(new InterruptedException("fake msg")), + new TosException(new InterruptedException("fake msg"))).thenReturn(response); + + // after run out retry times, should throw exception + Part part = storage.uploadPart(retryKey, "uploadId", 1, streamProvider, 0); + assertEquals(1, part.num()); + assertEquals("etag", part.eTag()); + + // the newStream method of stream provider should be called 8 times + verify(streamProvider, times(8)).newStream(); + + storage.close(); + } + + @Test + public void testShouldReturnResultIfRetry3TimesSucceed() throws IOException { + TOS storage = + (TOS) ObjectStorageFactory.create(TOS_SCHEME, TestUtility.bucket(), new Configuration()); + storage.setClient(client); + + PutObjectOutput response = mock(PutObjectOutput.class); + InputStreamProvider streamProvider = mock(InputStreamProvider.class); + + RequestInfo requestInfo = mock(RequestInfo.class); + Map<String, String> header = new HashMap<>(); + when(response.getRequestInfo()).thenReturn(requestInfo); + when(requestInfo.getHeader()).thenReturn(header); + + when(tosClient.putObject(any())).thenThrow( + new TosServerException(HttpStatus.INTERNAL_SERVER_ERROR), + new TosServerException(HttpStatus.TOO_MANY_REQUESTS)).thenReturn(response); + + storage.put(retryKey, streamProvider, 0); + // the newStream method of stream provider should be called 3 times + verify(streamProvider, times(3)).newStream(); + + storage.close(); + } + + @Test + public void testShouldNotRetryIfThrowUnRetryException() throws IOException { + TOS storage = + (TOS) ObjectStorageFactory.create(TOS_SCHEME, TestUtility.bucket(), new Configuration()); + storage.setClient(client); + + InputStreamProvider streamProvider = mock(InputStreamProvider.class); + + when(tosClient.putObject(any())).thenThrow( + new TosException(new NullPointerException("fake msg."))); + + RuntimeException exception = + assertThrows(RuntimeException.class, () -> storage.put(retryKey, streamProvider, 0)); + assertTrue(exception instanceof TosException); + assertTrue(exception.getCause() instanceof NullPointerException); + + // the newStream method of stream provider should be only called once. + verify(streamProvider, times(1)).newStream(); + + storage.close(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org