[
https://issues.apache.org/jira/browse/BEAM-3268?focusedWorklogId=93367&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93367
]
ASF GitHub Bot logged work on BEAM-3268:
----------------------------------------
Author: ASF GitHub Bot
Created on: 20/Apr/18 18:52
Start Date: 20/Apr/18 18:52
Worklog Time Spent: 10m
Work Description: jkff closed pull request #5159: [BEAM-3268] Reshuffle
filenames before returning them from WriteFilesResult
URL: https://github.com/apache/beam/pull/5159
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index cc71c7f3a83..7c16eed5ef9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -762,7 +762,10 @@ private FinalizeTempFileBundles(
PCollection<KV<DestinationT, String>> outputFilenames =
input
.apply("Finalize", ParDo.of(new
FinalizeFn()).withSideInputs(finalizeSideInputs))
- .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
+ .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()))
+ // Reshuffle the filenames to make sure they are observable
downstream
+ // only after each one is done finalizing.
+ .apply(Reshuffle.viaRandomKey());
TupleTag<KV<DestinationT, String>> perDestinationOutputFilenamesTag =
new TupleTag<>("perDestinationOutputFilenames");
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
index ff16eaad747..8ad5ace6c99 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
@@ -88,16 +88,16 @@ public SimpleSink(
static final class SimpleWriteOperation<DestinationT>
extends WriteOperation<DestinationT, String> {
- public SimpleWriteOperation(SimpleSink sink, ResourceId
tempOutputDirectory) {
+ public SimpleWriteOperation(SimpleSink<DestinationT> sink, ResourceId
tempOutputDirectory) {
super(sink, tempOutputDirectory);
}
- public SimpleWriteOperation(SimpleSink sink) {
+ public SimpleWriteOperation(SimpleSink<DestinationT> sink) {
super(sink);
}
@Override
- public SimpleWriter<DestinationT> createWriter() throws Exception {
+ public SimpleWriter<DestinationT> createWriter() {
return new SimpleWriter<>(this);
}
}
@@ -108,16 +108,16 @@ public SimpleWriteOperation(SimpleSink sink) {
private WritableByteChannel channel;
- public SimpleWriter(SimpleWriteOperation writeOperation) {
+ public SimpleWriter(SimpleWriteOperation<DestinationT> writeOperation) {
super(writeOperation, MimeTypes.TEXT);
}
- private static ByteBuffer wrap(String value) throws Exception {
+ private static ByteBuffer wrap(String value) {
return ByteBuffer.wrap((value + "\n").getBytes(StandardCharsets.UTF_8));
}
@Override
- protected void prepareWrite(WritableByteChannel channel) throws Exception {
+ protected void prepareWrite(WritableByteChannel channel) {
this.channel = channel;
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
index f458f9ee7ab..f31956e437b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
@@ -26,7 +26,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import com.google.common.base.Function;
import com.google.common.base.Functions;
@@ -365,30 +364,6 @@ private void runTestWrite(String[] elems, String header,
String footer) throws E
runTestWrite(elems, header, footer, 1);
}
- private static class MatchesFilesystem implements
SerializableFunction<Iterable<String>, Void> {
- private final ResourceId baseFilename;
-
- MatchesFilesystem(ResourceId baseFilename) {
- this.baseFilename = baseFilename;
- }
-
- @Override
- public Void apply(Iterable<String> values) {
- try {
- String pattern = baseFilename.toString() + "*";
- List<String> matches = Lists.newArrayList();
- for (Metadata match :Iterables.getOnlyElement(
- FileSystems.match(Collections.singletonList(pattern))).metadata())
{
- matches.add(match.resourceId().toString());
- }
- assertThat(values, containsInAnyOrder(Iterables.toArray(matches,
String.class)));
- } catch (Exception e) {
- fail("Exception caught " + e);
- }
- return null;
- }
- }
-
private void runTestWrite(String[] elems, String header, String footer, int
numShards)
throws Exception {
String outputName = "file.txt";
@@ -408,9 +383,8 @@ private void runTestWrite(String[] elems, String header,
String footer, int numS
write =
write.withNumShards(numShards).withShardNameTemplate(ShardNameTemplate.INDEX_OF_MAX);
}
- WriteFilesResult<Void> result = input.apply(write);
-
PAssert.that(result.getPerDestinationOutputFilenames().apply("GetFilenames",
Values.create()))
- .satisfies(new MatchesFilesystem(baseFilename));
+ input.apply(write);
+
p.run();
assertOutputFiles(
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index e4e7790906f..2cfb622bc5a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -54,6 +54,7 @@
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
import org.apache.beam.sdk.io.SimpleSink.SimpleWriter;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
@@ -71,6 +72,7 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Top;
+import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -83,6 +85,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PDone;
import org.apache.commons.compress.utils.Sets;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
@@ -158,6 +161,16 @@ public void processElement(ProcessContext c) {
}
}
+ private static class VerifyFilesExist<DestinationT> extends
+ PTransform<PCollection<KV<DestinationT, String>>, PDone> {
+ @Override
+ public PDone expand(PCollection<KV<DestinationT, String>> input) {
+ input.apply(Values.create())
+
.apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW));
+ return PDone.in(input.getPipeline());
+ }
+ }
+
private String getBaseOutputFilename() {
return getBaseOutputDirectory().resolve("file",
StandardResolveOptions.RESOLVE_FILE).toString();
}
@@ -239,7 +252,8 @@ public void testCustomShardedWrite() throws IOException {
WriteFiles<String, ?, String> write = WriteFiles.to(sink).withSharding(new
LargestInt());
p.apply(Create.timestamped(inputs,
timestamps).withCoder(StringUtf8Coder.of()))
.apply(IDENTITY_MAP)
- .apply(write);
+ .apply(write)
+ .getPerDestinationOutputFilenames().apply(new VerifyFilesExist<>());
p.run();
@@ -323,6 +337,7 @@ public void testWriteSpilling() throws IOException {
.withNumShards(1));
}
+ @Test
public void testBuildWrite() {
SimpleSink<Void> sink = makeSimpleSink();
WriteFiles<String, ?, String> write = WriteFiles.to(sink).withNumShards(3);
@@ -477,13 +492,15 @@ private void testDynamicDestinationsHelper(boolean
bounded, boolean emptyShards)
WriteFiles<String, Integer, String> writeFiles =
WriteFiles.to(sink).withNumShards(numShards);
PCollection<String> input = p.apply(Create.timestamped(inputs,
timestamps));
+ WriteFilesResult<Integer> res;
if (!bounded) {
input.setIsBoundedInternal(IsBounded.UNBOUNDED);
input =
input.apply(Window.into(FixedWindows.of(Duration.standardDays(1))));
- input.apply(writeFiles.withWindowedWrites());
+ res = input.apply(writeFiles.withWindowedWrites());
} else {
- input.apply(writeFiles);
+ res = input.apply(writeFiles);
}
+ res.getPerDestinationOutputFilenames().apply(new VerifyFilesExist<>());
p.run();
for (int i = 0; i < 5; ++i) {
@@ -661,7 +678,8 @@ private void runShardedWrite(
}
p.apply(Create.timestamped(inputs,
timestamps).withCoder(StringUtf8Coder.of()))
.apply(transform)
- .apply(write);
+ .apply(write)
+ .getPerDestinationOutputFilenames().apply(new VerifyFilesExist<>());
p.run();
Optional<Integer> numShards =
diff --git
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
index 9686e75130e..bcc239c991e 100644
---
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
+++
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
@@ -36,7 +36,6 @@
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PCollection;
@@ -109,8 +108,7 @@ public void writeThenReadAll() {
.withOutputFilenames()
.withSuffix(".avro"))
.getPerDestinationOutputFilenames()
- .apply(Values.create())
- .apply(Reshuffle.viaRandomKey());
+ .apply(Values.create());
PCollection<String> consolidatedHashcode = testFilenames
.apply("Read all files", AvroIO.readAllGenericRecords(AVRO_SCHEMA))
diff --git
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
index c7808bc0c39..a7a55730de3 100644
---
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
+++
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
@@ -34,7 +34,6 @@
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PCollection;
@@ -98,8 +97,7 @@ public void writeThenReadAll() {
ParDo.of(new
FileBasedIOITHelper.DeterministicallyConstructTestTextLineFn()))
.apply("Write content to files", write)
.getPerDestinationOutputFilenames()
- .apply(Values.create())
- .apply(Reshuffle.viaRandomKey());
+ .apply(Values.create());
PCollection<String> consolidatedHashcode = testFilenames
.apply("Read all files", TextIO.readAll().withCompression(AUTO))
diff --git
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
index 73d028fd4fd..d32b1d67e5f 100644
---
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
+++
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
@@ -38,7 +38,6 @@
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
@@ -111,7 +110,6 @@ public void writeThenReadAll() {
.withPrefix("birds")
.withSuffix(".xml"))
.getPerDestinationOutputFilenames()
- .apply("Prevent fusion", Reshuffle.viaRandomKey())
.apply("Get file names", Values.create());
PCollection<Bird> birds = testFileNames
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 93367)
Time Spent: 2h 10m (was: 2h)
> getPerDestinationOutputFilenames() is getting processed before write is
> finished on dataflow runner
> ---------------------------------------------------------------------------------------------------
>
> Key: BEAM-3268
> URL: https://issues.apache.org/jira/browse/BEAM-3268
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow
> Affects Versions: 2.3.0
> Reporter: Kamil Szewczyk
> Assignee: Eugene Kirpichov
> Priority: Major
> Attachments: comparison.png
>
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> While running filebased-io-test we found dataflow-runnner misbehaving. We run
> tests using single pipeline and without using Reshuffling between writing and
> reading dataflow jobs are unsuccessful because the runner tries to access the
> files that were not created yet.
> On the picture the difference between execution of writting is presented. On
> the left there is working example with Reshuffling added and on the right
> without it.
> !comparison.png|thumbnail!
> Steps to reproduce: substitute your-bucket-name wit your valid bucket.
> {code:java}
> mvn -e -Pio-it verify -pl sdks/java/io/file-based-io-tests
> -DintegrationTestPipelineOptions='["--runner=dataflow",
> "--filenamePrefix=gs://your-bucket-name/TEXTIO_IT"]' -Pdataflow-runner
> {code}
> Then look on the cloud console and job should fail.
> Now add Reshuffling to
> sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
> as in the example.
> {code:java}
> .getPerDestinationOutputFilenames().apply(Values.<String>create())
> .apply(Reshuffle.<String>viaRandomKey());
> PCollection<String> consolidatedHashcode = testFilenames
> {code}
> and trigger previously used maven command to see it working in the console
> right now.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)