xdelox commented on code in PR #4144:
URL: https://github.com/apache/hop/pull/4144#discussion_r1709048490
##########
plugins/tech/azure/src/main/java/org/apache/hop/vfs/azure/AzureFileObject.java:
##########
@@ -88,138 +87,156 @@ public void close() throws IOException {
}
}
- private final CloudBlobClient service;
+ private final DataLakeServiceClient service;
private boolean attached = false;
private long size;
private long lastModified;
private FileType type;
private List<String> children = null;
- private CloudBlobContainer container;
- private String containerPath;
- private CloudBlob cloudBlob;
- private CloudBlobDirectory cloudDir;
+ private DataLakeFileClient dataLakeFileClient;
+ private String currentFilePath;
+ private PathItem pathItem;
+ private PathItem dirPathItem;
private final String markerFileName = ".cvfs.temp";
private OutputStream blobOutputStream;
+ private String containerName;
public AzureFileObject(
- AbstractFileName fileName, AzureFileSystem fileSystem, CloudBlobClient
service)
+ AbstractFileName fileName, AzureFileSystem fileSystem,
DataLakeServiceClient service)
throws FileSystemException {
super(fileName, fileSystem);
this.service = service;
}
@Override
- protected void doAttach() throws URISyntaxException, StorageException {
+ protected void doAttach() throws HopException {
if (!attached) {
- if (getName().getPath().equals("/")) {
- children = new ArrayList<>();
- for (CloudBlobContainer container : service.listContainers()) {
- children.add(container.getName());
- }
+ containerName = ((AzureFileName) getName()).getContainer();
+ String fullPath = ((AzureFileName) getName()).getPath();
+ DataLakeFileSystemClient fileSystemClient =
service.getFileSystemClient(containerName);
+ ListPathsOptions lpo = new ListPathsOptions();
+ children = new ArrayList<>();
+ if (isFileSystemRoot(fullPath)) { // ROOT of the filesystem
+
+ lpo.setPath(fullPath);
+ // TODO SR Evaluate using lpo.setRecursive
+
+ service
+ .listFileSystems()
+ .iterator()
+ .forEachRemaining(
+ item -> {
+ children.add(StringUtils.substringAfterLast(item.getName(),
"/"));
+ });
+
size = children.size();
lastModified = 0;
type = FileType.FOLDER;
- container = null;
- containerPath = "";
- } else {
- String containerName = ((AzureFileName) getName()).getContainer();
- container = service.getContainerReference(containerName);
- containerPath = ((AzureFileName) getName()).getPathAfterContainer();
- String thisPath = "/" + containerName + containerPath;
- if (container.exists()) {
- children = new ArrayList<>();
- if (containerPath.equals("")) {
- if (container.exists()) {
- for (ListBlobItem item : container.listBlobs()) {
- StringBuilder path = new
StringBuilder(item.getUri().getPath());
- UriParser.extractFirstElement(path);
- children.add(path.substring(1));
- }
- type = FileType.FOLDER;
- } else {
- type = FileType.IMAGINARY;
- }
- lastModified = 0;
+ dataLakeFileClient = null;
+ currentFilePath = "";
+ } else if (isContainer(fullPath)) {
+ if (containerExists()) {
+ type = FileType.FOLDER;
+ } else {
+ type = FileType.IMAGINARY;
+ throw new HopException("Container does not exist: " + fullPath);
+ }
+ } else { // this is a subdirectory or file or a container
+
+ currentFilePath = ((AzureFileName) getName()).getPathAfterContainer();
+ if (StringUtils.isEmpty(currentFilePath)) {
+ type = FileType.FOLDER;
+ fileSystemClient.listPaths().forEach(pi ->
children.add(pi.getName()));
+ } else {
+ lpo.setPath(currentFilePath);
+ DataLakeDirectoryClient directoryClient =
+ fileSystemClient.getDirectoryClient(currentFilePath);
+ final Boolean exists = directoryClient.exists();
+
+ // TODO SR Evaluate using lpo.setRecursive
+ // dataLakeFileClient =
+ // fileSystemClient.getFileClient(((AzureFileName)
getName()).getContainer());
+
+ final Boolean isDirectory =
+ exists
+ && fileSystemClient
+ .getDirectoryClient(currentFilePath)
+ .getProperties()
+ .isDirectory();
+ final Boolean isFile = !isDirectory;
+ if (exists && isDirectory) {
+ children = new ArrayList<>();
+ PagedIterable<PathItem> pathItems =
fileSystemClient.listPaths(lpo, null);
+ pathItems.forEach(
+ item -> {
+ children.add(
+ item.getName().replace("small/", "")); // TODO SDL
replace with actual path
+ });
size = children.size();
+ type = FileType.FOLDER;
+ lastModified =
directoryClient.getProperties().getLastModified().toEpochSecond();
+ } else if (exists && isFile) {
+ dataLakeFileClient =
fileSystemClient.getFileClient(currentFilePath);
+ size = dataLakeFileClient.getProperties().getFileSize();
+ type = FileType.FILE;
+ lastModified =
dataLakeFileClient.getProperties().getLastModified().toEpochSecond();
} else {
- /*
- * Look in the parent path for this filename AND and
- * direct descendents
- */
- cloudBlob = null;
- cloudDir = null;
- String relpath =
- removeLeadingSlash(
- ((AzureFileName)
(getName().getParent())).getPathAfterContainer());
- for (ListBlobItem item :
- relpath.equals("") ? container.listBlobs() :
container.listBlobs(relpath + "/")) {
- String itemPath = removeTrailingSlash(item.getUri().getPath());
- if (pathsMatch(itemPath, thisPath)) {
- if (item instanceof CloudBlob) {
- cloudBlob = (CloudBlob) item;
- } else {
- cloudDir = (CloudBlobDirectory) item;
- for (ListBlobItem blob : cloudDir.listBlobs()) {
- URI blobUri = blob.getUri();
- String path = blobUri.getPath();
- while (path.endsWith("/")) path = path.substring(0,
path.length() - 1);
- int idx = path.lastIndexOf('/');
- if (idx != -1) path = path.substring(idx + 1);
- children.add(path);
- }
- }
- break;
- }
- }
- if (cloudBlob != null) {
- type = FileType.FILE;
- size = cloudBlob.getProperties().getLength();
- if (cloudBlob.getMetadata().containsKey("ActualLength")) {
- size =
Long.parseLong(cloudBlob.getMetadata().get("ActualLength"));
- }
- String disp = cloudBlob.getProperties().getContentDisposition();
- if (disp != null && disp.startsWith("vfs ; length=\"")) {
- size = Long.parseLong(disp.substring(14, disp.length() - 1));
- }
- Date lastModified2 = cloudBlob.getProperties().getLastModified();
- lastModified = lastModified2 == null ? 0 :
lastModified2.getTime();
- } else if (cloudDir != null) {
- type = FileType.FOLDER;
- size = children.size();
- lastModified = 0;
- } else {
- lastModified = 0;
- type = FileType.IMAGINARY;
- size = 0;
- }
+ lastModified = 0;
+ type = FileType.IMAGINARY;
+ size = 0;
+ pathItem = null;
+ dirPathItem = null;
}
- } else {
- lastModified = 0;
- type = FileType.IMAGINARY;
- size = 0;
- cloudBlob = null;
- cloudDir = null;
}
}
}
}
- private boolean pathsMatch(String path, String currentPath) {
- return path.replace("/" + ((AzureFileSystem)
getFileSystem()).getAccount(), "")
- .equals(currentPath);
+ private boolean containerExists() {
+ String containerName = ((AzureFileName) getName()).getContainer();
+ ListFileSystemsOptions fileSystemsOptions = new ListFileSystemsOptions();
+ fileSystemsOptions.setPrefix(containerName);
+
+ final DataLakeFileSystemClient fileSystemClient =
service.getFileSystemClient(containerName);
+
+ try {
+ return fileSystemClient.existsWithResponse(Duration.ofSeconds(5),
Context.NONE).getValue();
Review Comment:
we need to check whether the container/filesystem already exist, in order to
go ahead or throw an exception; since `fileSystemClient.exist()` returns in a
few seconds when the container exists, but the request is left hanging without
an answer when the container does not exist, I've adopted this workaround.
This fact doesn't happen in our sandbox, so it must depend on the
dependencies we bring in Hop.
Of course it's awful, but I couldn't see any alternative without resolving
all the dependency issues. When we're all set, we can get rid of the timeout
logic
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]