This is an automated email from the ASF dual-hosted git repository.
jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c514fdf A relative directory should be applied (if specified) even
when using a custom filenaming scheme
new 69e9ac8 This closes #4710: Uses output directory with custom
FileNaming in FileIO.write
c514fdf is described below
commit c514fdfdb71e91cdc5fe3a25139aa1a3e849be3e
Author: Gene Peters <[email protected]>
AuthorDate: Mon Feb 19 13:45:01 2018 -0800
A relative directory should be applied (if specified) even when using a
custom filenaming scheme
---
.../main/java/org/apache/beam/sdk/io/FileIO.java | 94 +++++++++++-----------
.../java/org/apache/beam/sdk/io/FileIOTest.java | 75 +++++++++++++++++
2 files changed, 123 insertions(+), 46 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
index 76717ad..9295981 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
@@ -23,6 +23,7 @@ import static
org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RE
import static org.apache.beam.sdk.transforms.Contextful.fn;
import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;
import java.io.IOException;
@@ -1147,6 +1148,52 @@ public class FileIO {
return toBuilder().setIgnoreWindowing(true).build();
}
+ @VisibleForTesting
+ Contextful<Fn<DestinationT, FileNaming>> resolveFileNamingFn() {
+ if (getDynamic()) {
+ checkArgument(
+ getConstantFileNaming() == null,
+ "when using writeDynamic(), must use versions of .withNaming()
"
+ + "that take functions from DestinationT");
+ checkArgument(getFilenamePrefix() == null, ".withPrefix() requires
write()");
+ checkArgument(getFilenameSuffix() == null, ".withSuffix() requires
write()");
+ checkArgument(
+ getFileNamingFn() != null,
+ "when using writeDynamic(), must specify "
+ + ".withNaming() taking a function form DestinationT");
+ return fn(
+ (element, c) -> {
+ FileNaming naming =
getFileNamingFn().getClosure().apply(element, c);
+ return getOutputDirectory() == null
+ ? naming
+ : relativeFileNaming(getOutputDirectory(),
naming);
+ },
+ getFileNamingFn().getRequirements());
+ } else {
+ checkArgument(getFileNamingFn() == null,
+ ".withNaming() taking a function from DestinationT requires
writeDynamic()");
+ FileNaming constantFileNaming;
+ if (getConstantFileNaming() == null) {
+ constantFileNaming = defaultNaming(
+ MoreObjects.firstNonNull(
+ getFilenamePrefix(),
StaticValueProvider.of("output")),
+ MoreObjects.firstNonNull(getFilenameSuffix(),
StaticValueProvider.of("")));
+ } else {
+ checkArgument(
+ getFilenamePrefix() == null,
+ ".to(FileNaming) is incompatible with .withSuffix()");
+ checkArgument(
+ getFilenameSuffix() == null,
+ ".to(FileNaming) is incompatible with .withPrefix()");
+ constantFileNaming = getConstantFileNaming();
+ }
+ if (getOutputDirectory() != null) {
+ constantFileNaming = relativeFileNaming(getOutputDirectory(),
constantFileNaming);
+ }
+ return fn(SerializableFunctions.<DestinationT,
FileNaming>constant(constantFileNaming));
+ }
+ }
+
@Override
public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
Write.Builder<DestinationT, UserT> resolvedSpec = new
AutoValue_FileIO_Write.Builder<>();
@@ -1172,52 +1219,7 @@ public class FileIO {
resolvedSpec.setDestinationCoder((Coder) VoidCoder.of());
}
- // Resolve fileNamingFn
- Contextful<Fn<DestinationT, FileNaming>> fileNamingFn;
- if (getDynamic()) {
- checkArgument(
- getConstantFileNaming() == null,
- "when using writeDynamic(), must use versions of .withNaming() "
- + "that take functions from DestinationT");
- checkArgument(getFilenamePrefix() == null, ".withPrefix() requires
write()");
- checkArgument(getFilenameSuffix() == null, ".withSuffix() requires
write()");
- checkArgument(
- getFileNamingFn() != null,
- "when using writeDynamic(), must specify "
- + ".withNaming() taking a function form DestinationT");
- fileNamingFn =
- Contextful.fn(
- (element, c) -> {
- FileNaming naming =
getFileNamingFn().getClosure().apply(element, c);
- return getOutputDirectory() == null
- ? naming
- : relativeFileNaming(getOutputDirectory(), naming);
- },
- getFileNamingFn().getRequirements());
- } else {
- checkArgument(getFileNamingFn() == null,
- ".withNaming() taking a function from DestinationT requires
writeDynamic()");
- FileNaming constantFileNaming;
- if (getConstantFileNaming() == null) {
- constantFileNaming = defaultNaming(
- MoreObjects.firstNonNull(
- getFilenamePrefix(), StaticValueProvider.of("output")),
- MoreObjects.firstNonNull(getFilenameSuffix(),
StaticValueProvider.of("")));
- if (getOutputDirectory() != null) {
- constantFileNaming = relativeFileNaming(getOutputDirectory(),
constantFileNaming);
- }
- } else {
- checkArgument(
- getFilenamePrefix() == null, ".to(FileNaming) is incompatible
with .withSuffix()");
- checkArgument(
- getFilenameSuffix() == null, ".to(FileNaming) is incompatible
with .withPrefix()");
- constantFileNaming = getConstantFileNaming();
- }
- fileNamingFn =
- fn(SerializableFunctions.<DestinationT,
FileNaming>constant(constantFileNaming));
- }
-
- resolvedSpec.setFileNamingFn(fileNamingFn);
+ resolvedSpec.setFileNamingFn(resolveFileNamingFn());
resolvedSpec.setEmptyWindowDestination(getEmptyWindowDestination());
if (getTempDirectory() == null) {
checkArgument(
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
index 608fd0a..36d0928 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io;
+import static
org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE;
import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -42,6 +43,8 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesSplittableParDo;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.junit.Rule;
@@ -301,4 +304,76 @@ public class FileIOTest implements Serializable {
.setSizeBytes(size)
.build();
}
+
+ private static FileIO.Write.FileNaming resolveFileNaming(FileIO.Write<?, ?>
write)
+ throws Exception {
+ return write.resolveFileNamingFn().getClosure().apply(null, null);
+ }
+
+ private static String getDefaultFileName(FileIO.Write<?, ?> write) throws
Exception {
+ return resolveFileNaming(write).getFilename(null, null, 0, 0, null);
+ }
+
+ @Test
+ public void testFilenameFnResolution() throws Exception {
+ FileIO.Write.FileNaming foo = (window, pane, numShards, shardIndex,
compression) -> "foo";
+
+ String expected =
+ FileSystems.matchNewResource("test", true).resolve("foo",
RESOLVE_FILE).toString();
+ assertEquals(
+ "Filenames should be resolved within a relative directory if '.to' is
invoked",
+ expected,
+ getDefaultFileName(FileIO.writeDynamic().to("test").withNaming(o ->
foo)));
+ assertEquals(
+ "Filenames should be resolved within a relative directory if '.to' is
invoked",
+ expected,
+ getDefaultFileName(FileIO.write().to("test").withNaming(foo)));
+
+ assertEquals(
+ "Filenames should be resolved as the direct result of the filenaming
function if '.to' "
+ + "is not invoked",
+ "foo",
+ getDefaultFileName(FileIO.writeDynamic().withNaming(o -> foo)));
+ assertEquals(
+ "Filenames should be resolved as the direct result of the filenaming
function if '.to' "
+ + "is not invoked",
+ "foo",
+ getDefaultFileName(FileIO.write().withNaming(foo)));
+
+ assertEquals(
+ "Default to the defaultNaming if a filenaming isn't provided for a
non-dynamic write",
+ "output-00000-of-00000",
+ resolveFileNaming(FileIO.write())
+ .getFilename(
+ GlobalWindow.INSTANCE,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING,
+ 0,
+ 0,
+ Compression.UNCOMPRESSED));
+
+ assertEquals(
+ "Default Naming should take prefix and suffix into account if
provided",
+ "foo-00000-of-00000.bar",
+ resolveFileNaming(FileIO.write().withPrefix("foo").withSuffix(".bar"))
+ .getFilename(
+ GlobalWindow.INSTANCE,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING,
+ 0,
+ 0,
+ Compression.UNCOMPRESSED));
+
+ assertEquals(
+ "Filenames should be resolved within a relative directory if '.to' is
invoked, "
+ + "even with default naming",
+ FileSystems.matchNewResource("test", true)
+ .resolve("output-00000-of-00000", RESOLVE_FILE)
+ .toString(),
+ resolveFileNaming(FileIO.write().to("test"))
+ .getFilename(
+ GlobalWindow.INSTANCE,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING,
+ 0,
+ 0,
+ Compression.UNCOMPRESSED));
+ }
}
--
To stop receiving notification emails like this one, please contact
[email protected].