Repository: beam Updated Branches: refs/heads/master 727253ee3 -> 867d81684
Adds logging at INFO for all creation, deletion and copying of files in WriteFiles Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/feab6043 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/feab6043 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/feab6043 Branch: refs/heads/master Commit: feab6043f5ff3f78c974c4c3438e55b1b55a39f8 Parents: 727253e Author: Eugene Kirpichov <[email protected]> Authored: Wed Nov 8 16:13:25 2017 -0800 Committer: Eugene Kirpichov <[email protected]> Committed: Wed Nov 8 20:54:43 2017 -0800 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/FileBasedSink.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/feab6043/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index d577fea..78ba071 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -694,6 +694,10 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> for (Map.Entry<ResourceId, ResourceId> srcDestPair : filenames.entrySet()) { srcFiles.add(srcDestPair.getKey()); dstFiles.add(srcDestPair.getValue()); + LOG.info( + "Will copy temporary file {} to final location {}", + srcDestPair.getKey(), + srcDestPair.getValue()); } // During a failure case, files may have been deleted in an earlier step. Thus // we ignore missing files here. @@ -734,6 +738,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> FileSystems.match(Collections.singletonList(tempDir.toString() + "*"))); for (Metadata matchResult : singleMatch.metadata()) { matches.add(matchResult.resourceId()); + LOG.info("Will remove temporary file {}", matchResult.resourceId()); } } catch (Exception e) { LOG.warn("Failed to match temporary files under: [{}].", tempDir); @@ -921,7 +926,15 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> getWriteOperation().getSink().writableByteChannelFactory; // The factory may force a MIME type or it may return null, indicating to use the sink's MIME. String channelMimeType = firstNonNull(factory.getMimeType(), mimeType); - LOG.debug("Opening {} for write with MIME type {}.", outputFile, channelMimeType); + LOG.info( + "Opening temporary file {} with MIME type {} " + + "to write destination {} shard {} window {} pane {}", + outputFile, + channelMimeType, + destination, + shard, + window, + paneInfo); WritableByteChannel tempChannel = FileSystems.create(outputFile, channelMimeType); try { channel = factory.create(tempChannel); @@ -950,6 +963,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> public final void cleanup() throws Exception { if (outputFile != null) { + LOG.info("Deleting temporary file {}", outputFile); // outputFile may be null if open() was not called or failed. FileSystems.delete( Collections.singletonList(outputFile), StandardMoveOptions.IGNORE_MISSING_FILES); @@ -991,7 +1005,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> FileResult<DestinationT> result = new FileResult<>(outputFile, shard, window, paneInfo, destination); - LOG.debug("Result for bundle {}: {}", this.id, outputFile); + LOG.info("Successfully wrote temporary file {}", outputFile); return result; }
