[
https://issues.apache.org/jira/browse/HADOOP-19220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Vinay Devadiga updated HADOOP-19220:
------------------------------------
Issue Type: Test (was: Bug)
> S3A : S3AInputStream positioned readFully Expectation
> -----------------------------------------------------
>
> Key: HADOOP-19220
> URL: https://issues.apache.org/jira/browse/HADOOP-19220
> Project: Hadoop Common
> Issue Type: Test
> Components: fs/s3
> Reporter: Vinay Devadiga
> Priority: Major
>
> So basically i was testing to write some unit test - for S3AInputStream
> readFully Method
> package org.apache.hadoop.fs.s3a;
> import java.io.EOFException;
> import java.io.FilterInputStream;
> import java.io.IOException;
> import java.io.InputStream;
> import java.net.SocketException;
> import java.net.URI;
> import java.nio.ByteBuffer;
> import java.nio.charset.Charset;
> import java.nio.charset.StandardCharsets;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.TimeUnit;
> import org.apache.commons.io.IOUtils;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan;
> import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
> import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
> import org.apache.hadoop.util.functional.CallableRaisingIOE;
> import org.assertj.core.api.Assertions;
> import org.junit.Before;
> import org.junit.Test;
> import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
> import software.amazon.awssdk.awscore.exception.AwsServiceException;
> import software.amazon.awssdk.core.ResponseInputStream;
> import software.amazon.awssdk.http.AbortableInputStream;
> import software.amazon.awssdk.services.s3.S3Client;
> import software.amazon.awssdk.services.s3.model.GetObjectRequest;
> import software.amazon.awssdk.services.s3.model.GetObjectResponse;
> import static java.lang.Math.min;
> import static java.nio.charset.StandardCharsets.UTF_8;
> import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD;
> import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
> import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
> import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
> import static org.apache.hadoop.fs.s3a.Constants.S3_CLIENT_FACTORY_IMPL;
> import static org.apache.hadoop.util.functional.FutureIO.eval;
> import static org.assertj.core.api.Assertions.assertThat;
> import static
> org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
> import static org.mockito.ArgumentMatchers.any;
> import static org.mockito.Mockito.never;
> import static org.mockito.Mockito.verify;
> public class TestReadFullyAndPositionalRead {
> private S3AFileSystem fs;
> private S3AInputStream input;
> private S3Client s3;
> private static final String EMPTY = "";
> private static final String INPUT = "test_content";
> @Before
> public void setUp() throws IOException {
> Configuration conf = createConfiguration();
> fs = new S3AFileSystem();
> URI uri = URI.create(FS_S3A + "://" + MockS3AFileSystem.BUCKET);
> // Unset S3CSE property from config to avoid pathIOE.
> conf.unset(Constants.S3_ENCRYPTION_ALGORITHM);
> fs.initialize(uri, conf);
> s3 = fs.getS3AInternals().getAmazonS3Client("mocking");
> }
> public Configuration createConfiguration() {
> Configuration conf = new Configuration();
> conf.setClass(S3_CLIENT_FACTORY_IMPL, MockS3ClientFactory.class,
> S3ClientFactory.class);
> // use minimum multipart size for faster triggering
> conf.setLong(Constants.MULTIPART_SIZE, MULTIPART_MIN_SIZE);
> conf.setInt(Constants.S3A_BUCKET_PROBE, 1);
> // this is so stream draining is always blocking, allowing assertions
> to be safely made without worrying about any race conditions
> conf.setInt(ASYNC_DRAIN_THRESHOLD, Integer.MAX_VALUE);
> // set the region to avoid the getBucketLocation on FS init.
> conf.set(AWS_REGION, "eu-west-1");
> return conf;
> }
> @Test
> public void testReadFullyFromBeginning() throws IOException {
> input = getMockedS3AInputStream(INPUT);
> byte[] byteArray = new byte[INPUT.length()];
> input.readFully(0, byteArray, 0, byteArray.length);
> assertThat(new String(byteArray, UTF_8)).isEqualTo(INPUT);
> }
> @Test
> public void testReadFullyWithOffsetAndLength() throws IOException {
> input = getMockedS3AInputStream(INPUT);
> byte[] byteArray = new byte[4];
> input.readFully(5, byteArray, 0, 4);
> assertThat(new String(byteArray, UTF_8)).isEqualTo("cont");
> }
> @Test
> public void testReadFullyWithOffsetBeyondStream() throws IOException {
> input = getMockedS3AInputStream(INPUT);
> byte[] byteArray = new byte[10];
> assertThatExceptionOfType(EOFException.class)
> .isThrownBy(() -> input.readFully(20, byteArray, 0, 10));
> }
> private S3AInputStream getMockedS3AInputStream(String input) {
> Path path = new Path("test-path");
> String eTag = "test-etag";
> String versionId = "test-version-id";
> String owner = "test-owner";
> S3AFileStatus s3AFileStatus = new S3AFileStatus(input.length(), 0,
> path, input.length(), owner, eTag, versionId);
> S3ObjectAttributes s3ObjectAttributes = new S3ObjectAttributes(
> fs.getBucket(), path, fs.pathToKey(path),
> fs.getS3EncryptionAlgorithm(), new EncryptionSecrets().getEncryptionKey(),
> eTag, versionId, input.length());
> S3AReadOpContext s3AReadOpContext =
> fs.createReadContext(s3AFileStatus, NoopSpan.INSTANCE);
> return new S3AInputStream(s3AReadOpContext, s3ObjectAttributes,
> getMockedInputStreamCallback(input),
> s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics(),
> BlockingThreadPoolExecutorService.newInstance(2, 40, 60, TimeUnit.SECONDS,
> "s3a-bounded"));
> }
> private S3AInputStream.InputStreamCallbacks
> getMockedInputStreamCallback(String input) {
> GetObjectResponse objectResponse =
> GetObjectResponse.builder().eTag("test-etag").build();
> ResponseInputStream<GetObjectResponse>[] responseInputStreams = new
> ResponseInputStream[] {
> getMockedInputStream(objectResponse, true, input),
> getMockedInputStream(objectResponse, true, input),
> getMockedInputStream(objectResponse, false, input)
> };
> return new S3AInputStream.InputStreamCallbacks() {
> private Integer mockedS3ObjectIndex = 0;
> @Override
> public ResponseInputStream<GetObjectResponse>
> getObject(GetObjectRequest request) {
> mockedS3ObjectIndex++;
> if (mockedS3ObjectIndex == 3) {
> throw AwsServiceException.builder()
> .message("Failed to get S3Object")
>
> .awsErrorDetails(AwsErrorDetails.builder().errorCode("test-code").build())
> .build();
> }
> return responseInputStreams[min(mockedS3ObjectIndex,
> responseInputStreams.length) - 1];
> }
> @Override
> public GetObjectRequest.Builder newGetRequestBuilder(String key) {
> return
> GetObjectRequest.builder().bucket(fs.getBucket()).key(key);
> }
> @Override
> public <T> CompletableFuture<T> submit(final
> CallableRaisingIOE<T> task) {
> return eval(task);
> }
> @Override
> public void close() {
> }
> };
> }
> private ResponseInputStream<GetObjectResponse> getMockedInputStream(
> GetObjectResponse response, boolean success, String input) {
> FilterInputStream stream = new
> FilterInputStream(AbortableInputStream.create(
> IOUtils.toInputStream(input, StandardCharsets.UTF_8), () -> {
> })) {
> @Override
> public void close() throws IOException {
> super.close();
> if (!success) {
> throw new SocketException("Socket closed");
> }
> }
> };
> return new ResponseInputStream<>(response, stream);
> }
> }
> Now this -
> [ERROR]
> TestReadFullyAndPositionalRead.testPositionalReadWithOffsetAndLength:136
> expected:<"[con]t"> but was:<"[tes]t">
> is the failure its not adhering to the position parameter and reading the
> inital bytes only
> What is the expectation of the readFully Function in S3AInputStream?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]