This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new cb0fdbc [BEAM-6491] Remove race in
FileIOTest.testMatchWatchForNewFiles
new 315b1bc Merge pull request #7599: [BEAM-6491] Remove race in
FileIOTest.testMatchWatchForNewFiles
cb0fdbc is described below
commit cb0fdbc327145bf479015ae7f651eb99b5f8d2be
Author: Jeff Klukas <[email protected]>
AuthorDate: Wed Jan 23 08:36:50 2019 -0500
[BEAM-6491] Remove race in FileIOTest.testMatchWatchForNewFiles
This test was checking lastModificationTime for files that might not yet
exist.
We now create the files beforehand so lastModificationTime is known, and
have
the writer thread copy them into place, preserving the metadata.
---
.../java/org/apache/beam/sdk/io/FileIOTest.java | 47 +++++++++++++---------
1 file changed, 29 insertions(+), 18 deletions(-)
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
index 27ed431..70d77f7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
@@ -30,8 +30,10 @@ import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.io.Writer;
+import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileTime;
import java.util.Arrays;
import java.util.List;
@@ -55,7 +57,6 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Charsets;
import org.joda.time.Duration;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -200,19 +201,26 @@ public class FileIOTest implements Serializable {
@Test
@Category(NeedsRunner.class)
- @Ignore("https://issues.apache.org/jira/browse/BEAM-6491")
public void testMatchWatchForNewFiles() throws IOException,
InterruptedException {
- final Path basePath = tmpFolder.getRoot().toPath().resolve("watch");
- basePath.toFile().mkdir();
+ // Write some files to a "source" directory.
+ final Path sourcePath = tmpFolder.getRoot().toPath().resolve("source");
+ sourcePath.toFile().mkdir();
+ Files.write(sourcePath.resolve("first"), new byte[42]);
+ Files.write(sourcePath.resolve("second"), new byte[37]);
+ Files.write(sourcePath.resolve("third"), new byte[99]);
+
+ // Create a "watch" directory that the pipeline will copy files into.
+ final Path watchPath = tmpFolder.getRoot().toPath().resolve("watch");
+ watchPath.toFile().mkdir();
PCollection<MatchResult.Metadata> matchMetadata =
p.apply(
FileIO.match()
- .filepattern(basePath.resolve("*").toString())
+ .filepattern(watchPath.resolve("*").toString())
.continuously(
Duration.millis(100),
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3))));
PCollection<MatchResult.Metadata> matchAllMetadata =
- p.apply(Create.of(basePath.resolve("*").toString()))
+ p.apply(Create.of(watchPath.resolve("*").toString()))
.apply(
FileIO.matchAll()
.continuously(
@@ -221,36 +229,35 @@ public class FileIOTest implements Serializable {
assertEquals(PCollection.IsBounded.UNBOUNDED, matchMetadata.isBounded());
assertEquals(PCollection.IsBounded.UNBOUNDED,
matchAllMetadata.isBounded());
+ // Copy the files to the "watch" directory, preserving the
lastModifiedTime;
+ // the COPY_ATTRIBUTES option ensures that we will at a minimum copy
lastModifiedTime.
+ CopyOption[] copyOptions = {StandardCopyOption.COPY_ATTRIBUTES};
Thread writer =
new Thread(
() -> {
try {
Thread.sleep(1000);
- Files.write(basePath.resolve("first"), new byte[42]);
+ Files.copy(sourcePath.resolve("first"),
watchPath.resolve("first"), copyOptions);
Thread.sleep(300);
- Files.write(basePath.resolve("second"), new byte[37]);
+ Files.copy(sourcePath.resolve("second"),
watchPath.resolve("second"), copyOptions);
Thread.sleep(300);
- Files.write(basePath.resolve("third"), new byte[99]);
+ Files.copy(sourcePath.resolve("third"),
watchPath.resolve("third"), copyOptions);
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
});
writer.start();
+ // We fetch lastModifiedTime from the files in the "source" directory to
avoid a race condition
+ // with the writer thread.
List<MatchResult.Metadata> expected =
Arrays.asList(
metadata(
- basePath.resolve("first"),
- 42,
-
Files.getLastModifiedTime(basePath.resolve("first")).toMillis()),
+ watchPath.resolve("first"), 42,
lastModifiedMillis(sourcePath.resolve("first"))),
metadata(
- basePath.resolve("second"),
- 37,
-
Files.getLastModifiedTime(basePath.resolve("second")).toMillis()),
+ watchPath.resolve("second"), 37,
lastModifiedMillis(sourcePath.resolve("second"))),
metadata(
- basePath.resolve("third"),
- 99,
-
Files.getLastModifiedTime(basePath.resolve("third")).toMillis()));
+ watchPath.resolve("third"), 99,
lastModifiedMillis(sourcePath.resolve("third"))));
PAssert.that(matchMetadata).containsInAnyOrder(expected);
PAssert.that(matchAllMetadata).containsInAnyOrder(expected);
p.run();
@@ -334,6 +341,10 @@ public class FileIOTest implements Serializable {
.build();
}
+ private static long lastModifiedMillis(Path path) throws IOException {
+ return Files.getLastModifiedTime(path).toMillis();
+ }
+
private static FileIO.Write.FileNaming resolveFileNaming(FileIO.Write<?, ?>
write)
throws Exception {
return write.resolveFileNamingFn().getClosure().apply(null, null);