This is an automated email from the ASF dual-hosted git repository. jkff pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit 5795c32c91a6de02b6731eacb5eef8ae55f069f5 Author: Eugene Kirpichov <[email protected]> AuthorDate: Tue Oct 17 17:06:43 2017 -0700 Merges Writer.openWindowed/Unwindowed and removes result of close() --- .../java/org/apache/beam/sdk/io/FileBasedSink.java | 94 ++++++---------------- .../java/org/apache/beam/sdk/io/WriteFiles.java | 69 +++++++++------- .../org/apache/beam/sdk/io/FileBasedSinkTest.java | 15 ++-- 3 files changed, 71 insertions(+), 107 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index d4cb57d..2108253 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -103,7 +103,7 @@ import org.slf4j.LoggerFactory; * * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the * event of failure/retry or for redundancy). However, exactly one of these executions will have its - * result passed to the finalize method. Each call to {@link Writer#openWindowed} or {@link + * result passed to the finalize method. Each call to {@link Writer#open} or {@link * Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called by the WriteFiles * transform, so even redundant or retried bundles will have a unique way of identifying their * output. @@ -805,10 +805,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> /** Unique id for this output bundle. */ private @Nullable String id; - private @Nullable BoundedWindow window; - private @Nullable PaneInfo paneInfo; - private int shard = -1; - private @Nullable DestinationT destination; + private DestinationT destination; /** The output file for this bundle. May be null if opening failed. */ private @Nullable ResourceId outputFile; @@ -868,53 +865,10 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> * id populated for the case of static sharding. In cases where the runner is dynamically * picking sharding, shard might be set to -1. */ - public final void openWindowed( - String uId, BoundedWindow window, PaneInfo paneInfo, int shard, DestinationT destination) - throws Exception { - if (!getWriteOperation().windowedWrites) { - throw new IllegalStateException("openWindowed called a non-windowed sink."); - } - open(uId, window, paneInfo, shard, destination); - } - - /** Called for each value in the bundle. */ - public abstract void write(OutputT value) throws Exception; - - /** - * Similar to {@link #openWindowed} however for the case where unwindowed writes were requested. - */ - public final void openUnwindowed(String uId, int shard, DestinationT destination) - throws Exception { - if (getWriteOperation().windowedWrites) { - throw new IllegalStateException("openUnwindowed called a windowed sink."); - } - open(uId, null, null, shard, destination); - } - - // Helper function to close a channel, on exception cases. - // Always throws prior exception, with any new closing exception suppressed. - private static void closeChannelAndThrow( - WritableByteChannel channel, ResourceId filename, Exception prior) throws Exception { - try { - channel.close(); - } catch (Exception e) { - LOG.error("Closing channel for {} failed.", filename, e); - prior.addSuppressed(e); - throw prior; - } - } - - private void open( - String uId, - @Nullable BoundedWindow window, - @Nullable PaneInfo paneInfo, - int shard, - DestinationT destination) + public final void open( + String uId, DestinationT destination) throws Exception { this.id = uId; - this.window = window; - this.paneInfo = paneInfo; - this.shard = shard; this.destination = destination; ResourceId tempDirectory = getWriteOperation().tempDirectory.get(); outputFile = tempDirectory.resolve(id, StandardResolveOptions.RESOLVE_FILE); @@ -925,15 +879,6 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> getWriteOperation().getSink().writableByteChannelFactory; // The factory may force a MIME type or it may return null, indicating to use the sink's MIME. String channelMimeType = firstNonNull(factory.getMimeType(), mimeType); - LOG.info( - "Opening temporary file {} with MIME type {} " - + "to write destination {} shard {} window {} pane {}", - outputFile, - channelMimeType, - destination, - shard, - window, - paneInfo); WritableByteChannel tempChannel = FileSystems.create(outputFile, channelMimeType); try { channel = factory.create(tempChannel); @@ -960,6 +905,26 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> LOG.debug("Starting write of bundle {} to {}.", this.id, outputFile); } + /** Called for each value in the bundle. */ + public abstract void write(OutputT value) throws Exception; + + public ResourceId getOutputFile() { + return outputFile; + } + + // Helper function to close a channel, on exception cases. + // Always throws prior exception, with any new closing exception suppressed. + private static void closeChannelAndThrow( + WritableByteChannel channel, ResourceId filename, Exception prior) throws Exception { + try { + channel.close(); + } catch (Exception e) { + LOG.error("Closing channel for {} failed.", filename, e); + prior.addSuppressed(e); + throw prior; + } + } + public final void cleanup() throws Exception { if (outputFile != null) { LOG.info("Deleting temporary file {}", outputFile); @@ -970,22 +935,19 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> } /** Closes the channel and returns the bundle result. */ - public final FileResult<DestinationT> close() throws Exception { + public final void close() throws Exception { checkState(outputFile != null, "FileResult.close cannot be called with a null outputFile"); + LOG.debug("Closing {}", outputFile); - LOG.debug("Writing footer to {}.", outputFile); try { writeFooter(); } catch (Exception e) { - LOG.error("Writing footer to {} failed, closing channel.", outputFile, e); closeChannelAndThrow(channel, outputFile, e); } - LOG.debug("Finishing write to {}.", outputFile); try { finishWrite(); } catch (Exception e) { - LOG.error("Finishing write to {} failed, closing channel.", outputFile, e); closeChannelAndThrow(channel, outputFile, e); } @@ -1001,11 +963,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> } catch (Exception e) { throw new IOException(String.format("Failed closing channel to %s", outputFile), e); } - - FileResult<DestinationT> result = - new FileResult<>(outputFile, shard, window, paneInfo, destination); LOG.info("Successfully wrote temporary file {}", outputFile); - return result; } /** Return the WriteOperation that this Writer belongs to. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index c99abce..35b28a1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -425,18 +425,13 @@ public class WriteFiles<UserT, DestinationT, OutputT> if (writers.size() <= maxNumWritersPerBundle) { String uuid = UUID.randomUUID().toString(); LOG.info( - "Opening writer {} for write operation {}, window {} pane {} destination {}", + "Opening writer {} for window {} pane {} destination {}", uuid, - writeOperation, window, paneInfo, destination); writer = writeOperation.createWriter(); - if (windowedWrites) { - writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM, destination); - } else { - writer.openUnwindowed(uuid, UNKNOWN_SHARDNUM, destination); - } + writer.open(uuid, destination); writers.put(key, writer); LOG.debug("Done opening writer"); } else { @@ -461,17 +456,23 @@ public class WriteFiles<UserT, DestinationT, OutputT> public void finishBundle(FinishBundleContext c) throws Exception { for (Map.Entry<WriterKey<DestinationT>, Writer<DestinationT, OutputT>> entry : writers.entrySet()) { + WriterKey<DestinationT> key = entry.getKey(); Writer<DestinationT, OutputT> writer = entry.getValue(); - FileResult<DestinationT> result; try { - result = writer.close(); + writer.close(); } catch (Exception e) { // If anything goes wrong, make sure to delete the temporary file. writer.cleanup(); throw e; } - BoundedWindow window = entry.getKey().window; - c.output(result, window.maxTimestamp(), window); + BoundedWindow window = key.window; + FileResult<DestinationT> res = + windowedWrites + ? new FileResult<>( + writer.getOutputFile(), UNKNOWN_SHARDNUM, window, key.paneInfo, key.destination) + : new FileResult<>( + writer.getOutputFile(), UNKNOWN_SHARDNUM, null, null, key.destination); + c.output(res, window.maxTimestamp(), window); } } @@ -505,20 +506,15 @@ public class WriteFiles<UserT, DestinationT, OutputT> DestinationT destination = sink.getDynamicDestinations().getDestination(input); Writer<DestinationT, OutputT> writer = writers.get(destination); if (writer == null) { - LOG.debug("Opening writer for write operation {}", writeOperation); + String uuid = UUID.randomUUID().toString(); + LOG.info( + "Opening writer {} for window {} pane {} destination {}", + uuid, + window, + c.pane(), + destination); writer = writeOperation.createWriter(); - int shardNumber = - shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING - ? c.element().getKey().getShardNumber() - : UNKNOWN_SHARDNUM; - if (windowedWrites) { - writer.openWindowed( - UUID.randomUUID().toString(), window, c.pane(), shardNumber, destination); - } else { - writer.openUnwindowed( - UUID.randomUUID().toString(), shardNumber, destination); - } - LOG.debug("Done opening writer"); + writer.open(uuid, destination); writers.put(destination, writer); } writeOrClose(writer, getSink().getDynamicDestinations().formatRecord(input)); @@ -527,16 +523,26 @@ public class WriteFiles<UserT, DestinationT, OutputT> // Close all writers. for (Map.Entry<DestinationT, Writer<DestinationT, OutputT>> entry : writers.entrySet()) { Writer<DestinationT, OutputT> writer = entry.getValue(); - FileResult<DestinationT> result; try { // Close the writer; if this throws let the error propagate. - result = writer.close(); - c.output(result); + writer.close(); } catch (Exception e) { // If anything goes wrong, make sure to delete the temporary file. writer.cleanup(); throw e; } + int shardNumber = + shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING + ? c.element().getKey().getShardNumber() + : UNKNOWN_SHARDNUM; + if (windowedWrites) { + c.output( + new FileResult<>( + writer.getOutputFile(), shardNumber, window, c.pane(), entry.getKey())); + } else { + c.output( + new FileResult<>(writer.getOutputFile(), shardNumber, null, null, entry.getKey())); + } } } @@ -998,11 +1004,14 @@ public class WriteFiles<UserT, DestinationT, OutputT> existingResults.size(), destination); for (int shard : missingShardNums) { + String uuid = UUID.randomUUID().toString(); + LOG.info("Opening empty writer {} for destination {}", uuid, writeOperation, destination); Writer<DestinationT, ?> writer = writeOperation.createWriter(); // Currently this code path is only called in the unwindowed case. - writer.openUnwindowed(UUID.randomUUID().toString(), shard, destination); - FileResult<DestinationT> emptyWrite = writer.close(); - completeResults.add(emptyWrite); + writer.open(uuid, destination); + writer.close(); + completeResults.add( + new FileResult<>(writer.getOutputFile(), shard, null, null, destination)); } LOG.debug("Done creating extra shards for {}.", destination); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index 29f3c1b..f7988bb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -102,14 +102,12 @@ public class FileBasedSinkTest { SimpleSink.SimpleWriter<Void> writer = buildWriteOperationWithTempDir(getBaseTempDirectory()).createWriter(); - writer.openUnwindowed(testUid, -1, null); + writer.open(testUid, null); for (String value : values) { writer.write(value); } - FileResult result = writer.close(); - - FileBasedSink sink = writer.getWriteOperation().getSink(); - assertEquals(expectedTempFile, result.getTempFilename()); + writer.close(); + assertEquals(expectedTempFile, writer.getOutputFile()); assertFileContains(expected, expectedTempFile); } @@ -514,12 +512,11 @@ public class FileBasedSinkTest { expected.add("footer"); expected.add("footer"); - writer.openUnwindowed(testUid, -1, null); + writer.open(testUid, null); writer.write("a"); writer.write("b"); - final FileResult result = writer.close(); - - assertEquals(expectedFile, result.getTempFilename()); + writer.close(); + assertEquals(expectedFile, writer.getOutputFile()); assertFileContains(expected, expectedFile); } -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
