[
https://issues.apache.org/jira/browse/HADOOP-19102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833708#comment-17833708
]
ASF GitHub Bot commented on HADOOP-19102:
-----------------------------------------
steveloughran commented on code in PR #6617:
URL: https://github.com/apache/hadoop/pull/6617#discussion_r1550219003
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java:
##########
@@ -114,6 +117,75 @@ public static <T> T awaitFuture(final Future<T> future,
}
}
+ /**
+ * Evaluates a collection of futures and returns their results as a list.
+ * <p>
+ * This method blocks until all futures in the collection have completed.
+ * If any future throws an exception during its execution, this method
+ * extracts and rethrows that exception.
+ * </p>
+ *
+ * @param collection collection of futures to be evaluated
+ * @param <T> type of the result.
+ * @return the list of future's result, if all went well.
+ * @throws InterruptedIOException future was interrupted
+ * @throws IOException if something went wrong
+ * @throws RuntimeException any nested RTE thrown
+ */
+ public static <T> List<T> awaitFuture(final Collection<Future<T>> collection)
+ throws InterruptedIOException, IOException, RuntimeException {
+ List<T> results = new ArrayList<>();
+ try {
+ for (Future<T> future : collection) {
+ results.add(future.get());
+ }
+ return results;
+ } catch (InterruptedException e) {
+ throw (InterruptedIOException) new InterruptedIOException(e.toString())
+ .initCause(e);
+ } catch (ExecutionException e) {
+ return raiseInnerCause(e);
Review Comment:
could you do a log at debug of this, as i've discovered how much of a PITA
it is debugging future-related failures. thanks.
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
+import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
+
+import static
org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest.SHORTENED_GUID_LEN;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class AbfsInputStreamTestUtils {
+
+ public static final int HUNDRED = 100;
+
+ private final AbstractAbfsIntegrationTest abstractAbfsIntegrationTest;
+
+ public AbfsInputStreamTestUtils(AbstractAbfsIntegrationTest
abstractAbfsIntegrationTest) {
+ this.abstractAbfsIntegrationTest = abstractAbfsIntegrationTest;
+ }
+
+ private Path path(String filepath) throws IOException {
+ return abstractAbfsIntegrationTest.getFileSystem().makeQualified(
+ new Path(getTestPath(), getUniquePath(filepath)));
+ }
+
+ private Path getTestPath() {
+ Path path = new Path(UriUtils.generateUniqueTestPath());
+ return path;
+ }
+
+ /**
+ * Generate a unique path using the given filepath.
+ * @param filepath path string
+ * @return unique path created from filepath and a GUID
+ */
+ private Path getUniquePath(String filepath) {
+ if (filepath.equals("/")) {
+ return new Path(filepath);
+ }
+ return new Path(filepath + StringUtils
+ .right(UUID.randomUUID().toString(), SHORTENED_GUID_LEN));
+ }
+
+ public AzureBlobFileSystem getFileSystem(boolean readSmallFilesCompletely)
+ throws IOException {
+ final AzureBlobFileSystem fs = abstractAbfsIntegrationTest.getFileSystem();
+ abstractAbfsIntegrationTest.getAbfsStore(fs).getAbfsConfiguration()
+ .setReadSmallFilesCompletely(readSmallFilesCompletely);
+ abstractAbfsIntegrationTest.getAbfsStore(fs).getAbfsConfiguration()
+ .setOptimizeFooterRead(false);
+ abstractAbfsIntegrationTest.getAbfsStore(fs).getAbfsConfiguration()
+ .setIsChecksumValidationEnabled(true);
+ return fs;
+ }
+
+ public byte[] getRandomBytesArray(int length) {
+ final byte[] b = new byte[length];
+ new Random().nextBytes(b);
+ return b;
+ }
+
+ public Path createFileWithContent(FileSystem fs, String fileName,
+ byte[] fileContent) throws IOException {
+ Path testFilePath = path(fileName);
+ try (FSDataOutputStream oStream = fs.create(testFilePath)) {
+ oStream.write(fileContent);
+ oStream.flush();
+ }
+ return testFilePath;
+ }
+
+ public AzureBlobFileSystemStore getAbfsStore(FileSystem fs)
+ throws NoSuchFieldException, IllegalAccessException {
+ AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
+ Field abfsStoreField = AzureBlobFileSystem.class
+ .getDeclaredField("abfsStore");
+ abfsStoreField.setAccessible(true);
+ return (AzureBlobFileSystemStore) abfsStoreField.get(abfs);
+ }
+
+ public Map<String, Long> getInstrumentationMap(FileSystem fs)
+ throws NoSuchFieldException, IllegalAccessException {
+ AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
+ Field abfsCountersField = AzureBlobFileSystem.class
+ .getDeclaredField("abfsCounters");
+ abfsCountersField.setAccessible(true);
+ AbfsCounters abfsCounters = (AbfsCounters) abfsCountersField.get(abfs);
+ return abfsCounters.toMap();
+ }
+
+ public void assertContentReadCorrectly(byte[] actualFileContent, int from,
+ int len, byte[] contentRead, Path testFilePath) {
+ for (int i = 0; i < len; i++) {
Review Comment:
* start with an assert that the ranges are valid, e.g.
actualContent.length() < (from+len); same forContentread
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
+import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
+
+import static
org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest.SHORTENED_GUID_LEN;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class AbfsInputStreamTestUtils {
+
+ public static final int HUNDRED = 100;
+
+ private final AbstractAbfsIntegrationTest abstractAbfsIntegrationTest;
+
+ public AbfsInputStreamTestUtils(AbstractAbfsIntegrationTest
abstractAbfsIntegrationTest) {
+ this.abstractAbfsIntegrationTest = abstractAbfsIntegrationTest;
+ }
+
+ private Path path(String filepath) throws IOException {
+ return abstractAbfsIntegrationTest.getFileSystem().makeQualified(
+ new Path(getTestPath(), getUniquePath(filepath)));
+ }
+
+ private Path getTestPath() {
+ Path path = new Path(UriUtils.generateUniqueTestPath());
+ return path;
+ }
+
+ /**
+ * Generate a unique path using the given filepath.
+ * @param filepath path string
+ * @return unique path created from filepath and a GUID
+ */
+ private Path getUniquePath(String filepath) {
+ if (filepath.equals("/")) {
+ return new Path(filepath);
+ }
+ return new Path(filepath + StringUtils
+ .right(UUID.randomUUID().toString(), SHORTENED_GUID_LEN));
+ }
+
+ public AzureBlobFileSystem getFileSystem(boolean readSmallFilesCompletely)
Review Comment:
can you add javadocs for all these.
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java:
##########
@@ -114,6 +117,75 @@ public static <T> T awaitFuture(final Future<T> future,
}
}
+ /**
+ * Evaluates a collection of futures and returns their results as a list.
+ * <p>
+ * This method blocks until all futures in the collection have completed.
+ * If any future throws an exception during its execution, this method
+ * extracts and rethrows that exception.
+ * </p>
+ *
+ * @param collection collection of futures to be evaluated
+ * @param <T> type of the result.
+ * @return the list of future's result, if all went well.
+ * @throws InterruptedIOException future was interrupted
+ * @throws IOException if something went wrong
+ * @throws RuntimeException any nested RTE thrown
+ */
+ public static <T> List<T> awaitFuture(final Collection<Future<T>> collection)
Review Comment:
could you call the method awaitAllFutures()
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java:
##########
@@ -114,6 +117,75 @@ public static <T> T awaitFuture(final Future<T> future,
}
}
+ /**
+ * Evaluates a collection of futures and returns their results as a list.
+ * <p>
+ * This method blocks until all futures in the collection have completed.
+ * If any future throws an exception during its execution, this method
+ * extracts and rethrows that exception.
+ * </p>
+ *
+ * @param collection collection of futures to be evaluated
+ * @param <T> type of the result.
+ * @return the list of future's result, if all went well.
+ * @throws InterruptedIOException future was interrupted
+ * @throws IOException if something went wrong
+ * @throws RuntimeException any nested RTE thrown
+ */
+ public static <T> List<T> awaitFuture(final Collection<Future<T>> collection)
+ throws InterruptedIOException, IOException, RuntimeException {
+ List<T> results = new ArrayList<>();
+ try {
+ for (Future<T> future : collection) {
+ results.add(future.get());
+ }
+ return results;
+ } catch (InterruptedException e) {
+ throw (InterruptedIOException) new InterruptedIOException(e.toString())
+ .initCause(e);
+ } catch (ExecutionException e) {
+ return raiseInnerCause(e);
+ }
+ }
+
+ /**
+ * Evaluates a collection of futures and returns their results as a list,
+ * but only waits up to the specified timeout for each future to complete.
+ * <p>
+ * This method blocks until all futures in the collection have completed or
+ * the timeout expires, whichever happens first. If any future throws an
+ * exception during its execution, this method extracts and rethrows that
exception.
+ * </p>
+ *
+ * @param collection collection of futures to be evaluated
+ * @param timeout timeout to wait
+ * @param unit time unit.
+ * @param <T> type of the result.
+ * @return the list of future's result, if all went well.
+ * @throws InterruptedIOException future was interrupted
+ * @throws IOException if something went wrong
+ * @throws RuntimeException any nested RTE thrown
+ * @throws TimeoutException the future timed out.
+ */
+ public static <T> List<T> awaitFuture(final Collection<Future<T>> collection,
Review Comment:
* could you call the method awaitAllFutures()
* Prefer you pass in a Duration rather than timeout and unit; we can split
that up later. It is just a duration is *exactly* what we need
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
+import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
+
+import static
org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest.SHORTENED_GUID_LEN;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class AbfsInputStreamTestUtils {
+
+ public static final int HUNDRED = 100;
+
+ private final AbstractAbfsIntegrationTest abstractAbfsIntegrationTest;
+
+ public AbfsInputStreamTestUtils(AbstractAbfsIntegrationTest
abstractAbfsIntegrationTest) {
+ this.abstractAbfsIntegrationTest = abstractAbfsIntegrationTest;
+ }
+
+ private Path path(String filepath) throws IOException {
+ return abstractAbfsIntegrationTest.getFileSystem().makeQualified(
+ new Path(getTestPath(), getUniquePath(filepath)));
+ }
+
+ private Path getTestPath() {
+ Path path = new Path(UriUtils.generateUniqueTestPath());
+ return path;
+ }
+
+ /**
+ * Generate a unique path using the given filepath.
+ * @param filepath path string
+ * @return unique path created from filepath and a GUID
+ */
+ private Path getUniquePath(String filepath) {
+ if (filepath.equals("/")) {
+ return new Path(filepath);
+ }
+ return new Path(filepath + StringUtils
+ .right(UUID.randomUUID().toString(), SHORTENED_GUID_LEN));
+ }
+
+ public AzureBlobFileSystem getFileSystem(boolean readSmallFilesCompletely)
+ throws IOException {
+ final AzureBlobFileSystem fs = abstractAbfsIntegrationTest.getFileSystem();
+ abstractAbfsIntegrationTest.getAbfsStore(fs).getAbfsConfiguration()
+ .setReadSmallFilesCompletely(readSmallFilesCompletely);
+ abstractAbfsIntegrationTest.getAbfsStore(fs).getAbfsConfiguration()
+ .setOptimizeFooterRead(false);
+ abstractAbfsIntegrationTest.getAbfsStore(fs).getAbfsConfiguration()
+ .setIsChecksumValidationEnabled(true);
+ return fs;
+ }
+
+ public byte[] getRandomBytesArray(int length) {
+ final byte[] b = new byte[length];
+ new Random().nextBytes(b);
+ return b;
+ }
+
+ public Path createFileWithContent(FileSystem fs, String fileName,
+ byte[] fileContent) throws IOException {
+ Path testFilePath = path(fileName);
+ try (FSDataOutputStream oStream = fs.create(testFilePath)) {
+ oStream.write(fileContent);
+ oStream.flush();
+ }
+ return testFilePath;
+ }
+
+ public AzureBlobFileSystemStore getAbfsStore(FileSystem fs)
Review Comment:
kind of ugly. well, very ugly. but at least this way it stops it ever
getting into production code. So not going to suggest any changes
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java:
##########
@@ -54,9 +63,44 @@ public class ITestAbfsInputStreamReadFooter extends
ITestAbfsInputStream {
private static final int TEN = 10;
private static final int TWENTY = 20;
+ private static ExecutorService executorService;
+
+ private static final int SIZE_256_KB = 256 * ONE_KB;
+
+ private static final Integer[] FILE_SIZES = {
Review Comment:
having a smaller file size is good, but at the same time, so is the broader
coverage.
making it a scale test means that it'll run on a -Dscale option. you can
also set up up your ide or auth-keys to always run scale tests too, if you
haven't noticed
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
+import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
+
+import static
org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest.SHORTENED_GUID_LEN;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class AbfsInputStreamTestUtils {
+
+ public static final int HUNDRED = 100;
+
+ private final AbstractAbfsIntegrationTest abstractAbfsIntegrationTest;
+
+ public AbfsInputStreamTestUtils(AbstractAbfsIntegrationTest
abstractAbfsIntegrationTest) {
+ this.abstractAbfsIntegrationTest = abstractAbfsIntegrationTest;
+ }
+
+ private Path path(String filepath) throws IOException {
+ return abstractAbfsIntegrationTest.getFileSystem().makeQualified(
+ new Path(getTestPath(), getUniquePath(filepath)));
+ }
+
+ private Path getTestPath() {
+ Path path = new Path(UriUtils.generateUniqueTestPath());
+ return path;
+ }
+
+ /**
+ * Generate a unique path using the given filepath.
+ * @param filepath path string
+ * @return unique path created from filepath and a GUID
+ */
+ private Path getUniquePath(String filepath) {
+ if (filepath.equals("/")) {
+ return new Path(filepath);
+ }
+ return new Path(filepath + StringUtils
+ .right(UUID.randomUUID().toString(), SHORTENED_GUID_LEN));
+ }
+
+ public AzureBlobFileSystem getFileSystem(boolean readSmallFilesCompletely)
+ throws IOException {
+ final AzureBlobFileSystem fs = abstractAbfsIntegrationTest.getFileSystem();
+ abstractAbfsIntegrationTest.getAbfsStore(fs).getAbfsConfiguration()
+ .setReadSmallFilesCompletely(readSmallFilesCompletely);
+ abstractAbfsIntegrationTest.getAbfsStore(fs).getAbfsConfiguration()
+ .setOptimizeFooterRead(false);
+ abstractAbfsIntegrationTest.getAbfsStore(fs).getAbfsConfiguration()
+ .setIsChecksumValidationEnabled(true);
+ return fs;
+ }
+
+ public byte[] getRandomBytesArray(int length) {
+ final byte[] b = new byte[length];
+ new Random().nextBytes(b);
+ return b;
+ }
+
+ public Path createFileWithContent(FileSystem fs, String fileName,
+ byte[] fileContent) throws IOException {
+ Path testFilePath = path(fileName);
+ try (FSDataOutputStream oStream = fs.create(testFilePath)) {
+ oStream.write(fileContent);
+ oStream.flush();
+ }
+ return testFilePath;
+ }
+
+ public AzureBlobFileSystemStore getAbfsStore(FileSystem fs)
+ throws NoSuchFieldException, IllegalAccessException {
+ AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
+ Field abfsStoreField = AzureBlobFileSystem.class
+ .getDeclaredField("abfsStore");
+ abfsStoreField.setAccessible(true);
+ return (AzureBlobFileSystemStore) abfsStoreField.get(abfs);
+ }
+
+ public Map<String, Long> getInstrumentationMap(FileSystem fs)
+ throws NoSuchFieldException, IllegalAccessException {
+ AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
+ Field abfsCountersField = AzureBlobFileSystem.class
+ .getDeclaredField("abfsCounters");
+ abfsCountersField.setAccessible(true);
+ AbfsCounters abfsCounters = (AbfsCounters) abfsCountersField.get(abfs);
+ return abfsCounters.toMap();
+ }
+
+ public void assertContentReadCorrectly(byte[] actualFileContent, int from,
+ int len, byte[] contentRead, Path testFilePath) {
+ for (int i = 0; i < len; i++) {
+ assertEquals("The test file path is " + testFilePath, contentRead[i],
+ actualFileContent[i + from]);
+ }
+ }
+
+ public void assertBuffersAreNotEqual(byte[] actualContent,
+ byte[] contentRead, AbfsConfiguration conf, Path testFilePath) {
+ assertBufferEquality(actualContent, contentRead, conf, false,
testFilePath);
+ }
+
+ public void assertBuffersAreEqual(byte[] actualContent, byte[] contentRead,
+ AbfsConfiguration conf, Path testFilePath) {
+ assertBufferEquality(actualContent, contentRead, conf, true, testFilePath);
+ }
+
+ private void assertBufferEquality(byte[] actualContent, byte[] contentRead,
Review Comment:
needs javadocs which include detail that actually only up to read buffer
size of data is validated.
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
+import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
+
+import static
org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest.SHORTENED_GUID_LEN;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class AbfsInputStreamTestUtils {
+
+ public static final int HUNDRED = 100;
+
+ private final AbstractAbfsIntegrationTest abstractAbfsIntegrationTest;
+
+ public AbfsInputStreamTestUtils(AbstractAbfsIntegrationTest
abstractAbfsIntegrationTest) {
+ this.abstractAbfsIntegrationTest = abstractAbfsIntegrationTest;
+ }
+
+ private Path path(String filepath) throws IOException {
+ return abstractAbfsIntegrationTest.getFileSystem().makeQualified(
+ new Path(getTestPath(), getUniquePath(filepath)));
+ }
+
+ private Path getTestPath() {
+ Path path = new Path(UriUtils.generateUniqueTestPath());
+ return path;
+ }
+
+ /**
+ * Generate a unique path using the given filepath.
+ * @param filepath path string
+ * @return unique path created from filepath and a GUID
+ */
+ private Path getUniquePath(String filepath) {
+ if (filepath.equals("/")) {
+ return new Path(filepath);
+ }
+ return new Path(filepath + StringUtils
+ .right(UUID.randomUUID().toString(), SHORTENED_GUID_LEN));
+ }
+
+ public AzureBlobFileSystem getFileSystem(boolean readSmallFilesCompletely)
+ throws IOException {
+ final AzureBlobFileSystem fs = abstractAbfsIntegrationTest.getFileSystem();
+ abstractAbfsIntegrationTest.getAbfsStore(fs).getAbfsConfiguration()
+ .setReadSmallFilesCompletely(readSmallFilesCompletely);
+ abstractAbfsIntegrationTest.getAbfsStore(fs).getAbfsConfiguration()
+ .setOptimizeFooterRead(false);
+ abstractAbfsIntegrationTest.getAbfsStore(fs).getAbfsConfiguration()
+ .setIsChecksumValidationEnabled(true);
+ return fs;
+ }
+
+ public byte[] getRandomBytesArray(int length) {
+ final byte[] b = new byte[length];
+ new Random().nextBytes(b);
+ return b;
+ }
+
+ public Path createFileWithContent(FileSystem fs, String fileName,
+ byte[] fileContent) throws IOException {
+ Path testFilePath = path(fileName);
+ try (FSDataOutputStream oStream = fs.create(testFilePath)) {
+ oStream.write(fileContent);
+ oStream.flush();
+ }
+ return testFilePath;
+ }
+
+ public AzureBlobFileSystemStore getAbfsStore(FileSystem fs)
+ throws NoSuchFieldException, IllegalAccessException {
+ AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
+ Field abfsStoreField = AzureBlobFileSystem.class
+ .getDeclaredField("abfsStore");
+ abfsStoreField.setAccessible(true);
+ return (AzureBlobFileSystemStore) abfsStoreField.get(abfs);
+ }
+
+ public Map<String, Long> getInstrumentationMap(FileSystem fs)
+ throws NoSuchFieldException, IllegalAccessException {
+ AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
+ Field abfsCountersField = AzureBlobFileSystem.class
+ .getDeclaredField("abfsCounters");
+ abfsCountersField.setAccessible(true);
+ AbfsCounters abfsCounters = (AbfsCounters) abfsCountersField.get(abfs);
+ return abfsCounters.toMap();
+ }
+
+ public void assertContentReadCorrectly(byte[] actualFileContent, int from,
+ int len, byte[] contentRead, Path testFilePath) {
+ for (int i = 0; i < len; i++) {
+ assertEquals("The test file path is " + testFilePath, contentRead[i],
+ actualFileContent[i + from]);
+ }
+ }
+
+ public void assertBuffersAreNotEqual(byte[] actualContent,
+ byte[] contentRead, AbfsConfiguration conf, Path testFilePath) {
+ assertBufferEquality(actualContent, contentRead, conf, false,
testFilePath);
+ }
+
+ public void assertBuffersAreEqual(byte[] actualContent, byte[] contentRead,
+ AbfsConfiguration conf, Path testFilePath) {
+ assertBufferEquality(actualContent, contentRead, conf, true, testFilePath);
+ }
+
+ private void assertBufferEquality(byte[] actualContent, byte[] contentRead,
+ AbfsConfiguration conf, boolean assertEqual, Path testFilePath) {
+ int bufferSize = conf.getReadBufferSize();
+ int actualContentSize = actualContent.length;
+ int n = (actualContentSize < bufferSize) ? actualContentSize : bufferSize;
+ int matches = 0;
+ for (int i = 0; i < n; i++) {
+ if (actualContent[i] == contentRead[i]) {
+ matches++;
+ }
+ }
+ if (assertEqual) {
+ assertEquals("The test file path is " + testFilePath, n, matches);
+ } else {
+ assertNotEquals("The test file path is " + testFilePath, n, matches);
+ }
+ }
+
+ public void seek(FSDataInputStream iStream, long seekPos)
+ throws IOException {
+ AbfsInputStream abfsInputStream
+ = (AbfsInputStream) iStream.getWrappedStream();
+ verifyBeforeSeek(abfsInputStream);
+ iStream.seek(seekPos);
+ verifyAfterSeek(abfsInputStream, seekPos);
+ }
+
+ public void verifyBeforeSeek(AbfsInputStream abfsInputStream) {
+ assertEquals(0, abfsInputStream.getFCursor());
Review Comment:
use assertJ or at least add a description of what each field is being
checked for, so when there's a failure the error message is more meaningful
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
+import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
+
+import static
org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest.SHORTENED_GUID_LEN;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class AbfsInputStreamTestUtils {
+
+ public static final int HUNDRED = 100;
+
+ private final AbstractAbfsIntegrationTest abstractAbfsIntegrationTest;
+
+ public AbfsInputStreamTestUtils(AbstractAbfsIntegrationTest
abstractAbfsIntegrationTest) {
+ this.abstractAbfsIntegrationTest = abstractAbfsIntegrationTest;
+ }
+
+ private Path path(String filepath) throws IOException {
+ return abstractAbfsIntegrationTest.getFileSystem().makeQualified(
+ new Path(getTestPath(), getUniquePath(filepath)));
+ }
+
+ private Path getTestPath() {
+ Path path = new Path(UriUtils.generateUniqueTestPath());
+ return path;
+ }
+
+ /**
+ * Generate a unique path using the given filepath.
+ * @param filepath path string
+ * @return unique path created from filepath and a GUID
+ */
+ private Path getUniquePath(String filepath) {
+ if (filepath.equals("/")) {
+ return new Path(filepath);
+ }
+ return new Path(filepath + StringUtils
+ .right(UUID.randomUUID().toString(), SHORTENED_GUID_LEN));
+ }
+
+ public AzureBlobFileSystem getFileSystem(boolean readSmallFilesCompletely)
+ throws IOException {
+ final AzureBlobFileSystem fs = abstractAbfsIntegrationTest.getFileSystem();
+ abstractAbfsIntegrationTest.getAbfsStore(fs).getAbfsConfiguration()
+ .setReadSmallFilesCompletely(readSmallFilesCompletely);
+ abstractAbfsIntegrationTest.getAbfsStore(fs).getAbfsConfiguration()
+ .setOptimizeFooterRead(false);
+ abstractAbfsIntegrationTest.getAbfsStore(fs).getAbfsConfiguration()
+ .setIsChecksumValidationEnabled(true);
+ return fs;
+ }
+
+ public byte[] getRandomBytesArray(int length) {
+ final byte[] b = new byte[length];
+ new Random().nextBytes(b);
+ return b;
+ }
+
+ public Path createFileWithContent(FileSystem fs, String fileName,
+ byte[] fileContent) throws IOException {
+ Path testFilePath = path(fileName);
+ try (FSDataOutputStream oStream = fs.create(testFilePath)) {
+ oStream.write(fileContent);
+ oStream.flush();
+ }
+ return testFilePath;
+ }
+
+ public AzureBlobFileSystemStore getAbfsStore(FileSystem fs)
+ throws NoSuchFieldException, IllegalAccessException {
+ AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
+ Field abfsStoreField = AzureBlobFileSystem.class
+ .getDeclaredField("abfsStore");
+ abfsStoreField.setAccessible(true);
+ return (AzureBlobFileSystemStore) abfsStoreField.get(abfs);
+ }
+
+ public Map<String, Long> getInstrumentationMap(FileSystem fs)
+ throws NoSuchFieldException, IllegalAccessException {
+ AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
+ Field abfsCountersField = AzureBlobFileSystem.class
+ .getDeclaredField("abfsCounters");
+ abfsCountersField.setAccessible(true);
+ AbfsCounters abfsCounters = (AbfsCounters) abfsCountersField.get(abfs);
+ return abfsCounters.toMap();
+ }
+
+ public void assertContentReadCorrectly(byte[] actualFileContent, int from,
+ int len, byte[] contentRead, Path testFilePath) {
+ for (int i = 0; i < len; i++) {
+ assertEquals("The test file path is " + testFilePath, contentRead[i],
+ actualFileContent[i + from]);
+ }
+ }
+
+ public void assertBuffersAreNotEqual(byte[] actualContent,
+ byte[] contentRead, AbfsConfiguration conf, Path testFilePath) {
+ assertBufferEquality(actualContent, contentRead, conf, false,
testFilePath);
+ }
+
+ public void assertBuffersAreEqual(byte[] actualContent, byte[] contentRead,
+ AbfsConfiguration conf, Path testFilePath) {
+ assertBufferEquality(actualContent, contentRead, conf, true, testFilePath);
+ }
+
+ private void assertBufferEquality(byte[] actualContent, byte[] contentRead,
+ AbfsConfiguration conf, boolean assertEqual, Path testFilePath) {
+ int bufferSize = conf.getReadBufferSize();
+ int actualContentSize = actualContent.length;
+ int n = (actualContentSize < bufferSize) ? actualContentSize : bufferSize;
+ int matches = 0;
+ for (int i = 0; i < n; i++) {
+ if (actualContent[i] == contentRead[i]) {
+ matches++;
+ }
+ }
+ if (assertEqual) {
+ assertEquals("The test file path is " + testFilePath, n, matches);
+ } else {
+ assertNotEquals("The test file path is " + testFilePath, n, matches);
+ }
+ }
+
+ public void seek(FSDataInputStream iStream, long seekPos)
+ throws IOException {
+ AbfsInputStream abfsInputStream
+ = (AbfsInputStream) iStream.getWrappedStream();
+ verifyBeforeSeek(abfsInputStream);
+ iStream.seek(seekPos);
+ verifyAfterSeek(abfsInputStream, seekPos);
+ }
+
+ public void verifyBeforeSeek(AbfsInputStream abfsInputStream) {
Review Comment:
verify "what". Verifying that the steam has not yet done any seek? read?
> [ABFS]: FooterReadBufferSize should not be greater than readBufferSize
> ----------------------------------------------------------------------
>
> Key: HADOOP-19102
> URL: https://issues.apache.org/jira/browse/HADOOP-19102
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.4.0
> Reporter: Pranav Saxena
> Assignee: Pranav Saxena
> Priority: Major
> Labels: pull-request-available
>
> The method `optimisedRead` creates a buffer array of size `readBufferSize`.
> If footerReadBufferSize is greater than readBufferSize, abfs will attempt to
> read more data than the buffer array can hold, which causes an exception.
> Change: To avoid this, we will keep footerBufferSize =
> min(readBufferSizeConfig, footerBufferSizeConfig)
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]