anujmodi2021 commented on code in PR #8153: URL: https://github.com/apache/hadoop/pull/8153#discussion_r2674954172
########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java: ########## @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azurebfs.constants.ReadType; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; + +/** + * Input stream implementation optimized for adaptive read patterns. + * This is the default implementation used for cases where user does not specify any input policy. + * It switches between sequential and random read optimizations based on the detected read pattern. + * It also keeps footer read and small file optimizations enabled. + */ +public class AbfsAdaptiveInputStream extends AbfsInputStream { + + /** + * Constructs AbfsAdaptiveInputStream + * @param client AbfsClient to be used for read operations + * @param statistics to recordinput stream statistics Review Comment: Taken ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRandomInputStream.java: ########## @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azurebfs.constants.ReadType; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; + +/** + * Input stream implementation optimized for random read patterns. + * This implementation disables prefetching of data blocks instead only + * reads ahead for a small range beyond what is requested by the caller. + */ +public class AbfsRandomInputStream extends AbfsInputStream { + + /** + * Constructs AbfsRandomInputStream + * @param client AbfsClient to be used for read operations + * @param statistics to record input stream statistics + * @param path file path + * @param contentLength file content length + * @param abfsInputStreamContext input stream context + * @param eTag file eTag + * @param tracingContext tracing context to trace the read operations + */ + public AbfsRandomInputStream( + final AbfsClient client, + final FileSystem.Statistics statistics, + final String path, + final long contentLength, + final AbfsInputStreamContext abfsInputStreamContext, + final String eTag, + TracingContext tracingContext) { + super(client, statistics, path, contentLength, + abfsInputStreamContext, eTag, tracingContext); + } + + /** + * inheritDoc Review Comment: Taken ########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java: ########## @@ -937,6 +952,92 @@ public void testPrefetchReadAddsPriorityHeaderWithDifferentConfigs() executePrefetchReadTest(tracingContext1, configuration1, false); } + /** + * Test to verify that the correct AbfsInputStream instance is created + * based on the read policy set in AbfsConfiguration. + */ + @Test + public void testAbfsInputStreamInstance() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + Path path = new Path("/testPath"); + fs.create(path).close(); + + // Assert that Sequential Read Policy uses Prefetch Input Stream + getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL); + InputStream stream = fs.open(path).getWrappedStream(); + assertThat(stream).isInstanceOf(AbfsPrefetchInputStream.class); + stream.close(); + + // Assert that Adaptive Read Policy uses Adaptive Input Stream + getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE); + stream = fs.open(path).getWrappedStream(); + assertThat(stream).isInstanceOf(AbfsAdaptiveInputStream.class); + stream.close(); + + // Assert that Parquet Read Policy uses Random Input Stream + getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_PARQUET); + stream = fs.open(path).getWrappedStream(); + assertThat(stream).isInstanceOf(AbfsRandomInputStream.class); + stream.close(); + + // Assert that Avro Read Policy uses Adaptive Input Stream + getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_AVRO); + stream = fs.open(path).getWrappedStream(); + assertThat(stream).isInstanceOf(AbfsAdaptiveInputStream.class); + stream.close(); + } + + @Test + public void testRandomInputStreamDoesNotQueuePrefetches() throws Exception { Review Comment: Taken -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
