anmolanmol1234 commented on code in PR #7272:
URL: https://github.com/apache/hadoop/pull/7272#discussion_r1909961269
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java:
##########
@@ -149,4 +197,954 @@ public void testCloseOfDataBlockOnAppendComplete() throws
Exception {
}
}
}
-}
+
+ /**
+ * Creates a file over DFS and attempts to append over Blob.
+ * It should fallback to DFS when appending to the file fails.
+ *
+ * @throws IOException if an I/O error occurs.
+ */
+ @Test
+ public void testCreateOverDfsAppendOverBlob() throws IOException {
+ Assume.assumeFalse(
+ getConfiguration().getBoolean(FS_AZURE_TEST_APPENDBLOB_ENABLED,
false));
+ final AzureBlobFileSystem fs = getFileSystem();
+ Path testPath = path(TEST_FILE_PATH);
+ AzureBlobFileSystemStore.Permissions permissions
+ = new AzureBlobFileSystemStore.Permissions(false,
+ FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
+ fs.getAbfsStore().getClientHandler().getDfsClient().
+ createPath(makeQualified(testPath).toUri().getPath(), true, false,
+ permissions, false, null,
+ null, getTestTracingContext(fs, true));
+ fs.getAbfsStore()
+ .getAbfsConfiguration()
+ .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name());
+ FSDataOutputStream outputStream = fs.append(testPath);
+ AzureIngressHandler ingressHandler
+ = ((AbfsOutputStream)
outputStream.getWrappedStream()).getIngressHandler();
+ AbfsClient client = ingressHandler.getClient();
+ Assert.assertTrue("Blob client was not used before fallback",
+ client instanceof AbfsBlobClient);
+ outputStream.write(TEN);
+ outputStream.hsync();
+ outputStream.write(TWENTY);
+ outputStream.hsync();
+ outputStream.write(THIRTY);
+ outputStream.hsync();
+ AzureIngressHandler ingressHandlerFallback
+ = ((AbfsOutputStream)
outputStream.getWrappedStream()).getIngressHandler();
+ AbfsClient clientFallback = ingressHandlerFallback.getClient();
+ Assert.assertTrue("DFS client was not used after fallback",
+ clientFallback instanceof AbfsDfsClient);
+ }
+
+ /**
+ * Creates a file over Blob and attempts to append over DFS.
+ * It should fallback to Blob when appending to the file fails.
+ *
+ * @throws IOException if an I/O error occurs.
+ */
+ @Test
+ public void testCreateOverBlobAppendOverDfs() throws IOException {
+ Assume.assumeFalse(
+ getConfiguration().getBoolean(FS_AZURE_TEST_APPENDBLOB_ENABLED,
+ false));
+ Configuration conf = getRawConfiguration();
+ conf.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
+ conf.set(FS_AZURE_INGRESS_SERVICE_TYPE,
+ String.valueOf(AbfsServiceType.DFS));
+ final AzureBlobFileSystem fs = (AzureBlobFileSystem)
FileSystem.newInstance(
+ conf);
+ Path testPath = path(TEST_FILE_PATH);
+ AzureBlobFileSystemStore.Permissions permissions
+ = new AzureBlobFileSystemStore.Permissions(false,
+ FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
+ fs.getAbfsStore()
+ .getAbfsConfiguration()
+ .setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
+ fs.getAbfsStore().getAbfsConfiguration().set(FS_AZURE_INGRESS_SERVICE_TYPE,
+ String.valueOf(AbfsServiceType.DFS));
+ fs.getAbfsStore().getClientHandler().getBlobClient().
+ createPath(makeQualified(testPath).toUri().getPath(), true, false,
+ permissions, false, null,
+ null, getTestTracingContext(fs, true));
+ FSDataOutputStream outputStream = fs.append(testPath);
+ outputStream.write(TEN);
+ outputStream.hsync();
+ outputStream.write(TWENTY);
+ outputStream.hsync();
+ outputStream.write(THIRTY);
+ outputStream.hsync();
+ }
+
+ /**
+ * Creates an Append Blob over Blob and attempts to append over DFS.
+ * It should fallback to Blob when appending to the file fails.
+ *
+ * @throws IOException if an I/O error occurs.
+ */
+ @Test
+ public void testCreateAppendBlobOverBlobEndpointAppendOverDfs()
+ throws IOException, NoSuchFieldException, IllegalAccessException {
+ Configuration conf = getRawConfiguration();
+ conf.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
+ conf.set(FS_AZURE_INGRESS_SERVICE_TYPE,
+ String.valueOf(AbfsServiceType.DFS));
+ final AzureBlobFileSystem fs = Mockito.spy(
+ (AzureBlobFileSystem) FileSystem.newInstance(conf));
+ AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+ Mockito.doReturn(true).when(store).isAppendBlobKey(anyString());
+
+ // Set abfsStore as our mocked value.
+ Field privateField = AzureBlobFileSystem.class.getDeclaredField(
+ "abfsStore");
+ privateField.setAccessible(true);
+ privateField.set(fs, store);
+ Path testPath = path(TEST_FILE_PATH);
+ AzureBlobFileSystemStore.Permissions permissions
+ = new AzureBlobFileSystemStore.Permissions(false,
+ FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
+ fs.getAbfsStore()
+ .getAbfsConfiguration()
+ .setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
+ fs.getAbfsStore().getAbfsConfiguration().set(FS_AZURE_INGRESS_SERVICE_TYPE,
+ String.valueOf(AbfsServiceType.DFS));
+ fs.getAbfsStore().getClientHandler().getBlobClient().
+ createPath(makeQualified(testPath).toUri().getPath(), true, false,
+ permissions, true, null,
+ null, getTestTracingContext(fs, true));
+ FSDataOutputStream outputStream = fs.append(testPath);
+ outputStream.write(TEN);
+ outputStream.hsync();
+ outputStream.write(TWENTY);
+ outputStream.hsync();
+ outputStream.write(THIRTY);
+ outputStream.hsync();
+ }
+
+ /**
+ * Creates an append Blob over DFS and attempts to append over Blob.
+ * It should fallback to DFS when appending to the file fails.
+ *
+ * @throws IOException if an I/O error occurs.
+ */
+ @Test
+ public void testCreateAppendBlobOverDfsEndpointAppendOverBlob()
+ throws IOException, NoSuchFieldException, IllegalAccessException {
+ Assume.assumeTrue(
+ "FNS does not support append blob creation for DFS endpoint",
+ getIsNamespaceEnabled(getFileSystem()));
+ final AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
+ AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+ Mockito.doReturn(true).when(store).isAppendBlobKey(anyString());
+
+ // Set abfsStore as our mocked value.
+ Field privateField = AzureBlobFileSystem.class.getDeclaredField(
+ "abfsStore");
+ privateField.setAccessible(true);
+ privateField.set(fs, store);
+ Path testPath = path(TEST_FILE_PATH);
+ AzureBlobFileSystemStore.Permissions permissions
+ = new AzureBlobFileSystemStore.Permissions(false,
+ FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
+ fs.getAbfsStore().getClientHandler().getDfsClient().
+ createPath(makeQualified(testPath).toUri().getPath(), true, false,
+ permissions, true, null,
+ null, getTestTracingContext(fs, true));
+ fs.getAbfsStore()
+ .getAbfsConfiguration()
+ .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name());
+ FSDataOutputStream outputStream = fs.append(testPath);
+ AzureIngressHandler ingressHandler
+ = ((AbfsOutputStream)
outputStream.getWrappedStream()).getIngressHandler();
+ AbfsClient client = ingressHandler.getClient();
+ Assert.assertTrue("Blob client was not used before fallback",
+ client instanceof AbfsBlobClient);
+ outputStream.write(TEN);
+ outputStream.hsync();
+ outputStream.write(TWENTY);
+ outputStream.hsync();
+ outputStream.write(THIRTY);
+ outputStream.flush();
+ AzureIngressHandler ingressHandlerFallback
+ = ((AbfsOutputStream)
outputStream.getWrappedStream()).getIngressHandler();
+ AbfsClient clientFallback = ingressHandlerFallback.getClient();
+ Assert.assertTrue("DFS client was not used after fallback",
+ clientFallback instanceof AbfsDfsClient);
+ }
+
+
+ /**
+ * Tests the correct retrieval of the AzureIngressHandler based on the
configured ingress service type.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Test
+ public void testValidateIngressHandler() throws IOException {
+ Configuration configuration = getRawConfiguration();
+ configuration.set(FS_AZURE_INGRESS_SERVICE_TYPE,
+ AbfsServiceType.BLOB.name());
+ AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
+ configuration);
+ Path testPath = path(TEST_FILE_PATH);
+ AzureBlobFileSystemStore.Permissions permissions
+ = new AzureBlobFileSystemStore.Permissions(false,
+ FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
+ fs.getAbfsStore().getClientHandler().getBlobClient().
+ createPath(makeQualified(testPath).toUri().getPath(), true,
+ false,
+ permissions, false, null,
+ null, getTestTracingContext(fs, true), getIsNamespaceEnabled(fs));
+ FSDataOutputStream outputStream = fs.append(testPath);
+ AzureIngressHandler ingressHandler
+ = ((AbfsOutputStream)
outputStream.getWrappedStream()).getIngressHandler();
+ Assert.assertTrue("Ingress handler instance is not correct",
+ ingressHandler instanceof AzureBlobIngressHandler);
+ AbfsClient client = ingressHandler.getClient();
+ Assert.assertTrue("Blob client was not used correctly",
+ client instanceof AbfsBlobClient);
+
+ Path testPath1 = new Path("testFile1");
+ fs.getAbfsStore().getClientHandler().getBlobClient().
+ createPath(makeQualified(testPath1).toUri().getPath(), true,
+ false,
+ permissions, false, null,
+ null, getTestTracingContext(fs, true), getIsNamespaceEnabled(fs));
+ fs.getAbfsStore()
+ .getAbfsConfiguration()
+ .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.DFS.name());
+ FSDataOutputStream outputStream1 = fs.append(testPath1);
+ AzureIngressHandler ingressHandler1
+ = ((AbfsOutputStream)
outputStream1.getWrappedStream()).getIngressHandler();
+ Assert.assertTrue("Ingress handler instance is not correct",
+ ingressHandler1 instanceof AzureDFSIngressHandler);
+ AbfsClient client1 = ingressHandler1.getClient();
+ Assert.assertTrue("DFS client was not used correctly",
+ client1 instanceof AbfsDfsClient);
+ }
+
+ @Test(expected = FileNotFoundException.class)
+ public void testAppendImplicitDirectory() throws Exception {
+ final AzureBlobFileSystem fs = getFileSystem();
+ final Path folderPath = new Path(TEST_FOLDER_PATH);
+ fs.mkdirs(folderPath);
+ fs.append(folderPath.getParent());
+ }
+
+ @Test(expected = FileNotFoundException.class)
+ public void testAppendFileNotExists() throws Exception {
+ final AzureBlobFileSystem fs = getFileSystem();
+ final Path folderPath = new Path(TEST_FOLDER_PATH);
+ fs.append(folderPath);
+ }
+
+ /**
+ * Create directory over dfs endpoint and append over blob endpoint.
+ * Should return error as append is not supported for directory.
+ * **/
+ @Test(expected = IOException.class)
+ public void testCreateExplicitDirectoryOverDfsAppendOverBlob()
+ throws IOException {
+ final AzureBlobFileSystem fs = getFileSystem();
+ final Path folderPath = path(TEST_FOLDER_PATH);
+ AzureBlobFileSystemStore.Permissions permissions
+ = new AzureBlobFileSystemStore.Permissions(false,
+ FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
+ fs.getAbfsStore().getClientHandler().getDfsClient().
+ createPath(makeQualified(folderPath).toUri().getPath(), false, false,
+ permissions, false, null,
+ null, getTestTracingContext(fs, true));
+ FSDataOutputStream outputStream = fs.append(folderPath);
+ outputStream.write(10);
+ outputStream.hsync();
+ }
+
+ /**
+ * Recreate file between append and flush. Etag mismatch happens.
+ **/
+ @Test(expected = IOException.class)
+ public void testRecreateAppendAndFlush() throws IOException {
+ final AzureBlobFileSystem fs = getFileSystem();
+ final Path filePath = path(TEST_FILE_PATH);
+ Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
+ fs.create(filePath);
+ AbfsClient abfsClient = fs.getAbfsStore()
+ .getClientHandler()
+ .getIngressClient();
+ Assume.assumeTrue("Skipping for DFS client",
+ abfsClient instanceof AbfsBlobClient);
+ FSDataOutputStream outputStream = fs.append(filePath);
+ outputStream.write(10);
+ final AzureBlobFileSystem fs1
Review Comment:
taken
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]