Use Runner API Encodings in the Fn API Removes uses of Coder.toCloudObject
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e2b5d6ea Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e2b5d6ea Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e2b5d6ea Branch: refs/heads/master Commit: e2b5d6ea8a5d41ac27245a4999f59a73dfe24c43 Parents: ed0b63a Author: Thomas Groh <[email protected]> Authored: Fri Apr 28 18:06:38 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Fri Apr 28 20:16:21 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/BeamFnDataReadRunner.java | 14 +++++----- .../runners/core/BeamFnDataWriteRunner.java | 15 +++++------ .../control/ProcessBundleHandlerTest.java | 28 ++++++++++++-------- .../runners/core/BeamFnDataReadRunnerTest.java | 12 ++++----- .../runners/core/BeamFnDataWriteRunnerTest.java | 12 ++++----- 5 files changed, 43 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e2b5d6ea/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java index 034ef84..805d480 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java @@ -21,7 +21,6 @@ package org.apache.beam.runners.core; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; -import com.google.protobuf.BytesValue; import java.io.IOException; import java.util.Collection; import java.util.Map; @@ -30,8 +29,9 @@ import java.util.function.Supplier; import org.apache.beam.fn.harness.data.BeamFnDataClient; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.runners.core.construction.Coders; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.util.Serializer; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.MessageWithComponents; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.slf4j.Logger; @@ -73,12 +73,12 @@ public class BeamFnDataReadRunner<OutputT> { this.beamFnDataClientFactory = beamFnDataClientFactory; this.consumers = ImmutableList.copyOf(FluentIterable.concat(outputMap.values())); + MessageWithComponents runnerApiCoder = + coderSpec.getFunctionSpec().getData().unpack(MessageWithComponents.class); @SuppressWarnings("unchecked") - Coder<WindowedValue<OutputT>> coder = Serializer.deserialize( - OBJECT_MAPPER.readValue( - coderSpec.getFunctionSpec().getData().unpack(BytesValue.class).getValue().newInput(), - Map.class), - Coder.class); + Coder<WindowedValue<OutputT>> coder = + (Coder<WindowedValue<OutputT>>) + Coders.fromProto(runnerApiCoder.getCoder(), runnerApiCoder.getComponents()); this.coder = coder; } http://git-wip-us.apache.org/repos/asf/beam/blob/e2b5d6ea/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java index 54fd626..0ba09e3 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java @@ -19,15 +19,14 @@ package org.apache.beam.runners.core; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.protobuf.BytesValue; import java.io.IOException; -import java.util.Map; import java.util.function.Supplier; import org.apache.beam.fn.harness.data.BeamFnDataClient; import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer; import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.runners.core.construction.Coders; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.util.Serializer; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.MessageWithComponents; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; @@ -61,12 +60,12 @@ public class BeamFnDataWriteRunner<InputT> { this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier; this.outputTarget = outputTarget; + MessageWithComponents runnerApiCoder = + coderSpec.getFunctionSpec().getData().unpack(MessageWithComponents.class); @SuppressWarnings("unchecked") - Coder<WindowedValue<InputT>> coder = Serializer.deserialize( - OBJECT_MAPPER.readValue( - coderSpec.getFunctionSpec().getData().unpack(BytesValue.class).getValue().newInput(), - Map.class), - Coder.class); + Coder<WindowedValue<InputT>> coder = + (Coder<WindowedValue<InputT>>) + Coders.fromProto(runnerApiCoder.getCoder(), runnerApiCoder.getComponents()); this.coder = coder; } http://git-wip-us.apache.org/repos/asf/beam/blob/e2b5d6ea/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java index bd2fba9..5987267 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java @@ -62,6 +62,7 @@ import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.runners.core.construction.Coders; import org.apache.beam.runners.dataflow.util.DoFnInfo; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -106,18 +107,23 @@ public class ProcessBundleHandlerTest { static { try { STRING_CODER_SPEC = - BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder() - .setId(STRING_CODER_SPEC_ID) - .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom( - OBJECT_MAPPER.writeValueAsBytes(STRING_CODER.asCloudObject()))).build()))) - .build(); + BeamFnApi.Coder.newBuilder() + .setFunctionSpec( + BeamFnApi.FunctionSpec.newBuilder() + .setId(STRING_CODER_SPEC_ID) + .setData(Any.pack(Coders.toProto(STRING_CODER)))) + .build(); LONG_CODER_SPEC = - BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder() - .setId(STRING_CODER_SPEC_ID) - .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom( - OBJECT_MAPPER.writeValueAsBytes(WindowedValue.getFullCoder( - VarLongCoder.of(), GlobalWindow.Coder.INSTANCE).asCloudObject()))).build()))) - .build(); + BeamFnApi.Coder.newBuilder() + .setFunctionSpec( + BeamFnApi.FunctionSpec.newBuilder() + .setId(STRING_CODER_SPEC_ID) + .setData( + Any.pack( + Coders.toProto( + WindowedValue.getFullCoder( + VarLongCoder.of(), GlobalWindow.Coder.INSTANCE))))) + .build(); } catch (IOException e) { throw new ExceptionInInitializerError(e); } http://git-wip-us.apache.org/repos/asf/beam/blob/e2b5d6ea/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java index 0cc5ef9..0d036fe 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java @@ -32,8 +32,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.Any; -import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -48,6 +46,7 @@ import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.test.TestExecutors; import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService; import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.runners.core.construction.Coders; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -76,10 +75,11 @@ public class BeamFnDataReadRunnerTest { private static final BeamFnApi.Coder CODER_SPEC; static { try { - CODER_SPEC = BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder() - .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom( - OBJECT_MAPPER.writeValueAsBytes(CODER.asCloudObject()))).build()))) - .build(); + CODER_SPEC = + BeamFnApi.Coder.newBuilder() + .setFunctionSpec( + BeamFnApi.FunctionSpec.newBuilder().setData(Any.pack(Coders.toProto(CODER)))) + .build(); } catch (IOException e) { throw new ExceptionInInitializerError(e); } http://git-wip-us.apache.org/repos/asf/beam/blob/e2b5d6ea/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java index 378567a..50fee7a 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java @@ -30,14 +30,13 @@ import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.Any; -import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.fn.harness.data.BeamFnDataClient; import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer; import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.runners.core.construction.Coders; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -64,10 +63,11 @@ public class BeamFnDataWriteRunnerTest { private static final BeamFnApi.Coder CODER_SPEC; static { try { - CODER_SPEC = BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder() - .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom( - OBJECT_MAPPER.writeValueAsBytes(CODER.asCloudObject()))).build()))) - .build(); + CODER_SPEC = + BeamFnApi.Coder.newBuilder() + .setFunctionSpec( + BeamFnApi.FunctionSpec.newBuilder().setData(Any.pack(Coders.toProto(CODER)))) + .build(); } catch (IOException e) { throw new ExceptionInInitializerError(e); }
