ettirapp commented on a change in pull request #12581:
URL: https://github.com/apache/beam/pull/12581#discussion_r471718807



##########
File path: 
sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
##########
@@ -17,67 +17,438 @@
  */
 package org.apache.beam.sdk.io.azure.blobstore;
 
+import static java.nio.channels.Channels.newChannel;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobProperties;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.SharedAccessAccountPolicy;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.azure.options.BlobstoreClientBuilderFactory;
+import org.apache.beam.sdk.io.azure.options.BlobstoreOptions;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.util.InstanceBuilder;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class AzureBlobStoreFileSystem extends FileSystem<AzfsResourceId> {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(AzureBlobStoreFileSystem.class);
+
+  private static final ImmutableSet<String> NON_READ_SEEK_EFFICIENT_ENCODINGS =
+      ImmutableSet.of("gzip");
+
+  private Supplier<BlobServiceClient> client;
+  private final BlobstoreOptions options;
+
+  AzureBlobStoreFileSystem(BlobstoreOptions options) {
+    this.options = checkNotNull(options, "options");
+
+    BlobServiceClientBuilder builder =
+        InstanceBuilder.ofType(BlobstoreClientBuilderFactory.class)
+            .fromClass(options.getBlobstoreClientFactoryClass())
+            .build()
+            .createBuilder(options);
+
+    // The Supplier is to make sure we don't call .build() unless we are 
actually using Azure.
+    client = Suppliers.memoize(builder::buildClient);
+  }
+
+  @VisibleForTesting
+  void setClient(BlobServiceClient client) {
+    this.client = Suppliers.ofInstance(client);
+  }
+
+  @VisibleForTesting
+  BlobServiceClient getClient() {
+    return client.get();
+  }
+
   @Override
   protected String getScheme() {
-    return "azfs";
+    return AzfsResourceId.SCHEME;
   }
 
   @Override
-  protected List<MatchResult> match(List<String> specs) throws IOException {
-    // TODO
-    return null;
+  protected List<MatchResult> match(List<String> specs) {
+    List<AzfsResourceId> paths =
+        
specs.stream().map(AzfsResourceId::fromUri).collect(Collectors.toList());
+    List<AzfsResourceId> globs = new ArrayList<>();
+    List<AzfsResourceId> nonGlobs = new ArrayList<>();
+    List<Boolean> isGlobBooleans = new ArrayList<>();
+
+    for (AzfsResourceId path : paths) {
+      if (path.isWildcard()) {
+        globs.add(path);
+        isGlobBooleans.add(true);
+      } else {
+        nonGlobs.add(path);
+        isGlobBooleans.add(false);
+      }
+    }
+
+    Iterator<MatchResult> globMatches = matchGlobPaths(globs).iterator();
+    Iterator<MatchResult> nonGlobMatches = 
matchNonGlobPaths(nonGlobs).iterator();
+
+    ImmutableList.Builder<MatchResult> matchResults = ImmutableList.builder();
+    for (Boolean isGlob : isGlobBooleans) {
+      if (isGlob) {
+        checkState(globMatches.hasNext(), "Expect globMatches has next.");
+        matchResults.add(globMatches.next());
+      } else {
+        checkState(nonGlobMatches.hasNext(), "Expect nonGlobMatches has 
next.");
+        matchResults.add(nonGlobMatches.next());
+      }
+    }
+    checkState(!globMatches.hasNext(), "Expect no more elements in 
globMatches.");
+    checkState(!nonGlobMatches.hasNext(), "Expect no more elements in 
nonGlobMatches.");
+
+    return matchResults.build();
+  }
+
+  /**
+   * Expands glob expressions to regular expressions.
+   *
+   * @param globExp the glob expression to expand
+   * @return a string with the regular expression this glob expands to
+   */
+  @VisibleForTesting
+  static String wildcardToRegexp(String globExp) {
+    StringBuilder dst = new StringBuilder();
+    char[] src = globExp.replace("**/*", "**").toCharArray();
+    int i = 0;
+    while (i < src.length) {
+      char c = src[i++];
+      switch (c) {
+        case '*':
+          // One char lookahead for **
+          if (i < src.length && src[i] == '*') {
+            dst.append(".*");
+            ++i;
+          } else {
+            dst.append("[^/]*");
+          }
+          break;
+        case '?':
+          dst.append("[^/]");
+          break;
+        case '.':
+        case '+':
+        case '{':
+        case '}':
+        case '(':
+        case ')':
+        case '|':
+        case '^':
+        case '$':
+          // These need to be escaped in regular expressions
+          dst.append('\\').append(c);
+          break;
+        case '\\':
+          i = doubleSlashes(dst, src, i);
+          break;
+        default:
+          dst.append(c);
+          break;
+      }
+    }
+    return dst.toString();
+  }
+
+  private static int doubleSlashes(StringBuilder dst, char[] src, int i) {
+    // Emit the next character without special interpretation
+    dst.append("\\\\");
+    if ((i - 1) != src.length) {
+      dst.append(src[i]);
+      i++;
+    } else {
+      // A backslash at the very end is treated like an escaped backslash
+      dst.append('\\');
+    }
+    return i;
+  }
+
+  private List<MatchResult> matchGlobPaths(List<AzfsResourceId> globs) {
+    return FluentIterable.from(globs).transform(this::expand).toList();
+  }
+
+  /** Expands a pattern into {@link MatchResult}. */
+  @VisibleForTesting
+  MatchResult expand(AzfsResourceId azfsPattern) {
+
+    checkArgument(azfsPattern.isWildcard(), "is Wildcard");
+    String blobPrefix = azfsPattern.getBlobNonWildcardPrefix();
+    Pattern wildcardAsRegexp = 
Pattern.compile(wildcardToRegexp(azfsPattern.getBlob()));
+
+    LOG.debug(
+        "matching files in container {}, prefix {} against pattern {}",
+        azfsPattern.getContainer(),
+        blobPrefix,
+        wildcardAsRegexp.toString());
+
+    ListBlobsOptions listOptions = new 
ListBlobsOptions().setPrefix(blobPrefix);
+    Duration timeout = Duration.ZERO.plusMinutes(1);
+
+    String account = azfsPattern.getAccount();
+    String container = azfsPattern.getContainer();
+    BlobContainerClient blobContainerClient = 
client.get().getBlobContainerClient(container);
+    PagedIterable<BlobItem> blobs = blobContainerClient.listBlobs(listOptions, 
timeout);
+    List<MatchResult.Metadata> results = new ArrayList<>();
+
+    blobs.forEach(
+        blob -> {
+          String name = blob.getName();
+          if (wildcardAsRegexp.matcher(name).matches() && !name.endsWith("/")) 
{
+            LOG.debug("Matched object: {}", name);
+
+            BlobProperties properties = 
blobContainerClient.getBlobClient(name).getProperties();
+            AzfsResourceId rid =
+                AzfsResourceId.fromComponents(account, container, name)
+                    .withSize(properties.getBlobSize())
+                    
.withLastModified(Date.from(properties.getLastModified().toInstant()));
+
+            results.add(toMetadata(rid, properties.getContentEncoding()));
+          }
+        });
+
+    return MatchResult.create(MatchResult.Status.OK, results);
+  }
+
+  private MatchResult.Metadata toMetadata(AzfsResourceId path, String 
contentEncoding) {
+
+    checkArgument(path.getSize().isPresent(), "path has size");
+    boolean isReadSeekEfficient = 
!NON_READ_SEEK_EFFICIENT_ENCODINGS.contains(contentEncoding);
+
+    return MatchResult.Metadata.builder()
+        .setIsReadSeekEfficient(isReadSeekEfficient)
+        .setResourceId(path)
+        .setSizeBytes(path.getSize().get())
+        
.setLastModifiedMillis(path.getLastModified().transform(Date::getTime).or(0L))
+        .build();
+  }
+
+  /**
+   * Returns {@link MatchResult MatchResults} for the given {@link 
AzfsResourceId paths}.
+   *
+   * <p>The number of returned {@link MatchResult MatchResults} equals to the 
number of given {@link
+   * AzfsResourceId paths}. Each {@link MatchResult} contains one {@link 
MatchResult.Metadata}.
+   */
+  @VisibleForTesting
+  private Iterable<MatchResult> matchNonGlobPaths(List<AzfsResourceId> paths) {
+    ImmutableList.Builder<MatchResult> toReturn = ImmutableList.builder();
+    for (AzfsResourceId path : paths) {
+      toReturn.add(toMatchResult(path));
+    }
+    return toReturn.build();
+  }
+
+  private MatchResult toMatchResult(AzfsResourceId path) {
+    BlobClient blobClient =
+        
client.get().getBlobContainerClient(path.getContainer()).getBlobClient(path.getBlob());
+    BlobProperties blobProperties;
+
+    try {
+      blobProperties = blobClient.getProperties();
+    } catch (BlobStorageException e) {
+      if (e.getStatusCode() == 404) {
+        return MatchResult.create(MatchResult.Status.NOT_FOUND, new 
FileNotFoundException());
+      }
+      return MatchResult.create(MatchResult.Status.ERROR, new IOException(e));
+    }
+
+    return MatchResult.create(
+        MatchResult.Status.OK,
+        ImmutableList.of(
+            toMetadata(
+                path.withSize(blobProperties.getBlobSize())
+                    
.withLastModified(Date.from(blobProperties.getLastModified().toInstant())),
+                blobProperties.getContentEncoding())));
   }
 
   @Override
   protected WritableByteChannel create(AzfsResourceId resourceId, 
CreateOptions createOptions)
       throws IOException {
-    // TODO
-    return null;
+    BlobContainerClient blobContainerClient =
+        client.get().getBlobContainerClient(resourceId.getContainer());
+    if (!blobContainerClient.exists()) {
+      throw new UnsupportedOperationException("create does not create 
containers.");
+    }
+
+    BlobClient blobClient = 
blobContainerClient.getBlobClient(resourceId.getBlob());
+    // The getBlobOutputStream method overwrites existing blobs,
+    // so throw an error in this case to prevent data loss
+    if (blobClient.exists()) {
+      throw new IOException("This filename is already in use.");
+    }
+
+    OutputStream outputStream;
+    try {
+      outputStream = blobClient.getBlockBlobClient().getBlobOutputStream();
+    } catch (BlobStorageException e) {
+      throw (IOException) e.getCause();
+    }
+    return newChannel(outputStream);
   }
 
   @Override
   protected ReadableByteChannel open(AzfsResourceId resourceId) throws 
IOException {
-    // TODO
-    return null;
+    BlobClient blobClient =
+        client
+            .get()
+            .getBlobContainerClient(resourceId.getContainer())
+            .getBlobClient(resourceId.getBlob());
+    if (!blobClient.exists()) {
+      throw new FileNotFoundException("The requested file doesn't exist.");
+    }
+    return new AzureReadableSeekableByteChannel(blobClient);
   }
 
   @Override
   protected void copy(List<AzfsResourceId> srcPaths, List<AzfsResourceId> 
destPaths)
       throws IOException {
-    // TODO
+    checkArgument(
+        srcPaths.size() == destPaths.size(),
+        "sizes of source paths and destination paths do not match");
+
+    Iterator<AzfsResourceId> sourcePathsIterator = srcPaths.iterator();
+    Iterator<AzfsResourceId> destinationPathsIterator = destPaths.iterator();
+    while (sourcePathsIterator.hasNext()) {
+      final AzfsResourceId sourcePath = sourcePathsIterator.next();
+      final AzfsResourceId destinationPath = destinationPathsIterator.next();
+      copy(sourcePath, destinationPath);
+    }
   }
 
   @VisibleForTesting
   void copy(AzfsResourceId sourcePath, AzfsResourceId destinationPath) throws 
IOException {
-    // TODO
+    checkArgument(
+        sourcePath.getBlob() != null && destinationPath.getBlob() != null,
+        "This method is intended to copy file-like resources, not 
directories.");
+
+    // get source blob client
+    BlobClient srcBlobClient =
+        client
+            .get()
+            .getBlobContainerClient(sourcePath.getContainer())
+            .getBlobClient(sourcePath.getBlob());
+    if (!srcBlobClient.exists()) {
+      throw new FileNotFoundException("The copy source does not exist.");
+    }
+
+    // get destination blob client
+    BlobContainerClient destBlobContainerClient =
+        client.get().getBlobContainerClient(destinationPath.getContainer());
+    if (!destBlobContainerClient.exists()) {
+      client.get().createBlobContainer(destinationPath.getContainer());
+    }
+    BlobClient destBlobClient = 
destBlobContainerClient.getBlobClient(destinationPath.getBlob());
+
+    destBlobClient.copyFromUrl(srcBlobClient.getBlobUrl() + 
generateSasToken());
+  }
+
+  @VisibleForTesting
+  String generateSasToken() throws IOException {
+    SharedAccessAccountPolicy sharedAccessAccountPolicy = new 
SharedAccessAccountPolicy();
+    long date = new Date().getTime();
+    long expiryDate = new Date(date + 8640000).getTime();

Review comment:
       This is a typo, I meant 86400000. 86,400,000 millis = 1 day.  I can make 
it a final constant.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to