ettirapp commented on a change in pull request #12581:
URL: https://github.com/apache/beam/pull/12581#discussion_r472655207
##########
File path:
sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
##########
@@ -17,67 +17,438 @@
*/
package org.apache.beam.sdk.io.azure.blobstore;
+import static java.nio.channels.Channels.newChannel;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobProperties;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.SharedAccessAccountPolicy;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.OutputStream;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
+import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.azure.options.BlobstoreClientBuilderFactory;
+import org.apache.beam.sdk.io.azure.options.BlobstoreOptions;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.util.InstanceBuilder;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class AzureBlobStoreFileSystem extends FileSystem<AzfsResourceId> {
+ private static final Logger LOG =
LoggerFactory.getLogger(AzureBlobStoreFileSystem.class);
+
+ private static final ImmutableSet<String> NON_READ_SEEK_EFFICIENT_ENCODINGS =
+ ImmutableSet.of("gzip");
+
+ private Supplier<BlobServiceClient> client;
+ private final BlobstoreOptions options;
+
+ AzureBlobStoreFileSystem(BlobstoreOptions options) {
+ this.options = checkNotNull(options, "options");
+
+ BlobServiceClientBuilder builder =
+ InstanceBuilder.ofType(BlobstoreClientBuilderFactory.class)
+ .fromClass(options.getBlobstoreClientFactoryClass())
+ .build()
+ .createBuilder(options);
+
+ // The Supplier is to make sure we don't call .build() unless we are
actually using Azure.
+ client = Suppliers.memoize(builder::buildClient);
+ }
+
+ @VisibleForTesting
+ void setClient(BlobServiceClient client) {
+ this.client = Suppliers.ofInstance(client);
+ }
+
+ @VisibleForTesting
+ BlobServiceClient getClient() {
+ return client.get();
+ }
+
@Override
protected String getScheme() {
- return "azfs";
+ return AzfsResourceId.SCHEME;
}
@Override
- protected List<MatchResult> match(List<String> specs) throws IOException {
- // TODO
- return null;
+ protected List<MatchResult> match(List<String> specs) {
+ List<AzfsResourceId> paths =
+
specs.stream().map(AzfsResourceId::fromUri).collect(Collectors.toList());
+ List<AzfsResourceId> globs = new ArrayList<>();
+ List<AzfsResourceId> nonGlobs = new ArrayList<>();
+ List<Boolean> isGlobBooleans = new ArrayList<>();
+
+ for (AzfsResourceId path : paths) {
+ if (path.isWildcard()) {
+ globs.add(path);
+ isGlobBooleans.add(true);
+ } else {
+ nonGlobs.add(path);
+ isGlobBooleans.add(false);
+ }
+ }
+
+ Iterator<MatchResult> globMatches = matchGlobPaths(globs).iterator();
+ Iterator<MatchResult> nonGlobMatches =
matchNonGlobPaths(nonGlobs).iterator();
+
+ ImmutableList.Builder<MatchResult> matchResults = ImmutableList.builder();
+ for (Boolean isGlob : isGlobBooleans) {
+ if (isGlob) {
+ checkState(globMatches.hasNext(), "Expect globMatches has next.");
+ matchResults.add(globMatches.next());
+ } else {
+ checkState(nonGlobMatches.hasNext(), "Expect nonGlobMatches has
next.");
+ matchResults.add(nonGlobMatches.next());
+ }
+ }
+ checkState(!globMatches.hasNext(), "Expect no more elements in
globMatches.");
+ checkState(!nonGlobMatches.hasNext(), "Expect no more elements in
nonGlobMatches.");
+
+ return matchResults.build();
+ }
+
+ /**
+ * Expands glob expressions to regular expressions.
+ *
+ * @param globExp the glob expression to expand
+ * @return a string with the regular expression this glob expands to
+ */
+ @VisibleForTesting
+ static String wildcardToRegexp(String globExp) {
Review comment:
Sure, I filed a ticket at
https://issues.apache.org/jira/browse/BEAM-10758.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]