Repository: incubator-beam Updated Branches: refs/heads/master 8042d52fc -> b75a76459
[BEAM-747] Fix FileChecksumMatcher That Inconsistent With FS Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d08a9f12 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d08a9f12 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d08a9f12 Branch: refs/heads/master Commit: d08a9f1278e0bb4ec5b08e11f6267c516c8ea56e Parents: 8042d52 Author: Mark Liu <mark...@markliu0.mtv.corp.google.com> Authored: Tue Oct 25 14:57:13 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Wed Nov 30 09:52:34 2016 -0800 ---------------------------------------------------------------------- .../org/apache/beam/examples/WordCountIT.java | 2 +- .../beam/sdk/testing/FileChecksumMatcher.java | 168 ++++++++++++++++--- .../sdk/testing/FileChecksumMatcherTest.java | 131 ++++++++++++++- 3 files changed, 268 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d08a9f12/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java index f2afe6a..01438de 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java @@ -65,7 +65,7 @@ public class WordCountIT { "output", "results")); options.setOnSuccessMatcher( - new FileChecksumMatcher(DEFAULT_OUTPUT_CHECKSUM, options.getOutput() + "*")); + new FileChecksumMatcher(DEFAULT_OUTPUT_CHECKSUM, options.getOutput() + "*-of-*")); WordCount.main(TestPipeline.convertToArgs(options)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d08a9f12/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java index de6cea3..4b249fe 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java @@ -19,8 +19,14 @@ package org.apache.beam.sdk.testing; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.Sleeper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; +import com.google.common.collect.Lists; import com.google.common.hash.HashCode; import com.google.common.hash.Hashing; import com.google.common.io.CharStreams; @@ -28,14 +34,21 @@ import java.io.IOException; import java.io.Reader; import java.nio.channels.Channels; import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.annotation.Nonnull; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.hamcrest.Description; import org.hamcrest.TypeSafeMatcher; +import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,71 +56,172 @@ import org.slf4j.LoggerFactory; * Matcher to verify file checksum in E2E test. * * <p>For example: - * <pre>{@code [ - * assertTrue(job, new FileChecksumMatcher(checksumString, filePath)); - * ]}</pre> + * <pre>{@code + * assertThat(job, new FileChecksumMatcher(checksumString, filePath)); + * }</pre> + * or + * <pre>{@code + * assertThat(job, new FileChecksumMatcher(checksumString, filePath, shardTemplate)); + * }</pre> + * + * <p>Checksum of outputs is generated based on SHA-1 algorithm. If output file is empty, + * SHA-1 hash of empty string (da39a3ee5e6b4b0d3255bfef95601890afd80709) is used as expected. */ public class FileChecksumMatcher extends TypeSafeMatcher<PipelineResult> implements SerializableMatcher<PipelineResult> { private static final Logger LOG = LoggerFactory.getLogger(FileChecksumMatcher.class); + static final int MAX_READ_RETRIES = 4; + static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L); + static final FluentBackoff BACK_OFF_FACTORY = + FluentBackoff.DEFAULT + .withInitialBackoff(DEFAULT_SLEEP_DURATION) + .withMaxRetries(MAX_READ_RETRIES); + + private static final Pattern DEFAULT_SHARD_TEMPLATE = + Pattern.compile("(?x) \\S* (?<shardnum> \\d+) -of- (?<numshards> \\d+)"); + private final String expectedChecksum; private final String filePath; + private final Pattern shardTemplate; private String actualChecksum; + /** + * Constructor that uses default shard template. + * + * @param checksum expected checksum string used to verify file content. + * @param filePath path of files that's to be verified. + */ public FileChecksumMatcher(String checksum, String filePath) { + this(checksum, filePath, DEFAULT_SHARD_TEMPLATE); + } + + /** + * Constructor. + * + * @param checksum expected checksum string used to verify file content. + * @param filePath path of files that's to be verified. + * @param shardTemplate template of shard name to parse out the total number of shards + * which is used in I/O retry to avoid inconsistency of filesystem. + * Customized template should assign name "numshards" to capturing + * group - total shard number. + */ + public FileChecksumMatcher(String checksum, String filePath, Pattern shardTemplate) { checkArgument( !Strings.isNullOrEmpty(checksum), "Expected valid checksum, but received %s", checksum); checkArgument( !Strings.isNullOrEmpty(filePath), "Expected valid file path, but received %s", filePath); + checkNotNull( + shardTemplate, + "Expected non-null shard pattern. " + + "Please call the other constructor to use default pattern: %s", + DEFAULT_SHARD_TEMPLATE); this.expectedChecksum = checksum; this.filePath = filePath; + this.shardTemplate = shardTemplate; } @Override public boolean matchesSafely(PipelineResult pipelineResult) { + // Load output data + List<String> outputs; try { - // Load output data - List<String> outputs = readLines(filePath); + outputs = readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()); + } catch (Exception e) { + throw new RuntimeException(String.format("Failed to read from: %s", filePath), e); + } - // Verify outputs. Checksum is computed using SHA-1 algorithm - actualChecksum = hashing(outputs); - LOG.info("Generated checksum for output data: {}", actualChecksum); + // Verify outputs. Checksum is computed using SHA-1 algorithm + actualChecksum = computeHash(outputs); + LOG.debug("Generated checksum: {}", actualChecksum); - return actualChecksum.equals(expectedChecksum); - } catch (IOException e) { - throw new RuntimeException( - String.format("Failed to read from path: %s", filePath)); - } + return actualChecksum.equals(expectedChecksum); } - private List<String> readLines(String path) throws IOException { - List<String> readData = new ArrayList<>(); - IOChannelFactory factory = IOChannelUtils.getFactory(path); - - // Match inputPath which may contains glob - Collection<String> files = factory.match(path); + @VisibleForTesting + List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff) + throws IOException, InterruptedException { + IOChannelFactory factory = IOChannelUtils.getFactory(filePath); + IOException lastException = null; + + do { + try { + // Match inputPath which may contains glob + Collection<String> files = factory.match(filePath); + LOG.debug("Found {} file(s) by matching the path: {}", files.size(), filePath); + + if (files.isEmpty() || !checkTotalNumOfFiles(files)) { + continue; + } + + // Read data from file paths + return readLines(files, factory); + } catch (IOException e) { + // Ignore and retry + lastException = e; + LOG.warn("Error in file reading. Ignore and retry."); + } + } while(BackOffUtils.next(sleeper, backOff)); + // Failed after max retries + throw new IOException( + String.format("Unable to read file(s) after retrying %d times", MAX_READ_RETRIES), + lastException); + } - // Read data from file paths - int i = 0; + @VisibleForTesting + List<String> readLines(Collection<String> files, IOChannelFactory factory) throws IOException { + List<String> allLines = Lists.newArrayList(); + int i = 1; for (String file : files) { try (Reader reader = - Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) { + Channels.newReader(factory.open(file), StandardCharsets.UTF_8.name())) { List<String> lines = CharStreams.readLines(reader); - readData.addAll(lines); - LOG.info( - "[{} of {}] Read {} lines from file: {}", i, files.size() - 1, lines.size(), file); + allLines.addAll(lines); + LOG.debug( + "[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file); } i++; } - return readData; + return allLines; + } + + /** + * Check if total number of files is correct by comparing with the number that + * is parsed from shard name using a name template. If no template is specified, + * "SSSS-of-NNNN" will be used as default, and "NNNN" will be the expected total + * number of files. + * + * @return {@code true} if at least one shard name matches template and total number + * of given files equals the number that is parsed from shard name. + */ + @VisibleForTesting + boolean checkTotalNumOfFiles(Collection<String> files) { + for (String filePath : files) { + Path fileName = Paths.get(filePath).getFileName(); + if (fileName == null) { + // this path has zero elements + continue; + } + Matcher matcher = shardTemplate.matcher(fileName.toString()); + if (!matcher.matches()) { + // shard name doesn't match the pattern, check with the next shard + continue; + } + // once match, extract total number of shards and compare to file list + return files.size() == Integer.parseInt(matcher.group("numshards")); + } + return false; } - private String hashing(List<String> strs) { + private String computeHash(@Nonnull List<String> strs) { + if (strs.isEmpty()) { + return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString(); + } + List<HashCode> hashCodes = new ArrayList<>(); for (String str : strs) { hashCodes.add(Hashing.sha1().hashString(str, StandardCharsets.UTF_8)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d08a9f12/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java index b2f2ec8..0dc307d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java @@ -19,12 +19,20 @@ package org.apache.beam.sdk.testing; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyCollection; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; +import com.google.api.client.util.BackOff; import com.google.common.io.Files; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.regex.Pattern; + import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.junit.Rule; import org.junit.Test; @@ -42,10 +50,14 @@ public class FileChecksumMatcherTest { public TemporaryFolder tmpFolder = new TemporaryFolder(); @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule + public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper(); @Mock private PipelineResult pResult = Mockito.mock(PipelineResult.class); + private BackOff backOff = FileChecksumMatcher.BACK_OFF_FACTORY.backoff(); + @Test public void testPreconditionChecksumIsNull() throws IOException { String tmpPath = tmpFolder.newFile().getPath(); @@ -79,8 +91,20 @@ public class FileChecksumMatcherTest { } @Test - public void testMatcherVerifySingleFile() throws IOException{ - File tmpFile = tmpFolder.newFile(); + public void testPreconditionShardTemplateIsNull() throws IOException { + String tmpPath = tmpFolder.newFile().getPath(); + + thrown.expect(NullPointerException.class); + thrown.expectMessage( + containsString( + "Expected non-null shard pattern. " + + "Please call the other constructor to use default pattern:")); + new FileChecksumMatcher("checksumString", tmpPath, null); + } + + @Test + public void testMatcherThatVerifiesSingleFile() throws IOException{ + File tmpFile = tmpFolder.newFile("result-000-of-001"); Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); FileChecksumMatcher matcher = new FileChecksumMatcher("a8772322f5d7b851777f820fc79d050f9d302915", tmpFile.getPath()); @@ -89,16 +113,113 @@ public class FileChecksumMatcherTest { } @Test - public void testMatcherVerifyMultipleFilesInOneDir() throws IOException { - File tmpFile1 = tmpFolder.newFile(); - File tmpFile2 = tmpFolder.newFile(); + public void testMatcherThatVerifiesMultipleFiles() throws IOException { + File tmpFile1 = tmpFolder.newFile("result-000-of-002"); + File tmpFile2 = tmpFolder.newFile("result-001-of-002"); + File tmpFile3 = tmpFolder.newFile("tmp"); Files.write("To be or not to be, ", tmpFile1, StandardCharsets.UTF_8); Files.write("it is not a question.", tmpFile2, StandardCharsets.UTF_8); + Files.write("tmp", tmpFile3, StandardCharsets.UTF_8); + FileChecksumMatcher matcher = new FileChecksumMatcher( "90552392c28396935fe4f123bd0b5c2d0f6260c8", + IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "result-*")); + + assertThat(pResult, matcher); + } + + @Test + public void testMatcherThatVerifiesFileWithEmptyContent() throws IOException { + File emptyFile = tmpFolder.newFile("result-000-of-001"); + Files.write("", emptyFile, StandardCharsets.UTF_8); + FileChecksumMatcher matcher = + new FileChecksumMatcher( + "da39a3ee5e6b4b0d3255bfef95601890afd80709", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")); assertThat(pResult, matcher); } + + @Test + public void testMatcherThatUsesCustomizedTemplate() throws Exception { + // Customized template: resultSSS-totalNNN + File tmpFile1 = tmpFolder.newFile("result0-total2"); + File tmpFile2 = tmpFolder.newFile("result1-total2"); + Files.write("To be or not to be, ", tmpFile1, StandardCharsets.UTF_8); + Files.write("it is not a question.", tmpFile2, StandardCharsets.UTF_8); + + Pattern customizedTemplate = + Pattern.compile("(?x) result (?<shardnum>\\d+) - total (?<numshards>\\d+)"); + FileChecksumMatcher matcher = new FileChecksumMatcher( + "90552392c28396935fe4f123bd0b5c2d0f6260c8", + IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"), + customizedTemplate); + + assertThat(pResult, matcher); + } + + @Test + public void testReadWithRetriesFailsWhenTemplateIncorrect() throws Exception { + File tmpFile = tmpFolder.newFile(); + Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); + + FileChecksumMatcher matcher = new FileChecksumMatcher( + "mock-checksum", + IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"), + Pattern.compile("incorrect-template")); + + thrown.expect(IOException.class); + thrown.expectMessage( + containsString( + "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES)); + matcher.readFilesWithRetries(fastClock, backOff); + } + + @Test + public void testReadWithRetriesFailsSinceFilesystemError() throws Exception { + File tmpFile = tmpFolder.newFile(); + Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); + + FileChecksumMatcher matcher = + spy(new FileChecksumMatcher( + "mock-checksum", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*"))); + doThrow(IOException.class) + .when(matcher).readLines(anyCollection(), any(IOChannelFactory.class)); + + thrown.expect(IOException.class); + thrown.expectMessage( + containsString( + "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES)); + matcher.readFilesWithRetries(fastClock, backOff); + } + + @Test + public void testReadWithRetriesFailsWhenOutputDirEmpty() throws Exception { + FileChecksumMatcher matcher = + new FileChecksumMatcher( + "mock-checksum", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")); + + thrown.expect(IOException.class); + thrown.expectMessage( + containsString( + "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES)); + matcher.readFilesWithRetries(fastClock, backOff); + } + + @Test + public void testReadWithRetriesFailsWhenRedundantFileLoaded() throws Exception { + tmpFolder.newFile("result-000-of-001"); + tmpFolder.newFile("tmp-result-000-of-001"); + + FileChecksumMatcher matcher = + new FileChecksumMatcher( + "mock-checksum", IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "*")); + + thrown.expect(IOException.class); + thrown.expectMessage( + containsString( + "Unable to read file(s) after retrying " + FileChecksumMatcher.MAX_READ_RETRIES)); + matcher.readFilesWithRetries(fastClock, backOff); + } }