Repository: incubator-beam Updated Branches: refs/heads/master 59f1fb26a -> d9657ffc3
Factor out ShardedFile from FileChecksumMatcher Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/db41940f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/db41940f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/db41940f Branch: refs/heads/master Commit: db41940f977bf3315ea7e5460d188d8f9b4fa119 Parents: 9678b1c Author: Kenneth Knowles <[email protected]> Authored: Mon Dec 5 14:32:12 2016 -0800 Committer: Kenneth Knowles <[email protected]> Committed: Mon Dec 12 15:12:05 2016 -0800 ---------------------------------------------------------------------- .../beam/sdk/testing/FileChecksumMatcher.java | 114 ++-------- .../beam/sdk/util/ExplicitShardedFile.java | 120 ++++++++++ .../beam/sdk/util/NumberedShardedFile.java | 220 +++++++++++++++++++ .../org/apache/beam/sdk/util/ShardedFile.java | 42 ++++ .../sdk/testing/FileChecksumMatcherTest.java | 77 ------- .../beam/sdk/util/NumberedShardedFileTest.java | 181 +++++++++++++++ 6 files changed, 581 insertions(+), 173 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/db41940f/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java index 4b249fe..82a6b71 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java @@ -21,31 +21,19 @@ package org.apache.beam.sdk.testing; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.BackOffUtils; import com.google.api.client.util.Sleeper; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; -import com.google.common.collect.Lists; import com.google.common.hash.HashCode; import com.google.common.hash.Hashing; -import com.google.common.io.CharStreams; -import java.io.IOException; -import java.io.Reader; -import java.nio.channels.Channels; import java.nio.charset.StandardCharsets; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.ArrayList; -import java.util.Collection; import java.util.List; -import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nonnull; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.util.FluentBackoff; -import org.apache.beam.sdk.util.IOChannelFactory; -import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.NumberedShardedFile; +import org.apache.beam.sdk.util.ShardedFile; import org.hamcrest.Description; import org.hamcrest.TypeSafeMatcher; import org.joda.time.Duration; @@ -83,9 +71,8 @@ public class FileChecksumMatcher extends TypeSafeMatcher<PipelineResult> Pattern.compile("(?x) \\S* (?<shardnum> \\d+) -of- (?<numshards> \\d+)"); private final String expectedChecksum; - private final String filePath; - private final Pattern shardTemplate; private String actualChecksum; + private final ShardedFile shardedFile; /** * Constructor that uses default shard template. @@ -98,7 +85,7 @@ public class FileChecksumMatcher extends TypeSafeMatcher<PipelineResult> } /** - * Constructor. + * Constructor using a custom shard template. * * @param checksum expected checksum string used to verify file content. * @param filePath path of files that's to be verified. @@ -121,8 +108,17 @@ public class FileChecksumMatcher extends TypeSafeMatcher<PipelineResult> DEFAULT_SHARD_TEMPLATE); this.expectedChecksum = checksum; - this.filePath = filePath; - this.shardTemplate = shardTemplate; + this.shardedFile = new NumberedShardedFile(filePath, shardTemplate); + } + + /** + * Constructor using an entirely custom {@link ShardedFile} implementation. + * + * <p>For internal use only. + */ + public FileChecksumMatcher(String expectedChecksum, ShardedFile shardedFile) { + this.expectedChecksum = expectedChecksum; + this.shardedFile = shardedFile; } @Override @@ -130,9 +126,10 @@ public class FileChecksumMatcher extends TypeSafeMatcher<PipelineResult> // Load output data List<String> outputs; try { - outputs = readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()); + outputs = shardedFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()); } catch (Exception e) { - throw new RuntimeException(String.format("Failed to read from: %s", filePath), e); + throw new RuntimeException( + String.format("Failed to read from: %s", shardedFile), e); } // Verify outputs. Checksum is computed using SHA-1 algorithm @@ -142,81 +139,6 @@ public class FileChecksumMatcher extends TypeSafeMatcher<PipelineResult> return actualChecksum.equals(expectedChecksum); } - @VisibleForTesting - List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff) - throws IOException, InterruptedException { - IOChannelFactory factory = IOChannelUtils.getFactory(filePath); - IOException lastException = null; - - do { - try { - // Match inputPath which may contains glob - Collection<String> files = factory.match(filePath); - LOG.debug("Found {} file(s) by matching the path: {}", files.size(), filePath); - - if (files.isEmpty() || !checkTotalNumOfFiles(files)) { - continue; - } - - // Read data from file paths - return readLines(files, factory); - } catch (IOException e) { - // Ignore and retry - lastException = e; - LOG.warn("Error in file reading. Ignore and retry."); - } - } while(BackOffUtils.next(sleeper, backOff)); - // Failed after max retries - throw new IOException( - String.format("Unable to read file(s) after retrying %d times", MAX_READ_RETRIES), - lastException); - } - - @VisibleForTesting - List<String> readLines(Collection<String> files, IOChannelFactory factory) throws IOException { - List<String> allLines = Lists.newArrayList(); - int i = 1; - for (String file : files) { - try (Reader reader = - Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) { - List<String> lines = CharStreams.readLines(reader); - allLines.addAll(lines); - LOG.debug( - "[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file); - } - i++; - } - return allLines; - } - - /** - * Check if total number of files is correct by comparing with the number that - * is parsed from shard name using a name template. If no template is specified, - * "SSSS-of-NNNN" will be used as default, and "NNNN" will be the expected total - * number of files. - * - * @return {@code true} if at least one shard name matches template and total number - * of given files equals the number that is parsed from shard name. - */ - @VisibleForTesting - boolean checkTotalNumOfFiles(Collection<String> files) { - for (String filePath : files) { - Path fileName = Paths.get(filePath).getFileName(); - if (fileName == null) { - // this path has zero elements - continue; - } - Matcher matcher = shardTemplate.matcher(fileName.toString()); - if (!matcher.matches()) { - // shard name doesn't match the pattern, check with the next shard - continue; - } - // once match, extract total number of shards and compare to file list - return files.size() == Integer.parseInt(matcher.group("numshards")); - } - return false; - } - private String computeHash(@Nonnull List<String> strs) { if (strs.isEmpty()) { return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/db41940f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java new file mode 100644 index 0000000..5f5bf1f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.Sleeper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.io.CharStreams; +import java.io.IOException; +import java.io.Reader; +import java.nio.channels.Channels; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** A sharded file where the file names are simply provided. */ +public class ExplicitShardedFile implements ShardedFile { + + private static final Logger LOG = LoggerFactory.getLogger(ExplicitShardedFile.class); + + private static final int MAX_READ_RETRIES = 4; + private static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L); + static final FluentBackoff BACK_OFF_FACTORY = + FluentBackoff.DEFAULT + .withInitialBackoff(DEFAULT_SLEEP_DURATION) + .withMaxRetries(MAX_READ_RETRIES); + + private final Collection<String> files; + + /** Constructs an {@link ExplicitShardedFile} for the given files. */ + public ExplicitShardedFile(Collection<String> files) { + this.files = files; + } + + @Override + public List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff) + throws IOException, InterruptedException { + if (files.isEmpty()) { + return Collections.emptyList(); + } + + IOChannelFactory factory = IOChannelUtils.getFactory(Iterables.get(files, 0)); + IOException lastException = null; + + do { + try { + // Read data from file paths + return readLines(files, factory); + } catch (IOException e) { + // Ignore and retry + lastException = e; + LOG.warn("Error in file reading. Ignore and retry."); + } + } while (BackOffUtils.next(sleeper, backOff)); + // Failed after max retries + throw new IOException( + String.format("Unable to read file(s) after retrying %d times", MAX_READ_RETRIES), + lastException); + } + + /** + * Discovers all shards of this file using the provided {@link Sleeper} and {@link BackOff}. + * + * <p>Because of eventual consistency, reads may discover no files or fewer files than the shard + * template implies. In this case, the read is considered to have failed. + */ + public List<String> readFilesWithRetries() throws IOException, InterruptedException { + return readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()); + } + + @Override + public String toString() { + return String.format("explicit sharded file (%s)", Joiner.on(", ").join(files)); + } + + /** + * Reads all the lines of all the files. + * + * <p>Not suitable for use except in testing of small data, since the data size may be far more + * than can be reasonably processed serially, in-memory, by a single thread. + */ + @VisibleForTesting + List<String> readLines(Collection<String> files, IOChannelFactory factory) throws IOException { + List<String> allLines = Lists.newArrayList(); + int i = 1; + for (String file : files) { + try (Reader reader = Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) { + List<String> lines = CharStreams.readLines(reader); + allLines.addAll(lines); + LOG.debug("[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file); + } + i++; + } + return allLines; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/db41940f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java new file mode 100644 index 0000000..f9f2d6d --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.Sleeper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.hash.HashCode; +import com.google.common.hash.Hashing; +import com.google.common.io.CharStreams; +import java.io.IOException; +import java.io.Reader; +import java.nio.channels.Channels; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.annotation.Nonnull; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility methods for working with sharded files. For internal use only; many parameters + * are just hardcoded to allow existing uses to work OK. + */ +public class NumberedShardedFile implements ShardedFile { + + private static final Logger LOG = LoggerFactory.getLogger(NumberedShardedFile.class); + + static final int MAX_READ_RETRIES = 4; + static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L); + static final FluentBackoff BACK_OFF_FACTORY = + FluentBackoff.DEFAULT + .withInitialBackoff(DEFAULT_SLEEP_DURATION) + .withMaxRetries(MAX_READ_RETRIES); + + private static final Pattern DEFAULT_SHARD_TEMPLATE = + Pattern.compile("(?x) \\S* (?<shardnum> \\d+) -of- (?<numshards> \\d+)"); + + private final String filePath; + private final Pattern shardTemplate; + + /** + * Constructor that uses default shard template. + * + * @param filePath path or glob of files to include + */ + public NumberedShardedFile(String filePath) { + this(filePath, DEFAULT_SHARD_TEMPLATE); + } + + /** + * Constructor. + * + * @param filePath path or glob of files to include + * @param shardTemplate template of shard name to parse out the total number of shards + * which is used in I/O retry to avoid inconsistency of filesystem. + * Customized template should assign name "numshards" to capturing + * group - total shard number. + */ + public NumberedShardedFile(String filePath, Pattern shardTemplate) { + checkArgument( + !Strings.isNullOrEmpty(filePath), + "Expected valid file path, but received %s", filePath); + checkNotNull( + shardTemplate, + "Expected non-null shard pattern. " + + "Please call the other constructor to use default pattern: %s", + DEFAULT_SHARD_TEMPLATE); + + this.filePath = filePath; + this.shardTemplate = shardTemplate; + } + + public String getFilePath() { + return filePath; + } + + /** + * Discovers all shards of this file using the provided {@link Sleeper} and {@link BackOff}. + * + * <p>Because of eventual consistency, reads may discover no files or fewer files than + * the shard template implies. In this case, the read is considered to have failed. + */ + @Override + public List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff) + throws IOException, InterruptedException { + IOChannelFactory factory = IOChannelUtils.getFactory(filePath); + IOException lastException = null; + + do { + try { + // Match inputPath which may contains glob + Collection<String> files = factory.match(filePath); + LOG.debug("Found {} file(s) by matching the path: {}", files.size(), filePath); + + if (files.isEmpty() || !checkTotalNumOfFiles(files)) { + continue; + } + + // Read data from file paths + return readLines(files, factory); + } catch (IOException e) { + // Ignore and retry + lastException = e; + LOG.warn("Error in file reading. Ignore and retry."); + } + } while(BackOffUtils.next(sleeper, backOff)); + // Failed after max retries + throw new IOException( + String.format("Unable to read file(s) after retrying %d times", MAX_READ_RETRIES), + lastException); + } + + /** + * Discovers all shards of this file using the provided {@link Sleeper} and {@link BackOff}. + * + * <p>Because of eventual consistency, reads may discover no files or fewer files than + * the shard template implies. In this case, the read is considered to have failed. + */ + public List<String> readFilesWithRetries() + throws IOException, InterruptedException { + return readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()); + } + + @Override + public String toString() { + return String.format("%s with shard template '%s'", filePath, shardTemplate); + } + + /** + * Reads all the lines of all the files. + * + * <p>Not suitable for use except in testing of small data, since the data size may be far more + * than can be reasonably processed serially, in-memory, by a single thread. + */ + @VisibleForTesting + List<String> readLines(Collection<String> files, IOChannelFactory factory) throws IOException { + List<String> allLines = Lists.newArrayList(); + int i = 1; + for (String file : files) { + try (Reader reader = + Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) { + List<String> lines = CharStreams.readLines(reader); + allLines.addAll(lines); + LOG.debug( + "[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file); + } + i++; + } + return allLines; + } + + /** + * Check if total number of files is correct by comparing with the number that + * is parsed from shard name using a name template. If no template is specified, + * "SSSS-of-NNNN" will be used as default, and "NNNN" will be the expected total + * number of files. + * + * @return {@code true} if at least one shard name matches template and total number + * of given files equals the number that is parsed from shard name. + */ + @VisibleForTesting + boolean checkTotalNumOfFiles(Collection<String> files) { + for (String filePath : files) { + Path fileName = Paths.get(filePath).getFileName(); + if (fileName == null) { + // this path has zero elements + continue; + } + Matcher matcher = shardTemplate.matcher(fileName.toString()); + if (!matcher.matches()) { + // shard name doesn't match the pattern, check with the next shard + continue; + } + // once match, extract total number of shards and compare to file list + return files.size() == Integer.parseInt(matcher.group("numshards")); + } + return false; + } + + private String computeHash(@Nonnull List<String> strs) { + if (strs.isEmpty()) { + return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString(); + } + + List<HashCode> hashCodes = new ArrayList<>(); + for (String str : strs) { + hashCodes.add(Hashing.sha1().hashString(str, StandardCharsets.UTF_8)); + } + return Hashing.combineUnordered(hashCodes).toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/db41940f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java new file mode 100644 index 0000000..ec9ed64 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import com.google.api.client.util.BackOff; +import com.google.api.client.util.Sleeper; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import org.apache.beam.sdk.testing.SerializableMatcher; + +/** + * Bare-bones class for using sharded files. + * + * <p>For internal use only; used only in SDK tests. Must be {@link Serializable} so it can be + * shipped as a {@link SerializableMatcher}. + */ +public interface ShardedFile extends Serializable { + + /** + * Reads the lines from all shards of this file using the provided {@link Sleeper} and {@link + * BackOff}. + */ + List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff) + throws IOException, InterruptedException; +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/db41940f/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java index 0dc307d..5438479 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java @@ -19,10 +19,6 @@ package org.apache.beam.sdk.testing; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyCollection; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.spy; import com.google.api.client.util.BackOff; import com.google.common.io.Files; @@ -30,9 +26,7 @@ import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.regex.Pattern; - import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.junit.Rule; import org.junit.Test; @@ -77,13 +71,6 @@ public class FileChecksumMatcherTest { } @Test - public void testPreconditionFilePathIsNull() { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage(containsString("Expected valid file path, but received")); - new FileChecksumMatcher("checksumString", null); - } - - @Test public void testPreconditionFilePathIsEmpty() { thrown.expect(IllegalArgumentException.class); thrown.expectMessage(containsString("Expected valid file path, but received")); @@ -158,68 +145,4 @@ public class FileChecksumMatcherTest { assertThat(pResult, matcher); } - - @Test - public void testReadWithRetriesFailsWhenTemplateIncorrect() throws Exception { - File tmpFile = tmpFolder.newFile(); - Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); - - FileChecksumMatcher matcher = new FileChecksumMatcher( - "mock-checksum", - IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"), - Pattern.compile("incorrect-template")); - - thrown.expect(IOException.class); - thrown.expectMessage( - containsString( - "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES)); - matcher.readFilesWithRetries(fastClock, backOff); - } - - @Test - public void testReadWithRetriesFailsSinceFilesystemError() throws Exception { - File tmpFile = tmpFolder.newFile(); - Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); - - FileChecksumMatcher matcher = - spy(new FileChecksumMatcher( - "mock-checksum", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"))); - doThrow(IOException.class) - .when(matcher).readLines(anyCollection(), any(IOChannelFactory.class)); - - thrown.expect(IOException.class); - thrown.expectMessage( - containsString( - "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES)); - matcher.readFilesWithRetries(fastClock, backOff); - } - - @Test - public void testReadWithRetriesFailsWhenOutputDirEmpty() throws Exception { - FileChecksumMatcher matcher = - new FileChecksumMatcher( - "mock-checksum", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")); - - thrown.expect(IOException.class); - thrown.expectMessage( - containsString( - "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES)); - matcher.readFilesWithRetries(fastClock, backOff); - } - - @Test - public void testReadWithRetriesFailsWhenRedundantFileLoaded() throws Exception { - tmpFolder.newFile("result-000-of-001"); - tmpFolder.newFile("tmp-result-000-of-001"); - - FileChecksumMatcher matcher = - new FileChecksumMatcher( - "mock-checksum", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")); - - thrown.expect(IOException.class); - thrown.expectMessage( - containsString( - "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES)); - matcher.readFilesWithRetries(fastClock, backOff); - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/db41940f/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java new file mode 100644 index 0000000..475e459 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyCollection; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; + +import com.google.api.client.util.BackOff; +import com.google.common.io.Files; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.regex.Pattern; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; + +/** Tests for {@link NumberedShardedFile}. */ +@RunWith(JUnit4.class) +public class NumberedShardedFileTest { + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper(); + + @Mock private PipelineResult pResult = Mockito.mock(PipelineResult.class); + + private final BackOff backOff = NumberedShardedFile.BACK_OFF_FACTORY.backoff(); + + @Test + public void testPreconditionFilePathIsNull() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(containsString("Expected valid file path, but received")); + new NumberedShardedFile(null); + } + + @Test + public void testPreconditionFilePathIsEmpty() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(containsString("Expected valid file path, but received")); + new NumberedShardedFile(""); + } + + @Test + public void testReadMultipleShards() throws Exception { + String + contents1 = "To be or not to be, ", + contents2 = "it is not a question.", + contents3 = "should not be included"; + + File tmpFile1 = tmpFolder.newFile("result-000-of-002"); + File tmpFile2 = tmpFolder.newFile("result-001-of-002"); + File tmpFile3 = tmpFolder.newFile("tmp"); + Files.write(contents1, tmpFile1, StandardCharsets.UTF_8); + Files.write(contents2, tmpFile2, StandardCharsets.UTF_8); + Files.write(contents3, tmpFile3, StandardCharsets.UTF_8); + + NumberedShardedFile shardedFile = + new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "result-*")); + + assertThat(shardedFile.readFilesWithRetries(), containsInAnyOrder(contents1, contents2)); + } + + @Test + public void testReadEmpty() throws Exception { + File emptyFile = tmpFolder.newFile("result-000-of-001"); + Files.write("", emptyFile, StandardCharsets.UTF_8); + NumberedShardedFile shardedFile = + new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")); + + assertThat(shardedFile.readFilesWithRetries(), empty()); + } + + @Test + public void testReadCustomTemplate() throws Exception { + String contents1 = "To be or not to be, ", contents2 = "it is not a question."; + + // Customized template: resultSSS-totalNNN + File tmpFile1 = tmpFolder.newFile("result0-total2"); + File tmpFile2 = tmpFolder.newFile("result1-total2"); + Files.write(contents1, tmpFile1, StandardCharsets.UTF_8); + Files.write(contents2, tmpFile2, StandardCharsets.UTF_8); + + Pattern customizedTemplate = + Pattern.compile("(?x) result (?<shardnum>\\d+) - total (?<numshards>\\d+)"); + NumberedShardedFile shardedFile = + new NumberedShardedFile( + IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"), customizedTemplate); + + assertThat(shardedFile.readFilesWithRetries(), containsInAnyOrder(contents1, contents2)); + } + + @Test + public void testReadWithRetriesFailsWhenTemplateIncorrect() throws Exception { + File tmpFile = tmpFolder.newFile(); + Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); + + NumberedShardedFile shardedFile = + new NumberedShardedFile( + IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"), + Pattern.compile("incorrect-template")); + + thrown.expect(IOException.class); + thrown.expectMessage( + containsString( + "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES)); + shardedFile.readFilesWithRetries(fastClock, backOff); + } + + @Test + public void testReadWithRetriesFailsSinceFilesystemError() throws Exception { + File tmpFile = tmpFolder.newFile(); + Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); + + NumberedShardedFile shardedFile = + spy(new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"))); + doThrow(IOException.class) + .when(shardedFile) + .readLines(anyCollection(), any(IOChannelFactory.class)); + + thrown.expect(IOException.class); + thrown.expectMessage( + containsString( + "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES)); + shardedFile.readFilesWithRetries(fastClock, backOff); + } + + @Test + public void testReadWithRetriesFailsWhenOutputDirEmpty() throws Exception { + NumberedShardedFile shardedFile = + new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")); + + thrown.expect(IOException.class); + thrown.expectMessage( + containsString( + "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES)); + shardedFile.readFilesWithRetries(fastClock, backOff); + } + + @Test + public void testReadWithRetriesFailsWhenRedundantFileLoaded() throws Exception { + tmpFolder.newFile("result-000-of-001"); + tmpFolder.newFile("tmp-result-000-of-001"); + + NumberedShardedFile shardedFile = + new NumberedShardedFile(IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")); + + thrown.expect(IOException.class); + thrown.expectMessage( + containsString( + "Unable to read file(s) after retrying " + NumberedShardedFile.MAX_READ_RETRIES)); + shardedFile.readFilesWithRetries(fastClock, backOff); + } +}
