[
https://issues.apache.org/jira/browse/HADOOP-19767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18050572#comment-18050572
]
ASF GitHub Bot commented on HADOOP-19767:
-----------------------------------------
manika137 commented on code in PR #8153:
URL: https://github.com/apache/hadoop/pull/8153#discussion_r2671707670
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -937,6 +952,92 @@ public void
testPrefetchReadAddsPriorityHeaderWithDifferentConfigs()
executePrefetchReadTest(tracingContext1, configuration1, false);
}
+ /**
+ * Test to verify that the correct AbfsInputStream instance is created
+ * based on the read policy set in AbfsConfiguration.
+ */
+ @Test
+ public void testAbfsInputStreamInstance() throws Exception {
+ AzureBlobFileSystem fs = getFileSystem();
+ Path path = new Path("/testPath");
+ fs.create(path).close();
+
+ // Assert that Sequential Read Policy uses Prefetch Input Stream
+
getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL);
+ InputStream stream = fs.open(path).getWrappedStream();
+ assertThat(stream).isInstanceOf(AbfsPrefetchInputStream.class);
+ stream.close();
+
+ // Assert that Adaptive Read Policy uses Adaptive Input Stream
+
getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE);
+ stream = fs.open(path).getWrappedStream();
+ assertThat(stream).isInstanceOf(AbfsAdaptiveInputStream.class);
+ stream.close();
+
+ // Assert that Parquet Read Policy uses Random Input Stream
+
getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_PARQUET);
+ stream = fs.open(path).getWrappedStream();
+ assertThat(stream).isInstanceOf(AbfsRandomInputStream.class);
+ stream.close();
+
+ // Assert that Avro Read Policy uses Adaptive Input Stream
+
getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_AVRO);
+ stream = fs.open(path).getWrappedStream();
+ assertThat(stream).isInstanceOf(AbfsAdaptiveInputStream.class);
+ stream.close();
+ }
+
+ @Test
+ public void testRandomInputStreamDoesNotQueuePrefetches() throws Exception {
Review Comment:
nit: javadoc
> ABFS: [Read] Introduce Abfs Input Policy for detecting read patterns
> --------------------------------------------------------------------
>
> Key: HADOOP-19767
> URL: https://issues.apache.org/jira/browse/HADOOP-19767
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.4.2
> Reporter: Anuj Modi
> Assignee: Anuj Modi
> Priority: Major
> Labels: pull-request-available
>
> Since the onset of ABFS Driver, there has been a single implementation of
> AbfsInputStream. Different kinds of workloads require different heuristics to
> give the best performance for that type of workload. For example:
> # Sequential Read Workloads like DFSIO and DistCP gain performance
> improvement from prefetched
> # Random Read Workloads on other hand do not need Prefetches and enabling
> prefetches for them is an overhead and TPS heavy
> # Query Workloads involving Parquet/ORC files benefit from improvements like
> Footer Read and Small Files Reads
> To accomodate this we need to determine the pattern and accordingly create
> Input Streams implemented for that particular pattern.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]