Remove contexts from coders where they'll never be used.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/996dce37 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/996dce37 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/996dce37 Branch: refs/heads/master Commit: 996dce37b76b103f104328b7caa65f73a1bcb15a Parents: 27e9a06 Author: Robert Bradshaw <[email protected]> Authored: Fri May 5 16:36:47 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Mon May 8 20:17:56 2017 -0700 ---------------------------------------------------------------------- .../UnboundedReadFromBoundedSource.java | 4 +-- .../core/ElementAndRestrictionCoder.java | 4 +-- .../beam/runners/core/KeyedWorkItemCoder.java | 4 +-- .../beam/runners/core/TimerInternals.java | 4 +-- .../apache/beam/sdk/coders/DurationCoder.java | 4 +-- .../apache/beam/sdk/coders/InstantCoder.java | 4 +-- .../sdk/transforms/ApproximateQuantiles.java | 20 +++++------- .../org/apache/beam/sdk/transforms/Mean.java | 4 +-- .../org/apache/beam/sdk/transforms/Top.java | 4 +-- .../beam/sdk/transforms/join/CoGbkResult.java | 10 ++---- .../transforms/windowing/IntervalWindow.java | 4 +-- .../beam/sdk/values/TimestampedValue.java | 4 +-- .../beam/sdk/io/kinesis/KinesisRecordCoder.java | 34 +++++++++----------- 13 files changed, 47 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java index ae28e3a..b74da80 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java @@ -224,7 +224,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle public void encode(Checkpoint<T> value, OutputStream outStream, Context context) throws CoderException, IOException { elemsCoder.encode(value.residualElements, outStream); - sourceCoder.encode(value.residualSource, outStream, context); + sourceCoder.encode(value.residualSource, outStream); } @SuppressWarnings("unchecked") @@ -233,7 +233,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle throws CoderException, IOException { return new Checkpoint<>( elemsCoder.decode(inStream), - sourceCoder.decode(inStream, context)); + sourceCoder.decode(inStream)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java index 5ddd865..fcb1deb 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java @@ -56,14 +56,14 @@ public class ElementAndRestrictionCoder<ElementT, RestrictionT> throw new CoderException("cannot encode a null ElementAndRestriction"); } elementCoder.encode(value.element(), outStream); - restrictionCoder.encode(value.restriction(), outStream, context); + restrictionCoder.encode(value.restriction(), outStream); } @Override public ElementAndRestriction<ElementT, RestrictionT> decode(InputStream inStream, Context context) throws IOException { ElementT key = elementCoder.decode(inStream); - RestrictionT value = restrictionCoder.decode(inStream, context); + RestrictionT value = restrictionCoder.decode(inStream); return ElementAndRestriction.of(key, value); } http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java index ac8a34c..0869244 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java @@ -74,7 +74,7 @@ public class KeyedWorkItemCoder<K, ElemT> extends StructuredCoder<KeyedWorkItem< Coder.Context nestedContext = context.nested(); keyCoder.encode(value.key(), outStream); timersCoder.encode(value.timersIterable(), outStream); - elemsCoder.encode(value.elementsIterable(), outStream, context); + elemsCoder.encode(value.elementsIterable(), outStream); } @Override @@ -83,7 +83,7 @@ public class KeyedWorkItemCoder<K, ElemT> extends StructuredCoder<KeyedWorkItem< Coder.Context nestedContext = context.nested(); K key = keyCoder.decode(inStream); Iterable<TimerData> timers = timersCoder.decode(inStream); - Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream, context); + Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream); return KeyedWorkItems.workItem(key, timers, elems); } http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java index 3607fdd..f0a62cd 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java @@ -244,7 +244,7 @@ public interface TimerInternals { STRING_CODER.encode(timer.getTimerId(), outStream); STRING_CODER.encode(timer.getNamespace().stringKey(), outStream); INSTANT_CODER.encode(timer.getTimestamp(), outStream); - STRING_CODER.encode(timer.getDomain().name(), outStream, context); + STRING_CODER.encode(timer.getDomain().name(), outStream); } @Override @@ -255,7 +255,7 @@ public interface TimerInternals { StateNamespace namespace = StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder); Instant timestamp = INSTANT_CODER.decode(inStream); - TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream, context)); + TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream)); return TimerData.of(timerId, namespace, timestamp, domain); } http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java index 8b4ae1d..b7db305 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java @@ -59,13 +59,13 @@ public class DurationCoder extends AtomicCoder<ReadableDuration> { if (value == null) { throw new CoderException("cannot encode a null ReadableDuration"); } - LONG_CODER.encode(toLong(value), outStream, context); + LONG_CODER.encode(toLong(value), outStream); } @Override public ReadableDuration decode(InputStream inStream, Context context) throws CoderException, IOException { - return fromLong(LONG_CODER.decode(inStream, context)); + return fromLong(LONG_CODER.decode(inStream)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java index 000f406..22b11a3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java @@ -73,13 +73,13 @@ public class InstantCoder extends AtomicCoder<Instant> { if (value == null) { throw new CoderException("cannot encode a null Instant"); } - LONG_CODER.encode(ORDER_PRESERVING_CONVERTER.convert(value), outStream, context); + LONG_CODER.encode(ORDER_PRESERVING_CONVERTER.convert(value), outStream); } @Override public Instant decode(InputStream inStream, Context context) throws CoderException, IOException { - return ORDER_PRESERVING_CONVERTER.reverse().convert(LONG_CODER.decode(inStream, context)); + return ORDER_PRESERVING_CONVERTER.reverse().convert(LONG_CODER.decode(inStream)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java index 348cc5f..9b9d3f8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java @@ -681,24 +681,22 @@ public class ApproximateQuantiles { public void encode( QuantileState<T, ComparatorT> state, OutputStream outStream, Coder.Context context) throws CoderException, IOException { - Coder.Context nestedContext = context.nested(); intCoder.encode(state.numQuantiles, outStream); intCoder.encode(state.bufferSize, outStream); elementCoder.encode(state.min, outStream); elementCoder.encode(state.max, outStream); elementListCoder.encode( - state.unbufferedElements, outStream, nestedContext); + state.unbufferedElements, outStream); BigEndianIntegerCoder.of().encode( - state.buffers.size(), outStream, nestedContext); + state.buffers.size(), outStream); for (QuantileBuffer<T> buffer : state.buffers) { - encodeBuffer(buffer, outStream, nestedContext); + encodeBuffer(buffer, outStream); } } @Override public QuantileState<T, ComparatorT> decode(InputStream inStream, Coder.Context context) throws CoderException, IOException { - Coder.Context nestedContext = context.nested(); int numQuantiles = intCoder.decode(inStream); int bufferSize = intCoder.decode(inStream); T min = elementCoder.decode(inStream); @@ -709,29 +707,27 @@ public class ApproximateQuantiles { BigEndianIntegerCoder.of().decode(inStream); List<QuantileBuffer<T>> buffers = new ArrayList<>(numBuffers); for (int i = 0; i < numBuffers; i++) { - buffers.add(decodeBuffer(inStream, nestedContext)); + buffers.add(decodeBuffer(inStream)); } return new QuantileState<T, ComparatorT>( compareFn, numQuantiles, min, max, numBuffers, bufferSize, unbufferedElements, buffers); } - private void encodeBuffer( - QuantileBuffer<T> buffer, OutputStream outStream, Coder.Context context) + private void encodeBuffer(QuantileBuffer<T> buffer, OutputStream outStream) throws CoderException, IOException { DataOutputStream outData = new DataOutputStream(outStream); outData.writeInt(buffer.level); outData.writeLong(buffer.weight); - elementListCoder.encode(buffer.elements, outStream, context); + elementListCoder.encode(buffer.elements, outStream); } - private QuantileBuffer<T> decodeBuffer( - InputStream inStream, Coder.Context context) + private QuantileBuffer<T> decodeBuffer(InputStream inStream) throws IOException, CoderException { DataInputStream inData = new DataInputStream(inStream); return new QuantileBuffer<>( inData.readInt(), inData.readLong(), - elementListCoder.decode(inStream, context)); + elementListCoder.decode(inStream)); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java index a46a21f..c8e0d95 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java @@ -188,7 +188,7 @@ public class Mean { public void encode(CountSum<NumT> value, OutputStream outStream, Coder.Context context) throws CoderException, IOException { LONG_CODER.encode(value.count, outStream); - DOUBLE_CODER.encode(value.sum, outStream, context); + DOUBLE_CODER.encode(value.sum, outStream); } @Override @@ -196,7 +196,7 @@ public class Mean { throws CoderException, IOException { return new CountSum<>( LONG_CODER.decode(inStream), - DOUBLE_CODER.decode(inStream, context)); + DOUBLE_CODER.decode(inStream)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java index c0381a7..7aec667 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java @@ -541,13 +541,13 @@ public class Top { public void encode( BoundedHeap<T, ComparatorT> value, OutputStream outStream, Context context) throws CoderException, IOException { - listCoder.encode(value.asList(), outStream, context); + listCoder.encode(value.asList(), outStream); } @Override public BoundedHeap<T, ComparatorT> decode(InputStream inStream, Coder.Context context) throws CoderException, IOException { - return new BoundedHeap<>(maximumSize, compareFn, listCoder.decode(inStream, context)); + return new BoundedHeap<>(maximumSize, compareFn, listCoder.decode(inStream)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java index bd669ef..6603325 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java @@ -251,11 +251,9 @@ public class CoGbkResult { if (schema.size() == 0) { return; } - int lastIndex = schema.size() - 1; - for (int unionTag = 0; unionTag < lastIndex; unionTag++) { + for (int unionTag = 0; unionTag < schema.size(); unionTag++) { tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream); } - tagListCoder(lastIndex).encode(value.valueMap.get(lastIndex), outStream, context); } @Override @@ -266,12 +264,10 @@ public class CoGbkResult { if (schema.size() == 0) { return new CoGbkResult(schema, ImmutableList.<Iterable<?>>of()); } - int lastIndex = schema.size() - 1; List<Iterable<?>> valueMap = Lists.newArrayListWithExpectedSize(schema.size()); - for (int unionTag = 0; unionTag < lastIndex; unionTag++) { - valueMap.add(tagListCoder(unionTag).decode(inStream, context.nested())); + for (int unionTag = 0; unionTag < schema.size(); unionTag++) { + valueMap.add(tagListCoder(unionTag).decode(inStream, Coder.Context.NESTED)); } - valueMap.add(tagListCoder(lastIndex).decode(inStream, context)); return new CoGbkResult(schema, valueMap); } http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java index cb5a7cf..318dc4c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java @@ -182,14 +182,14 @@ public class IntervalWindow extends BoundedWindow public void encode(IntervalWindow window, OutputStream outStream, Context context) throws IOException, CoderException { instantCoder.encode(window.end, outStream); - durationCoder.encode(new Duration(window.start, window.end), outStream, context); + durationCoder.encode(new Duration(window.start, window.end), outStream); } @Override public IntervalWindow decode(InputStream inStream, Context context) throws IOException, CoderException { Instant end = instantCoder.decode(inStream); - ReadableDuration duration = durationCoder.decode(inStream, context); + ReadableDuration duration = durationCoder.decode(inStream); return new IntervalWindow(end.minus(duration), end); } http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java index 89747a7..95a3152 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java @@ -106,14 +106,14 @@ public class TimestampedValue<V> { throws IOException { valueCoder.encode(windowedElem.getValue(), outStream); InstantCoder.of().encode( - windowedElem.getTimestamp(), outStream, context); + windowedElem.getTimestamp(), outStream); } @Override public TimestampedValue<T> decode(InputStream inStream, Context context) throws IOException { T value = valueCoder.decode(inStream); - Instant timestamp = InstantCoder.of().decode(inStream, context); + Instant timestamp = InstantCoder.of().decode(inStream); return TimestampedValue.of(value, timestamp); } http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java index 77fe127..c6a0174 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java @@ -45,28 +45,26 @@ class KinesisRecordCoder extends AtomicCoder<KinesisRecord> { @Override public void encode(KinesisRecord value, OutputStream outStream, Context context) throws IOException { - Context nested = context.nested(); - BYTE_ARRAY_CODER.encode(value.getData().array(), outStream, nested); - STRING_CODER.encode(value.getSequenceNumber(), outStream, nested); - STRING_CODER.encode(value.getPartitionKey(), outStream, nested); - INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream, nested); - VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream, nested); - INSTANT_CODER.encode(value.getReadTime(), outStream, nested); - STRING_CODER.encode(value.getStreamName(), outStream, nested); - STRING_CODER.encode(value.getShardId(), outStream, context); + BYTE_ARRAY_CODER.encode(value.getData().array(), outStream); + STRING_CODER.encode(value.getSequenceNumber(), outStream); + STRING_CODER.encode(value.getPartitionKey(), outStream); + INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream); + VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream); + INSTANT_CODER.encode(value.getReadTime(), outStream); + STRING_CODER.encode(value.getStreamName(), outStream); + STRING_CODER.encode(value.getShardId(), outStream); } @Override public KinesisRecord decode(InputStream inStream, Context context) throws IOException { - Context nested = context.nested(); - ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream, nested)); - String sequenceNumber = STRING_CODER.decode(inStream, nested); - String partitionKey = STRING_CODER.decode(inStream, nested); - Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream, nested); - long subSequenceNumber = VAR_LONG_CODER.decode(inStream, nested); - Instant readTimestamp = INSTANT_CODER.decode(inStream, nested); - String streamName = STRING_CODER.decode(inStream, nested); - String shardId = STRING_CODER.decode(inStream, context); + ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream)); + String sequenceNumber = STRING_CODER.decode(inStream); + String partitionKey = STRING_CODER.decode(inStream); + Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream); + long subSequenceNumber = VAR_LONG_CODER.decode(inStream); + Instant readTimestamp = INSTANT_CODER.decode(inStream); + String streamName = STRING_CODER.decode(inStream); + String shardId = STRING_CODER.decode(inStream); return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey, approximateArrivalTimestamp, readTimestamp, streamName, shardId );
