Repository: beam
Updated Branches:
  refs/heads/master 185dc4798 -> 47aaf1125


[BEAM-59] Remove IOChannelFactory usage from 
NumberedShardedFile/ExplicitShardedFile


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/34c3ee7b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/34c3ee7b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/34c3ee7b

Branch: refs/heads/master
Commit: 34c3ee7bfe0e0b6b95d0e9ec421a061116fc19b0
Parents: 185dc47
Author: Vikas Kedigehalli <[email protected]>
Authored: Thu Apr 27 14:58:24 2017 -0700
Committer: Dan Halperin <[email protected]>
Committed: Thu Apr 27 23:02:47 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/util/ExplicitShardedFile.java      | 23 ++++++-----
 .../beam/sdk/util/NumberedShardedFile.java      | 31 +++++++++------
 .../beam/sdk/util/NumberedShardedFileTest.java  | 41 ++++++++++----------
 3 files changed, 54 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/34c3ee7b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java
index 5f5bf1f..0f184de 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java
@@ -23,7 +23,6 @@ import com.google.api.client.util.BackOffUtils;
 import com.google.api.client.util.Sleeper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.io.CharStreams;
 import java.io.IOException;
@@ -32,7 +31,10 @@ import java.nio.channels.Channels;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,11 +51,14 @@ public class ExplicitShardedFile implements ShardedFile {
           .withInitialBackoff(DEFAULT_SLEEP_DURATION)
           .withMaxRetries(MAX_READ_RETRIES);
 
-  private final Collection<String> files;
+  private final List<Metadata> files;
 
   /** Constructs an {@link ExplicitShardedFile} for the given files. */
-  public ExplicitShardedFile(Collection<String> files) {
-    this.files = files;
+  public ExplicitShardedFile(Collection<String> files) throws IOException {
+    this.files = new LinkedList<>();
+    for (String file: files) {
+      this.files.add(FileSystems.matchSingleFileSpec(file));
+    }
   }
 
   @Override
@@ -63,13 +68,12 @@ public class ExplicitShardedFile implements ShardedFile {
       return Collections.emptyList();
     }
 
-    IOChannelFactory factory = IOChannelUtils.getFactory(Iterables.get(files, 
0));
     IOException lastException = null;
 
     do {
       try {
         // Read data from file paths
-        return readLines(files, factory);
+        return readLines(files);
       } catch (IOException e) {
         // Ignore and retry
         lastException = e;
@@ -104,11 +108,12 @@ public class ExplicitShardedFile implements ShardedFile {
    * than can be reasonably processed serially, in-memory, by a single thread.
    */
   @VisibleForTesting
-  List<String> readLines(Collection<String> files, IOChannelFactory factory) 
throws IOException {
+  List<String> readLines(Collection<Metadata> files) throws IOException {
     List<String> allLines = Lists.newArrayList();
     int i = 1;
-    for (String file : files) {
-      try (Reader reader = Channels.newReader(factory.open(file), 
StandardCharsets.UTF_8.name())) {
+    for (Metadata file : files) {
+      try (Reader reader = 
Channels.newReader(FileSystems.open(file.resourceId()),
+          StandardCharsets.UTF_8.name())) {
         List<String> lines = CharStreams.readLines(reader);
         allLines.addAll(lines);
         LOG.debug("[{} of {}] Read {} lines from file: {}", i, files.size(), 
lines.size(), file);

http://git-wip-us.apache.org/repos/asf/beam/blob/34c3ee7b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
index 740aa46..fa22586 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
@@ -26,6 +26,8 @@ import com.google.api.client.util.BackOffUtils;
 import com.google.api.client.util.Sleeper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.hash.HashCode;
 import com.google.common.hash.Hashing;
@@ -34,14 +36,15 @@ import java.io.IOException;
 import java.io.Reader;
 import java.nio.channels.Channels;
 import java.nio.charset.StandardCharsets;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.annotation.Nonnull;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -112,13 +115,14 @@ public class NumberedShardedFile implements ShardedFile {
   @Override
   public List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff)
       throws IOException, InterruptedException {
-    IOChannelFactory factory = IOChannelUtils.getFactory(filePattern);
     IOException lastException = null;
 
     do {
       try {
         // Match inputPath which may contains glob
-        Collection<String> files = factory.match(filePattern);
+        Collection<Metadata> files = Arrays.asList(Iterables.getOnlyElement(
+            FileSystems.match(ImmutableList.of(filePattern))).metadata());
+
         LOG.debug("Found {} file(s) by matching the path: {}", files.size(), 
filePattern);
 
         if (files.isEmpty() || !checkTotalNumOfFiles(files)) {
@@ -126,7 +130,7 @@ public class NumberedShardedFile implements ShardedFile {
         }
 
         // Read data from file paths
-        return readLines(files, factory);
+        return readLines(files);
       } catch (IOException e) {
         // Ignore and retry
         lastException = e;
@@ -162,12 +166,13 @@ public class NumberedShardedFile implements ShardedFile {
    * than can be reasonably processed serially, in-memory, by a single thread.
    */
   @VisibleForTesting
-  List<String> readLines(Collection<String> files, IOChannelFactory factory) 
throws IOException {
+  List<String> readLines(Collection<Metadata> files) throws IOException {
     List<String> allLines = Lists.newArrayList();
     int i = 1;
-    for (String file : files) {
+    for (Metadata file : files) {
       try (Reader reader =
-               Channels.newReader(factory.open(file), 
StandardCharsets.UTF_8.name())) {
+               Channels.newReader(FileSystems.open(file.resourceId()),
+                   StandardCharsets.UTF_8.name())) {
         List<String> lines = CharStreams.readLines(reader);
         allLines.addAll(lines);
         LOG.debug(
@@ -188,14 +193,16 @@ public class NumberedShardedFile implements ShardedFile {
    * of given files equals the number that is parsed from shard name.
    */
   @VisibleForTesting
-  boolean checkTotalNumOfFiles(Collection<String> files) {
-    for (String filePath : files) {
-      Path fileName = Paths.get(filePath).getFileName();
+  boolean checkTotalNumOfFiles(Collection<Metadata> files) {
+    for (Metadata fileMedadata : files) {
+      String fileName = fileMedadata.resourceId().toString().substring(
+          fileMedadata.resourceId().getCurrentDirectory().toString().length());
+
       if (fileName == null) {
         // this path has zero elements
         continue;
       }
-      Matcher matcher = shardTemplate.matcher(fileName.toString());
+      Matcher matcher = shardTemplate.matcher(fileName);
       if (!matcher.matches()) {
         // shard name doesn't match the pattern, check with the next shard
         continue;

http://git-wip-us.apache.org/repos/asf/beam/blob/34c3ee7b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java
index 475e459..43a9166 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java
@@ -21,7 +21,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
-import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyCollection;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
@@ -33,7 +32,10 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.regex.Pattern;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.LocalResources;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -53,6 +55,13 @@ public class NumberedShardedFileTest {
   @Mock private PipelineResult pResult = Mockito.mock(PipelineResult.class);
 
   private final BackOff backOff = 
NumberedShardedFile.BACK_OFF_FACTORY.backoff();
+  private String filePattern;
+
+  @Before
+  public void setup() throws IOException {
+    filePattern = LocalResources.fromFile(tmpFolder.getRoot(), true).resolve(
+            "*", StandardResolveOptions.RESOLVE_FILE).toString();
+  }
 
   @Test
   public void testPreconditionFilePathIsNull() {
@@ -82,8 +91,9 @@ public class NumberedShardedFileTest {
     Files.write(contents2, tmpFile2, StandardCharsets.UTF_8);
     Files.write(contents3, tmpFile3, StandardCharsets.UTF_8);
 
-    NumberedShardedFile shardedFile =
-        new 
NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), 
"result-*"));
+    filePattern = LocalResources.fromFile(tmpFolder.getRoot(), true).resolve(
+        "result-*", StandardResolveOptions.RESOLVE_FILE).toString();
+    NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern);
 
     assertThat(shardedFile.readFilesWithRetries(), 
containsInAnyOrder(contents1, contents2));
   }
@@ -92,8 +102,7 @@ public class NumberedShardedFileTest {
   public void testReadEmpty() throws Exception {
     File emptyFile = tmpFolder.newFile("result-000-of-001");
     Files.write("", emptyFile, StandardCharsets.UTF_8);
-    NumberedShardedFile shardedFile =
-        new 
NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"));
+    NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern);
 
     assertThat(shardedFile.readFilesWithRetries(), empty());
   }
@@ -110,9 +119,7 @@ public class NumberedShardedFileTest {
 
     Pattern customizedTemplate =
         Pattern.compile("(?x) result (?<shardnum>\\d+) - total 
(?<numshards>\\d+)");
-    NumberedShardedFile shardedFile =
-        new NumberedShardedFile(
-            IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"), 
customizedTemplate);
+    NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern, 
customizedTemplate);
 
     assertThat(shardedFile.readFilesWithRetries(), 
containsInAnyOrder(contents1, contents2));
   }
@@ -122,10 +129,8 @@ public class NumberedShardedFileTest {
     File tmpFile = tmpFolder.newFile();
     Files.write("Test for file checksum verifier.", tmpFile, 
StandardCharsets.UTF_8);
 
-    NumberedShardedFile shardedFile =
-        new NumberedShardedFile(
-            IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"),
-            Pattern.compile("incorrect-template"));
+    NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern,
+        Pattern.compile("incorrect-template"));
 
     thrown.expect(IOException.class);
     thrown.expectMessage(
@@ -138,12 +143,10 @@ public class NumberedShardedFileTest {
   public void testReadWithRetriesFailsSinceFilesystemError() throws Exception {
     File tmpFile = tmpFolder.newFile();
     Files.write("Test for file checksum verifier.", tmpFile, 
StandardCharsets.UTF_8);
-
-    NumberedShardedFile shardedFile =
-        spy(new 
NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), 
"*")));
+    NumberedShardedFile shardedFile = spy(new 
NumberedShardedFile(filePattern));
     doThrow(IOException.class)
         .when(shardedFile)
-        .readLines(anyCollection(), any(IOChannelFactory.class));
+        .readLines(anyCollection());
 
     thrown.expect(IOException.class);
     thrown.expectMessage(
@@ -154,8 +157,7 @@ public class NumberedShardedFileTest {
 
   @Test
   public void testReadWithRetriesFailsWhenOutputDirEmpty() throws Exception {
-    NumberedShardedFile shardedFile =
-        new 
NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"));
+    NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern);
 
     thrown.expect(IOException.class);
     thrown.expectMessage(
@@ -169,8 +171,7 @@ public class NumberedShardedFileTest {
     tmpFolder.newFile("result-000-of-001");
     tmpFolder.newFile("tmp-result-000-of-001");
 
-    NumberedShardedFile shardedFile =
-        new 
NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"));
+    NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern);
 
     thrown.expect(IOException.class);
     thrown.expectMessage(

Reply via email to