[ 
https://issues.apache.org/jira/browse/HADOOP-19767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18050762#comment-18050762
 ] 

ASF GitHub Bot commented on HADOOP-19767:
-----------------------------------------

anujmodi2021 commented on code in PR #8153:
URL: https://github.com/apache/hadoop/pull/8153#discussion_r2674954172


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java:
##########
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.ReadType;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+/**
+ * Input stream implementation optimized for adaptive read patterns.
+ * This is the default implementation used for cases where user does not 
specify any input policy.
+ * It switches between sequential and random read optimizations based on the 
detected read pattern.
+ * It also keeps footer read and small file optimizations enabled.
+ */
+public class AbfsAdaptiveInputStream extends AbfsInputStream {
+
+  /**
+   * Constructs AbfsAdaptiveInputStream
+   * @param client AbfsClient to be used for read operations
+   * @param statistics to recordinput stream statistics

Review Comment:
   Taken



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRandomInputStream.java:
##########
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.ReadType;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+/**
+ * Input stream implementation optimized for random read patterns.
+ * This implementation disables prefetching of data blocks instead only
+ * reads ahead for a small range beyond what is requested by the caller.
+ */
+public class AbfsRandomInputStream extends AbfsInputStream {
+
+  /**
+   * Constructs AbfsRandomInputStream
+   * @param client AbfsClient to be used for read operations
+   * @param statistics to record input stream statistics
+   * @param path file path
+   * @param contentLength file content length
+   * @param abfsInputStreamContext input stream context
+   * @param eTag file eTag
+   * @param tracingContext tracing context to trace the read operations
+   */
+  public AbfsRandomInputStream(
+      final AbfsClient client,
+      final FileSystem.Statistics statistics,
+      final String path,
+      final long contentLength,
+      final AbfsInputStreamContext abfsInputStreamContext,
+      final String eTag,
+      TracingContext tracingContext) {
+    super(client, statistics, path, contentLength,
+        abfsInputStreamContext, eTag, tracingContext);
+  }
+
+  /**
+   * inheritDoc

Review Comment:
   Taken



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -937,6 +952,92 @@ public void 
testPrefetchReadAddsPriorityHeaderWithDifferentConfigs()
     executePrefetchReadTest(tracingContext1, configuration1, false);
   }
 
+  /**
+   * Test to verify that the correct AbfsInputStream instance is created
+   * based on the read policy set in AbfsConfiguration.
+   */
+  @Test
+  public void testAbfsInputStreamInstance() throws Exception {
+    AzureBlobFileSystem fs = getFileSystem();
+    Path path = new Path("/testPath");
+    fs.create(path).close();
+
+    // Assert that Sequential Read Policy uses Prefetch Input Stream
+    
getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL);
+    InputStream stream = fs.open(path).getWrappedStream();
+    assertThat(stream).isInstanceOf(AbfsPrefetchInputStream.class);
+    stream.close();
+
+    // Assert that Adaptive Read Policy uses Adaptive Input Stream
+    
getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE);
+    stream = fs.open(path).getWrappedStream();
+    assertThat(stream).isInstanceOf(AbfsAdaptiveInputStream.class);
+    stream.close();
+
+    // Assert that Parquet Read Policy uses Random Input Stream
+    
getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_PARQUET);
+    stream = fs.open(path).getWrappedStream();
+    assertThat(stream).isInstanceOf(AbfsRandomInputStream.class);
+    stream.close();
+
+    // Assert that Avro Read Policy uses Adaptive Input Stream
+    
getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_AVRO);
+    stream = fs.open(path).getWrappedStream();
+    assertThat(stream).isInstanceOf(AbfsAdaptiveInputStream.class);
+    stream.close();
+  }
+
+  @Test
+  public void testRandomInputStreamDoesNotQueuePrefetches() throws Exception {

Review Comment:
   Taken





> ABFS: [Read] Introduce Abfs Input Policy for detecting read patterns
> --------------------------------------------------------------------
>
>                 Key: HADOOP-19767
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19767
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.4.2
>            Reporter: Anuj Modi
>            Assignee: Anuj Modi
>            Priority: Major
>              Labels: pull-request-available
>
> Since the onset of ABFS Driver, there has been a single implementation of 
> AbfsInputStream. Different kinds of workloads require different heuristics to 
> give the best performance for that type of workload. For example: 
>  # Sequential Read Workloads like DFSIO and DistCP gain performance 
> improvement from prefetched 
>  # Random Read Workloads on other hand do not need Prefetches and enabling 
> prefetches for them is an overhead and TPS heavy 
>  # Query Workloads involving Parquet/ORC files benefit from improvements like 
> Footer Read and Small Files Reads
> To accomodate this we need to determine the pattern and accordingly create 
> Input Streams implemented for that particular pattern.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to