Repository: beam Updated Branches: refs/heads/master 185dc4798 -> 47aaf1125
[BEAM-59] Remove IOChannelFactory usage from NumberedShardedFile/ExplicitShardedFile Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/34c3ee7b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/34c3ee7b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/34c3ee7b Branch: refs/heads/master Commit: 34c3ee7bfe0e0b6b95d0e9ec421a061116fc19b0 Parents: 185dc47 Author: Vikas Kedigehalli <[email protected]> Authored: Thu Apr 27 14:58:24 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Thu Apr 27 23:02:47 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/util/ExplicitShardedFile.java | 23 ++++++----- .../beam/sdk/util/NumberedShardedFile.java | 31 +++++++++------ .../beam/sdk/util/NumberedShardedFileTest.java | 41 ++++++++++---------- 3 files changed, 54 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/34c3ee7b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java index 5f5bf1f..0f184de 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java @@ -23,7 +23,6 @@ import com.google.api.client.util.BackOffUtils; import com.google.api.client.util.Sleeper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.io.CharStreams; import java.io.IOException; @@ -32,7 +31,10 @@ import java.nio.channels.Channels; import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Collections; +import java.util.LinkedList; import java.util.List; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,11 +51,14 @@ public class ExplicitShardedFile implements ShardedFile { .withInitialBackoff(DEFAULT_SLEEP_DURATION) .withMaxRetries(MAX_READ_RETRIES); - private final Collection<String> files; + private final List<Metadata> files; /** Constructs an {@link ExplicitShardedFile} for the given files. */ - public ExplicitShardedFile(Collection<String> files) { - this.files = files; + public ExplicitShardedFile(Collection<String> files) throws IOException { + this.files = new LinkedList<>(); + for (String file: files) { + this.files.add(FileSystems.matchSingleFileSpec(file)); + } } @Override @@ -63,13 +68,12 @@ public class ExplicitShardedFile implements ShardedFile { return Collections.emptyList(); } - IOChannelFactory factory = IOChannelUtils.getFactory(Iterables.get(files, 0)); IOException lastException = null; do { try { // Read data from file paths - return readLines(files, factory); + return readLines(files); } catch (IOException e) { // Ignore and retry lastException = e; @@ -104,11 +108,12 @@ public class ExplicitShardedFile implements ShardedFile { * than can be reasonably processed serially, in-memory, by a single thread. */ @VisibleForTesting - List<String> readLines(Collection<String> files, IOChannelFactory factory) throws IOException { + List<String> readLines(Collection<Metadata> files) throws IOException { List<String> allLines = Lists.newArrayList(); int i = 1; - for (String file : files) { - try (Reader reader = Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) { + for (Metadata file : files) { + try (Reader reader = Channels.newReader(FileSystems.open(file.resourceId()), + StandardCharsets.UTF_8.name())) { List<String> lines = CharStreams.readLines(reader); allLines.addAll(lines); LOG.debug("[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file); http://git-wip-us.apache.org/repos/asf/beam/blob/34c3ee7b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java index 740aa46..fa22586 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java @@ -26,6 +26,8 @@ import com.google.api.client.util.BackOffUtils; import com.google.api.client.util.Sleeper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.hash.HashCode; import com.google.common.hash.Hashing; @@ -34,14 +36,15 @@ import java.io.IOException; import java.io.Reader; import java.nio.channels.Channels; import java.nio.charset.StandardCharsets; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nonnull; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,13 +115,14 @@ public class NumberedShardedFile implements ShardedFile { @Override public List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff) throws IOException, InterruptedException { - IOChannelFactory factory = IOChannelUtils.getFactory(filePattern); IOException lastException = null; do { try { // Match inputPath which may contains glob - Collection<String> files = factory.match(filePattern); + Collection<Metadata> files = Arrays.asList(Iterables.getOnlyElement( + FileSystems.match(ImmutableList.of(filePattern))).metadata()); + LOG.debug("Found {} file(s) by matching the path: {}", files.size(), filePattern); if (files.isEmpty() || !checkTotalNumOfFiles(files)) { @@ -126,7 +130,7 @@ public class NumberedShardedFile implements ShardedFile { } // Read data from file paths - return readLines(files, factory); + return readLines(files); } catch (IOException e) { // Ignore and retry lastException = e; @@ -162,12 +166,13 @@ public class NumberedShardedFile implements ShardedFile { * than can be reasonably processed serially, in-memory, by a single thread. */ @VisibleForTesting - List<String> readLines(Collection<String> files, IOChannelFactory factory) throws IOException { + List<String> readLines(Collection<Metadata> files) throws IOException { List<String> allLines = Lists.newArrayList(); int i = 1; - for (String file : files) { + for (Metadata file : files) { try (Reader reader = - Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) { + Channels.newReader(FileSystems.open(file.resourceId()), + StandardCharsets.UTF_8.name())) { List<String> lines = CharStreams.readLines(reader); allLines.addAll(lines); LOG.debug( @@ -188,14 +193,16 @@ public class NumberedShardedFile implements ShardedFile { * of given files equals the number that is parsed from shard name. */ @VisibleForTesting - boolean checkTotalNumOfFiles(Collection<String> files) { - for (String filePath : files) { - Path fileName = Paths.get(filePath).getFileName(); + boolean checkTotalNumOfFiles(Collection<Metadata> files) { + for (Metadata fileMedadata : files) { + String fileName = fileMedadata.resourceId().toString().substring( + fileMedadata.resourceId().getCurrentDirectory().toString().length()); + if (fileName == null) { // this path has zero elements continue; } - Matcher matcher = shardTemplate.matcher(fileName.toString()); + Matcher matcher = shardTemplate.matcher(fileName); if (!matcher.matches()) { // shard name doesn't match the pattern, check with the next shard continue; http://git-wip-us.apache.org/repos/asf/beam/blob/34c3ee7b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java index 475e459..43a9166 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java @@ -21,7 +21,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; -import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyCollection; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; @@ -33,7 +32,10 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.regex.Pattern; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.io.LocalResources; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -53,6 +55,13 @@ public class NumberedShardedFileTest { @Mock private PipelineResult pResult = Mockito.mock(PipelineResult.class); private final BackOff backOff = NumberedShardedFile.BACK_OFF_FACTORY.backoff(); + private String filePattern; + + @Before + public void setup() throws IOException { + filePattern = LocalResources.fromFile(tmpFolder.getRoot(), true).resolve( + "*", StandardResolveOptions.RESOLVE_FILE).toString(); + } @Test public void testPreconditionFilePathIsNull() { @@ -82,8 +91,9 @@ public class NumberedShardedFileTest { Files.write(contents2, tmpFile2, StandardCharsets.UTF_8); Files.write(contents3, tmpFile3, StandardCharsets.UTF_8); - NumberedShardedFile shardedFile = - new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "result-*")); + filePattern = LocalResources.fromFile(tmpFolder.getRoot(), true).resolve( + "result-*", StandardResolveOptions.RESOLVE_FILE).toString(); + NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern); assertThat(shardedFile.readFilesWithRetries(), containsInAnyOrder(contents1, contents2)); } @@ -92,8 +102,7 @@ public class NumberedShardedFileTest { public void testReadEmpty() throws Exception { File emptyFile = tmpFolder.newFile("result-000-of-001"); Files.write("", emptyFile, StandardCharsets.UTF_8); - NumberedShardedFile shardedFile = - new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")); + NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern); assertThat(shardedFile.readFilesWithRetries(), empty()); } @@ -110,9 +119,7 @@ public class NumberedShardedFileTest { Pattern customizedTemplate = Pattern.compile("(?x) result (?<shardnum>\\d+) - total (?<numshards>\\d+)"); - NumberedShardedFile shardedFile = - new NumberedShardedFile( - IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"), customizedTemplate); + NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern, customizedTemplate); assertThat(shardedFile.readFilesWithRetries(), containsInAnyOrder(contents1, contents2)); } @@ -122,10 +129,8 @@ public class NumberedShardedFileTest { File tmpFile = tmpFolder.newFile(); Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); - NumberedShardedFile shardedFile = - new NumberedShardedFile( - IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"), - Pattern.compile("incorrect-template")); + NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern, + Pattern.compile("incorrect-template")); thrown.expect(IOException.class); thrown.expectMessage( @@ -138,12 +143,10 @@ public class NumberedShardedFileTest { public void testReadWithRetriesFailsSinceFilesystemError() throws Exception { File tmpFile = tmpFolder.newFile(); Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); - - NumberedShardedFile shardedFile = - spy(new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"))); + NumberedShardedFile shardedFile = spy(new NumberedShardedFile(filePattern)); doThrow(IOException.class) .when(shardedFile) - .readLines(anyCollection(), any(IOChannelFactory.class)); + .readLines(anyCollection()); thrown.expect(IOException.class); thrown.expectMessage( @@ -154,8 +157,7 @@ public class NumberedShardedFileTest { @Test public void testReadWithRetriesFailsWhenOutputDirEmpty() throws Exception { - NumberedShardedFile shardedFile = - new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")); + NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern); thrown.expect(IOException.class); thrown.expectMessage( @@ -169,8 +171,7 @@ public class NumberedShardedFileTest { tmpFolder.newFile("result-000-of-001"); tmpFolder.newFile("tmp-result-000-of-001"); - NumberedShardedFile shardedFile = - new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")); + NumberedShardedFile shardedFile = new NumberedShardedFile(filePattern); thrown.expect(IOException.class); thrown.expectMessage(
