anmolanmol1234 commented on code in PR #7272:
URL: https://github.com/apache/hadoop/pull/7272#discussion_r1909939089
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java:
##########
@@ -149,4 +197,954 @@ public void testCloseOfDataBlockOnAppendComplete() throws
Exception {
}
}
}
-}
+
+ /**
+ * Creates a file over DFS and attempts to append over Blob.
+ * It should fallback to DFS when appending to the file fails.
+ *
+ * @throws IOException if an I/O error occurs.
+ */
+ @Test
+ public void testCreateOverDfsAppendOverBlob() throws IOException {
+ Assume.assumeFalse(
+ getConfiguration().getBoolean(FS_AZURE_TEST_APPENDBLOB_ENABLED,
false));
+ final AzureBlobFileSystem fs = getFileSystem();
+ Path testPath = path(TEST_FILE_PATH);
+ AzureBlobFileSystemStore.Permissions permissions
+ = new AzureBlobFileSystemStore.Permissions(false,
+ FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
+ fs.getAbfsStore().getClientHandler().getDfsClient().
+ createPath(makeQualified(testPath).toUri().getPath(), true, false,
+ permissions, false, null,
+ null, getTestTracingContext(fs, true));
+ fs.getAbfsStore()
+ .getAbfsConfiguration()
+ .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name());
+ FSDataOutputStream outputStream = fs.append(testPath);
+ AzureIngressHandler ingressHandler
+ = ((AbfsOutputStream)
outputStream.getWrappedStream()).getIngressHandler();
+ AbfsClient client = ingressHandler.getClient();
+ Assert.assertTrue("Blob client was not used before fallback",
+ client instanceof AbfsBlobClient);
+ outputStream.write(TEN);
+ outputStream.hsync();
+ outputStream.write(TWENTY);
+ outputStream.hsync();
+ outputStream.write(THIRTY);
+ outputStream.hsync();
+ AzureIngressHandler ingressHandlerFallback
+ = ((AbfsOutputStream)
outputStream.getWrappedStream()).getIngressHandler();
+ AbfsClient clientFallback = ingressHandlerFallback.getClient();
+ Assert.assertTrue("DFS client was not used after fallback",
+ clientFallback instanceof AbfsDfsClient);
+ }
+
+ /**
+ * Creates a file over Blob and attempts to append over DFS.
+ * It should fallback to Blob when appending to the file fails.
+ *
+ * @throws IOException if an I/O error occurs.
+ */
+ @Test
+ public void testCreateOverBlobAppendOverDfs() throws IOException {
+ Assume.assumeFalse(
+ getConfiguration().getBoolean(FS_AZURE_TEST_APPENDBLOB_ENABLED,
+ false));
+ Configuration conf = getRawConfiguration();
+ conf.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
+ conf.set(FS_AZURE_INGRESS_SERVICE_TYPE,
+ String.valueOf(AbfsServiceType.DFS));
+ final AzureBlobFileSystem fs = (AzureBlobFileSystem)
FileSystem.newInstance(
+ conf);
+ Path testPath = path(TEST_FILE_PATH);
+ AzureBlobFileSystemStore.Permissions permissions
+ = new AzureBlobFileSystemStore.Permissions(false,
+ FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
+ fs.getAbfsStore()
+ .getAbfsConfiguration()
+ .setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
+ fs.getAbfsStore().getAbfsConfiguration().set(FS_AZURE_INGRESS_SERVICE_TYPE,
+ String.valueOf(AbfsServiceType.DFS));
+ fs.getAbfsStore().getClientHandler().getBlobClient().
+ createPath(makeQualified(testPath).toUri().getPath(), true, false,
+ permissions, false, null,
+ null, getTestTracingContext(fs, true));
+ FSDataOutputStream outputStream = fs.append(testPath);
+ outputStream.write(TEN);
+ outputStream.hsync();
+ outputStream.write(TWENTY);
+ outputStream.hsync();
+ outputStream.write(THIRTY);
+ outputStream.hsync();
+ }
+
+ /**
+ * Creates an Append Blob over Blob and attempts to append over DFS.
+ * It should fallback to Blob when appending to the file fails.
+ *
+ * @throws IOException if an I/O error occurs.
+ */
+ @Test
+ public void testCreateAppendBlobOverBlobEndpointAppendOverDfs()
+ throws IOException, NoSuchFieldException, IllegalAccessException {
+ Configuration conf = getRawConfiguration();
+ conf.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
+ conf.set(FS_AZURE_INGRESS_SERVICE_TYPE,
+ String.valueOf(AbfsServiceType.DFS));
+ final AzureBlobFileSystem fs = Mockito.spy(
+ (AzureBlobFileSystem) FileSystem.newInstance(conf));
+ AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+ Mockito.doReturn(true).when(store).isAppendBlobKey(anyString());
+
+ // Set abfsStore as our mocked value.
+ Field privateField = AzureBlobFileSystem.class.getDeclaredField(
+ "abfsStore");
+ privateField.setAccessible(true);
+ privateField.set(fs, store);
+ Path testPath = path(TEST_FILE_PATH);
+ AzureBlobFileSystemStore.Permissions permissions
+ = new AzureBlobFileSystemStore.Permissions(false,
+ FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
+ fs.getAbfsStore()
+ .getAbfsConfiguration()
+ .setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
+ fs.getAbfsStore().getAbfsConfiguration().set(FS_AZURE_INGRESS_SERVICE_TYPE,
+ String.valueOf(AbfsServiceType.DFS));
+ fs.getAbfsStore().getClientHandler().getBlobClient().
+ createPath(makeQualified(testPath).toUri().getPath(), true, false,
+ permissions, true, null,
+ null, getTestTracingContext(fs, true));
+ FSDataOutputStream outputStream = fs.append(testPath);
+ outputStream.write(TEN);
+ outputStream.hsync();
+ outputStream.write(TWENTY);
+ outputStream.hsync();
+ outputStream.write(THIRTY);
+ outputStream.hsync();
+ }
+
+ /**
+ * Creates an append Blob over DFS and attempts to append over Blob.
+ * It should fallback to DFS when appending to the file fails.
+ *
+ * @throws IOException if an I/O error occurs.
+ */
+ @Test
+ public void testCreateAppendBlobOverDfsEndpointAppendOverBlob()
+ throws IOException, NoSuchFieldException, IllegalAccessException {
+ Assume.assumeTrue(
+ "FNS does not support append blob creation for DFS endpoint",
+ getIsNamespaceEnabled(getFileSystem()));
+ final AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
+ AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+ Mockito.doReturn(true).when(store).isAppendBlobKey(anyString());
+
+ // Set abfsStore as our mocked value.
+ Field privateField = AzureBlobFileSystem.class.getDeclaredField(
+ "abfsStore");
+ privateField.setAccessible(true);
+ privateField.set(fs, store);
+ Path testPath = path(TEST_FILE_PATH);
+ AzureBlobFileSystemStore.Permissions permissions
+ = new AzureBlobFileSystemStore.Permissions(false,
+ FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
+ fs.getAbfsStore().getClientHandler().getDfsClient().
+ createPath(makeQualified(testPath).toUri().getPath(), true, false,
+ permissions, true, null,
+ null, getTestTracingContext(fs, true));
+ fs.getAbfsStore()
+ .getAbfsConfiguration()
+ .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name());
+ FSDataOutputStream outputStream = fs.append(testPath);
+ AzureIngressHandler ingressHandler
+ = ((AbfsOutputStream)
outputStream.getWrappedStream()).getIngressHandler();
+ AbfsClient client = ingressHandler.getClient();
+ Assert.assertTrue("Blob client was not used before fallback",
+ client instanceof AbfsBlobClient);
+ outputStream.write(TEN);
+ outputStream.hsync();
+ outputStream.write(TWENTY);
+ outputStream.hsync();
+ outputStream.write(THIRTY);
+ outputStream.flush();
+ AzureIngressHandler ingressHandlerFallback
+ = ((AbfsOutputStream)
outputStream.getWrappedStream()).getIngressHandler();
+ AbfsClient clientFallback = ingressHandlerFallback.getClient();
+ Assert.assertTrue("DFS client was not used after fallback",
+ clientFallback instanceof AbfsDfsClient);
+ }
+
+
+ /**
+ * Tests the correct retrieval of the AzureIngressHandler based on the
configured ingress service type.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Test
+ public void testValidateIngressHandler() throws IOException {
+ Configuration configuration = getRawConfiguration();
+ configuration.set(FS_AZURE_INGRESS_SERVICE_TYPE,
+ AbfsServiceType.BLOB.name());
+ AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
Review Comment:
taken
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]