This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c514fdf  A relative directory should be applied (if specified) even 
when using a custom filenaming scheme
     new 69e9ac8  This closes #4710: Uses output directory with custom 
FileNaming in FileIO.write
c514fdf is described below

commit c514fdfdb71e91cdc5fe3a25139aa1a3e849be3e
Author: Gene Peters <[email protected]>
AuthorDate: Mon Feb 19 13:45:01 2018 -0800

    A relative directory should be applied (if specified) even when using a 
custom filenaming scheme
---
 .../main/java/org/apache/beam/sdk/io/FileIO.java   | 94 +++++++++++-----------
 .../java/org/apache/beam/sdk/io/FileIOTest.java    | 75 +++++++++++++++++
 2 files changed, 123 insertions(+), 46 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
index 76717ad..9295981 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
@@ -23,6 +23,7 @@ import static 
org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RE
 import static org.apache.beam.sdk.transforms.Contextful.fn;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.Lists;
 import java.io.IOException;
@@ -1147,6 +1148,52 @@ public class FileIO {
       return toBuilder().setIgnoreWindowing(true).build();
     }
 
+    @VisibleForTesting
+    Contextful<Fn<DestinationT, FileNaming>> resolveFileNamingFn() {
+      if (getDynamic()) {
+        checkArgument(
+                getConstantFileNaming() == null,
+                "when using writeDynamic(), must use versions of .withNaming() 
"
+                        + "that take functions from DestinationT");
+        checkArgument(getFilenamePrefix() == null, ".withPrefix() requires 
write()");
+        checkArgument(getFilenameSuffix() == null, ".withSuffix() requires 
write()");
+        checkArgument(
+                getFileNamingFn() != null,
+                "when using writeDynamic(), must specify "
+                        + ".withNaming() taking a function form DestinationT");
+        return fn(
+                        (element, c) -> {
+                          FileNaming naming = 
getFileNamingFn().getClosure().apply(element, c);
+                          return getOutputDirectory() == null
+                                  ? naming
+                                  : relativeFileNaming(getOutputDirectory(), 
naming);
+                        },
+                        getFileNamingFn().getRequirements());
+      } else {
+        checkArgument(getFileNamingFn() == null,
+                ".withNaming() taking a function from DestinationT requires 
writeDynamic()");
+        FileNaming constantFileNaming;
+        if (getConstantFileNaming() == null) {
+          constantFileNaming = defaultNaming(
+                  MoreObjects.firstNonNull(
+                          getFilenamePrefix(), 
StaticValueProvider.of("output")),
+                  MoreObjects.firstNonNull(getFilenameSuffix(), 
StaticValueProvider.of("")));
+        } else {
+          checkArgument(
+                  getFilenamePrefix() == null,
+                  ".to(FileNaming) is incompatible with .withSuffix()");
+          checkArgument(
+                  getFilenameSuffix() == null,
+                  ".to(FileNaming) is incompatible with .withPrefix()");
+          constantFileNaming = getConstantFileNaming();
+        }
+        if (getOutputDirectory() != null) {
+          constantFileNaming = relativeFileNaming(getOutputDirectory(), 
constantFileNaming);
+        }
+        return fn(SerializableFunctions.<DestinationT, 
FileNaming>constant(constantFileNaming));
+      }
+    }
+
     @Override
     public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
       Write.Builder<DestinationT, UserT> resolvedSpec = new 
AutoValue_FileIO_Write.Builder<>();
@@ -1172,52 +1219,7 @@ public class FileIO {
         resolvedSpec.setDestinationCoder((Coder) VoidCoder.of());
       }
 
-      // Resolve fileNamingFn
-      Contextful<Fn<DestinationT, FileNaming>> fileNamingFn;
-      if (getDynamic()) {
-        checkArgument(
-            getConstantFileNaming() == null,
-            "when using writeDynamic(), must use versions of .withNaming() "
-                + "that take functions from DestinationT");
-        checkArgument(getFilenamePrefix() == null, ".withPrefix() requires 
write()");
-        checkArgument(getFilenameSuffix() == null, ".withSuffix() requires 
write()");
-        checkArgument(
-            getFileNamingFn() != null,
-            "when using writeDynamic(), must specify "
-                + ".withNaming() taking a function form DestinationT");
-        fileNamingFn =
-            Contextful.fn(
-                (element, c) -> {
-                  FileNaming naming = 
getFileNamingFn().getClosure().apply(element, c);
-                  return getOutputDirectory() == null
-                      ? naming
-                      : relativeFileNaming(getOutputDirectory(), naming);
-                },
-                getFileNamingFn().getRequirements());
-      } else {
-        checkArgument(getFileNamingFn() == null,
-            ".withNaming() taking a function from DestinationT requires 
writeDynamic()");
-        FileNaming constantFileNaming;
-        if (getConstantFileNaming() == null) {
-          constantFileNaming = defaultNaming(
-              MoreObjects.firstNonNull(
-                  getFilenamePrefix(), StaticValueProvider.of("output")),
-              MoreObjects.firstNonNull(getFilenameSuffix(), 
StaticValueProvider.of("")));
-          if (getOutputDirectory() != null) {
-            constantFileNaming = relativeFileNaming(getOutputDirectory(), 
constantFileNaming);
-          }
-        } else {
-          checkArgument(
-              getFilenamePrefix() == null, ".to(FileNaming) is incompatible 
with .withSuffix()");
-          checkArgument(
-              getFilenameSuffix() == null, ".to(FileNaming) is incompatible 
with .withPrefix()");
-          constantFileNaming = getConstantFileNaming();
-        }
-        fileNamingFn =
-            fn(SerializableFunctions.<DestinationT, 
FileNaming>constant(constantFileNaming));
-      }
-
-      resolvedSpec.setFileNamingFn(fileNamingFn);
+      resolvedSpec.setFileNamingFn(resolveFileNamingFn());
       resolvedSpec.setEmptyWindowDestination(getEmptyWindowDestination());
       if (getTempDirectory() == null) {
         checkArgument(
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
index 608fd0a..36d0928 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io;
 
+import static 
org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE;
 import static org.hamcrest.Matchers.isA;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -42,6 +43,8 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.UsesSplittableParDo;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
 import org.junit.Rule;
@@ -301,4 +304,76 @@ public class FileIOTest implements Serializable {
         .setSizeBytes(size)
         .build();
   }
+
+  private static FileIO.Write.FileNaming resolveFileNaming(FileIO.Write<?, ?> 
write)
+      throws Exception {
+    return write.resolveFileNamingFn().getClosure().apply(null, null);
+  }
+
+  private static String getDefaultFileName(FileIO.Write<?, ?> write) throws 
Exception {
+    return resolveFileNaming(write).getFilename(null, null, 0, 0, null);
+  }
+
+  @Test
+  public void testFilenameFnResolution() throws Exception {
+    FileIO.Write.FileNaming foo = (window, pane, numShards, shardIndex, 
compression) -> "foo";
+
+    String expected =
+        FileSystems.matchNewResource("test", true).resolve("foo", 
RESOLVE_FILE).toString();
+    assertEquals(
+        "Filenames should be resolved within a relative directory if '.to' is 
invoked",
+        expected,
+        getDefaultFileName(FileIO.writeDynamic().to("test").withNaming(o -> 
foo)));
+    assertEquals(
+        "Filenames should be resolved within a relative directory if '.to' is 
invoked",
+        expected,
+        getDefaultFileName(FileIO.write().to("test").withNaming(foo)));
+
+    assertEquals(
+        "Filenames should be resolved as the direct result of the filenaming 
function if '.to' "
+            + "is not invoked",
+        "foo",
+        getDefaultFileName(FileIO.writeDynamic().withNaming(o -> foo)));
+    assertEquals(
+        "Filenames should be resolved as the direct result of the filenaming 
function if '.to' "
+            + "is not invoked",
+        "foo",
+        getDefaultFileName(FileIO.write().withNaming(foo)));
+
+    assertEquals(
+        "Default to the defaultNaming if a filenaming isn't provided for a 
non-dynamic write",
+        "output-00000-of-00000",
+        resolveFileNaming(FileIO.write())
+            .getFilename(
+                GlobalWindow.INSTANCE,
+                PaneInfo.ON_TIME_AND_ONLY_FIRING,
+                0,
+                0,
+                Compression.UNCOMPRESSED));
+
+    assertEquals(
+        "Default Naming should take prefix and suffix into account if 
provided",
+        "foo-00000-of-00000.bar",
+        resolveFileNaming(FileIO.write().withPrefix("foo").withSuffix(".bar"))
+            .getFilename(
+                GlobalWindow.INSTANCE,
+                PaneInfo.ON_TIME_AND_ONLY_FIRING,
+                0,
+                0,
+                Compression.UNCOMPRESSED));
+
+    assertEquals(
+        "Filenames should be resolved within a relative directory if '.to' is 
invoked, "
+            + "even with default naming",
+        FileSystems.matchNewResource("test", true)
+            .resolve("output-00000-of-00000", RESOLVE_FILE)
+            .toString(),
+        resolveFileNaming(FileIO.write().to("test"))
+            .getFilename(
+                GlobalWindow.INSTANCE,
+                PaneInfo.ON_TIME_AND_ONLY_FIRING,
+                0,
+                0,
+                Compression.UNCOMPRESSED));
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to