This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1e21ffe530 Add support for copy file with subdirectories for
ADLSGen2PinotFS (#14860)
1e21ffe530 is described below
commit 1e21ffe5307a6ebf978a11459d1d83558daf086f
Author: aishikbh <[email protected]>
AuthorDate: Wed Jan 22 20:42:38 2025 +0530
Add support for copy file with subdirectories for ADLSGen2PinotFS (#14860)
* fixCopyFile
* add tests
* modify to be in line with other PinotFs implementations.
---
.../pinot/plugin/filesystem/ADLSGen2PinotFS.java | 4 +
.../filesystem/test/ADLSGen2PinotFSTest.java | 119 +++++++++++++++++++++
2 files changed, 123 insertions(+)
diff --git
a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
index d3ac056026..ed40b34cc0 100644
---
a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
+++
b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
@@ -480,6 +480,10 @@ public class ADLSGen2PinotFS extends BasePinotFS {
public void copyToLocalFile(URI srcUri, File dstFile)
throws Exception {
LOGGER.debug("copyToLocalFile is called with srcUri='{}', dstFile='{}'",
srcUri, dstFile);
+
+ // Create parent directories if they don't exist.
+ FileUtils.forceMkdir(dstFile.getParentFile());
+
if (dstFile.exists()) {
if (dstFile.isDirectory()) {
FileUtils.deleteDirectory(dstFile);
diff --git
a/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java
b/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java
index acc5373664..ab857b588e 100644
---
a/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java
+++
b/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java
@@ -29,16 +29,20 @@ import
com.azure.storage.file.datalake.models.DataLakeFileOpenInputStreamResult;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import com.azure.storage.file.datalake.models.PathItem;
import com.azure.storage.file.datalake.models.PathProperties;
+import java.io.ByteArrayInputStream;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
+import java.nio.file.Files;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Stream;
+import org.apache.commons.io.FileUtils;
import org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS;
import org.apache.pinot.plugin.filesystem.AzurePinotFSUtil;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -51,6 +55,9 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.mockito.Mockito.*;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
/**
@@ -445,4 +452,116 @@ public class ADLSGen2PinotFSTest {
verify(_mockFileClient).openInputStream();
verify(_mockFileOpenInputStreamResult).getInputStream();
}
+
+ @Test
+ public void testCopyToLocalFileWithSubdirectories() throws Exception {
+ // Create a temporary file for the test
+ File tempDir = new File(System.getProperty("java.io.tmpdir"),
"pinot_test");
+ tempDir.mkdirs();
+ File mockDstFile = new File(tempDir, "test_file.txt");
+
+ // Create parent directory
+ File parentFile = mockDstFile.getParentFile();
+ if (!parentFile.exists()) {
+ parentFile.mkdirs();
+ }
+
+ // Mock file stream
+ byte[] testData = "test data".getBytes();
+ InputStream mockInputStream = new ByteArrayInputStream(testData);
+
when(_mockFileSystemClient.getFileClient(any())).thenReturn(_mockFileClient);
+
when(_mockFileClient.openInputStream()).thenReturn(_mockFileOpenInputStreamResult);
+
when(_mockFileOpenInputStreamResult.getInputStream()).thenReturn(mockInputStream);
+
+ try {
+ // Execute
+ _adlsGen2PinotFsUnderTest.copyToLocalFile(_mockURI, mockDstFile);
+
+ // Verify file operations in order
+
verify(_mockFileSystemClient).getFileClient(AzurePinotFSUtil.convertUriToAzureStylePath(_mockURI));
+ verify(_mockFileClient).openInputStream();
+ verify(_mockFileOpenInputStreamResult).getInputStream();
+
+ // Verify file was created
+ assertTrue(mockDstFile.exists());
+
+ // Verify content was written correctly
+ byte[] writtenContent = Files.readAllBytes(mockDstFile.toPath());
+ assertArrayEquals(testData, writtenContent);
+ } finally {
+ // Cleanup
+ FileUtils.deleteQuietly(mockDstFile);
+ FileUtils.deleteQuietly(tempDir);
+ }
+ }
+
+ @Test
+ public void testCopyToLocalFileWithoutSubdirectories() throws Exception {
+ // Create a temporary file for the test
+ File tempFile = new File(System.getProperty("java.io.tmpdir"),
"test_file.txt");
+
+ // Mock file stream
+ byte[] testData = "test data".getBytes();
+ InputStream mockInputStream = new ByteArrayInputStream(testData);
+
when(_mockFileSystemClient.getFileClient(any())).thenReturn(_mockFileClient);
+
when(_mockFileClient.openInputStream()).thenReturn(_mockFileOpenInputStreamResult);
+
when(_mockFileOpenInputStreamResult.getInputStream()).thenReturn(mockInputStream);
+
+ try {
+ // Execute
+ _adlsGen2PinotFsUnderTest.copyToLocalFile(_mockURI, tempFile);
+
+ // Verify file operations in order
+
verify(_mockFileSystemClient).getFileClient(AzurePinotFSUtil.convertUriToAzureStylePath(_mockURI));
+ verify(_mockFileClient).openInputStream();
+ verify(_mockFileOpenInputStreamResult).getInputStream();
+
+ // Verify file was created
+ assertTrue(tempFile.exists());
+
+ // Verify content was written correctly
+ byte[] writtenContent = Files.readAllBytes(tempFile.toPath());
+ assertArrayEquals(testData, writtenContent);
+ } finally {
+ // Cleanup
+ FileUtils.deleteQuietly(tempFile);
+ }
+ }
+
+ @Test
+ public void testCopyToLocalFileExistingDirectory() throws Exception {
+ // Create a temporary directory for the test
+ File tempDir = new File(System.getProperty("java.io.tmpdir"),
"existing_dir");
+ tempDir.mkdirs();
+
+ // Mock file stream
+ byte[] testData = "test data".getBytes();
+ InputStream mockInputStream = new ByteArrayInputStream(testData);
+
when(_mockFileSystemClient.getFileClient(any())).thenReturn(_mockFileClient);
+
when(_mockFileClient.openInputStream()).thenReturn(_mockFileOpenInputStreamResult);
+
when(_mockFileOpenInputStreamResult.getInputStream()).thenReturn(mockInputStream);
+
+ try {
+ // Execute
+ _adlsGen2PinotFsUnderTest.copyToLocalFile(_mockURI, tempDir);
+
+ // Verify file operations in order
+
verify(_mockFileSystemClient).getFileClient(AzurePinotFSUtil.convertUriToAzureStylePath(_mockURI));
+ verify(_mockFileClient).openInputStream();
+ verify(_mockFileOpenInputStreamResult).getInputStream();
+
+ // Verify directory was overwritten with file
+ assertTrue(tempDir.exists());
+ assertFalse(tempDir.isDirectory());
+
+ // Verify content was written correctly
+ byte[] writtenContent = Files.readAllBytes(tempDir.toPath());
+ assertArrayEquals(testData, writtenContent);
+ } finally {
+ // Cleanup
+ if (tempDir.exists()) {
+ FileUtils.deleteQuietly(tempDir);
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]