anujmodi2021 commented on code in PR #7837: URL: https://github.com/apache/hadoop/pull/7837#discussion_r2250349750
########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderVersion.java: ########## @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.utils; + +public enum TracingHeaderVersion { + + V0("", 8), + V1("v1", 13); Review Comment: We will have simple version strings like v0, v1, v2 and so on. This will help reduce char count in clientReqId. With any new changes in the schema of Tracing Header (add/delete/rearrange) we need to bump up version and update the schema and getCurrentVersion method to return the latest version. ########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java: ########## @@ -781,6 +794,132 @@ public void testDefaultReadaheadQueueDepth() throws Exception { in.close(); } + @Test + public void testReadTypeInTracingContextHeader() throws Exception { + AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem()); + AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore()); + AbfsConfiguration spiedConfig = Mockito.spy(spiedStore.getAbfsConfiguration()); + AbfsClient spiedClient = Mockito.spy(spiedStore.getClient()); + Mockito.doReturn(ONE_MB).when(spiedConfig).getReadBufferSize(); + Mockito.doReturn(ONE_MB).when(spiedConfig).getReadAheadBlockSize(); + Mockito.doReturn(spiedClient).when(spiedStore).getClient(); + Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore(); + Mockito.doReturn(spiedConfig).when(spiedStore).getAbfsConfiguration(); + int numOfReadCalls = 0; + int fileSize = 0; + + /* + * Test to verify Normal Read Type. + * Disabling read ahead ensures that read type is normal read. + */ + fileSize = 3 * ONE_MB; // To make sure multiple blocks are read. + numOfReadCalls += 3; // 3 blocks of 1MB each. + doReturn(false).when(spiedConfig).isReadAheadV2Enabled(); + doReturn(false).when(spiedConfig).isReadAheadEnabled(); + testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, NORMAL_READ, numOfReadCalls); + + /* + * Test to verify Missed Cache Read Type. + * Setting read ahead depth to 0 ensure that nothing can be got from prefetch. + * In such a case Input Stream will do a sequential read with missed cache read type. + */ + fileSize = ONE_MB; // To make sure only one block is read. + numOfReadCalls += 1; // 1 block of 1MB. + Mockito.doReturn(0).when(spiedConfig).getReadAheadQueueDepth(); + doReturn(true).when(spiedConfig).isReadAheadEnabled(); + testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, MISSEDCACHE_READ, numOfReadCalls); + + /* + * Test to verify Prefetch Read Type. + * Setting read ahead depth to 2 with prefetch enabled ensures that prefetch is done. + * First read here might be Normal or Missed Cache but the rest 2 should be Prefetched Read. + */ + fileSize = 3 * ONE_MB; // To make sure multiple blocks are read. + numOfReadCalls += 3; + doReturn(true).when(spiedConfig).isReadAheadEnabled(); + Mockito.doReturn(3).when(spiedConfig).getReadAheadQueueDepth(); + testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, PREFETCH_READ, numOfReadCalls); + + /* + * Test to verify Footer Read Type. + * Having file size less than footer read size and disabling small file opt + */ + fileSize = 8 * ONE_KB; + numOfReadCalls += 1; // Full file will be read along with footer. + doReturn(false).when(spiedConfig).readSmallFilesCompletely(); + doReturn(true).when(spiedConfig).optimizeFooterRead(); + testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, FOOTER_READ, numOfReadCalls); + + /* + * Test to verify Small File Read Type. + * Having file size less than footer read size and disabling small file opt + */ + fileSize = 8 * ONE_KB; + numOfReadCalls += 1; // Full file will be read along with footer. + doReturn(true).when(spiedConfig).readSmallFilesCompletely(); + doReturn(false).when(spiedConfig).optimizeFooterRead(); + testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, SMALLFILE_READ, numOfReadCalls); + } + + private void testReadTypeInTracingContextHeaderInternal(AzureBlobFileSystem fs, int fileSize, ReadType readType, int numOfReadCalls) throws Exception { + Path testPath = new Path("testFile"); + byte[] fileContent = getRandomBytesArray(fileSize); + try (FSDataOutputStream oStream = fs.create(testPath)) { + oStream.write(fileContent); + oStream.flush(); + } + try (FSDataInputStream iStream = fs.open(testPath)) { + int bytesRead = iStream.read(new byte[fileSize], 0, + fileSize); + Assertions.assertThat(fileSize) + .describedAs("Read size should match file size") + .isEqualTo(bytesRead); + } + + ArgumentCaptor<String> captor1 = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<Long> captor2 = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor<byte[]> captor3 = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor<Integer> captor4 = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor<Integer> captor5 = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor<String> captor6 = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<String> captor7 = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<ContextEncryptionAdapter> captor8 = ArgumentCaptor.forClass(ContextEncryptionAdapter.class); + ArgumentCaptor<TracingContext> captor9 = ArgumentCaptor.forClass(TracingContext.class); + + verify(fs.getAbfsStore().getClient(), times(numOfReadCalls)).read( + captor1.capture(), captor2.capture(), captor3.capture(), + captor4.capture(), captor5.capture(), captor6.capture(), + captor7.capture(), captor8.capture(), captor9.capture()); + TracingContext tracingContext = captor9.getAllValues().get(numOfReadCalls - 1); + verifyHeaderForReadTypeInTracingContextHeader(tracingContext, readType); + } + + private void verifyHeaderForReadTypeInTracingContextHeader(TracingContext tracingContext, ReadType readType) { + AbfsHttpOperation mockOp = Mockito.mock(AbfsHttpOperation.class); + doReturn(EMPTY_STRING).when(mockOp).getTracingContextSuffix(); + tracingContext.constructHeader(mockOp, null, null); + String[] idList = tracingContext.getHeader().split(COLON, SPLIT_NO_LIMIT); + Assertions.assertThat(idList).describedAs("Client Request Id should have all fields").hasSize( + TracingHeaderVersion.getCurrentVersion().getFieldCount()); + Assertions.assertThat(tracingContext.getHeader()).describedAs("Operation Type Should Be Read") + .contains(FSOperationType.READ.toString()); + Assertions.assertThat(tracingContext.getHeader()).describedAs("Read type in tracing context header should match") + .contains(readType.toString()); + } + +// private testReadTypeInTracingContextHeaderInternal(ReadType readType) throws Exception { Review Comment: Removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org