WriteWithShardingFactoryTest: switch to FileSystems
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d0504770 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d0504770 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d0504770 Branch: refs/heads/master Commit: d050477065cf8cf27b1334ed7bebad5b30e5d8e4 Parents: f4e7c02 Author: Dan Halperin <[email protected]> Authored: Wed May 3 18:11:34 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Thu May 4 09:32:45 2017 -0700 ---------------------------------------------------------------------- .../direct/WriteWithShardingFactoryTest.java | 23 +++++++++++--------- 1 file changed, 13 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d0504770/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java index 18940d2..f28c8cf 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java @@ -32,7 +32,6 @@ import java.io.FileReader; import java.io.Reader; import java.nio.CharBuffer; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.UUID; @@ -42,9 +41,11 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.DefaultFilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; +import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.LocalResources; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.WriteFiles; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.TestPipeline; @@ -53,7 +54,6 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.PCollectionViews; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; @@ -72,7 +72,7 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class WriteWithShardingFactoryTest { - public static final int INPUT_SIZE = 10000; + private static final int INPUT_SIZE = 10000; @Rule public TemporaryFolder tmp = new TemporaryFolder(); private WriteWithShardingFactory<Object> factory = new WriteWithShardingFactory<>(); @Rule public final TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); @@ -86,19 +86,22 @@ public class WriteWithShardingFactoryTest { Collections.shuffle(strs); String fileName = "resharded_write"; - String outputPath = tmp.getRoot().getAbsolutePath(); - String targetLocation = IOChannelUtils.resolve(outputPath, fileName); + String targetLocation = tmp.getRoot().toPath().resolve(fileName).toString(); + String targetLocationGlob = targetLocation + '*'; + // TextIO is implemented in terms of the WriteFiles PTransform. When sharding is not specified, // resharding should be automatically applied p.apply(Create.of(strs)).apply(TextIO.write().to(targetLocation)); - p.run(); - Collection<String> files = IOChannelUtils.getFactory(outputPath).match(targetLocation + "*"); + List<Metadata> matches = FileSystems.match(targetLocationGlob).metadata(); List<String> actuals = new ArrayList<>(strs.size()); - for (String file : files) { - CharBuffer buf = CharBuffer.allocate((int) new File(file).length()); - try (Reader reader = new FileReader(file)) { + List<String> files = new ArrayList<>(strs.size()); + for (Metadata match : matches) { + String filename = match.resourceId().toString(); + files.add(filename); + CharBuffer buf = CharBuffer.allocate((int) new File(filename).length()); + try (Reader reader = new FileReader(filename)) { reader.read(buf); buf.flip(); }
