Repository: beam
Updated Branches:
  refs/heads/master a9bcdcedf -> 5fe11a2fc


[BEAM-59] GcsUtil: refactor and expose getObject() and listObjects() APIs.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/51a8c37a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/51a8c37a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/51a8c37a

Branch: refs/heads/master
Commit: 51a8c37aa0cdf002629787c342b3d21a20d4404c
Parents: a9bcdce
Author: Pei He <[email protected]>
Authored: Fri Feb 10 14:40:55 2017 -0800
Committer: Pei He <[email protected]>
Committed: Fri Feb 10 21:35:32 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/util/GcsUtil.java  | 124 +++++++++----------
 .../org/apache/beam/sdk/util/GcsUtilTest.java   |  11 +-
 2 files changed, 67 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/51a8c37a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 5e83584..44c49bc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.util;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.api.client.googleapis.batch.BatchRequest;
 import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
@@ -198,26 +197,14 @@ public class GcsUtil {
     String prefix = null;
     if (!m.matches()) {
       // Not a glob.
-      Storage.Objects.Get getObject = storageClient.objects().get(
-          gcsPattern.getBucket(), gcsPattern.getObject());
       try {
-        // Use a get request to fetch the metadata of the object,
-        // the request has strong global consistency.
-        ResilientOperation.retry(
-            ResilientOperation.getGoogleRequestCallable(getObject),
-            BACKOFF_FACTORY.backoff(),
-            RetryDeterminer.SOCKET_ERRORS,
-            IOException.class);
+        // Use a get request to fetch the metadata of the object, and ignore 
the return value.
+        // The request has strong global consistency.
+        getObject(gcsPattern);
         return ImmutableList.of(gcsPattern);
-      } catch (IOException | InterruptedException e) {
-        if (e instanceof InterruptedException) {
-          Thread.currentThread().interrupt();
-        }
-        if (e instanceof IOException && 
errorExtractor.itemNotFound((IOException) e)) {
-          // If the path was not found, return an empty list.
-          return ImmutableList.of();
-        }
-        throw new IOException("Unable to match files for pattern " + 
gcsPattern, e);
+      } catch (FileNotFoundException e) {
+        // If the path was not found, return an empty list.
+        return ImmutableList.of();
       }
     } else {
       // Part before the first wildcard character.
@@ -228,32 +215,10 @@ public class GcsUtil {
     LOG.debug("matching files in bucket {}, prefix {} against pattern {}", 
gcsPattern.getBucket(),
         prefix, p.toString());
 
-    // List all objects that start with the prefix (including objects in 
sub-directories).
-    Storage.Objects.List listObject = 
storageClient.objects().list(gcsPattern.getBucket());
-    listObject.setMaxResults(MAX_LIST_ITEMS_PER_CALL);
-    listObject.setPrefix(prefix);
-
     String pageToken = null;
     List<GcsPath> results = new LinkedList<>();
     do {
-      if (pageToken != null) {
-        listObject.setPageToken(pageToken);
-      }
-
-      Objects objects;
-      try {
-        objects = ResilientOperation.retry(
-            ResilientOperation.getGoogleRequestCallable(listObject),
-            BACKOFF_FACTORY.backoff(),
-            RetryDeterminer.SOCKET_ERRORS,
-            IOException.class);
-      } catch (Exception e) {
-        throw new IOException("Unable to match files in bucket " + 
gcsPattern.getBucket()
-            +  ", prefix " + prefix + " against pattern " + p.toString(), e);
-      }
-      //Objects objects = listObject.execute();
-      checkNotNull(objects);
-
+      Objects objects = listObjects(gcsPattern.getBucket(), prefix, pageToken);
       if (objects.getItems() == null) {
         break;
       }
@@ -267,7 +232,6 @@ public class GcsUtil {
           results.add(GcsPath.fromObject(o));
         }
       }
-
       pageToken = objects.getNextPageToken();
     } while (pageToken != null);
 
@@ -285,33 +249,64 @@ public class GcsUtil {
    * if the resource does not exist.
    */
   public long fileSize(GcsPath path) throws IOException {
-    return fileSize(
-        path,
-        BACKOFF_FACTORY.backoff(),
-        Sleeper.DEFAULT);
+    return getObject(path).getSize().longValue();
   }
 
   /**
-   * Returns the file size from GCS or throws {@link FileNotFoundException}
-   * if the resource does not exist.
+   * Returns the {@link StorageObject} for the given {@link GcsPath}.
    */
+  public StorageObject getObject(GcsPath gcsPath) throws IOException {
+    return getObject(gcsPath, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
+  }
+
   @VisibleForTesting
-  long fileSize(GcsPath path, BackOff backoff, Sleeper sleeper) throws 
IOException {
+  StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) 
throws IOException {
     Storage.Objects.Get getObject =
-            storageClient.objects().get(path.getBucket(), path.getObject());
+        storageClient.objects().get(gcsPath.getBucket(), gcsPath.getObject());
     try {
-      StorageObject object = ResilientOperation.retry(
+      return ResilientOperation.retry(
           ResilientOperation.getGoogleRequestCallable(getObject),
           backoff,
           RetryDeterminer.SOCKET_ERRORS,
           IOException.class,
           sleeper);
-      return object.getSize().longValue();
-    } catch (Exception e) {
+    } catch (IOException | InterruptedException e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
       if (e instanceof IOException && 
errorExtractor.itemNotFound((IOException) e)) {
-        throw new FileNotFoundException(path.toString());
+        throw new FileNotFoundException(gcsPath.toString());
       }
-      throw new IOException("Unable to get file size", e);
+      throw new IOException(
+          String.format("Unable to get the file object for path %s.", gcsPath),
+          e);
+    }
+  }
+
+  /**
+   * Lists {@link Objects} given the {@code bucket}, {@code prefix}, {@code 
pageToken}.
+   */
+  public Objects listObjects(String bucket, String prefix, @Nullable String 
pageToken)
+      throws IOException {
+    // List all objects that start with the prefix (including objects in 
sub-directories).
+    Storage.Objects.List listObject = storageClient.objects().list(bucket);
+    listObject.setMaxResults(MAX_LIST_ITEMS_PER_CALL);
+    listObject.setPrefix(prefix);
+
+    if (pageToken != null) {
+      listObject.setPageToken(pageToken);
+    }
+
+    try {
+      return ResilientOperation.retry(
+          ResilientOperation.getGoogleRequestCallable(listObject),
+          BACKOFF_FACTORY.backoff(),
+          RetryDeterminer.SOCKET_ERRORS,
+          IOException.class);
+    } catch (Exception e) {
+      throw new IOException(
+          String.format("Unable to match files in bucket %s, prefix %s.", 
bucket, prefix),
+          e);
     }
   }
 
@@ -321,12 +316,12 @@ public class GcsUtil {
    */
   @VisibleForTesting
   List<Long> fileSizes(Collection<GcsPath> paths) throws IOException {
-    List<long[]> results = Lists.newArrayList();
+    List<StorageObject[]> results = Lists.newArrayList();
     executeBatches(makeGetBatches(paths, results));
 
     ImmutableList.Builder<Long> ret = ImmutableList.builder();
-    for (long[] result : results) {
-      ret.add(result[0]);
+    for (StorageObject[] result : results) {
+      ret.add(result[0].getSize().longValue());
     }
     return ret.build();
   }
@@ -542,7 +537,7 @@ public class GcsUtil {
   @VisibleForTesting
   List<BatchRequest> makeGetBatches(
       Collection<GcsPath> paths,
-      List<long[]> results) throws IOException {
+      List<StorageObject[]> results) throws IOException {
     List<BatchRequest> batches = new LinkedList<>();
     for (List<GcsPath> filesToGet :
         Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) {
@@ -601,15 +596,16 @@ public class GcsUtil {
     executeBatches(makeRemoveBatches(filenames));
   }
 
-  private long[] enqueueGetFileSize(final GcsPath path, BatchRequest batch) 
throws IOException {
-    final long[] fileSize = new long[1];
+  private StorageObject[] enqueueGetFileSize(final GcsPath path, BatchRequest 
batch)
+      throws IOException {
+    final StorageObject[] storageObject = new StorageObject[1];
 
     Storage.Objects.Get getRequest = storageClient.objects()
         .get(path.getBucket(), path.getObject());
     getRequest.queue(batch, new JsonBatchCallback<StorageObject>() {
       @Override
       public void onSuccess(StorageObject response, HttpHeaders httpHeaders) 
throws IOException {
-        fileSize[0] = response.getSize().longValue();
+        storageObject[0] = response;
       }
 
       @Override
@@ -621,7 +617,7 @@ public class GcsUtil {
         }
       }
     });
-    return fileSize;
+    return storageObject;
   }
 
   private void enqueueCopy(final GcsPath from, final GcsPath to, BatchRequest 
batch)

http://git-wip-us.apache.org/repos/asf/beam/blob/51a8c37a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
index d592761..920e593 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
@@ -320,7 +320,7 @@ public class GcsUtilTest {
     when(mockStorageGet.execute()).thenThrow(expectedException);
 
     thrown.expect(IOException.class);
-    thrown.expectMessage("Unable to match files for pattern");
+    thrown.expectMessage("Unable to get the file object for path");
     gcsUtil.expand(pattern);
   }
 
@@ -381,8 +381,11 @@ public class GcsUtilTest {
             .thenThrow(new SocketTimeoutException("SocketException"))
             .thenReturn(new StorageObject().setSize(BigInteger.valueOf(1000)));
 
-    assertEquals(1000, gcsUtil.fileSize(GcsPath.fromComponents("testbucket", 
"testobject"),
-            mockBackOff, new FastNanoClockAndSleeper()));
+    assertEquals(1000,
+        gcsUtil.getObject(
+            GcsPath.fromComponents("testbucket", "testobject"),
+            mockBackOff,
+            new FastNanoClockAndSleeper()).getSize().longValue());
     assertEquals(BackOff.STOP, mockBackOff.nextBackOffMillis());
   }
 
@@ -752,7 +755,7 @@ public class GcsUtilTest {
     GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
 
     // Small number of files fits in 1 batch
-    List<long[]> results = Lists.newArrayList();
+    List<StorageObject[]> results = Lists.newArrayList();
     List<BatchRequest> batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 3), 
results);
     assertThat(batches.size(), equalTo(1));
     assertThat(sumBatchSizes(batches), equalTo(3));

Reply via email to