Repository: beam Updated Branches: refs/heads/master 7e603d5c7 -> 571631a5e
Use SdkComponents in WindowingStrategy.toProto Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e0fd0a22 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e0fd0a22 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e0fd0a22 Branch: refs/heads/master Commit: e0fd0a222510b733b98d6c33c694431139bbc40d Parents: 7e603d5 Author: Thomas Groh <[email protected]> Authored: Fri Apr 7 13:41:29 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Wed Apr 12 09:20:43 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/construction/Coders.java | 6 +- .../core/construction/SdkComponents.java | 5 +- .../core/construction/WindowingStrategies.java | 94 ++++++++------------ .../core/construction/SdkComponentsTest.java | 6 +- .../construction/WindowingStrategiesTest.java | 18 ++++ 5 files changed, 66 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e0fd0a22/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java index d890de7..7b96240 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java @@ -76,8 +76,10 @@ public class Coders { private static RunnerApi.Coder toKnownCoder(Coder<?> coder, SdkComponents components) throws IOException { List<String> componentIds = new ArrayList<>(); - for (Coder<?> componentCoder : coder.getCoderArguments()) { - componentIds.add(components.registerCoder(componentCoder)); + if (coder.getCoderArguments() != null) { + for (Coder<?> componentCoder : coder.getCoderArguments()) { + componentIds.add(components.registerCoder(componentCoder)); + } } return RunnerApi.Coder.newBuilder() .addAllComponentCoderIds(componentIds) http://git-wip-us.apache.org/repos/asf/beam/blob/e0fd0a22/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java index 5cb0a00..03f3a03 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java @@ -96,7 +96,7 @@ class SdkComponents { * unique ID for the {@link WindowingStrategy}. Multiple registrations of the same {@link * WindowingStrategy} will return the same unique ID. */ - String registerWindowingStrategy(WindowingStrategy<?, ?> windowingStrategy) { + String registerWindowingStrategy(WindowingStrategy<?, ?> windowingStrategy) throws IOException { String existing = windowingStrategyIds.get(windowingStrategy); if (existing != null) { return existing; @@ -108,6 +108,9 @@ class SdkComponents { NameUtils.approximateSimpleName(windowingStrategy.getWindowFn())); String name = uniqify(baseName, windowingStrategyIds.values()); windowingStrategyIds.put(windowingStrategy, name); + RunnerApi.WindowingStrategy windowingStrategyProto = + WindowingStrategies.toProto(windowingStrategy, this); + componentsBuilder.putWindowingStrategies(name, windowingStrategyProto); return name; } http://git-wip-us.apache.org/repos/asf/beam/blob/e0fd0a22/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java index 353be05..6d721b0 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java @@ -19,15 +19,12 @@ package org.apache.beam.runners.core.construction; import static com.google.common.base.Preconditions.checkArgument; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; import java.io.Serializable; -import java.util.UUID; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; @@ -127,62 +124,30 @@ public class WindowingStrategies implements Serializable { } } - // This URN says that the coder is just a UDF blob the indicated SDK understands - // TODO: standardize such things - public static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1"; - // This URN says that the WindowFn is just a UDF blob the indicated SDK understands // TODO: standardize such things public static final String CUSTOM_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1"; - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - /** - * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where - * {@link RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link FunctionSpec} - * for the input {@link WindowFn}. + * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where {@link + * RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link RunnerApi.FunctionSpec} for the + * input {@link WindowFn}. */ - public static RunnerApi.MessageWithComponents toProto(WindowFn<?, ?> windowFn) + public static SdkFunctionSpec toProto( + WindowFn<?, ?> windowFn, @SuppressWarnings("unused") SdkComponents components) throws IOException { - Coder<?> windowCoder = windowFn.windowCoder(); - - // TODO: re-use components - String windowCoderId = UUID.randomUUID().toString(); - - SdkFunctionSpec windowFnSpec = - SdkFunctionSpec.newBuilder() - .setSpec( - FunctionSpec.newBuilder() - .setUrn(CUSTOM_WINDOWFN_URN) - .setParameter( - Any.pack( - BytesValue.newBuilder() - .setValue( - ByteString.copyFrom( - SerializableUtils.serializeToByteArray(windowFn))) - .build()))) - .build(); - - RunnerApi.Coder windowCoderProto = - RunnerApi.Coder.newBuilder() - .setSpec( - SdkFunctionSpec.newBuilder() - .setSpec( - FunctionSpec.newBuilder() - .setUrn(CUSTOM_CODER_URN) - .setParameter( - Any.pack( - BytesValue.newBuilder() - .setValue( - ByteString.copyFrom( - OBJECT_MAPPER.writeValueAsBytes( - windowCoder.asCloudObject()))) - .build())))) - .build(); - - return RunnerApi.MessageWithComponents.newBuilder() - .setSdkFunctionSpec(windowFnSpec) - .setComponents(Components.newBuilder().putCoders(windowCoderId, windowCoderProto)) + return SdkFunctionSpec.newBuilder() + // TODO: Set environment ID + .setSpec( + FunctionSpec.newBuilder() + .setUrn(CUSTOM_WINDOWFN_URN) + .setParameter( + Any.pack( + BytesValue.newBuilder() + .setValue( + ByteString.copyFrom( + SerializableUtils.serializeToByteArray(windowFn))) + .build()))) .build(); } @@ -194,9 +159,22 @@ public class WindowingStrategies implements Serializable { */ public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, ?> windowingStrategy) throws IOException { + SdkComponents components = SdkComponents.create(); + RunnerApi.WindowingStrategy windowingStrategyProto = toProto(windowingStrategy, components); - RunnerApi.MessageWithComponents windowFnWithComponents = - toProto(windowingStrategy.getWindowFn()); + return RunnerApi.MessageWithComponents.newBuilder() + .setWindowingStrategy(windowingStrategyProto) + .setComponents(components.toComponents()) + .build(); + } + + /** + * Converts a {@link WindowingStrategy} into a {@link RunnerApi.WindowingStrategy}, registering + * any components in the provided {@link SdkComponents}. + */ + public static RunnerApi.WindowingStrategy toProto( + WindowingStrategy<?, ?> windowingStrategy, SdkComponents components) throws IOException { + SdkFunctionSpec windowFnSpec = toProto(windowingStrategy.getWindowFn(), components); RunnerApi.WindowingStrategy.Builder windowingStrategyProto = RunnerApi.WindowingStrategy.newBuilder() @@ -205,11 +183,11 @@ public class WindowingStrategies implements Serializable { .setClosingBehavior(toProto(windowingStrategy.getClosingBehavior())) .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis()) .setTrigger(Triggers.toProto(windowingStrategy.getTrigger())) - .setWindowFn(windowFnWithComponents.getSdkFunctionSpec()); + .setWindowFn(windowFnSpec) + .setWindowCoderId( + components.registerCoder(windowingStrategy.getWindowFn().windowCoder())); - return RunnerApi.MessageWithComponents.newBuilder() - .setWindowingStrategy(windowingStrategyProto) - .setComponents(windowFnWithComponents.getComponents()).build(); + return windowingStrategyProto.build(); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/e0fd0a22/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java index 28b4911..ef4b16b 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java @@ -132,15 +132,17 @@ public class SdkComponentsTest { } @Test - public void registerWindowingStrategy() { + public void registerWindowingStrategy() throws IOException { WindowingStrategy<?, ?> strategy = WindowingStrategy.globalDefault().withMode(AccumulationMode.ACCUMULATING_FIRED_PANES); String name = components.registerWindowingStrategy(strategy); assertThat(name, not(isEmptyOrNullString())); + + components.toComponents().getWindowingStrategiesOrThrow(name); } @Test - public void registerWindowingStrategyIdEqualStrategies() { + public void registerWindowingStrategyIdEqualStrategies() throws IOException { WindowingStrategy<?, ?> strategy = WindowingStrategy.globalDefault().withMode(AccumulationMode.ACCUMULATING_FIRED_PANES); String name = components.registerWindowingStrategy(strategy); http://git-wip-us.apache.org/repos/asf/beam/blob/e0fd0a22/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java index b603d65..62bba8e 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertThat; import com.google.auto.value.AutoValue; import com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; @@ -30,6 +31,7 @@ import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; +import org.hamcrest.Matchers; import org.joda.time.Duration; import org.junit.Test; import org.junit.runner.RunWith; @@ -89,4 +91,20 @@ public class WindowingStrategiesTest { toProtoAndBackWindowingStrategy, equalTo((WindowingStrategy) windowingStrategy.fixDefaults())); } + + @Test + public void testToProtoAndBackWithComponents() throws Exception { + WindowingStrategy<?, ?> windowingStrategy = toProtoAndBackSpec.getWindowingStrategy(); + SdkComponents components = SdkComponents.create(); + RunnerApi.WindowingStrategy proto = + WindowingStrategies.toProto(windowingStrategy, components); + RunnerApi.Components protoComponents = components.toComponents(); + + assertThat( + WindowingStrategies.fromProto(proto, protoComponents).fixDefaults(), + Matchers.<WindowingStrategy<?, ?>>equalTo(windowingStrategy.fixDefaults())); + + protoComponents.getCodersOrThrow( + components.registerCoder(windowingStrategy.getWindowFn().windowCoder())); + } }
