get it compiling
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/45e09b2d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/45e09b2d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/45e09b2d Branch: refs/heads/master Commit: 45e09b2df7a97cfbbe0d8013c16cfdd7a55afbde Parents: b7f3341 Author: Robert Bradshaw <[email protected]> Authored: Fri May 5 17:27:13 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Mon May 8 20:17:56 2017 -0700 ---------------------------------------------------------------------- .../runners/dataflow/BatchViewOverrides.java | 4 +-- .../org/apache/beam/sdk/transforms/Combine.java | 3 +- .../beam/sdk/transforms/join/CoGbkResult.java | 2 +- .../beam/sdk/transforms/windowing/PaneInfo.java | 1 - .../org/apache/beam/sdk/util/WindowedValue.java | 3 +- .../beam/sdk/coders/CoderRegistryTest.java | 6 ++++ .../beam/sdk/testing/CoderPropertiesTest.java | 36 ++++++++++---------- .../sdk/testing/SerializableMatchersTest.java | 1 - .../apache/beam/sdk/transforms/CombineTest.java | 19 +++-------- .../apache/beam/sdk/transforms/ParDoTest.java | 17 ++------- 10 files changed, 37 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/45e09b2d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index 34609df..d640f6e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -1351,9 +1351,9 @@ class BatchViewOverrides { } @Override - public void encode(TransformedMap<K, V1, V2> value, OutputStream outStream, OutputStream outStream) + public void encode(TransformedMap<K, V1, V2> value, OutputStream outStream) throws CoderException, IOException { - encode(outStream, outStream, Coder.Context.NESTED); + encode(value, outStream, Coder.Context.NESTED); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/45e09b2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 7e43564..9e1cc71 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -2001,7 +2001,8 @@ public class Combine { } @Override - public InputOrAccum<InputT, AccumT> decode(InputStream inStream) throws CoderException, IOException { + public InputOrAccum<InputT, AccumT> decode(InputStream inStream) + throws CoderException, IOException { return decode(inStream, Coder.Context.NESTED); } http://git-wip-us.apache.org/repos/asf/beam/blob/45e09b2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java index d42de82..877bb07 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java @@ -264,7 +264,7 @@ public class CoGbkResult { } List<Iterable<?>> valueMap = Lists.newArrayListWithExpectedSize(schema.size()); for (int unionTag = 0; unionTag < schema.size(); unionTag++) { - valueMap.add(tagListCoder(unionTag).decode(inStream, Coder.Context.NESTED)); + valueMap.add(tagListCoder(unionTag).decode(inStream)); } return new CoGbkResult(schema, valueMap); } http://git-wip-us.apache.org/repos/asf/beam/blob/45e09b2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java index 75df220..1e9a187 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java @@ -27,7 +27,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.Objects; import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.WindowedContext; http://git-wip-us.apache.org/repos/asf/beam/blob/45e09b2d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java index 963886b..444521a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java @@ -646,8 +646,7 @@ public abstract class WindowedValue<T> { OutputStream outStream, Context context) throws CoderException, IOException { - InstantCoder.of().encode( - windowedElem.getTimestamp(), outStream, nestedContext); + InstantCoder.of().encode(windowedElem.getTimestamp(), outStream); windowsCoder.encode(windowedElem.getWindows(), outStream); PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream); valueCoder.encode(windowedElem.getValue(), outStream, context); http://git-wip-us.apache.org/repos/asf/beam/blob/45e09b2d/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java index c883ca0..b199a06 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java @@ -438,6 +438,12 @@ public class CoderRegistryTest { private static class AutoRegistrationClassCoder extends CustomCoder<AutoRegistrationClass> { private static final AutoRegistrationClassCoder INSTANCE = new AutoRegistrationClassCoder(); + + public void encode(AutoRegistrationClass value, OutputStream outStream) {} + + public AutoRegistrationClass decode(InputStream inStream) { + return null; + } } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/45e09b2d/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java index 164d221..ce78411 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java @@ -50,15 +50,15 @@ public class CoderPropertiesTest { /** A coder that says it is not deterministic but actually is. */ public static class NonDeterministicCoder extends AtomicCoder<String> { @Override - public void encode(String value, OutputStream outStream, Context context) + public void encode(String value, OutputStream outStream) throws CoderException, IOException { - StringUtf8Coder.of().encode(value, outStream, context); + StringUtf8Coder.of().encode(value, outStream); } @Override - public String decode(InputStream inStream, Context context) + public String decode(InputStream inStream) throws CoderException, IOException { - return StringUtf8Coder.of().decode(inStream, context); + return StringUtf8Coder.of().decode(inStream); } public void verifyDeterministic() throws NonDeterministicException { @@ -96,15 +96,15 @@ public class CoderPropertiesTest { } @Override - public void encode(String value, OutputStream outStream, Context context) + public void encode(String value, OutputStream outStream) throws IOException, CoderException { - StringUtf8Coder.of().encode(value + System.nanoTime(), outStream, context); + StringUtf8Coder.of().encode(value + System.nanoTime(), outStream); } @Override - public String decode(InputStream inStream, Context context) + public String decode(InputStream inStream) throws CoderException, IOException { - return StringUtf8Coder.of().decode(inStream, context); + return StringUtf8Coder.of().decode(inStream); } @Override @@ -136,16 +136,16 @@ public class CoderPropertiesTest { } @Override - public void encode(String value, OutputStream outStream, Context context) + public void encode(String value, OutputStream outStream) throws CoderException, IOException { changedState += 1; - StringUtf8Coder.of().encode(value + Strings.repeat("A", changedState), outStream, context); + StringUtf8Coder.of().encode(value + Strings.repeat("A", changedState), outStream); } @Override - public String decode(InputStream inStream, Context context) + public String decode(InputStream inStream) throws CoderException, IOException { - String decodedValue = StringUtf8Coder.of().decode(inStream, context); + String decodedValue = StringUtf8Coder.of().decode(inStream); return decodedValue.substring(0, decodedValue.length() - changedState); } @@ -180,18 +180,18 @@ public class CoderPropertiesTest { } @Override - public void encode(String value, OutputStream outStream, Context context) + public void encode(String value, OutputStream outStream) throws CoderException, IOException { if (lostState == 0) { throw new RuntimeException("I forgot something..."); } - StringUtf8Coder.of().encode(value, outStream, context); + StringUtf8Coder.of().encode(value, outStream); } @Override - public String decode(InputStream inStream, Context context) + public String decode(InputStream inStream) throws CoderException, IOException { - return StringUtf8Coder.of().decode(inStream, context); + return StringUtf8Coder.of().decode(inStream); } @Override @@ -216,12 +216,12 @@ public class CoderPropertiesTest { /** A coder which closes the underlying stream during encoding and decoding. */ public static class ClosingCoder extends AtomicCoder<String> { @Override - public void encode(String value, OutputStream outStream, Context context) throws IOException { + public void encode(String value, OutputStream outStream) throws IOException { outStream.close(); } @Override - public String decode(InputStream inStream, Context context) throws IOException { + public String decode(InputStream inStream) throws IOException { inStream.close(); return null; } http://git-wip-us.apache.org/repos/asf/beam/blob/45e09b2d/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java index 375be33..6b17696 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java @@ -30,7 +30,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.KV; import org.hamcrest.Matchers; http://git-wip-us.apache.org/repos/asf/beam/blob/45e09b2d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index bd8aee4..dc9788f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -876,28 +876,17 @@ public class CombineTest implements Serializable { */ private class CountSumCoder extends AtomicCoder<CountSum> { @Override - public void encode(CountSum value, OutputStream outStream, OutputStream outStream) + public void encode(CountSum value, OutputStream outStream) throws CoderException, IOException { - encode(outStream, outStream, Context.NESTED); - } - - @Override - public void encode(CountSum value, OutputStream outStream, - Context context) throws CoderException, IOException { LONG_CODER.encode(value.count, outStream); - DOUBLE_CODER.encode(value.sum, outStream, context); - } - - @Override - public CountSum decode(InputStream inStream) throws CoderException, IOException { - return decode(inStream, Coder.Context.NESTED); + DOUBLE_CODER.encode(value.sum, outStream); } @Override - public CountSum decode(InputStream inStream, Coder.Context context) + public CountSum decode(InputStream inStream) throws CoderException, IOException { long count = LONG_CODER.decode(inStream); - double sum = DOUBLE_CODER.decode(inStream, context); + double sum = DOUBLE_CODER.decode(inStream); return new CountSum(count, sum); } http://git-wip-us.apache.org/repos/asf/beam/blob/45e09b2d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 3697211..ef27f4c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -1092,24 +1092,13 @@ public class ParDoTest implements Serializable { @Override public void encode(MyInteger value, OutputStream outStream) throws CoderException, IOException { - encode(value, outStream, Context.NESTED); + delegate.encode(value.getValue(), outStream); } @Override - public void encode(MyInteger value, OutputStream outStream, Context context) - throws CoderException, IOException { - delegate.encode(value.getValue(), outStream, context); - } - - @Override - public MyInteger decode(InputStream inStream) throws CoderException { - return decode(inStream, Context.NESTED); - } - - @Override - public MyInteger decode(InputStream inStream, Context context) throws CoderException, + public MyInteger decode(InputStream inStream) throws CoderException, IOException { - return new MyInteger(delegate.decode(inStream, context)); + return new MyInteger(delegate.decode(inStream)); } }
