[
https://issues.apache.org/jira/browse/BEAM-10378?focusedWorklogId=472275&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-472275
]
ASF GitHub Bot logged work on BEAM-10378:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 19/Aug/20 03:52
Start Date: 19/Aug/20 03:52
Worklog Time Spent: 10m
Work Description: ettirapp commented on a change in pull request #12581:
URL: https://github.com/apache/beam/pull/12581#discussion_r472655207
##########
File path:
sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
##########
@@ -17,67 +17,438 @@
*/
package org.apache.beam.sdk.io.azure.blobstore;
+import static java.nio.channels.Channels.newChannel;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobProperties;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.SharedAccessAccountPolicy;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.OutputStream;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
+import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.azure.options.BlobstoreClientBuilderFactory;
+import org.apache.beam.sdk.io.azure.options.BlobstoreOptions;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.util.InstanceBuilder;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class AzureBlobStoreFileSystem extends FileSystem<AzfsResourceId> {
+ private static final Logger LOG =
LoggerFactory.getLogger(AzureBlobStoreFileSystem.class);
+
+ private static final ImmutableSet<String> NON_READ_SEEK_EFFICIENT_ENCODINGS =
+ ImmutableSet.of("gzip");
+
+ private Supplier<BlobServiceClient> client;
+ private final BlobstoreOptions options;
+
+ AzureBlobStoreFileSystem(BlobstoreOptions options) {
+ this.options = checkNotNull(options, "options");
+
+ BlobServiceClientBuilder builder =
+ InstanceBuilder.ofType(BlobstoreClientBuilderFactory.class)
+ .fromClass(options.getBlobstoreClientFactoryClass())
+ .build()
+ .createBuilder(options);
+
+ // The Supplier is to make sure we don't call .build() unless we are
actually using Azure.
+ client = Suppliers.memoize(builder::buildClient);
+ }
+
+ @VisibleForTesting
+ void setClient(BlobServiceClient client) {
+ this.client = Suppliers.ofInstance(client);
+ }
+
+ @VisibleForTesting
+ BlobServiceClient getClient() {
+ return client.get();
+ }
+
@Override
protected String getScheme() {
- return "azfs";
+ return AzfsResourceId.SCHEME;
}
@Override
- protected List<MatchResult> match(List<String> specs) throws IOException {
- // TODO
- return null;
+ protected List<MatchResult> match(List<String> specs) {
+ List<AzfsResourceId> paths =
+
specs.stream().map(AzfsResourceId::fromUri).collect(Collectors.toList());
+ List<AzfsResourceId> globs = new ArrayList<>();
+ List<AzfsResourceId> nonGlobs = new ArrayList<>();
+ List<Boolean> isGlobBooleans = new ArrayList<>();
+
+ for (AzfsResourceId path : paths) {
+ if (path.isWildcard()) {
+ globs.add(path);
+ isGlobBooleans.add(true);
+ } else {
+ nonGlobs.add(path);
+ isGlobBooleans.add(false);
+ }
+ }
+
+ Iterator<MatchResult> globMatches = matchGlobPaths(globs).iterator();
+ Iterator<MatchResult> nonGlobMatches =
matchNonGlobPaths(nonGlobs).iterator();
+
+ ImmutableList.Builder<MatchResult> matchResults = ImmutableList.builder();
+ for (Boolean isGlob : isGlobBooleans) {
+ if (isGlob) {
+ checkState(globMatches.hasNext(), "Expect globMatches has next.");
+ matchResults.add(globMatches.next());
+ } else {
+ checkState(nonGlobMatches.hasNext(), "Expect nonGlobMatches has
next.");
+ matchResults.add(nonGlobMatches.next());
+ }
+ }
+ checkState(!globMatches.hasNext(), "Expect no more elements in
globMatches.");
+ checkState(!nonGlobMatches.hasNext(), "Expect no more elements in
nonGlobMatches.");
+
+ return matchResults.build();
+ }
+
+ /**
+ * Expands glob expressions to regular expressions.
+ *
+ * @param globExp the glob expression to expand
+ * @return a string with the regular expression this glob expands to
+ */
+ @VisibleForTesting
+ static String wildcardToRegexp(String globExp) {
Review comment:
Sure, I filed a ticket at
https://issues.apache.org/jira/browse/BEAM-10758.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 472275)
Time Spent: 22h 10m (was: 22h)
> Implement an Azure blobstore filesystem for Java SDK
> ----------------------------------------------------
>
> Key: BEAM-10378
> URL: https://issues.apache.org/jira/browse/BEAM-10378
> Project: Beam
> Issue Type: Improvement
> Components: io-java-azure
> Reporter: Pablo Estrada
> Assignee: Etta Rapp
> Priority: P2
> Time Spent: 22h 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)