automated context removal or redirection
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b7f3341e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b7f3341e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b7f3341e Branch: refs/heads/master Commit: b7f3341ed4fc073a834a55773bcb4a2c7f821c52 Parents: 996dce3 Author: Robert Bradshaw <[email protected]> Authored: Fri May 5 17:24:02 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Mon May 8 20:17:56 2017 -0700 ---------------------------------------------------------------------- .../apex/translation/utils/ApexStreamTuple.java | 11 ++++++++ .../UnboundedReadFromBoundedSource.java | 4 +-- .../runners/core/construction/CodersTest.java | 4 +-- .../core/ElementAndRestrictionCoder.java | 4 +-- .../beam/runners/core/KeyedWorkItemCoder.java | 6 ++--- .../beam/runners/core/TimerInternals.java | 6 ++--- .../direct/CloningBundleFactoryTest.java | 20 ++++++-------- .../UnboundedReadEvaluatorFactoryTest.java | 5 ++-- .../streaming/SingletonKeyedWorkItemCoder.java | 11 ++++++++ .../runners/dataflow/BatchViewOverrides.java | 11 ++++++++ .../runners/dataflow/internal/IsmFormat.java | 28 +++++++++++++------- .../runners/dataflow/util/RandomAccessData.java | 11 ++++++++ .../org/apache/beam/sdk/coders/AvroCoder.java | 4 +-- .../apache/beam/sdk/coders/BigDecimalCoder.java | 11 ++++++++ .../beam/sdk/coders/BigEndianIntegerCoder.java | 4 +-- .../beam/sdk/coders/BigEndianLongCoder.java | 4 +-- .../apache/beam/sdk/coders/BigIntegerCoder.java | 11 ++++++++ .../org/apache/beam/sdk/coders/BitSetCoder.java | 11 ++++++++ .../apache/beam/sdk/coders/ByteArrayCoder.java | 11 ++++++++ .../org/apache/beam/sdk/coders/ByteCoder.java | 4 +-- .../apache/beam/sdk/coders/DelegateCoder.java | 11 ++++++++ .../org/apache/beam/sdk/coders/DoubleCoder.java | 4 +-- .../apache/beam/sdk/coders/DurationCoder.java | 4 +-- .../apache/beam/sdk/coders/InstantCoder.java | 4 +-- .../beam/sdk/coders/IterableLikeCoder.java | 6 ++--- .../org/apache/beam/sdk/coders/KvCoder.java | 11 ++++++++ .../beam/sdk/coders/LengthPrefixCoder.java | 4 +-- .../org/apache/beam/sdk/coders/MapCoder.java | 11 ++++++++ .../apache/beam/sdk/coders/NullableCoder.java | 11 ++++++++ .../beam/sdk/coders/SerializableCoder.java | 4 +-- .../beam/sdk/coders/StringDelegateCoder.java | 11 ++++++++ .../apache/beam/sdk/coders/StringUtf8Coder.java | 11 ++++++++ .../beam/sdk/coders/TextualIntegerCoder.java | 11 ++++++++ .../org/apache/beam/sdk/coders/VarIntCoder.java | 4 +-- .../apache/beam/sdk/coders/VarLongCoder.java | 4 +-- .../org/apache/beam/sdk/coders/VoidCoder.java | 4 +-- .../org/apache/beam/sdk/io/FileBasedSink.java | 11 ++++++++ .../sdk/transforms/ApproximateQuantiles.java | 4 +-- .../org/apache/beam/sdk/transforms/Combine.java | 22 +++++++++++++++ .../apache/beam/sdk/transforms/CombineFns.java | 13 +++++++-- .../org/apache/beam/sdk/transforms/Count.java | 4 +-- .../org/apache/beam/sdk/transforms/Mean.java | 4 +-- .../org/apache/beam/sdk/transforms/Top.java | 4 +-- .../beam/sdk/transforms/join/CoGbkResult.java | 6 ++--- .../beam/sdk/transforms/join/UnionCoder.java | 11 ++++++++ .../sdk/transforms/windowing/GlobalWindow.java | 4 +-- .../transforms/windowing/IntervalWindow.java | 4 +-- .../beam/sdk/transforms/windowing/PaneInfo.java | 4 +-- .../org/apache/beam/sdk/util/BitSetCoder.java | 11 ++++++++ .../org/apache/beam/sdk/util/WindowedValue.java | 24 +++++++++++++++-- .../beam/sdk/values/TimestampedValue.java | 5 ++-- .../beam/sdk/values/ValueInSingleWindow.java | 13 +++++++-- .../beam/sdk/values/ValueWithRecordId.java | 11 ++++++++ .../beam/sdk/coders/CoderRegistryTest.java | 8 +++--- .../apache/beam/sdk/coders/CustomCoderTest.java | 4 +-- .../beam/sdk/coders/NullableCoderTest.java | 11 ++++++++ .../beam/sdk/coders/StructuredCoderTest.java | 12 ++++----- .../apache/beam/sdk/testing/PAssertTest.java | 4 +-- .../sdk/testing/SerializableMatchersTest.java | 4 +-- .../beam/sdk/testing/WindowSupplierTest.java | 4 +-- .../beam/sdk/transforms/CombineFnsTest.java | 11 ++++++++ .../apache/beam/sdk/transforms/CombineTest.java | 22 +++++++++++++++ .../apache/beam/sdk/transforms/CreateTest.java | 9 +++---- .../beam/sdk/transforms/GroupByKeyTest.java | 4 +-- .../apache/beam/sdk/transforms/ParDoTest.java | 15 +++++++++-- .../apache/beam/sdk/transforms/ViewTest.java | 11 ++++++++ .../transforms/reflect/DoFnInvokersTest.java | 8 +++--- .../apache/beam/sdk/util/CoderUtilsTest.java | 4 +-- .../beam/sdk/util/SerializableUtilsTest.java | 4 +-- .../extensions/protobuf/ByteStringCoder.java | 11 ++++++++ .../sdk/extensions/protobuf/ProtoCoder.java | 11 ++++++++ .../io/gcp/bigquery/TableDestinationCoder.java | 2 +- .../sdk/io/gcp/bigquery/TableRowInfoCoder.java | 11 ++++++++ .../sdk/io/gcp/bigquery/TableRowJsonCoder.java | 11 ++++++++ .../io/gcp/bigquery/WriteBundlesToFiles.java | 4 +-- .../pubsub/PubsubMessagePayloadOnlyCoder.java | 11 ++++++++ .../PubsubMessageWithAttributesCoder.java | 11 ++++++++ .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 4 +-- .../io/gcp/pubsub/PubsubUnboundedSource.java | 11 ++++++++ .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 11 ++++++++ .../beam/sdk/io/hadoop/WritableCoder.java | 4 +-- .../beam/sdk/io/hbase/HBaseMutationCoder.java | 6 ++--- .../beam/sdk/io/hbase/HBaseResultCoder.java | 4 +-- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 4 +-- .../beam/sdk/io/kafka/KafkaRecordCoder.java | 11 ++++++++ .../beam/sdk/io/kinesis/KinesisRecordCoder.java | 4 +-- .../org/apache/beam/sdk/io/xml/JAXBCoder.java | 25 ++++++++++------- .../apache/beam/sdk/io/xml/JAXBCoderTest.java | 13 +++++++-- 88 files changed, 598 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java index 4aa6ee8..1d402eb 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java @@ -162,6 +162,12 @@ public interface ApexStreamTuple<T> { } @Override + public void encode(ApexStreamTuple<T> value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(ApexStreamTuple<T> value, OutputStream outStream, Context context) throws CoderException, IOException { if (value instanceof WatermarkTuple) { @@ -175,6 +181,11 @@ public interface ApexStreamTuple<T> { } @Override + public ApexStreamTuple<T> decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public ApexStreamTuple<T> decode(InputStream inStream, Context context) throws CoderException, IOException { int b = inStream.read(); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java index b74da80..24eb384 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java @@ -221,7 +221,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle } @Override - public void encode(Checkpoint<T> value, OutputStream outStream, Context context) + public void encode(Checkpoint<T> value, OutputStream outStream) throws CoderException, IOException { elemsCoder.encode(value.residualElements, outStream); sourceCoder.encode(value.residualSource, outStream); @@ -229,7 +229,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle @SuppressWarnings("unchecked") @Override - public Checkpoint<T> decode(InputStream inStream, Context context) + public Checkpoint<T> decode(InputStream inStream) throws CoderException, IOException { return new Checkpoint<>( elemsCoder.decode(inStream), http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java index 765723c..42fba7c 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java @@ -151,11 +151,11 @@ public class CodersTest { private static class RecordCoder extends AtomicCoder<Record> { @Override - public void encode(Record value, OutputStream outStream, Context context) + public void encode(Record value, OutputStream outStream) throws CoderException, IOException {} @Override - public Record decode(InputStream inStream, Context context) + public Record decode(InputStream inStream) throws CoderException, IOException { return new Record(); } http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java index fcb1deb..4440b85 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java @@ -50,7 +50,7 @@ public class ElementAndRestrictionCoder<ElementT, RestrictionT> @Override public void encode( - ElementAndRestriction<ElementT, RestrictionT> value, OutputStream outStream, Context context) + ElementAndRestriction<ElementT, RestrictionT> value, OutputStream outStream) throws IOException { if (value == null) { throw new CoderException("cannot encode a null ElementAndRestriction"); @@ -60,7 +60,7 @@ public class ElementAndRestrictionCoder<ElementT, RestrictionT> } @Override - public ElementAndRestriction<ElementT, RestrictionT> decode(InputStream inStream, Context context) + public ElementAndRestriction<ElementT, RestrictionT> decode(InputStream inStream) throws IOException { ElementT key = elementCoder.decode(inStream); RestrictionT value = restrictionCoder.decode(inStream); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java index 0869244..b1cb1a6 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java @@ -69,18 +69,16 @@ public class KeyedWorkItemCoder<K, ElemT> extends StructuredCoder<KeyedWorkItem< } @Override - public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream, Coder.Context context) + public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream) throws CoderException, IOException { - Coder.Context nestedContext = context.nested(); keyCoder.encode(value.key(), outStream); timersCoder.encode(value.timersIterable(), outStream); elemsCoder.encode(value.elementsIterable(), outStream); } @Override - public KeyedWorkItem<K, ElemT> decode(InputStream inStream, Coder.Context context) + public KeyedWorkItem<K, ElemT> decode(InputStream inStream) throws CoderException, IOException { - Coder.Context nestedContext = context.nested(); K key = keyCoder.decode(inStream); Iterable<TimerData> timers = timersCoder.decode(inStream); Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java index f0a62cd..f4a12d0 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java @@ -238,9 +238,8 @@ public interface TimerInternals { } @Override - public void encode(TimerData timer, OutputStream outStream, Context context) + public void encode(TimerData timer, OutputStream outStream) throws CoderException, IOException { - Context nestedContext = context.nested(); STRING_CODER.encode(timer.getTimerId(), outStream); STRING_CODER.encode(timer.getNamespace().stringKey(), outStream); INSTANT_CODER.encode(timer.getTimestamp(), outStream); @@ -248,9 +247,8 @@ public interface TimerInternals { } @Override - public TimerData decode(InputStream inStream, Context context) + public TimerData decode(InputStream inStream) throws CoderException, IOException { - Context nestedContext = context.nested(); String timerId = STRING_CODER.decode(inStream); StateNamespace namespace = StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java index 33d171e..5bc8077 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java @@ -178,15 +178,14 @@ public class CloningBundleFactoryTest { @Override public void encode( Record value, - OutputStream outStream, - org.apache.beam.sdk.coders.Coder.Context context) + OutputStream outStream) throws IOException { throw new CoderException("Encode not allowed"); } @Override public Record decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) + InputStream inStream) throws IOException { return null; } @@ -196,13 +195,12 @@ public class CloningBundleFactoryTest { @Override public void encode( Record value, - OutputStream outStream, - org.apache.beam.sdk.coders.Coder.Context context) + OutputStream outStream) throws IOException {} @Override public Record decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) + InputStream inStream) throws IOException { throw new CoderException("Decode not allowed"); } @@ -212,13 +210,12 @@ public class CloningBundleFactoryTest { @Override public void encode( Record value, - OutputStream outStream, - org.apache.beam.sdk.coders.Coder.Context context) + OutputStream outStream) throws CoderException, IOException {} @Override public Record decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) + InputStream inStream) throws CoderException, IOException { return new Record() { @Override @@ -244,13 +241,12 @@ public class CloningBundleFactoryTest { @Override public void encode( Record value, - OutputStream outStream, - org.apache.beam.sdk.coders.Coder.Context context) + OutputStream outStream) throws CoderException, IOException {} @Override public Record decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) + InputStream inStream) throws CoderException, IOException { return new Record() { @Override http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index b9ba7f4..2a01db5 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -590,15 +590,14 @@ public class UnboundedReadEvaluatorFactoryTest { @Override public void encode( TestCheckpointMark value, - OutputStream outStream, - org.apache.beam.sdk.coders.Coder.Context context) + OutputStream outStream) throws IOException { VarInt.encode(value.index, outStream); } @Override public TestCheckpointMark decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) + InputStream inStream) throws IOException { TestCheckpointMark decoded = new TestCheckpointMark(VarInt.decodeInt(inStream)); decoded.decoded = true; http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java index d7bae7e..b62fc16 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java @@ -66,6 +66,12 @@ public class SingletonKeyedWorkItemCoder<K, ElemT> } @Override + public void encode(SingletonKeyedWorkItem<K, ElemT> value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(SingletonKeyedWorkItem<K, ElemT> value, OutputStream outStream, Context context) @@ -75,6 +81,11 @@ public class SingletonKeyedWorkItemCoder<K, ElemT> } @Override + public SingletonKeyedWorkItem<K, ElemT> decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public SingletonKeyedWorkItem<K, ElemT> decode(InputStream inStream, Context context) throws CoderException, IOException { K key = keyCoder.decode(inStream); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index 0e60fa0..34609df 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -1351,6 +1351,12 @@ class BatchViewOverrides { } @Override + public void encode(TransformedMap<K, V1, V2> value, OutputStream outStream, OutputStream outStream) + throws CoderException, IOException { + encode(outStream, outStream, Coder.Context.NESTED); + } + + @Override public void encode(TransformedMap<K, V1, V2> value, OutputStream outStream, Coder.Context context) throws CoderException, IOException { transformCoder.encode(value.transform, outStream); @@ -1358,6 +1364,11 @@ class BatchViewOverrides { } @Override + public TransformedMap<K, V1, V2> decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Coder.Context.NESTED); + } + + @Override public TransformedMap<K, V1, V2> decode( InputStream inStream, Coder.Context context) throws CoderException, IOException { return new TransformedMap<>( http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java index 0f0cd4d..8cfae81 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java @@ -231,8 +231,7 @@ public class IsmFormat { } @Override - public void encode(IsmRecord<V> value, OutputStream outStream, - Coder.Context context) throws CoderException, IOException { + public void encode(IsmRecord<V> value, OutputStream outStream) throws CoderException, IOException { if (value.getKeyComponents().size() != keyComponentCoders.size()) { throw new CoderException(String.format( "Expected %s key component(s) but received key component(s) %s.", @@ -249,7 +248,7 @@ public class IsmFormat { } @Override - public IsmRecord<V> decode(InputStream inStream, Coder.Context context) + public IsmRecord<V> decode(InputStream inStream) throws CoderException, IOException { List<Object> keyComponents = new ArrayList<>(keyComponentCoders.size()); for (Coder<?> keyCoder : keyComponentCoders) { @@ -493,7 +492,7 @@ public class IsmFormat { } @Override - public void encode(K value, OutputStream outStream, Coder.Context context) + public void encode(K value, OutputStream outStream) throws CoderException, IOException { if (value == METADATA_KEY) { outStream.write(0); @@ -504,7 +503,7 @@ public class IsmFormat { } @Override - public K decode(InputStream inStream, Coder.Context context) + public K decode(InputStream inStream) throws CoderException, IOException { int marker = inStream.read(); if (marker == 0) { @@ -621,6 +620,12 @@ public class IsmFormat { private IsmShardCoder() {} @Override + public void encode(IsmShard value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Coder.Context.NESTED); + } + + @Override public void encode(IsmShard value, OutputStream outStream, Coder.Context context) throws CoderException, IOException { checkState(value.getIndexOffset() >= 0, @@ -632,6 +637,11 @@ public class IsmFormat { } @Override + public IsmShard decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Coder.Context.NESTED); + } + + @Override public IsmShard decode( InputStream inStream, Coder.Context context) throws CoderException, IOException { return IsmShard.of( @@ -683,14 +693,14 @@ public class IsmFormat { } @Override - public void encode(KeyPrefix value, OutputStream outStream, Coder.Context context) + public void encode(KeyPrefix value, OutputStream outStream) throws CoderException, IOException { VarInt.encode(value.getSharedKeySize(), outStream); VarInt.encode(value.getUnsharedKeySize(), outStream); } @Override - public KeyPrefix decode(InputStream inStream, Coder.Context context) + public KeyPrefix decode(InputStream inStream) throws CoderException, IOException { return KeyPrefix.of(VarInt.decodeInt(inStream), VarInt.decodeInt(inStream)); } @@ -755,7 +765,7 @@ public class IsmFormat { } @Override - public void encode(Footer value, OutputStream outStream, Coder.Context context) + public void encode(Footer value, OutputStream outStream) throws CoderException, IOException { DataOutputStream dataOut = new DataOutputStream(outStream); dataOut.writeLong(value.getIndexPosition()); @@ -765,7 +775,7 @@ public class IsmFormat { } @Override - public Footer decode(InputStream inStream, Coder.Context context) + public Footer decode(InputStream inStream) throws CoderException, IOException { DataInputStream dataIn = new DataInputStream(inStream); Footer footer = Footer.of(dataIn.readLong(), dataIn.readLong(), dataIn.readLong()); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java index f36bd78..5ea9f07 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java @@ -63,6 +63,12 @@ public class RandomAccessData { } @Override + public void encode(RandomAccessData value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Coder.Context.NESTED); + } + + @Override public void encode(RandomAccessData value, OutputStream outStream, Coder.Context context) throws CoderException, IOException { if (value == POSITIVE_INFINITY) { @@ -75,6 +81,11 @@ public class RandomAccessData { } @Override + public RandomAccessData decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Coder.Context.NESTED); + } + + @Override public RandomAccessData decode(InputStream inStream, Coder.Context context) throws CoderException, IOException { RandomAccessData rval = new RandomAccessData(); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java index f82c065..bba669d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java @@ -300,7 +300,7 @@ public class AvroCoder<T> extends CustomCoder<T> { } @Override - public void encode(T value, OutputStream outStream, Context context) throws IOException { + public void encode(T value, OutputStream outStream) throws IOException { // Get a BinaryEncoder instance from the ThreadLocal cache and attempt to reuse it. BinaryEncoder encoderInstance = ENCODER_FACTORY.directBinaryEncoder(outStream, encoder.get()); // Save the potentially-new instance for reuse later. @@ -310,7 +310,7 @@ public class AvroCoder<T> extends CustomCoder<T> { } @Override - public T decode(InputStream inStream, Context context) throws IOException { + public T decode(InputStream inStream) throws IOException { // Get a BinaryDecoder instance from the ThreadLocal cache and attempt to reuse it. BinaryDecoder decoderInstance = DECODER_FACTORY.directBinaryDecoder(inStream, decoder.get()); // Save the potentially-new instance for later. http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java index e2166cf..e890d11 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java @@ -48,6 +48,12 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> { private BigDecimalCoder() {} @Override + public void encode(BigDecimal value, OutputStream outStream) + throws IOException, CoderException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(BigDecimal value, OutputStream outStream, Context context) throws IOException, CoderException { checkNotNull(value, String.format("cannot encode a null %s", BigDecimal.class.getSimpleName())); @@ -56,6 +62,11 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> { } @Override + public BigDecimal decode(InputStream inStream) throws IOException, CoderException { + return decode(inStream, Context.NESTED); + } + + @Override public BigDecimal decode(InputStream inStream, Context context) throws IOException, CoderException { int scale = VAR_INT_CODER.decode(inStream); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java index a61f099..efb1e4b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java @@ -43,7 +43,7 @@ public class BigEndianIntegerCoder extends AtomicCoder<Integer> { private BigEndianIntegerCoder() {} @Override - public void encode(Integer value, OutputStream outStream, Context context) + public void encode(Integer value, OutputStream outStream) throws IOException, CoderException { if (value == null) { throw new CoderException("cannot encode a null Integer"); @@ -52,7 +52,7 @@ public class BigEndianIntegerCoder extends AtomicCoder<Integer> { } @Override - public Integer decode(InputStream inStream, Context context) + public Integer decode(InputStream inStream) throws IOException, CoderException { try { return new DataInputStream(inStream).readInt(); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java index 868160a..ab85e17 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java @@ -43,7 +43,7 @@ public class BigEndianLongCoder extends AtomicCoder<Long> { private BigEndianLongCoder() {} @Override - public void encode(Long value, OutputStream outStream, Context context) + public void encode(Long value, OutputStream outStream) throws IOException, CoderException { if (value == null) { throw new CoderException("cannot encode a null Long"); @@ -52,7 +52,7 @@ public class BigEndianLongCoder extends AtomicCoder<Long> { } @Override - public Long decode(InputStream inStream, Context context) + public Long decode(InputStream inStream) throws IOException, CoderException { try { return new DataInputStream(inStream).readLong(); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java index 3b038af..d54accf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java @@ -42,6 +42,12 @@ public class BigIntegerCoder extends AtomicCoder<BigInteger> { private BigIntegerCoder() {} @Override + public void encode(BigInteger value, OutputStream outStream) + throws IOException, CoderException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(BigInteger value, OutputStream outStream, Context context) throws IOException, CoderException { checkNotNull(value, String.format("cannot encode a null %s", BigInteger.class.getSimpleName())); @@ -49,6 +55,11 @@ public class BigIntegerCoder extends AtomicCoder<BigInteger> { } @Override + public BigInteger decode(InputStream inStream) throws IOException, CoderException { + return decode(inStream, Context.NESTED); + } + + @Override public BigInteger decode(InputStream inStream, Context context) throws IOException, CoderException { return new BigInteger(BYTE_ARRAY_CODER.decode(inStream, context)); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java index f49776b..8115017 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java @@ -36,6 +36,12 @@ public class BitSetCoder extends AtomicCoder<BitSet> { } @Override + public void encode(BitSet value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(BitSet value, OutputStream outStream, Context context) throws CoderException, IOException { if (value == null) { @@ -45,6 +51,11 @@ public class BitSetCoder extends AtomicCoder<BitSet> { } @Override + public BitSet decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public BitSet decode(InputStream inStream, Context context) throws CoderException, IOException { return BitSet.valueOf(BYTE_ARRAY_CODER.decode(inStream, context)); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java index c9393a1..3b38388 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java @@ -52,6 +52,12 @@ public class ByteArrayCoder extends AtomicCoder<byte[]> { private ByteArrayCoder() {} @Override + public void encode(byte[] value, OutputStream outStream) + throws IOException, CoderException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(byte[] value, OutputStream outStream, Context context) throws IOException, CoderException { if (value == null) { @@ -86,6 +92,11 @@ public class ByteArrayCoder extends AtomicCoder<byte[]> { } @Override + public byte[] decode(InputStream inStream) throws IOException, CoderException { + return decode(inStream, Context.NESTED); + } + + @Override public byte[] decode(InputStream inStream, Context context) throws IOException, CoderException { if (context.isWholeStream) { http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java index 7f449d6..2d23a64 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java @@ -41,7 +41,7 @@ public class ByteCoder extends AtomicCoder<Byte> { private ByteCoder() {} @Override - public void encode(Byte value, OutputStream outStream, Context context) + public void encode(Byte value, OutputStream outStream) throws IOException, CoderException { if (value == null) { throw new CoderException("cannot encode a null Byte"); @@ -50,7 +50,7 @@ public class ByteCoder extends AtomicCoder<Byte> { } @Override - public Byte decode(InputStream inStream, Context context) + public Byte decode(InputStream inStream) throws IOException, CoderException { try { // value will be between 0-255, -1 for EOF http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java index 86077eb..f51b156 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java @@ -66,12 +66,23 @@ public final class DelegateCoder<T, IntermediateT> extends CustomCoder<T> { } @Override + public void encode(T value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(T value, OutputStream outStream, Context context) throws CoderException, IOException { coder.encode(applyAndWrapExceptions(toFn, value), outStream, context); } @Override + public T decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public T decode(InputStream inStream, Context context) throws CoderException, IOException { return applyAndWrapExceptions(fromFn, coder.decode(inStream, context)); } http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java index 8eff6ba..deb18f2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java @@ -43,7 +43,7 @@ public class DoubleCoder extends AtomicCoder<Double> { private DoubleCoder() {} @Override - public void encode(Double value, OutputStream outStream, Context context) + public void encode(Double value, OutputStream outStream) throws IOException, CoderException { if (value == null) { throw new CoderException("cannot encode a null Double"); @@ -52,7 +52,7 @@ public class DoubleCoder extends AtomicCoder<Double> { } @Override - public Double decode(InputStream inStream, Context context) + public Double decode(InputStream inStream) throws IOException, CoderException { try { return new DataInputStream(inStream).readDouble(); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java index b7db305..90de26f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java @@ -54,7 +54,7 @@ public class DurationCoder extends AtomicCoder<ReadableDuration> { } @Override - public void encode(ReadableDuration value, OutputStream outStream, Context context) + public void encode(ReadableDuration value, OutputStream outStream) throws CoderException, IOException { if (value == null) { throw new CoderException("cannot encode a null ReadableDuration"); @@ -63,7 +63,7 @@ public class DurationCoder extends AtomicCoder<ReadableDuration> { } @Override - public ReadableDuration decode(InputStream inStream, Context context) + public ReadableDuration decode(InputStream inStream) throws CoderException, IOException { return fromLong(LONG_CODER.decode(inStream)); } http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java index 22b11a3..648493e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java @@ -68,7 +68,7 @@ public class InstantCoder extends AtomicCoder<Instant> { } @Override - public void encode(Instant value, OutputStream outStream, Context context) + public void encode(Instant value, OutputStream outStream) throws CoderException, IOException { if (value == null) { throw new CoderException("cannot encode a null Instant"); @@ -77,7 +77,7 @@ public class InstantCoder extends AtomicCoder<Instant> { } @Override - public Instant decode(InputStream inStream, Context context) + public Instant decode(InputStream inStream) throws CoderException, IOException { return ORDER_PRESERVING_CONVERTER.reverse().convert(LONG_CODER.decode(inStream)); } http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java index 59d5424..248c26c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java @@ -84,12 +84,11 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>> @Override public void encode( - IterableT iterable, OutputStream outStream, Context context) + IterableT iterable, OutputStream outStream) throws IOException, CoderException { if (iterable == null) { throw new CoderException("cannot encode a null " + iterableName); } - Context nestedContext = context.nested(); DataOutputStream dataOutStream = new DataOutputStream(outStream); if (iterable instanceof Collection) { // We can know the size of the Iterable. Use an encoding with a @@ -117,9 +116,8 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>> } @Override - public IterableT decode(InputStream inStream, Context context) + public IterableT decode(InputStream inStream) throws IOException, CoderException { - Context nestedContext = context.nested(); DataInputStream dataInStream = new DataInputStream(inStream); int size = dataInStream.readInt(); if (size >= 0) { http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java index 0bb53ec..9c01886 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java @@ -58,6 +58,12 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> { } @Override + public void encode(KV<K, V> kv, OutputStream outStream) + throws IOException, CoderException { + encode(kv, outStream, Context.NESTED); + } + + @Override public void encode(KV<K, V> kv, OutputStream outStream, Context context) throws IOException, CoderException { if (kv == null) { @@ -68,6 +74,11 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> { } @Override + public KV<K, V> decode(InputStream inStream) throws IOException, CoderException { + return decode(inStream, Context.NESTED); + } + + @Override public KV<K, V> decode(InputStream inStream, Context context) throws IOException, CoderException { K key = keyCoder.decode(inStream); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java index 7dd2a32..b24f66d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java @@ -53,7 +53,7 @@ public class LengthPrefixCoder<T> extends StructuredCoder<T> { } @Override - public void encode(T value, OutputStream outStream, Context context) + public void encode(T value, OutputStream outStream) throws CoderException, IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); valueCoder.encode(value, bos, Context.OUTER); @@ -62,7 +62,7 @@ public class LengthPrefixCoder<T> extends StructuredCoder<T> { } @Override - public T decode(InputStream inStream, Context context) throws CoderException, IOException { + public T decode(InputStream inStream) throws CoderException, IOException { long size = VarInt.decodeLong(inStream); return valueCoder.decode(ByteStreams.limit(inStream, size), Context.OUTER); } http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java index f20eb93..d8b3f1c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java @@ -69,6 +69,12 @@ public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> { } @Override + public void encode(Map<K, V> map, OutputStream outStream) + throws IOException, CoderException { + encode(map, outStream, Context.NESTED); + } + + @Override public void encode( Map<K, V> map, OutputStream outStream, @@ -100,6 +106,11 @@ public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> { } @Override + public Map<K, V> decode(InputStream inStream) throws IOException, CoderException { + return decode(inStream, Context.NESTED); + } + + @Override public Map<K, V> decode(InputStream inStream, Context context) throws IOException, CoderException { DataInputStream dataInStream = new DataInputStream(inStream); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java index e46591e..64229e8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java @@ -61,6 +61,12 @@ public class NullableCoder<T> extends StructuredCoder<T> { } @Override + public void encode(@Nullable T value, OutputStream outStream) + throws IOException, CoderException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(@Nullable T value, OutputStream outStream, Context context) throws IOException, CoderException { if (value == null) { @@ -72,6 +78,11 @@ public class NullableCoder<T> extends StructuredCoder<T> { } @Override + public T decode(InputStream inStream) throws IOException, CoderException { + return decode(inStream, Context.NESTED); + } + + @Override @Nullable public T decode(InputStream inStream, Context context) throws IOException, CoderException { int b = inStream.read(); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java index e3b2959..9aa8493 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java @@ -119,7 +119,7 @@ public class SerializableCoder<T extends Serializable> extends CustomCoder<T> { } @Override - public void encode(T value, OutputStream outStream, Context context) + public void encode(T value, OutputStream outStream) throws IOException, CoderException { try { ObjectOutputStream oos = new ObjectOutputStream(outStream); @@ -131,7 +131,7 @@ public class SerializableCoder<T extends Serializable> extends CustomCoder<T> { } @Override - public T decode(InputStream inStream, Context context) + public T decode(InputStream inStream) throws IOException, CoderException { try { ObjectInputStream ois = new ObjectInputStream(inStream); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java index 1f4538f..2161291 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java @@ -100,12 +100,23 @@ public final class StringDelegateCoder<T> extends CustomCoder<T> { } @Override + public void encode(T value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(T value, OutputStream outStream, Context context) throws CoderException, IOException { delegateCoder.encode(value, outStream, context); } @Override + public T decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public T decode(InputStream inStream, Context context) throws CoderException, IOException { return delegateCoder.decode(inStream, context); } http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java index 44856e8..3bbc983 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java @@ -67,6 +67,12 @@ public class StringUtf8Coder extends AtomicCoder<String> { private StringUtf8Coder() {} @Override + public void encode(String value, OutputStream outStream) + throws IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(String value, OutputStream outStream, Context context) throws IOException { if (value == null) { @@ -85,6 +91,11 @@ public class StringUtf8Coder extends AtomicCoder<String> { } @Override + public String decode(InputStream inStream) throws IOException { + return decode(inStream, Context.NESTED); + } + + @Override public String decode(InputStream inStream, Context context) throws IOException { if (context.isWholeStream) { http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java index 718811c..6078fa3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java @@ -39,6 +39,12 @@ public class TextualIntegerCoder extends AtomicCoder<Integer> { protected TextualIntegerCoder() {} @Override + public void encode(Integer value, OutputStream outStream) + throws IOException, CoderException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(Integer value, OutputStream outStream, Context context) throws IOException, CoderException { if (value == null) { @@ -49,6 +55,11 @@ public class TextualIntegerCoder extends AtomicCoder<Integer> { } @Override + public Integer decode(InputStream inStream) throws IOException, CoderException { + return decode(inStream, Context.NESTED); + } + + @Override public Integer decode(InputStream inStream, Context context) throws IOException, CoderException { String textualValue = StringUtf8Coder.of().decode(inStream, context); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java index bda66bb..3a9abe7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java @@ -44,7 +44,7 @@ public class VarIntCoder extends AtomicCoder<Integer> { private VarIntCoder() {} @Override - public void encode(Integer value, OutputStream outStream, Context context) + public void encode(Integer value, OutputStream outStream) throws IOException, CoderException { if (value == null) { throw new CoderException("cannot encode a null Integer"); @@ -53,7 +53,7 @@ public class VarIntCoder extends AtomicCoder<Integer> { } @Override - public Integer decode(InputStream inStream, Context context) + public Integer decode(InputStream inStream) throws IOException, CoderException { try { return VarInt.decodeInt(inStream); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java index bf651c3..37ad8f6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java @@ -45,7 +45,7 @@ public class VarLongCoder extends StructuredCoder<Long> { private VarLongCoder() {} @Override - public void encode(Long value, OutputStream outStream, Context context) + public void encode(Long value, OutputStream outStream) throws IOException, CoderException { if (value == null) { throw new CoderException("cannot encode a null Long"); @@ -54,7 +54,7 @@ public class VarLongCoder extends StructuredCoder<Long> { } @Override - public Long decode(InputStream inStream, Context context) + public Long decode(InputStream inStream) throws IOException, CoderException { try { return VarInt.decodeLong(inStream); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java index 4467faa..3e1ff7f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java @@ -38,12 +38,12 @@ public class VoidCoder extends AtomicCoder<Void> { private VoidCoder() {} @Override - public void encode(Void value, OutputStream outStream, Context context) { + public void encode(Void value, OutputStream outStream) { // Nothing to write! } @Override - public Void decode(InputStream inStream, Context context) { + public Void decode(InputStream inStream) { // Nothing to read! return null; } http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index d8a98cd..3620c22 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -947,6 +947,12 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { } @Override + public void encode(FileResult value, OutputStream outStream) + throws IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(FileResult value, OutputStream outStream, Context context) throws IOException { if (value == null) { @@ -961,6 +967,11 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { } @Override + public FileResult decode(InputStream inStream) throws IOException { + return decode(inStream, Context.NESTED); + } + + @Override public FileResult decode(InputStream inStream, Context context) throws IOException { String filename = stringCoder.decode(inStream); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java index 9b9d3f8..d12d193 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java @@ -679,7 +679,7 @@ public class ApproximateQuantiles { @Override public void encode( - QuantileState<T, ComparatorT> state, OutputStream outStream, Coder.Context context) + QuantileState<T, ComparatorT> state, OutputStream outStream) throws CoderException, IOException { intCoder.encode(state.numQuantiles, outStream); intCoder.encode(state.bufferSize, outStream); @@ -695,7 +695,7 @@ public class ApproximateQuantiles { } @Override - public QuantileState<T, ComparatorT> decode(InputStream inStream, Coder.Context context) + public QuantileState<T, ComparatorT> decode(InputStream inStream) throws CoderException, IOException { int numQuantiles = intCoder.decode(inStream); int bufferSize = intCoder.decode(inStream); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index b9cdbd5..7e43564 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -532,6 +532,12 @@ public class Combine { } @Override + public void encode(Holder<V> accumulator, OutputStream outStream) + throws CoderException, IOException { + encode(accumulator, outStream, Context.NESTED); + } + + @Override public void encode(Holder<V> accumulator, OutputStream outStream, Context context) throws CoderException, IOException { if (accumulator.present) { @@ -543,6 +549,11 @@ public class Combine { } @Override + public Holder<V> decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public Holder<V> decode(InputStream inStream, Context context) throws CoderException, IOException { if (inStream.read() == 1) { @@ -1971,6 +1982,12 @@ public class Combine { } @Override + public void encode(InputOrAccum<InputT, AccumT> value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Coder.Context.NESTED); + } + + @Override public void encode( InputOrAccum<InputT, AccumT> value, OutputStream outStream, Coder.Context context) throws CoderException, IOException { @@ -1984,6 +2001,11 @@ public class Combine { } @Override + public InputOrAccum<InputT, AccumT> decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Coder.Context.NESTED); + } + + @Override public InputOrAccum<InputT, AccumT> decode(InputStream inStream, Coder.Context context) throws CoderException, IOException { if (inStream.read() == 0) { http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java index c45df04..c619783 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java @@ -534,6 +534,12 @@ public class CombineFns { } @Override + public void encode(Object[] value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(Object[] value, OutputStream outStream, Context context) throws CoderException, IOException { checkArgument(value.length == codersCount); @@ -541,7 +547,6 @@ public class CombineFns { return; } int lastIndex = codersCount - 1; - Context nestedContext = context.nested(); for (int i = 0; i < lastIndex; ++i) { coders.get(i).encode(value[i], outStream); } @@ -549,6 +554,11 @@ public class CombineFns { } @Override + public Object[] decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public Object[] decode(InputStream inStream, Context context) throws CoderException, IOException { Object[] ret = new Object[codersCount]; @@ -556,7 +566,6 @@ public class CombineFns { return ret; } int lastIndex = codersCount - 1; - Context nestedContext = context.nested(); for (int i = 0; i < lastIndex; ++i) { ret[i] = coders.get(i).decode(inStream); } http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java index 497d62b..b405dd1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java @@ -169,13 +169,13 @@ public class Count { Coder<T> inputCoder) { return new AtomicCoder<long[]>() { @Override - public void encode(long[] value, OutputStream outStream, Context context) + public void encode(long[] value, OutputStream outStream) throws IOException { VarInt.encode(value[0], outStream); } @Override - public long[] decode(InputStream inStream, Context context) + public long[] decode(InputStream inStream) throws IOException, CoderException { try { return new long[] {VarInt.decodeLong(inStream)}; http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java index c8e0d95..8932b03 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java @@ -185,14 +185,14 @@ public class Mean { private static final Coder<Double> DOUBLE_CODER = DoubleCoder.of(); @Override - public void encode(CountSum<NumT> value, OutputStream outStream, Coder.Context context) + public void encode(CountSum<NumT> value, OutputStream outStream) throws CoderException, IOException { LONG_CODER.encode(value.count, outStream); DOUBLE_CODER.encode(value.sum, outStream); } @Override - public CountSum<NumT> decode(InputStream inStream, Coder.Context context) + public CountSum<NumT> decode(InputStream inStream) throws CoderException, IOException { return new CountSum<>( LONG_CODER.decode(inStream), http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java index 7aec667..dd8bc5f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java @@ -539,13 +539,13 @@ public class Top { @Override public void encode( - BoundedHeap<T, ComparatorT> value, OutputStream outStream, Context context) + BoundedHeap<T, ComparatorT> value, OutputStream outStream) throws CoderException, IOException { listCoder.encode(value.asList(), outStream); } @Override - public BoundedHeap<T, ComparatorT> decode(InputStream inStream, Coder.Context context) + public BoundedHeap<T, ComparatorT> decode(InputStream inStream) throws CoderException, IOException { return new BoundedHeap<>(maximumSize, compareFn, listCoder.decode(inStream)); } http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java index 6603325..d42de82 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java @@ -242,8 +242,7 @@ public class CoGbkResult { @SuppressWarnings("unchecked") public void encode( CoGbkResult value, - OutputStream outStream, - Context context) throws CoderException, + OutputStream outStream) throws CoderException, IOException { if (!schema.equals(value.getSchema())) { throw new CoderException("input schema does not match coder schema"); @@ -258,8 +257,7 @@ public class CoGbkResult { @Override public CoGbkResult decode( - InputStream inStream, - Context context) + InputStream inStream) throws CoderException, IOException { if (schema.size() == 0) { return new CoGbkResult(schema, ImmutableList.<Iterable<?>>of()); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java index 3194a37..66959d3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java @@ -54,6 +54,12 @@ public class UnionCoder extends StructuredCoder<RawUnionValue> { return index; } + @Override + public void encode(RawUnionValue union, OutputStream outStream) + throws IOException, CoderException { + encode(union, outStream, Context.NESTED); + } + @SuppressWarnings("unchecked") @Override public void encode( @@ -74,6 +80,11 @@ public class UnionCoder extends StructuredCoder<RawUnionValue> { } @Override + public RawUnionValue decode(InputStream inStream) throws IOException, CoderException { + return decode(inStream, Context.NESTED); + } + + @Override public RawUnionValue decode(InputStream inStream, Context context) throws IOException, CoderException { int index = VarInt.decodeInt(inStream); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java index 0bfb875..078cbee 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java @@ -67,10 +67,10 @@ public class GlobalWindow extends BoundedWindow { public static final Coder INSTANCE = new Coder(); @Override - public void encode(GlobalWindow window, OutputStream outStream, Context context) {} + public void encode(GlobalWindow window, OutputStream outStream) {} @Override - public GlobalWindow decode(InputStream inStream, Context context) { + public GlobalWindow decode(InputStream inStream) { return GlobalWindow.INSTANCE; } http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java index 318dc4c..f25a208 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java @@ -179,14 +179,14 @@ public class IntervalWindow extends BoundedWindow } @Override - public void encode(IntervalWindow window, OutputStream outStream, Context context) + public void encode(IntervalWindow window, OutputStream outStream) throws IOException, CoderException { instantCoder.encode(window.end, outStream); durationCoder.encode(new Duration(window.start, window.end), outStream); } @Override - public IntervalWindow decode(InputStream inStream, Context context) + public IntervalWindow decode(InputStream inStream) throws IOException, CoderException { Instant end = instantCoder.decode(inStream); ReadableDuration duration = durationCoder.decode(inStream); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java index 79ce2f5..75df220 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java @@ -347,7 +347,7 @@ public final class PaneInfo { private PaneInfoCoder() {} @Override - public void encode(PaneInfo value, final OutputStream outStream, Coder.Context context) + public void encode(PaneInfo value, final OutputStream outStream) throws CoderException, IOException { Encoding encoding = chooseEncoding(value); switch (chooseEncoding(value)) { @@ -369,7 +369,7 @@ public final class PaneInfo { } @Override - public PaneInfo decode(final InputStream inStream, Coder.Context context) + public PaneInfo decode(final InputStream inStream) throws CoderException, IOException { byte keyAndTag = (byte) inStream.read(); PaneInfo base = BYTE_TO_PANE_INFO.get((byte) (keyAndTag & 0x0F)); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java index a0896f5..b202065 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java @@ -43,12 +43,23 @@ public class BitSetCoder extends AtomicCoder<BitSet> { } @Override + public void encode(BitSet value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(BitSet value, OutputStream outStream, Context context) throws CoderException, IOException { BYTE_ARRAY_CODER.encodeAndOwn(value.toByteArray(), outStream, context); } @Override + public BitSet decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public BitSet decode(InputStream inStream, Context context) throws CoderException, IOException { return BitSet.valueOf(BYTE_ARRAY_CODER.decode(inStream, context)); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java index e3e61cf..963886b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java @@ -636,11 +636,16 @@ public abstract class WindowedValue<T> { } @Override + public void encode(WindowedValue<T> windowedElem, OutputStream outStream) + throws CoderException, IOException { + encode(windowedElem, outStream, Context.NESTED); + } + + @Override public void encode(WindowedValue<T> windowedElem, OutputStream outStream, Context context) throws CoderException, IOException { - Context nestedContext = context.nested(); InstantCoder.of().encode( windowedElem.getTimestamp(), outStream, nestedContext); windowsCoder.encode(windowedElem.getWindows(), outStream); @@ -649,9 +654,13 @@ public abstract class WindowedValue<T> { } @Override + public WindowedValue<T> decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public WindowedValue<T> decode(InputStream inStream, Context context) throws CoderException, IOException { - Context nestedContext = context.nested(); Instant timestamp = InstantCoder.of().decode(inStream); Collection<? extends BoundedWindow> windows = windowsCoder.decode(inStream); @@ -710,12 +719,23 @@ public abstract class WindowedValue<T> { } @Override + public void encode(WindowedValue<T> windowedElem, OutputStream outStream) + throws CoderException, IOException { + encode(windowedElem, outStream, Context.NESTED); + } + + @Override public void encode(WindowedValue<T> windowedElem, OutputStream outStream, Context context) throws CoderException, IOException { valueCoder.encode(windowedElem.getValue(), outStream, context); } @Override + public WindowedValue<T> decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public WindowedValue<T> decode(InputStream inStream, Context context) throws CoderException, IOException { T value = valueCoder.decode(inStream, context); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java index 95a3152..a4c8b3f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java @@ -101,8 +101,7 @@ public class TimestampedValue<V> { @Override public void encode(TimestampedValue<T> windowedElem, - OutputStream outStream, - Context context) + OutputStream outStream) throws IOException { valueCoder.encode(windowedElem.getValue(), outStream); InstantCoder.of().encode( @@ -110,7 +109,7 @@ public class TimestampedValue<V> { } @Override - public TimestampedValue<T> decode(InputStream inStream, Context context) + public TimestampedValue<T> decode(InputStream inStream) throws IOException { T value = valueCoder.decode(inStream); Instant timestamp = InstantCoder.of().decode(inStream); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java index e8a2dfd..24c3c38 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java @@ -75,9 +75,14 @@ public abstract class ValueInSingleWindow<T> { } @Override + public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream) + throws IOException { + encode(windowedElem, outStream, Context.NESTED); + } + + @Override public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream, Context context) throws IOException { - Context nestedContext = context.nested(); InstantCoder.of().encode(windowedElem.getTimestamp(), outStream); windowCoder.encode(windowedElem.getWindow(), outStream); PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream); @@ -85,8 +90,12 @@ public abstract class ValueInSingleWindow<T> { } @Override + public ValueInSingleWindow<T> decode(InputStream inStream) throws IOException { + return decode(inStream, Context.NESTED); + } + + @Override public ValueInSingleWindow<T> decode(InputStream inStream, Context context) throws IOException { - Context nestedContext = context.nested(); Instant timestamp = InstantCoder.of().decode(inStream); BoundedWindow window = windowCoder.decode(inStream); PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java index f06317b..96a5f1d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java @@ -101,6 +101,12 @@ public class ValueWithRecordId<ValueT> { } @Override + public void encode(ValueWithRecordId<ValueT> value, OutputStream outStream) + throws IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(ValueWithRecordId<ValueT> value, OutputStream outStream, Context context) throws IOException { valueCoder.encode(value.value, outStream); @@ -108,6 +114,11 @@ public class ValueWithRecordId<ValueT> { } @Override + public ValueWithRecordId<ValueT> decode(InputStream inStream) throws IOException { + return decode(inStream, Context.NESTED); + } + + @Override public ValueWithRecordId<ValueT> decode(InputStream inStream, Context context) throws IOException { return new ValueWithRecordId<ValueT>( http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java index 7ca7fb9..c883ca0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java @@ -81,12 +81,12 @@ public class CoderRegistryTest { @SuppressWarnings("rawtypes") // this class exists to fail a test because of its rawtypes private class MyListCoder extends AtomicCoder<List> { @Override - public void encode(List value, OutputStream outStream, Context context) + public void encode(List value, OutputStream outStream) throws CoderException, IOException { } @Override - public List decode(InputStream inStream, Context context) + public List decode(InputStream inStream) throws CoderException, IOException { return Collections.emptyList(); } @@ -375,12 +375,12 @@ public class CoderRegistryTest { } @Override - public void encode(MyValue value, OutputStream outStream, Context context) + public void encode(MyValue value, OutputStream outStream) throws CoderException, IOException { } @Override - public MyValue decode(InputStream inStream, Context context) + public MyValue decode(InputStream inStream) throws CoderException, IOException { return new MyValue(); }
