Repository: beam
Updated Branches:
  refs/heads/master d04a88e7e -> 763fb50b5


[BEAM-59] Beam GcsFileSystem: port fileSizes() from GcsUtil for batch get 
StorageObjects.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/85f04085
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/85f04085
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/85f04085

Branch: refs/heads/master
Commit: 85f04085dbfbe130cfc10e0b7451a4aa9d8d8df2
Parents: d04a88e
Author: Pei He <[email protected]>
Authored: Wed Feb 15 14:07:45 2017 -0800
Committer: Pei He <[email protected]>
Committed: Thu Feb 16 15:17:09 2017 -0800

----------------------------------------------------------------------
 .../src/main/resources/beam/findbugs-filter.xml | 11 +++
 .../java/org/apache/beam/sdk/util/GcsUtil.java  | 85 +++++++++++++++++---
 .../org/apache/beam/sdk/util/GcsUtilTest.java   | 27 +++++--
 .../beam/sdk/io/gcp/storage/GcsFileSystem.java  | 49 ++++++++---
 .../sdk/io/gcp/storage/GcsFileSystemTest.java   | 35 ++++++++
 5 files changed, 179 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/85f04085/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml 
b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index edbdb14..2ffd648 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -357,4 +357,15 @@
     <Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS"/>
     <!--[BEAM-421] Class doesn't override equals in superclass-->
   </Match>
+  <Match>
+    <Class 
name="org.apache.beam.sdk.util.AutoValue_GcsUtil_StorageObjectOrIOException"/>
+    <Bug pattern="NM_CLASS_NOT_EXCEPTION"/>
+    <!-- It is clear from the name that this class holds either StorageObject 
or IOException. -->
+  </Match>
+
+  <Match>
+    <Class name="org.apache.beam.sdk.util.GcsUtil$StorageObjectOrIOException"/>
+    <Bug pattern="NM_CLASS_NOT_EXCEPTION"/>
+    <!-- It is clear from the name that this class holds either StorageObject 
or IOException. -->
+  </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/beam/blob/85f04085/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 6345867..ea0cf9e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.util;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.api.client.googleapis.batch.BatchRequest;
 import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
@@ -31,6 +32,7 @@ import com.google.api.services.storage.Storage;
 import com.google.api.services.storage.model.Bucket;
 import com.google.api.services.storage.model.Objects;
 import com.google.api.services.storage.model.StorageObject;
+import com.google.auto.value.AutoValue;
 import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel;
 import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel;
 import com.google.cloud.hadoop.gcsio.ObjectWriteConditions;
@@ -52,6 +54,7 @@ import java.nio.channels.SeekableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.nio.file.AccessDeniedException;
 import java.nio.file.FileAlreadyExistsException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -336,6 +339,21 @@ public class GcsUtil {
   }
 
   /**
+   * Returns {@link StorageObjectOrIOException StorageObjectOrIOExceptions} 
for the given
+   * {@link GcsPath GcsPaths}.
+   */
+  public List<StorageObjectOrIOException> getObjects(List<GcsPath> gcsPaths)
+      throws IOException {
+    List<StorageObjectOrIOException[]> results = new ArrayList<>();
+    executeBatches(makeGetBatches(gcsPaths, results));
+    ImmutableList.Builder<StorageObjectOrIOException> ret = 
ImmutableList.builder();
+    for (StorageObjectOrIOException[] result : results) {
+      ret.add(result[0]);
+    }
+    return ret.build();
+  }
+
+  /**
    * Lists {@link Objects} given the {@code bucket}, {@code prefix}, {@code 
pageToken}.
    */
   public Objects listObjects(String bucket, String prefix, @Nullable String 
pageToken)
@@ -367,17 +385,25 @@ public class GcsUtil {
    * if the resource does not exist.
    */
   @VisibleForTesting
-  List<Long> fileSizes(Collection<GcsPath> paths) throws IOException {
-    List<StorageObject[]> results = Lists.newArrayList();
-    executeBatches(makeGetBatches(paths, results));
+  List<Long> fileSizes(List<GcsPath> paths) throws IOException {
+    List<StorageObjectOrIOException> results = getObjects(paths);
 
     ImmutableList.Builder<Long> ret = ImmutableList.builder();
-    for (StorageObject[] result : results) {
-      ret.add(result[0].getSize().longValue());
+    for (StorageObjectOrIOException result : results) {
+      ret.add(toFileSize(result));
     }
     return ret.build();
   }
 
+  private Long toFileSize(StorageObjectOrIOException 
storageObjectOrIOException)
+      throws IOException {
+    if (storageObjectOrIOException.ioException() != null) {
+      throw storageObjectOrIOException.ioException();
+    } else {
+      return storageObjectOrIOException.storageObject().getSize().longValue();
+    }
+  }
+
   /**
    * Opens an object in GCS.
    *
@@ -589,7 +615,7 @@ public class GcsUtil {
   @VisibleForTesting
   List<BatchRequest> makeGetBatches(
       Collection<GcsPath> paths,
-      List<StorageObject[]> results) throws IOException {
+      List<StorageObjectOrIOException[]> results) throws IOException {
     List<BatchRequest> batches = new LinkedList<>();
     for (List<GcsPath> filesToGet :
         Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) {
@@ -648,28 +674,63 @@ public class GcsUtil {
     executeBatches(makeRemoveBatches(filenames));
   }
 
-  private StorageObject[] enqueueGetFileSize(final GcsPath path, BatchRequest 
batch)
+  private StorageObjectOrIOException[] enqueueGetFileSize(final GcsPath path, 
BatchRequest batch)
       throws IOException {
-    final StorageObject[] storageObject = new StorageObject[1];
+    final StorageObjectOrIOException[] ret = new StorageObjectOrIOException[1];
 
     Storage.Objects.Get getRequest = storageClient.objects()
         .get(path.getBucket(), path.getObject());
     getRequest.queue(batch, new JsonBatchCallback<StorageObject>() {
       @Override
       public void onSuccess(StorageObject response, HttpHeaders httpHeaders) 
throws IOException {
-        storageObject[0] = response;
+        ret[0] = StorageObjectOrIOException.create(response);
       }
 
       @Override
       public void onFailure(GoogleJsonError e, HttpHeaders httpHeaders) throws 
IOException {
+        IOException ioException;
         if (errorExtractor.itemNotFound(e)) {
-          throw new FileNotFoundException(path.toString());
+          ioException = new FileNotFoundException(path.toString());
         } else {
-          throw new IOException(String.format("Error trying to get %s: %s", 
path, e));
+          ioException = new IOException(String.format("Error trying to get %s: 
%s", path, e));
         }
+        ret[0] = StorageObjectOrIOException.create(ioException);
       }
     });
-    return storageObject;
+    return ret;
+  }
+
+  /**
+   * A class that holds either a {@link StorageObject} or an {@link 
IOException}.
+   */
+  @AutoValue
+  public abstract static class StorageObjectOrIOException {
+
+    /**
+     * Returns the {@link StorageObject}.
+     */
+    @Nullable
+    public abstract StorageObject storageObject();
+
+    /**
+     * Returns the {@link IOException}.
+     */
+    @Nullable
+    public abstract IOException ioException();
+
+    @VisibleForTesting
+    public static StorageObjectOrIOException create(StorageObject 
storageObject) {
+      return new AutoValue_GcsUtil_StorageObjectOrIOException(
+          checkNotNull(storageObject, "storageObject"),
+          null /* ioException */);
+    }
+
+    @VisibleForTesting
+    public static StorageObjectOrIOException create(IOException ioException) {
+      return new AutoValue_GcsUtil_StorageObjectOrIOException(
+          null /* storageObject */,
+          checkNotNull(ioException, "ioException"));
+    }
   }
 
   private void enqueueCopy(final GcsPath from, final GcsPath to, BatchRequest 
batch)

http://git-wip-us.apache.org/repos/asf/beam/blob/85f04085/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
index 920e593..03668ce 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
@@ -76,6 +76,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
+import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.junit.Rule;
 import org.junit.Test;
@@ -394,18 +395,24 @@ public class GcsUtilTest {
     JsonFactory jsonFactory = new JacksonFactory();
 
     String contentBoundary = "batch_foobarbaz";
+    String contentBoundaryLine = "--" + contentBoundary;
+    String endOfContentBoundaryLine = "--" + contentBoundary + "--";
 
     GenericJson error = new GenericJson()
         .set("error", new GenericJson().set("code", 404));
     error.setFactory(jsonFactory);
 
-    String content = contentBoundary + "\n"
+    String content = contentBoundaryLine + "\n"
         + "Content-Type: application/http\n"
         + "\n"
         + "HTTP/1.1 404 Not Found\n"
-        + "Content-Length: 105\n"
+        + "Content-Length: -1\n"
         + "\n"
-        + error.toString();
+        + error.toString()
+        + "\n"
+        + "\n"
+        + endOfContentBoundaryLine
+        + "\n";
     thrown.expect(FileNotFoundException.class);
     MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse()
         .setContentType("multipart/mixed; boundary=" + contentBoundary)
@@ -426,18 +433,24 @@ public class GcsUtilTest {
     JsonFactory jsonFactory = new JacksonFactory();
 
     String contentBoundary = "batch_foobarbaz";
+    String contentBoundaryLine = "--" + contentBoundary;
+    String endOfContentBoundaryLine = "--" + contentBoundary + "--";
 
     GenericJson error = new GenericJson()
         .set("error", new GenericJson().set("code", 404));
     error.setFactory(jsonFactory);
 
-    String content = contentBoundary + "\n"
+    String content = contentBoundaryLine + "\n"
         + "Content-Type: application/http\n"
         + "\n"
         + "HTTP/1.1 404 Not Found\n"
-        + "Content-Length: 105\n"
+        + "Content-Length: -1\n"
+        + "\n"
+        + error.toString()
+        + "\n"
         + "\n"
-        + error.toString();
+        + endOfContentBoundaryLine
+        + "\n";
     thrown.expect(FileNotFoundException.class);
 
     final LowLevelHttpResponse mockResponse = 
Mockito.mock(LowLevelHttpResponse.class);
@@ -755,7 +768,7 @@ public class GcsUtilTest {
     GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
 
     // Small number of files fits in 1 batch
-    List<StorageObject[]> results = Lists.newArrayList();
+    List<StorageObjectOrIOException[]> results = Lists.newArrayList();
     List<BatchRequest> batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 3), 
results);
     assertThat(batches.size(), equalTo(1));
     assertThat(sumBatchSizes(batches), equalTo(3));

http://git-wip-us.apache.org/repos/asf/beam/blob/85f04085/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
index 1811fec..b2a712d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
@@ -24,6 +24,8 @@ import com.google.api.services.storage.model.StorageObject;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.nio.channels.ReadableByteChannel;
@@ -39,6 +41,7 @@ import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.MatchResult.Status;
 import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -125,15 +128,32 @@ class GcsFileSystem extends FileSystem<GcsResourceId> {
     return MatchResult.create(Status.OK, results.toArray(new 
Metadata[results.size()]));
   }
 
-  private List<String> toFilenames(Collection<GcsResourceId> resources) {
-    return FluentIterable.from(resources)
-        .transform(
-            new Function<GcsResourceId, String>() {
-              @Override
-              public String apply(GcsResourceId resource) {
-                return resource.getGcsPath().toString();
-              }})
-        .toList();
+  /**
+   * Returns {@link MatchResult MatchResults} for the given {@link GcsPath 
GcsPaths}.
+   *
+   *<p>The number of returned {@link MatchResult MatchResults} equals to the 
number of given
+   * {@link GcsPath GcsPaths}. Each {@link MatchResult} contains one {@link 
Metadata}.
+   */
+  @VisibleForTesting
+  List<MatchResult> matchNonGlobs(List<GcsPath> gcsPaths) throws IOException {
+    List<StorageObjectOrIOException> results = 
options.getGcsUtil().getObjects(gcsPaths);
+
+    ImmutableList.Builder<MatchResult> ret = ImmutableList.builder();
+    for (StorageObjectOrIOException result : results) {
+      ret.add(toMatchResult(result));
+    }
+    return ret.build();
+  }
+
+  private MatchResult toMatchResult(StorageObjectOrIOException 
objectOrException) {
+    if (objectOrException.ioException() instanceof FileNotFoundException) {
+      return MatchResult.create(Status.NOT_FOUND, 
objectOrException.ioException());
+    } else if (objectOrException.ioException() != null) {
+      return MatchResult.create(Status.ERROR, objectOrException.ioException());
+    } else {
+      return MatchResult.create(
+          Status.OK, new 
Metadata[]{toMetadata(objectOrException.storageObject())});
+    }
   }
 
   private Metadata toMetadata(StorageObject storageObject) {
@@ -148,4 +168,15 @@ class GcsFileSystem extends FileSystem<GcsResourceId> {
     }
     return ret.build();
   }
+
+  private List<String> toFilenames(Collection<GcsResourceId> resources) {
+    return FluentIterable.from(resources)
+        .transform(
+            new Function<GcsResourceId, String>() {
+              @Override
+              public String apply(GcsResourceId resource) {
+                return resource.getGcsPath().toString();
+              }})
+        .toList();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/85f04085/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java
index 4deb7b3..8b8a788 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.gcp.storage;
 
 import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
@@ -29,15 +30,18 @@ import com.google.api.services.storage.model.StorageObject;
 import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MatchResult.Status;
 import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.junit.Before;
 import org.junit.Rule;
@@ -168,6 +172,37 @@ public class GcsFileSystemTest {
     gcsFileSystem.expand(GcsPath.fromUri("gs://testbucket/test**"));
   }
 
+  @Test
+  public void testMatchNonGlobs() throws Exception {
+    List<StorageObjectOrIOException> items = new ArrayList<>();
+    // Files within the directory
+    items.add(StorageObjectOrIOException.create(
+        createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* 
fileSize */)));
+    items.add(StorageObjectOrIOException.create(new FileNotFoundException()));
+    items.add(StorageObjectOrIOException.create(new IOException()));
+    items.add(StorageObjectOrIOException.create(
+        createStorageObject("gs://testbucket/testdirectory/file4name", 4L /* 
fileSize */)));
+
+    List<GcsPath> gcsPaths = ImmutableList.of(
+        GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
+        GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
+        GcsPath.fromUri("gs://testbucket/testdirectory/file3name"),
+        GcsPath.fromUri("gs://testbucket/testdirectory/file4name"));
+
+    when(mockGcsUtil.getObjects(eq(gcsPaths))).thenReturn(items);
+    List<MatchResult> matchResults = gcsFileSystem.matchNonGlobs(gcsPaths);
+
+    assertEquals(4, matchResults.size());
+    assertThat(
+        ImmutableList.of("gs://testbucket/testdirectory/file1name"),
+        contains(toFilenames(matchResults.get(0)).toArray()));
+    assertEquals(Status.NOT_FOUND, matchResults.get(1).status());
+    assertEquals(Status.ERROR, matchResults.get(2).status());
+    assertThat(
+        ImmutableList.of("gs://testbucket/testdirectory/file4name"),
+        contains(toFilenames(matchResults.get(3)).toArray()));
+  }
+
   private StorageObject createStorageObject(String gcsFilename, long fileSize) 
{
     GcsPath gcsPath = GcsPath.fromUri(gcsFilename);
     return new StorageObject()

Reply via email to