[BEAM-2166] Use contextless encode/decode by default.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/09d1affd Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/09d1affd Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/09d1affd Branch: refs/heads/release-2.0.0 Commit: 09d1affd5d64cc3693eddc8e54c5e0c7f90069b2 Parents: f476d8a Author: Robert Bradshaw <[email protected]> Authored: Fri May 5 16:20:37 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Tue May 9 10:46:22 2017 -0700 ---------------------------------------------------------------------- .../apex/translation/utils/ApexStreamTuple.java | 11 +++++ .../UnboundedReadFromBoundedSource.java | 12 ++--- .../runners/core/construction/CodersTest.java | 4 +- .../core/construction/PCollectionsTest.java | 12 ++--- .../core/ElementAndRestrictionCoder.java | 12 ++--- .../beam/runners/core/KeyedWorkItemCoder.java | 18 +++---- .../beam/runners/core/TimerInternals.java | 22 ++++----- .../direct/CloningBundleFactoryTest.java | 20 +++----- .../beam/runners/direct/DirectRunnerTest.java | 4 +- .../UnboundedReadEvaluatorFactoryTest.java | 5 +- .../translation/types/CoderTypeSerializer.java | 4 +- .../streaming/SingletonKeyedWorkItemCoder.java | 16 +++++- .../state/FlinkKeyGroupStateInternals.java | 9 ++-- .../runners/dataflow/BatchViewOverrides.java | 16 +++--- .../runners/dataflow/internal/IsmFormat.java | 51 ++++++++++---------- .../runners/dataflow/util/RandomAccessData.java | 11 +++++ .../runners/dataflow/util/CloudObjectsTest.java | 8 +-- .../spark/aggregators/NamedAggregators.java | 4 +- .../beam/sdk/annotations/Experimental.java | 3 ++ .../org/apache/beam/sdk/coders/AvroCoder.java | 4 +- .../apache/beam/sdk/coders/BigDecimalCoder.java | 15 +++++- .../beam/sdk/coders/BigEndianIntegerCoder.java | 4 +- .../beam/sdk/coders/BigEndianLongCoder.java | 4 +- .../apache/beam/sdk/coders/BigIntegerCoder.java | 11 +++++ .../org/apache/beam/sdk/coders/BitSetCoder.java | 11 +++++ .../apache/beam/sdk/coders/ByteArrayCoder.java | 11 +++++ .../org/apache/beam/sdk/coders/ByteCoder.java | 4 +- .../java/org/apache/beam/sdk/coders/Coder.java | 38 +++++---------- .../org/apache/beam/sdk/coders/CustomCoder.java | 47 ------------------ .../apache/beam/sdk/coders/DelegateCoder.java | 11 +++++ .../org/apache/beam/sdk/coders/DoubleCoder.java | 4 +- .../apache/beam/sdk/coders/DurationCoder.java | 8 +-- .../apache/beam/sdk/coders/InstantCoder.java | 8 +-- .../beam/sdk/coders/IterableLikeCoder.java | 14 +++--- .../org/apache/beam/sdk/coders/KvCoder.java | 15 +++++- .../beam/sdk/coders/LengthPrefixCoder.java | 4 +- .../org/apache/beam/sdk/coders/MapCoder.java | 23 ++++++--- .../apache/beam/sdk/coders/NullableCoder.java | 11 +++++ .../beam/sdk/coders/SerializableCoder.java | 4 +- .../beam/sdk/coders/StringDelegateCoder.java | 11 +++++ .../apache/beam/sdk/coders/StringUtf8Coder.java | 11 +++++ .../apache/beam/sdk/coders/StructuredCoder.java | 47 ------------------ .../beam/sdk/coders/TextualIntegerCoder.java | 11 +++++ .../org/apache/beam/sdk/coders/VarIntCoder.java | 4 +- .../apache/beam/sdk/coders/VarLongCoder.java | 4 +- .../org/apache/beam/sdk/coders/VoidCoder.java | 4 +- .../org/apache/beam/sdk/io/FileBasedSink.java | 15 +++--- .../sdk/transforms/ApproximateQuantiles.java | 44 ++++++++--------- .../org/apache/beam/sdk/transforms/Combine.java | 23 +++++++++ .../apache/beam/sdk/transforms/CombineFns.java | 17 +++++-- .../org/apache/beam/sdk/transforms/Count.java | 4 +- .../org/apache/beam/sdk/transforms/Mean.java | 12 ++--- .../org/apache/beam/sdk/transforms/Top.java | 8 +-- .../beam/sdk/transforms/join/CoGbkResult.java | 18 +++---- .../beam/sdk/transforms/join/UnionCoder.java | 11 +++++ .../sdk/transforms/windowing/GlobalWindow.java | 4 +- .../transforms/windowing/IntervalWindow.java | 12 ++--- .../beam/sdk/transforms/windowing/PaneInfo.java | 5 +- .../org/apache/beam/sdk/util/BitSetCoder.java | 11 +++++ .../org/apache/beam/sdk/util/WindowedValue.java | 37 ++++++++++---- .../beam/sdk/values/TimestampedValue.java | 13 +++-- .../beam/sdk/values/ValueInSingleWindow.java | 25 +++++++--- .../beam/sdk/values/ValueWithRecordId.java | 15 +++++- .../beam/sdk/coders/CoderRegistryTest.java | 16 ++++-- .../apache/beam/sdk/coders/CustomCoderTest.java | 4 +- .../beam/sdk/coders/NullableCoderTest.java | 11 +++++ .../beam/sdk/coders/SerializableCoderTest.java | 28 +++++------ .../beam/sdk/coders/StructuredCoderTest.java | 12 ++--- .../beam/sdk/testing/CoderPropertiesTest.java | 36 +++++++------- .../apache/beam/sdk/testing/PAssertTest.java | 4 +- .../sdk/testing/SerializableMatchersTest.java | 5 +- .../beam/sdk/testing/WindowSupplierTest.java | 4 +- .../beam/sdk/transforms/CombineFnsTest.java | 11 +++++ .../apache/beam/sdk/transforms/CombineTest.java | 25 +++++++--- .../apache/beam/sdk/transforms/CreateTest.java | 13 +++-- .../beam/sdk/transforms/GroupByKeyTest.java | 4 +- .../apache/beam/sdk/transforms/ParDoTest.java | 12 ++--- .../apache/beam/sdk/transforms/ViewTest.java | 11 +++++ .../transforms/reflect/DoFnInvokersTest.java | 8 +-- .../transforms/windowing/GlobalWindowTest.java | 2 +- ...BufferedElementCountingOutputStreamTest.java | 5 +- .../apache/beam/sdk/util/CoderUtilsTest.java | 4 +- .../beam/sdk/util/SerializableUtilsTest.java | 4 +- .../extensions/protobuf/ByteStringCoder.java | 11 +++++ .../sdk/extensions/protobuf/ProtoCoder.java | 11 +++++ .../BeamFnDataBufferingOutboundObserver.java | 3 +- .../harness/data/BeamFnDataInboundObserver.java | 3 +- ...BeamFnDataBufferingOutboundObserverTest.java | 3 +- .../data/BeamFnDataInboundObserverTest.java | 3 +- .../sdk/io/gcp/bigquery/ShardedKeyCoder.java | 11 +++-- .../io/gcp/bigquery/TableDestinationCoder.java | 12 ++--- .../sdk/io/gcp/bigquery/TableRowInfoCoder.java | 15 +++++- .../sdk/io/gcp/bigquery/TableRowJsonCoder.java | 11 +++++ .../io/gcp/bigquery/WriteBundlesToFiles.java | 16 +++--- .../pubsub/PubsubMessagePayloadOnlyCoder.java | 11 +++++ .../PubsubMessageWithAttributesCoder.java | 15 +++++- .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 20 ++++---- .../io/gcp/pubsub/PubsubUnboundedSource.java | 13 +++-- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 11 +++++ .../beam/sdk/io/hadoop/WritableCoder.java | 4 +- .../beam/sdk/io/hbase/HBaseMutationCoder.java | 6 +-- .../beam/sdk/io/hbase/HBaseResultCoder.java | 4 +- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 4 +- .../beam/sdk/io/kafka/KafkaRecordCoder.java | 11 +++++ .../beam/sdk/io/kinesis/KinesisRecordCoder.java | 38 +++++++-------- .../org/apache/beam/sdk/io/xml/JAXBCoder.java | 25 ++++++---- .../apache/beam/sdk/io/xml/JAXBCoderTest.java | 21 +++++--- 107 files changed, 809 insertions(+), 551 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java index 4aa6ee8..1d402eb 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java @@ -162,6 +162,12 @@ public interface ApexStreamTuple<T> { } @Override + public void encode(ApexStreamTuple<T> value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(ApexStreamTuple<T> value, OutputStream outStream, Context context) throws CoderException, IOException { if (value instanceof WatermarkTuple) { @@ -175,6 +181,11 @@ public interface ApexStreamTuple<T> { } @Override + public ApexStreamTuple<T> decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public ApexStreamTuple<T> decode(InputStream inStream, Context context) throws CoderException, IOException { int b = inStream.read(); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java index 1424b8b..24eb384 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java @@ -221,19 +221,19 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle } @Override - public void encode(Checkpoint<T> value, OutputStream outStream, Context context) + public void encode(Checkpoint<T> value, OutputStream outStream) throws CoderException, IOException { - elemsCoder.encode(value.residualElements, outStream, context.nested()); - sourceCoder.encode(value.residualSource, outStream, context); + elemsCoder.encode(value.residualElements, outStream); + sourceCoder.encode(value.residualSource, outStream); } @SuppressWarnings("unchecked") @Override - public Checkpoint<T> decode(InputStream inStream, Context context) + public Checkpoint<T> decode(InputStream inStream) throws CoderException, IOException { return new Checkpoint<>( - elemsCoder.decode(inStream, context.nested()), - sourceCoder.decode(inStream, context)); + elemsCoder.decode(inStream), + sourceCoder.decode(inStream)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java index 765723c..42fba7c 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java @@ -151,11 +151,11 @@ public class CodersTest { private static class RecordCoder extends AtomicCoder<Record> { @Override - public void encode(Record value, OutputStream outStream, Context context) + public void encode(Record value, OutputStream outStream) throws CoderException, IOException {} @Override - public Record decode(InputStream inStream, Context context) + public Record decode(InputStream inStream) throws CoderException, IOException { return new Record(); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java index 2c45cbd..c38dbc0 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java @@ -130,13 +130,13 @@ public class PCollectionsTest { @AutoValue abstract static class CustomIntCoder extends CustomCoder<Integer> { @Override - public void encode(Integer value, OutputStream outStream, Context context) throws IOException { - VarInt.encode(value, outStream); + public Integer decode(InputStream inStream) throws IOException { + return VarInt.decodeInt(inStream); } @Override - public Integer decode(InputStream inStream, Context context) throws IOException { - return VarInt.decodeInt(inStream); + public void encode(Integer value, OutputStream outStream) throws IOException { + VarInt.encode(value, outStream); } } @@ -163,13 +163,13 @@ public class PCollectionsTest { @Override public void verifyDeterministic() {} @Override - public void encode(BoundedWindow value, OutputStream outStream, Context context) + public void encode(BoundedWindow value, OutputStream outStream) throws IOException { VarInt.encode(value.maxTimestamp().getMillis(), outStream); } @Override - public BoundedWindow decode(InputStream inStream, Context context) throws IOException { + public BoundedWindow decode(InputStream inStream) throws IOException { final Instant ts = new Instant(VarInt.decodeLong(inStream)); return new BoundedWindow() { @Override http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java index 83c4e62..4440b85 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java @@ -50,20 +50,20 @@ public class ElementAndRestrictionCoder<ElementT, RestrictionT> @Override public void encode( - ElementAndRestriction<ElementT, RestrictionT> value, OutputStream outStream, Context context) + ElementAndRestriction<ElementT, RestrictionT> value, OutputStream outStream) throws IOException { if (value == null) { throw new CoderException("cannot encode a null ElementAndRestriction"); } - elementCoder.encode(value.element(), outStream, context.nested()); - restrictionCoder.encode(value.restriction(), outStream, context); + elementCoder.encode(value.element(), outStream); + restrictionCoder.encode(value.restriction(), outStream); } @Override - public ElementAndRestriction<ElementT, RestrictionT> decode(InputStream inStream, Context context) + public ElementAndRestriction<ElementT, RestrictionT> decode(InputStream inStream) throws IOException { - ElementT key = elementCoder.decode(inStream, context.nested()); - RestrictionT value = restrictionCoder.decode(inStream, context); + ElementT key = elementCoder.decode(inStream); + RestrictionT value = restrictionCoder.decode(inStream); return ElementAndRestriction.of(key, value); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java index e1872b5..b1cb1a6 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java @@ -69,21 +69,19 @@ public class KeyedWorkItemCoder<K, ElemT> extends StructuredCoder<KeyedWorkItem< } @Override - public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream, Coder.Context context) + public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream) throws CoderException, IOException { - Coder.Context nestedContext = context.nested(); - keyCoder.encode(value.key(), outStream, nestedContext); - timersCoder.encode(value.timersIterable(), outStream, nestedContext); - elemsCoder.encode(value.elementsIterable(), outStream, context); + keyCoder.encode(value.key(), outStream); + timersCoder.encode(value.timersIterable(), outStream); + elemsCoder.encode(value.elementsIterable(), outStream); } @Override - public KeyedWorkItem<K, ElemT> decode(InputStream inStream, Coder.Context context) + public KeyedWorkItem<K, ElemT> decode(InputStream inStream) throws CoderException, IOException { - Coder.Context nestedContext = context.nested(); - K key = keyCoder.decode(inStream, nestedContext); - Iterable<TimerData> timers = timersCoder.decode(inStream, nestedContext); - Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream, context); + K key = keyCoder.decode(inStream); + Iterable<TimerData> timers = timersCoder.decode(inStream); + Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream); return KeyedWorkItems.workItem(key, timers, elems); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java index 888c11f..f4a12d0 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java @@ -238,24 +238,22 @@ public interface TimerInternals { } @Override - public void encode(TimerData timer, OutputStream outStream, Context context) + public void encode(TimerData timer, OutputStream outStream) throws CoderException, IOException { - Context nestedContext = context.nested(); - STRING_CODER.encode(timer.getTimerId(), outStream, nestedContext); - STRING_CODER.encode(timer.getNamespace().stringKey(), outStream, nestedContext); - INSTANT_CODER.encode(timer.getTimestamp(), outStream, nestedContext); - STRING_CODER.encode(timer.getDomain().name(), outStream, context); + STRING_CODER.encode(timer.getTimerId(), outStream); + STRING_CODER.encode(timer.getNamespace().stringKey(), outStream); + INSTANT_CODER.encode(timer.getTimestamp(), outStream); + STRING_CODER.encode(timer.getDomain().name(), outStream); } @Override - public TimerData decode(InputStream inStream, Context context) + public TimerData decode(InputStream inStream) throws CoderException, IOException { - Context nestedContext = context.nested(); - String timerId = STRING_CODER.decode(inStream, nestedContext); + String timerId = STRING_CODER.decode(inStream); StateNamespace namespace = - StateNamespaces.fromString(STRING_CODER.decode(inStream, nestedContext), windowCoder); - Instant timestamp = INSTANT_CODER.decode(inStream, nestedContext); - TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream, context)); + StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder); + Instant timestamp = INSTANT_CODER.decode(inStream); + TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream)); return TimerData.of(timerId, namespace, timestamp, domain); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java index 33d171e..5bc8077 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java @@ -178,15 +178,14 @@ public class CloningBundleFactoryTest { @Override public void encode( Record value, - OutputStream outStream, - org.apache.beam.sdk.coders.Coder.Context context) + OutputStream outStream) throws IOException { throw new CoderException("Encode not allowed"); } @Override public Record decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) + InputStream inStream) throws IOException { return null; } @@ -196,13 +195,12 @@ public class CloningBundleFactoryTest { @Override public void encode( Record value, - OutputStream outStream, - org.apache.beam.sdk.coders.Coder.Context context) + OutputStream outStream) throws IOException {} @Override public Record decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) + InputStream inStream) throws IOException { throw new CoderException("Decode not allowed"); } @@ -212,13 +210,12 @@ public class CloningBundleFactoryTest { @Override public void encode( Record value, - OutputStream outStream, - org.apache.beam.sdk.coders.Coder.Context context) + OutputStream outStream) throws CoderException, IOException {} @Override public Record decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) + InputStream inStream) throws CoderException, IOException { return new Record() { @Override @@ -244,13 +241,12 @@ public class CloningBundleFactoryTest { @Override public void encode( Record value, - OutputStream outStream, - org.apache.beam.sdk.coders.Coder.Context context) + OutputStream outStream) throws CoderException, IOException {} @Override public Record decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) + InputStream inStream) throws CoderException, IOException { return new Record() { @Override http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index 85e55eb..943d27c 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -526,11 +526,11 @@ public class DirectRunnerTest implements Serializable { private static class LongNoDecodeCoder extends AtomicCoder<Long> { @Override public void encode( - Long value, OutputStream outStream, Context context) throws IOException { + Long value, OutputStream outStream) throws IOException { } @Override - public Long decode(InputStream inStream, Context context) throws IOException { + public Long decode(InputStream inStream) throws IOException { throw new CoderException("Cannot decode a long"); } } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index b9ba7f4..2a01db5 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -590,15 +590,14 @@ public class UnboundedReadEvaluatorFactoryTest { @Override public void encode( TestCheckpointMark value, - OutputStream outStream, - org.apache.beam.sdk.coders.Coder.Context context) + OutputStream outStream) throws IOException { VarInt.encode(value.index, outStream); } @Override public TestCheckpointMark decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) + InputStream inStream) throws IOException { TestCheckpointMark decoded = new TestCheckpointMark(VarInt.decodeInt(inStream)); decoded.decoded = true; http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java index e210ed9..e003119 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java @@ -77,14 +77,14 @@ public class CoderTypeSerializer<T> extends TypeSerializer<T> { @Override public void serialize(T t, DataOutputView dataOutputView) throws IOException { DataOutputViewWrapper outputWrapper = new DataOutputViewWrapper(dataOutputView); - coder.encode(t, outputWrapper, Coder.Context.NESTED); + coder.encode(t, outputWrapper); } @Override public T deserialize(DataInputView dataInputView) throws IOException { try { DataInputViewWrapper inputWrapper = new DataInputViewWrapper(dataInputView); - return coder.decode(inputWrapper, Coder.Context.NESTED); + return coder.decode(inputWrapper); } catch (CoderException e) { Throwable cause = e.getCause(); if (cause instanceof EOFException) { http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java index f218693..2ed2055 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java @@ -66,18 +66,30 @@ public class SingletonKeyedWorkItemCoder<K, ElemT> } @Override + public void encode(SingletonKeyedWorkItem<K, ElemT> value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(SingletonKeyedWorkItem<K, ElemT> value, OutputStream outStream, Context context) throws CoderException, IOException { - keyCoder.encode(value.key(), outStream, context.nested()); + keyCoder.encode(value.key(), outStream); valueCoder.encode(value.value, outStream, context); } @Override + public SingletonKeyedWorkItem<K, ElemT> decode(InputStream inStream) + throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public SingletonKeyedWorkItem<K, ElemT> decode(InputStream inStream, Context context) throws CoderException, IOException { - K key = keyCoder.decode(inStream, context.nested()); + K key = keyCoder.decode(inStream); WindowedValue<ElemT> value = valueCoder.decode(inStream, context); return new SingletonKeyedWorkItem<>(key, value); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java index d6af4f9..512e4ef 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java @@ -31,7 +31,6 @@ import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateTag; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -430,8 +429,8 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals { Map<String, ?> map = entry.getValue().f1; out.writeInt(map.size()); for (Map.Entry<String, ?> entry1 : map.entrySet()) { - StringUtf8Coder.of().encode(entry1.getKey(), out, Context.NESTED); - coder.encode(entry1.getValue(), out, Context.NESTED); + StringUtf8Coder.of().encode(entry1.getKey(), out); + coder.encode(entry1.getValue(), out); } } } @@ -463,8 +462,8 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals { Map<String, Object> map = (Map<String, Object>) tuple2.f1; int mapSize = in.readInt(); for (int j = 0; j < mapSize; j++) { - String namespace = StringUtf8Coder.of().decode(in, Context.NESTED); - Object value = coder.decode(in, Context.NESTED); + String namespace = StringUtf8Coder.of().decode(in); + Object value = coder.decode(in); map.put(namespace, value); } } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index ecd0365..b4a6e64 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -1351,18 +1351,18 @@ class BatchViewOverrides { } @Override - public void encode(TransformedMap<K, V1, V2> value, OutputStream outStream, - Coder.Context context) throws CoderException, IOException { - transformCoder.encode(value.transform, outStream, context.nested()); - originalMapCoder.encode(value.originalMap, outStream, context); + public void encode(TransformedMap<K, V1, V2> value, OutputStream outStream) + throws CoderException, IOException { + transformCoder.encode(value.transform, outStream); + originalMapCoder.encode(value.originalMap, outStream); } @Override - public TransformedMap<K, V1, V2> decode( - InputStream inStream, Coder.Context context) throws CoderException, IOException { + public TransformedMap<K, V1, V2> decode(InputStream inStream) + throws CoderException, IOException { return new TransformedMap<>( - transformCoder.decode(inStream, context.nested()), - originalMapCoder.decode(inStream, context)); + transformCoder.decode(inStream), + originalMapCoder.decode(inStream)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java index 00e0c54..81ac2a2 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java @@ -231,35 +231,35 @@ public class IsmFormat { } @Override - public void encode(IsmRecord<V> value, OutputStream outStream, - Coder.Context context) throws CoderException, IOException { + public void encode(IsmRecord<V> value, OutputStream outStream) + throws CoderException, IOException { if (value.getKeyComponents().size() != keyComponentCoders.size()) { throw new CoderException(String.format( "Expected %s key component(s) but received key component(s) %s.", keyComponentCoders.size(), value.getKeyComponents())); } for (int i = 0; i < keyComponentCoders.size(); ++i) { - getKeyComponentCoder(i).encode(value.getKeyComponent(i), outStream, context.nested()); + getKeyComponentCoder(i).encode(value.getKeyComponent(i), outStream); } if (isMetadataKey(value.getKeyComponents())) { - ByteArrayCoder.of().encode(value.getMetadata(), outStream, context.nested()); + ByteArrayCoder.of().encode(value.getMetadata(), outStream); } else { - valueCoder.encode(value.getValue(), outStream, context.nested()); + valueCoder.encode(value.getValue(), outStream); } } @Override - public IsmRecord<V> decode(InputStream inStream, Coder.Context context) + public IsmRecord<V> decode(InputStream inStream) throws CoderException, IOException { List<Object> keyComponents = new ArrayList<>(keyComponentCoders.size()); for (Coder<?> keyCoder : keyComponentCoders) { - keyComponents.add(keyCoder.decode(inStream, context.nested())); + keyComponents.add(keyCoder.decode(inStream)); } if (isMetadataKey(keyComponents)) { return IsmRecord.<V>meta( - keyComponents, ByteArrayCoder.of().decode(inStream, context.nested())); + keyComponents, ByteArrayCoder.of().decode(inStream)); } else { - return IsmRecord.<V>of(keyComponents, valueCoder.decode(inStream, context.nested())); + return IsmRecord.<V>of(keyComponents, valueCoder.decode(inStream)); } } @@ -493,24 +493,24 @@ public class IsmFormat { } @Override - public void encode(K value, OutputStream outStream, Coder.Context context) + public void encode(K value, OutputStream outStream) throws CoderException, IOException { if (value == METADATA_KEY) { outStream.write(0); } else { outStream.write(1); - keyCoder.encode(value, outStream, context.nested()); + keyCoder.encode(value, outStream); } } @Override - public K decode(InputStream inStream, Coder.Context context) + public K decode(InputStream inStream) throws CoderException, IOException { int marker = inStream.read(); if (marker == 0) { return (K) getMetadataKey(); } else if (marker == 1) { - return keyCoder.decode(inStream, context.nested()); + return keyCoder.decode(inStream); } else { throw new CoderException(String.format("Expected marker but got %s.", marker)); } @@ -621,23 +621,22 @@ public class IsmFormat { private IsmShardCoder() {} @Override - public void encode(IsmShard value, OutputStream outStream, Coder.Context context) + public void encode(IsmShard value, OutputStream outStream) throws CoderException, IOException { checkState(value.getIndexOffset() >= 0, "%s attempting to be written without index offset.", value); - VarIntCoder.of().encode(value.getId(), outStream, context.nested()); - VarLongCoder.of().encode(value.getBlockOffset(), outStream, context.nested()); - VarLongCoder.of().encode(value.getIndexOffset(), outStream, context); + VarIntCoder.of().encode(value.getId(), outStream); + VarLongCoder.of().encode(value.getBlockOffset(), outStream); + VarLongCoder.of().encode(value.getIndexOffset(), outStream); } @Override - public IsmShard decode( - InputStream inStream, Coder.Context context) throws CoderException, IOException { + public IsmShard decode(InputStream inStream) throws CoderException, IOException { return IsmShard.of( - VarIntCoder.of().decode(inStream, context.nested()), - VarLongCoder.of().decode(inStream, context.nested()), - VarLongCoder.of().decode(inStream, context)); + VarIntCoder.of().decode(inStream), + VarLongCoder.of().decode(inStream), + VarLongCoder.of().decode(inStream)); } @Override @@ -683,14 +682,14 @@ public class IsmFormat { } @Override - public void encode(KeyPrefix value, OutputStream outStream, Coder.Context context) + public void encode(KeyPrefix value, OutputStream outStream) throws CoderException, IOException { VarInt.encode(value.getSharedKeySize(), outStream); VarInt.encode(value.getUnsharedKeySize(), outStream); } @Override - public KeyPrefix decode(InputStream inStream, Coder.Context context) + public KeyPrefix decode(InputStream inStream) throws CoderException, IOException { return KeyPrefix.of(VarInt.decodeInt(inStream), VarInt.decodeInt(inStream)); } @@ -755,7 +754,7 @@ public class IsmFormat { } @Override - public void encode(Footer value, OutputStream outStream, Coder.Context context) + public void encode(Footer value, OutputStream outStream) throws CoderException, IOException { DataOutputStream dataOut = new DataOutputStream(outStream); dataOut.writeLong(value.getIndexPosition()); @@ -765,7 +764,7 @@ public class IsmFormat { } @Override - public Footer decode(InputStream inStream, Coder.Context context) + public Footer decode(InputStream inStream) throws CoderException, IOException { DataInputStream dataIn = new DataInputStream(inStream); Footer footer = Footer.of(dataIn.readLong(), dataIn.readLong(), dataIn.readLong()); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java index f36bd78..5ea9f07 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java @@ -63,6 +63,12 @@ public class RandomAccessData { } @Override + public void encode(RandomAccessData value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Coder.Context.NESTED); + } + + @Override public void encode(RandomAccessData value, OutputStream outStream, Coder.Context context) throws CoderException, IOException { if (value == POSITIVE_INFINITY) { @@ -75,6 +81,11 @@ public class RandomAccessData { } @Override + public RandomAccessData decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Coder.Context.NESTED); + } + + @Override public RandomAccessData decode(InputStream inStream, Coder.Context context) throws CoderException, IOException { RandomAccessData rval = new RandomAccessData(); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java index 64c0dbd..59a5431 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java @@ -171,12 +171,12 @@ public class CloudObjectsTest { private static class ObjectCoder extends CustomCoder<Object> { @Override - public void encode(Object value, OutputStream outStream, Context context) + public void encode(Object value, OutputStream outStream) throws CoderException, IOException { } @Override - public Object decode(InputStream inStream, Context context) + public Object decode(InputStream inStream) throws CoderException, IOException { return new Object(); } @@ -197,11 +197,11 @@ public class CloudObjectsTest { */ private static class ArbitraryCoder extends StructuredCoder<Record> { @Override - public void encode(Record value, OutputStream outStream, Context context) + public void encode(Record value, OutputStream outStream) throws CoderException, IOException {} @Override - public Record decode(InputStream inStream, Context context) throws CoderException, IOException { + public Record decode(InputStream inStream) throws CoderException, IOException { return new Record(); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java index c836ca5..27f2ec8 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java @@ -207,7 +207,7 @@ public class NamedAggregators implements Serializable { oos.writeObject(inCoder); try { combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder) - .encode(state, oos, Coder.Context.NESTED); + .encode(state, oos); } catch (CannotProvideCoderException e) { throw new IllegalStateException("Could not determine coder for accumulator", e); } @@ -220,7 +220,7 @@ public class NamedAggregators implements Serializable { inCoder = (Coder<InputT>) ois.readObject(); try { state = combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder) - .decode(ois, Coder.Context.NESTED); + .decode(ois); } catch (CannotProvideCoderException e) { throw new IllegalStateException("Could not determine coder for accumulator", e); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java index 7255a01..2e3a711 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java @@ -84,6 +84,9 @@ public @interface Experimental { /** Metrics-related experimental APIs. */ METRICS, + /** Experimental feature related to alternative, unnested encodings for coders. */ + CODER_CONTEXT, + /** Experimental runner APIs. Should not be used by pipeline authors. */ CORE_RUNNERS_ONLY, http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java index f82c065..bba669d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java @@ -300,7 +300,7 @@ public class AvroCoder<T> extends CustomCoder<T> { } @Override - public void encode(T value, OutputStream outStream, Context context) throws IOException { + public void encode(T value, OutputStream outStream) throws IOException { // Get a BinaryEncoder instance from the ThreadLocal cache and attempt to reuse it. BinaryEncoder encoderInstance = ENCODER_FACTORY.directBinaryEncoder(outStream, encoder.get()); // Save the potentially-new instance for reuse later. @@ -310,7 +310,7 @@ public class AvroCoder<T> extends CustomCoder<T> { } @Override - public T decode(InputStream inStream, Context context) throws IOException { + public T decode(InputStream inStream) throws IOException { // Get a BinaryDecoder instance from the ThreadLocal cache and attempt to reuse it. BinaryDecoder decoderInstance = DECODER_FACTORY.directBinaryDecoder(inStream, decoder.get()); // Save the potentially-new instance for later. http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java index 97559a9..e890d11 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java @@ -48,17 +48,28 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> { private BigDecimalCoder() {} @Override + public void encode(BigDecimal value, OutputStream outStream) + throws IOException, CoderException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(BigDecimal value, OutputStream outStream, Context context) throws IOException, CoderException { checkNotNull(value, String.format("cannot encode a null %s", BigDecimal.class.getSimpleName())); - VAR_INT_CODER.encode(value.scale(), outStream, context.nested()); + VAR_INT_CODER.encode(value.scale(), outStream); BIG_INT_CODER.encode(value.unscaledValue(), outStream, context); } @Override + public BigDecimal decode(InputStream inStream) throws IOException, CoderException { + return decode(inStream, Context.NESTED); + } + + @Override public BigDecimal decode(InputStream inStream, Context context) throws IOException, CoderException { - int scale = VAR_INT_CODER.decode(inStream, context.nested()); + int scale = VAR_INT_CODER.decode(inStream); BigInteger bigInteger = BIG_INT_CODER.decode(inStream, context); return new BigDecimal(bigInteger, scale); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java index a61f099..efb1e4b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java @@ -43,7 +43,7 @@ public class BigEndianIntegerCoder extends AtomicCoder<Integer> { private BigEndianIntegerCoder() {} @Override - public void encode(Integer value, OutputStream outStream, Context context) + public void encode(Integer value, OutputStream outStream) throws IOException, CoderException { if (value == null) { throw new CoderException("cannot encode a null Integer"); @@ -52,7 +52,7 @@ public class BigEndianIntegerCoder extends AtomicCoder<Integer> { } @Override - public Integer decode(InputStream inStream, Context context) + public Integer decode(InputStream inStream) throws IOException, CoderException { try { return new DataInputStream(inStream).readInt(); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java index 868160a..ab85e17 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java @@ -43,7 +43,7 @@ public class BigEndianLongCoder extends AtomicCoder<Long> { private BigEndianLongCoder() {} @Override - public void encode(Long value, OutputStream outStream, Context context) + public void encode(Long value, OutputStream outStream) throws IOException, CoderException { if (value == null) { throw new CoderException("cannot encode a null Long"); @@ -52,7 +52,7 @@ public class BigEndianLongCoder extends AtomicCoder<Long> { } @Override - public Long decode(InputStream inStream, Context context) + public Long decode(InputStream inStream) throws IOException, CoderException { try { return new DataInputStream(inStream).readLong(); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java index 3b038af..d54accf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java @@ -42,6 +42,12 @@ public class BigIntegerCoder extends AtomicCoder<BigInteger> { private BigIntegerCoder() {} @Override + public void encode(BigInteger value, OutputStream outStream) + throws IOException, CoderException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(BigInteger value, OutputStream outStream, Context context) throws IOException, CoderException { checkNotNull(value, String.format("cannot encode a null %s", BigInteger.class.getSimpleName())); @@ -49,6 +55,11 @@ public class BigIntegerCoder extends AtomicCoder<BigInteger> { } @Override + public BigInteger decode(InputStream inStream) throws IOException, CoderException { + return decode(inStream, Context.NESTED); + } + + @Override public BigInteger decode(InputStream inStream, Context context) throws IOException, CoderException { return new BigInteger(BYTE_ARRAY_CODER.decode(inStream, context)); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java index f49776b..8115017 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java @@ -36,6 +36,12 @@ public class BitSetCoder extends AtomicCoder<BitSet> { } @Override + public void encode(BitSet value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(BitSet value, OutputStream outStream, Context context) throws CoderException, IOException { if (value == null) { @@ -45,6 +51,11 @@ public class BitSetCoder extends AtomicCoder<BitSet> { } @Override + public BitSet decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public BitSet decode(InputStream inStream, Context context) throws CoderException, IOException { return BitSet.valueOf(BYTE_ARRAY_CODER.decode(inStream, context)); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java index c9393a1..3b38388 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java @@ -52,6 +52,12 @@ public class ByteArrayCoder extends AtomicCoder<byte[]> { private ByteArrayCoder() {} @Override + public void encode(byte[] value, OutputStream outStream) + throws IOException, CoderException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(byte[] value, OutputStream outStream, Context context) throws IOException, CoderException { if (value == null) { @@ -86,6 +92,11 @@ public class ByteArrayCoder extends AtomicCoder<byte[]> { } @Override + public byte[] decode(InputStream inStream) throws IOException, CoderException { + return decode(inStream, Context.NESTED); + } + + @Override public byte[] decode(InputStream inStream, Context context) throws IOException, CoderException { if (context.isWholeStream) { http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java index 7f449d6..2d23a64 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java @@ -41,7 +41,7 @@ public class ByteCoder extends AtomicCoder<Byte> { private ByteCoder() {} @Override - public void encode(Byte value, OutputStream outStream, Context context) + public void encode(Byte value, OutputStream outStream) throws IOException, CoderException { if (value == null) { throw new CoderException("cannot encode a null Byte"); @@ -50,7 +50,7 @@ public class ByteCoder extends AtomicCoder<Byte> { } @Override - public Byte decode(InputStream inStream, Context context) + public Byte decode(InputStream inStream) throws IOException, CoderException { try { // value will be between 0-255, -1 for EOF http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java index d140e89..2ee532d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java @@ -64,6 +64,7 @@ import org.apache.beam.sdk.values.TypeDescriptor; public abstract class Coder<T> implements Serializable { /** The context in which encoding or decoding is being done. */ @Deprecated + @Experimental(Kind.CODER_CONTEXT) public static class Context { /** * The outer context: the value being encoded or decoded takes @@ -127,18 +128,6 @@ public abstract class Coder<T> implements Serializable { /** * Encodes the given value of type {@code T} onto the given output stream - * in the outer context. - * - * @throws IOException if writing to the {@code OutputStream} fails - * for some reason - * @throws CoderException if the value could not be encoded for some reason - */ - @Deprecated - public abstract void encodeOuter(T value, OutputStream outStream) - throws CoderException, IOException; - - /** - * Encodes the given value of type {@code T} onto the given output stream * in the given context. * * @throws IOException if writing to the {@code OutputStream} fails @@ -146,8 +135,11 @@ public abstract class Coder<T> implements Serializable { * @throws CoderException if the value could not be encoded for some reason */ @Deprecated - public abstract void encode(T value, OutputStream outStream, Context context) - throws CoderException, IOException; + @Experimental(Kind.CODER_CONTEXT) + public void encode(T value, OutputStream outStream, Context context) + throws CoderException, IOException { + encode(value, outStream); + } /** * Decodes a value of type {@code T} from the given input stream in @@ -161,17 +153,6 @@ public abstract class Coder<T> implements Serializable { /** * Decodes a value of type {@code T} from the given input stream in - * the outer context. Returns the decoded value. - * - * @throws IOException if reading from the {@code InputStream} fails - * for some reason - * @throws CoderException if the value could not be decoded for some reason - */ - @Deprecated - public abstract T decodeOuter(InputStream inStream) throws CoderException, IOException; - - /** - * Decodes a value of type {@code T} from the given input stream in * the given context. Returns the decoded value. * * @throws IOException if reading from the {@code InputStream} fails @@ -179,8 +160,11 @@ public abstract class Coder<T> implements Serializable { * @throws CoderException if the value could not be decoded for some reason */ @Deprecated - public abstract T decode(InputStream inStream, Context context) - throws CoderException, IOException; + @Experimental(Kind.CODER_CONTEXT) + public T decode(InputStream inStream, Context context) + throws CoderException, IOException { + return decode(inStream); + } /** * If this is a {@code Coder} for a parameterized type, returns the http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java index edbaa7f..c581923 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java @@ -17,9 +17,6 @@ */ package org.apache.beam.sdk.coders; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.io.Serializable; import java.util.Collections; import java.util.List; @@ -39,50 +36,6 @@ import java.util.List; public abstract class CustomCoder<T> extends Coder<T> implements Serializable { - @Override - public void encode(T value, OutputStream outStream) - throws CoderException, IOException { - encode(value, outStream, Coder.Context.NESTED); - } - - @Deprecated - @Override - public void encodeOuter(T value, OutputStream outStream) - throws CoderException, IOException { - encode(value, outStream, Coder.Context.OUTER); - } - - @Deprecated - public void encode(T value, OutputStream outStream, Coder.Context context) - throws CoderException, IOException { - if (context == Coder.Context.NESTED) { - encode(value, outStream); - } else { - encodeOuter(value, outStream); - } - } - - @Override - public T decode(InputStream inStream) throws CoderException, IOException { - return decode(inStream, Coder.Context.NESTED); - } - - @Deprecated - @Override - public T decodeOuter(InputStream inStream) throws CoderException, IOException { - return decode(inStream, Coder.Context.OUTER); - } - - @Deprecated - public T decode(InputStream inStream, Coder.Context context) - throws CoderException, IOException { - if (context == Coder.Context.NESTED) { - return decode(inStream); - } else { - return decodeOuter(inStream); - } - } - /** * {@inheritDoc}. * http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java index 86077eb..f51b156 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java @@ -66,12 +66,23 @@ public final class DelegateCoder<T, IntermediateT> extends CustomCoder<T> { } @Override + public void encode(T value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(T value, OutputStream outStream, Context context) throws CoderException, IOException { coder.encode(applyAndWrapExceptions(toFn, value), outStream, context); } @Override + public T decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public T decode(InputStream inStream, Context context) throws CoderException, IOException { return applyAndWrapExceptions(fromFn, coder.decode(inStream, context)); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java index 8eff6ba..deb18f2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java @@ -43,7 +43,7 @@ public class DoubleCoder extends AtomicCoder<Double> { private DoubleCoder() {} @Override - public void encode(Double value, OutputStream outStream, Context context) + public void encode(Double value, OutputStream outStream) throws IOException, CoderException { if (value == null) { throw new CoderException("cannot encode a null Double"); @@ -52,7 +52,7 @@ public class DoubleCoder extends AtomicCoder<Double> { } @Override - public Double decode(InputStream inStream, Context context) + public Double decode(InputStream inStream) throws IOException, CoderException { try { return new DataInputStream(inStream).readDouble(); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java index 8b4ae1d..90de26f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java @@ -54,18 +54,18 @@ public class DurationCoder extends AtomicCoder<ReadableDuration> { } @Override - public void encode(ReadableDuration value, OutputStream outStream, Context context) + public void encode(ReadableDuration value, OutputStream outStream) throws CoderException, IOException { if (value == null) { throw new CoderException("cannot encode a null ReadableDuration"); } - LONG_CODER.encode(toLong(value), outStream, context); + LONG_CODER.encode(toLong(value), outStream); } @Override - public ReadableDuration decode(InputStream inStream, Context context) + public ReadableDuration decode(InputStream inStream) throws CoderException, IOException { - return fromLong(LONG_CODER.decode(inStream, context)); + return fromLong(LONG_CODER.decode(inStream)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java index 000f406..648493e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java @@ -68,18 +68,18 @@ public class InstantCoder extends AtomicCoder<Instant> { } @Override - public void encode(Instant value, OutputStream outStream, Context context) + public void encode(Instant value, OutputStream outStream) throws CoderException, IOException { if (value == null) { throw new CoderException("cannot encode a null Instant"); } - LONG_CODER.encode(ORDER_PRESERVING_CONVERTER.convert(value), outStream, context); + LONG_CODER.encode(ORDER_PRESERVING_CONVERTER.convert(value), outStream); } @Override - public Instant decode(InputStream inStream, Context context) + public Instant decode(InputStream inStream) throws CoderException, IOException { - return ORDER_PRESERVING_CONVERTER.reverse().convert(LONG_CODER.decode(inStream, context)); + return ORDER_PRESERVING_CONVERTER.reverse().convert(LONG_CODER.decode(inStream)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java index 9994b3f..248c26c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java @@ -84,12 +84,11 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>> @Override public void encode( - IterableT iterable, OutputStream outStream, Context context) + IterableT iterable, OutputStream outStream) throws IOException, CoderException { if (iterable == null) { throw new CoderException("cannot encode a null " + iterableName); } - Context nestedContext = context.nested(); DataOutputStream dataOutStream = new DataOutputStream(outStream); if (iterable instanceof Collection) { // We can know the size of the Iterable. Use an encoding with a @@ -97,7 +96,7 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>> Collection<T> collection = (Collection<T>) iterable; dataOutStream.writeInt(collection.size()); for (T elem : collection) { - elementCoder.encode(elem, dataOutStream, nestedContext); + elementCoder.encode(elem, dataOutStream); } } else { // We don't know the size without traversing it so use a fixed size buffer @@ -108,7 +107,7 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>> new BufferedElementCountingOutputStream(dataOutStream); for (T elem : iterable) { countingOutputStream.markElementStart(); - elementCoder.encode(elem, countingOutputStream, nestedContext); + elementCoder.encode(elem, countingOutputStream); } countingOutputStream.finish(); } @@ -117,15 +116,14 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>> } @Override - public IterableT decode(InputStream inStream, Context context) + public IterableT decode(InputStream inStream) throws IOException, CoderException { - Context nestedContext = context.nested(); DataInputStream dataInStream = new DataInputStream(inStream); int size = dataInStream.readInt(); if (size >= 0) { List<T> elements = new ArrayList<>(size); for (int i = 0; i < size; i++) { - elements.add(elementCoder.decode(dataInStream, nestedContext)); + elements.add(elementCoder.decode(dataInStream)); } return decodeToIterable(elements); } @@ -134,7 +132,7 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>> // each block of elements. long count = VarInt.decodeLong(dataInStream); while (count > 0L) { - elements.add(elementCoder.decode(dataInStream, nestedContext)); + elements.add(elementCoder.decode(dataInStream)); --count; if (count == 0L) { count = VarInt.decodeLong(dataInStream); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java index 1df4460..9c01886 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java @@ -58,19 +58,30 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> { } @Override + public void encode(KV<K, V> kv, OutputStream outStream) + throws IOException, CoderException { + encode(kv, outStream, Context.NESTED); + } + + @Override public void encode(KV<K, V> kv, OutputStream outStream, Context context) throws IOException, CoderException { if (kv == null) { throw new CoderException("cannot encode a null KV"); } - keyCoder.encode(kv.getKey(), outStream, context.nested()); + keyCoder.encode(kv.getKey(), outStream); valueCoder.encode(kv.getValue(), outStream, context); } @Override + public KV<K, V> decode(InputStream inStream) throws IOException, CoderException { + return decode(inStream, Context.NESTED); + } + + @Override public KV<K, V> decode(InputStream inStream, Context context) throws IOException, CoderException { - K key = keyCoder.decode(inStream, context.nested()); + K key = keyCoder.decode(inStream); V value = valueCoder.decode(inStream, context); return KV.of(key, value); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java index 7dd2a32..b24f66d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java @@ -53,7 +53,7 @@ public class LengthPrefixCoder<T> extends StructuredCoder<T> { } @Override - public void encode(T value, OutputStream outStream, Context context) + public void encode(T value, OutputStream outStream) throws CoderException, IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); valueCoder.encode(value, bos, Context.OUTER); @@ -62,7 +62,7 @@ public class LengthPrefixCoder<T> extends StructuredCoder<T> { } @Override - public T decode(InputStream inStream, Context context) throws CoderException, IOException { + public T decode(InputStream inStream) throws CoderException, IOException { long size = VarInt.decodeLong(inStream); return valueCoder.decode(ByteStreams.limit(inStream, size), Context.OUTER); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java index 7df9ca9..d8b3f1c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java @@ -69,6 +69,12 @@ public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> { } @Override + public void encode(Map<K, V> map, OutputStream outStream) + throws IOException, CoderException { + encode(map, outStream, Context.NESTED); + } + + @Override public void encode( Map<K, V> map, OutputStream outStream, @@ -89,17 +95,22 @@ public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> { Iterator<Entry<K, V>> iterator = map.entrySet().iterator(); Entry<K, V> entry = iterator.next(); while (iterator.hasNext()) { - keyCoder.encode(entry.getKey(), outStream, context.nested()); - valueCoder.encode(entry.getValue(), outStream, context.nested()); + keyCoder.encode(entry.getKey(), outStream); + valueCoder.encode(entry.getValue(), outStream); entry = iterator.next(); } - keyCoder.encode(entry.getKey(), outStream, context.nested()); + keyCoder.encode(entry.getKey(), outStream); valueCoder.encode(entry.getValue(), outStream, context); // no flush needed as DataOutputStream does not buffer } @Override + public Map<K, V> decode(InputStream inStream) throws IOException, CoderException { + return decode(inStream, Context.NESTED); + } + + @Override public Map<K, V> decode(InputStream inStream, Context context) throws IOException, CoderException { DataInputStream dataInStream = new DataInputStream(inStream); @@ -110,12 +121,12 @@ public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> { Map<K, V> retval = Maps.newHashMapWithExpectedSize(size); for (int i = 0; i < size - 1; ++i) { - K key = keyCoder.decode(inStream, context.nested()); - V value = valueCoder.decode(inStream, context.nested()); + K key = keyCoder.decode(inStream); + V value = valueCoder.decode(inStream); retval.put(key, value); } - K key = keyCoder.decode(inStream, context.nested()); + K key = keyCoder.decode(inStream); V value = valueCoder.decode(inStream, context); retval.put(key, value); return retval; http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java index e46591e..64229e8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java @@ -61,6 +61,12 @@ public class NullableCoder<T> extends StructuredCoder<T> { } @Override + public void encode(@Nullable T value, OutputStream outStream) + throws IOException, CoderException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(@Nullable T value, OutputStream outStream, Context context) throws IOException, CoderException { if (value == null) { @@ -72,6 +78,11 @@ public class NullableCoder<T> extends StructuredCoder<T> { } @Override + public T decode(InputStream inStream) throws IOException, CoderException { + return decode(inStream, Context.NESTED); + } + + @Override @Nullable public T decode(InputStream inStream, Context context) throws IOException, CoderException { int b = inStream.read(); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java index e3b2959..9aa8493 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java @@ -119,7 +119,7 @@ public class SerializableCoder<T extends Serializable> extends CustomCoder<T> { } @Override - public void encode(T value, OutputStream outStream, Context context) + public void encode(T value, OutputStream outStream) throws IOException, CoderException { try { ObjectOutputStream oos = new ObjectOutputStream(outStream); @@ -131,7 +131,7 @@ public class SerializableCoder<T extends Serializable> extends CustomCoder<T> { } @Override - public T decode(InputStream inStream, Context context) + public T decode(InputStream inStream) throws IOException, CoderException { try { ObjectInputStream ois = new ObjectInputStream(inStream); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java index 1f4538f..2161291 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java @@ -100,12 +100,23 @@ public final class StringDelegateCoder<T> extends CustomCoder<T> { } @Override + public void encode(T value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(T value, OutputStream outStream, Context context) throws CoderException, IOException { delegateCoder.encode(value, outStream, context); } @Override + public T decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public T decode(InputStream inStream, Context context) throws CoderException, IOException { return delegateCoder.decode(inStream, context); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java index 44856e8..3bbc983 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java @@ -67,6 +67,12 @@ public class StringUtf8Coder extends AtomicCoder<String> { private StringUtf8Coder() {} @Override + public void encode(String value, OutputStream outStream) + throws IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(String value, OutputStream outStream, Context context) throws IOException { if (value == null) { @@ -85,6 +91,11 @@ public class StringUtf8Coder extends AtomicCoder<String> { } @Override + public String decode(InputStream inStream) throws IOException { + return decode(inStream, Context.NESTED); + } + + @Override public String decode(InputStream inStream, Context context) throws IOException { if (context.isWholeStream) { http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java index 437f10d..42c0598 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java @@ -18,9 +18,6 @@ package org.apache.beam.sdk.coders; import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -101,50 +98,6 @@ public abstract class StructuredCoder<T> extends Coder<T> { return builder.toString(); } - @Override - public void encode(T value, OutputStream outStream) - throws CoderException, IOException { - encode(value, outStream, Coder.Context.NESTED); - } - - @Deprecated - @Override - public void encodeOuter(T value, OutputStream outStream) - throws CoderException, IOException { - encode(value, outStream, Coder.Context.OUTER); - } - - @Deprecated - public void encode(T value, OutputStream outStream, Coder.Context context) - throws CoderException, IOException { - if (context == Coder.Context.NESTED) { - encode(value, outStream); - } else { - encodeOuter(value, outStream); - } - } - - @Override - public T decode(InputStream inStream) throws CoderException, IOException { - return decode(inStream, Coder.Context.NESTED); - } - - @Deprecated - @Override - public T decodeOuter(InputStream inStream) throws CoderException, IOException { - return decode(inStream, Coder.Context.OUTER); - } - - @Deprecated - public T decode(InputStream inStream, Coder.Context context) - throws CoderException, IOException { - if (context == Coder.Context.NESTED) { - return decode(inStream); - } else { - return decodeOuter(inStream); - } - } - protected void verifyDeterministic(String message, Iterable<Coder<?>> coders) throws NonDeterministicException { for (Coder<?> coder : coders) { http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java index 718811c..6078fa3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java @@ -39,6 +39,12 @@ public class TextualIntegerCoder extends AtomicCoder<Integer> { protected TextualIntegerCoder() {} @Override + public void encode(Integer value, OutputStream outStream) + throws IOException, CoderException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(Integer value, OutputStream outStream, Context context) throws IOException, CoderException { if (value == null) { @@ -49,6 +55,11 @@ public class TextualIntegerCoder extends AtomicCoder<Integer> { } @Override + public Integer decode(InputStream inStream) throws IOException, CoderException { + return decode(inStream, Context.NESTED); + } + + @Override public Integer decode(InputStream inStream, Context context) throws IOException, CoderException { String textualValue = StringUtf8Coder.of().decode(inStream, context); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java index bda66bb..3a9abe7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java @@ -44,7 +44,7 @@ public class VarIntCoder extends AtomicCoder<Integer> { private VarIntCoder() {} @Override - public void encode(Integer value, OutputStream outStream, Context context) + public void encode(Integer value, OutputStream outStream) throws IOException, CoderException { if (value == null) { throw new CoderException("cannot encode a null Integer"); @@ -53,7 +53,7 @@ public class VarIntCoder extends AtomicCoder<Integer> { } @Override - public Integer decode(InputStream inStream, Context context) + public Integer decode(InputStream inStream) throws IOException, CoderException { try { return VarInt.decodeInt(inStream); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java index bf651c3..37ad8f6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java @@ -45,7 +45,7 @@ public class VarLongCoder extends StructuredCoder<Long> { private VarLongCoder() {} @Override - public void encode(Long value, OutputStream outStream, Context context) + public void encode(Long value, OutputStream outStream) throws IOException, CoderException { if (value == null) { throw new CoderException("cannot encode a null Long"); @@ -54,7 +54,7 @@ public class VarLongCoder extends StructuredCoder<Long> { } @Override - public Long decode(InputStream inStream, Context context) + public Long decode(InputStream inStream) throws IOException, CoderException { try { return VarInt.decodeLong(inStream);
