http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java index 4467faa..3e1ff7f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java @@ -38,12 +38,12 @@ public class VoidCoder extends AtomicCoder<Void> { private VoidCoder() {} @Override - public void encode(Void value, OutputStream outStream, Context context) { + public void encode(Void value, OutputStream outStream) { // Nothing to write! } @Override - public Void decode(InputStream inStream, Context context) { + public Void decode(InputStream inStream) { // Nothing to read! return null; }
http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 20fab9b..32aa9c3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -947,25 +947,24 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { } @Override - public void encode(FileResult value, OutputStream outStream, Context context) + public void encode(FileResult value, OutputStream outStream) throws IOException { if (value == null) { throw new CoderException("cannot encode a null value"); } - stringCoder.encode(value.getFilename().toString(), outStream, context.nested()); + stringCoder.encode(value.getFilename().toString(), outStream); if (value.getDestinationFilename() == null) { - stringCoder.encode(null, outStream, context); + stringCoder.encode(null, outStream); } else { - stringCoder.encode(value.getDestinationFilename().toString(), outStream, context); + stringCoder.encode(value.getDestinationFilename().toString(), outStream); } } @Override - public FileResult decode(InputStream inStream, Context context) - throws IOException { - String filename = stringCoder.decode(inStream, context.nested()); + public FileResult decode(InputStream inStream) throws IOException { + String filename = stringCoder.decode(inStream); assert filename != null; // fixes a compiler warning - @Nullable String destinationFilename = stringCoder.decode(inStream, context); + @Nullable String destinationFilename = stringCoder.decode(inStream); return new FileResult( FileSystems.matchNewResource(filename, false /* isDirectory */), destinationFilename == null http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java index 37d5a55..d12d193 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java @@ -679,59 +679,55 @@ public class ApproximateQuantiles { @Override public void encode( - QuantileState<T, ComparatorT> state, OutputStream outStream, Coder.Context context) + QuantileState<T, ComparatorT> state, OutputStream outStream) throws CoderException, IOException { - Coder.Context nestedContext = context.nested(); - intCoder.encode(state.numQuantiles, outStream, nestedContext); - intCoder.encode(state.bufferSize, outStream, nestedContext); - elementCoder.encode(state.min, outStream, nestedContext); - elementCoder.encode(state.max, outStream, nestedContext); + intCoder.encode(state.numQuantiles, outStream); + intCoder.encode(state.bufferSize, outStream); + elementCoder.encode(state.min, outStream); + elementCoder.encode(state.max, outStream); elementListCoder.encode( - state.unbufferedElements, outStream, nestedContext); + state.unbufferedElements, outStream); BigEndianIntegerCoder.of().encode( - state.buffers.size(), outStream, nestedContext); + state.buffers.size(), outStream); for (QuantileBuffer<T> buffer : state.buffers) { - encodeBuffer(buffer, outStream, nestedContext); + encodeBuffer(buffer, outStream); } } @Override - public QuantileState<T, ComparatorT> decode(InputStream inStream, Coder.Context context) + public QuantileState<T, ComparatorT> decode(InputStream inStream) throws CoderException, IOException { - Coder.Context nestedContext = context.nested(); - int numQuantiles = intCoder.decode(inStream, nestedContext); - int bufferSize = intCoder.decode(inStream, nestedContext); - T min = elementCoder.decode(inStream, nestedContext); - T max = elementCoder.decode(inStream, nestedContext); + int numQuantiles = intCoder.decode(inStream); + int bufferSize = intCoder.decode(inStream); + T min = elementCoder.decode(inStream); + T max = elementCoder.decode(inStream); List<T> unbufferedElements = - elementListCoder.decode(inStream, nestedContext); + elementListCoder.decode(inStream); int numBuffers = - BigEndianIntegerCoder.of().decode(inStream, nestedContext); + BigEndianIntegerCoder.of().decode(inStream); List<QuantileBuffer<T>> buffers = new ArrayList<>(numBuffers); for (int i = 0; i < numBuffers; i++) { - buffers.add(decodeBuffer(inStream, nestedContext)); + buffers.add(decodeBuffer(inStream)); } return new QuantileState<T, ComparatorT>( compareFn, numQuantiles, min, max, numBuffers, bufferSize, unbufferedElements, buffers); } - private void encodeBuffer( - QuantileBuffer<T> buffer, OutputStream outStream, Coder.Context context) + private void encodeBuffer(QuantileBuffer<T> buffer, OutputStream outStream) throws CoderException, IOException { DataOutputStream outData = new DataOutputStream(outStream); outData.writeInt(buffer.level); outData.writeLong(buffer.weight); - elementListCoder.encode(buffer.elements, outStream, context); + elementListCoder.encode(buffer.elements, outStream); } - private QuantileBuffer<T> decodeBuffer( - InputStream inStream, Coder.Context context) + private QuantileBuffer<T> decodeBuffer(InputStream inStream) throws IOException, CoderException { DataInputStream inData = new DataInputStream(inStream); return new QuantileBuffer<>( inData.readInt(), inData.readLong(), - elementListCoder.decode(inStream, context)); + elementListCoder.decode(inStream)); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index b9cdbd5..9e1cc71 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -532,6 +532,12 @@ public class Combine { } @Override + public void encode(Holder<V> accumulator, OutputStream outStream) + throws CoderException, IOException { + encode(accumulator, outStream, Context.NESTED); + } + + @Override public void encode(Holder<V> accumulator, OutputStream outStream, Context context) throws CoderException, IOException { if (accumulator.present) { @@ -543,6 +549,11 @@ public class Combine { } @Override + public Holder<V> decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public Holder<V> decode(InputStream inStream, Context context) throws CoderException, IOException { if (inStream.read() == 1) { @@ -1971,6 +1982,12 @@ public class Combine { } @Override + public void encode(InputOrAccum<InputT, AccumT> value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Coder.Context.NESTED); + } + + @Override public void encode( InputOrAccum<InputT, AccumT> value, OutputStream outStream, Coder.Context context) throws CoderException, IOException { @@ -1984,6 +2001,12 @@ public class Combine { } @Override + public InputOrAccum<InputT, AccumT> decode(InputStream inStream) + throws CoderException, IOException { + return decode(inStream, Coder.Context.NESTED); + } + + @Override public InputOrAccum<InputT, AccumT> decode(InputStream inStream, Coder.Context context) throws CoderException, IOException { if (inStream.read() == 0) { http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java index 0515ed5..c619783 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java @@ -534,6 +534,12 @@ public class CombineFns { } @Override + public void encode(Object[] value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(Object[] value, OutputStream outStream, Context context) throws CoderException, IOException { checkArgument(value.length == codersCount); @@ -541,14 +547,18 @@ public class CombineFns { return; } int lastIndex = codersCount - 1; - Context nestedContext = context.nested(); for (int i = 0; i < lastIndex; ++i) { - coders.get(i).encode(value[i], outStream, nestedContext); + coders.get(i).encode(value[i], outStream); } coders.get(lastIndex).encode(value[lastIndex], outStream, context); } @Override + public Object[] decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public Object[] decode(InputStream inStream, Context context) throws CoderException, IOException { Object[] ret = new Object[codersCount]; @@ -556,9 +566,8 @@ public class CombineFns { return ret; } int lastIndex = codersCount - 1; - Context nestedContext = context.nested(); for (int i = 0; i < lastIndex; ++i) { - ret[i] = coders.get(i).decode(inStream, nestedContext); + ret[i] = coders.get(i).decode(inStream); } ret[lastIndex] = coders.get(lastIndex).decode(inStream, context); return ret; http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java index 497d62b..b405dd1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java @@ -169,13 +169,13 @@ public class Count { Coder<T> inputCoder) { return new AtomicCoder<long[]>() { @Override - public void encode(long[] value, OutputStream outStream, Context context) + public void encode(long[] value, OutputStream outStream) throws IOException { VarInt.encode(value[0], outStream); } @Override - public long[] decode(InputStream inStream, Context context) + public long[] decode(InputStream inStream) throws IOException, CoderException { try { return new long[] {VarInt.decodeLong(inStream)}; http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java index a309954..8932b03 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java @@ -185,18 +185,18 @@ public class Mean { private static final Coder<Double> DOUBLE_CODER = DoubleCoder.of(); @Override - public void encode(CountSum<NumT> value, OutputStream outStream, Coder.Context context) + public void encode(CountSum<NumT> value, OutputStream outStream) throws CoderException, IOException { - LONG_CODER.encode(value.count, outStream, context.nested()); - DOUBLE_CODER.encode(value.sum, outStream, context); + LONG_CODER.encode(value.count, outStream); + DOUBLE_CODER.encode(value.sum, outStream); } @Override - public CountSum<NumT> decode(InputStream inStream, Coder.Context context) + public CountSum<NumT> decode(InputStream inStream) throws CoderException, IOException { return new CountSum<>( - LONG_CODER.decode(inStream, context.nested()), - DOUBLE_CODER.decode(inStream, context)); + LONG_CODER.decode(inStream), + DOUBLE_CODER.decode(inStream)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java index c0381a7..dd8bc5f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java @@ -539,15 +539,15 @@ public class Top { @Override public void encode( - BoundedHeap<T, ComparatorT> value, OutputStream outStream, Context context) + BoundedHeap<T, ComparatorT> value, OutputStream outStream) throws CoderException, IOException { - listCoder.encode(value.asList(), outStream, context); + listCoder.encode(value.asList(), outStream); } @Override - public BoundedHeap<T, ComparatorT> decode(InputStream inStream, Coder.Context context) + public BoundedHeap<T, ComparatorT> decode(InputStream inStream) throws CoderException, IOException { - return new BoundedHeap<>(maximumSize, compareFn, listCoder.decode(inStream, context)); + return new BoundedHeap<>(maximumSize, compareFn, listCoder.decode(inStream)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java index e9a3571..877bb07 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java @@ -242,8 +242,7 @@ public class CoGbkResult { @SuppressWarnings("unchecked") public void encode( CoGbkResult value, - OutputStream outStream, - Context context) throws CoderException, + OutputStream outStream) throws CoderException, IOException { if (!schema.equals(value.getSchema())) { throw new CoderException("input schema does not match coder schema"); @@ -251,27 +250,22 @@ public class CoGbkResult { if (schema.size() == 0) { return; } - int lastIndex = schema.size() - 1; - for (int unionTag = 0; unionTag < lastIndex; unionTag++) { - tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream, context.nested()); + for (int unionTag = 0; unionTag < schema.size(); unionTag++) { + tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream); } - tagListCoder(lastIndex).encode(value.valueMap.get(lastIndex), outStream, context); } @Override public CoGbkResult decode( - InputStream inStream, - Context context) + InputStream inStream) throws CoderException, IOException { if (schema.size() == 0) { return new CoGbkResult(schema, ImmutableList.<Iterable<?>>of()); } - int lastIndex = schema.size() - 1; List<Iterable<?>> valueMap = Lists.newArrayListWithExpectedSize(schema.size()); - for (int unionTag = 0; unionTag < lastIndex; unionTag++) { - valueMap.add(tagListCoder(unionTag).decode(inStream, context.nested())); + for (int unionTag = 0; unionTag < schema.size(); unionTag++) { + valueMap.add(tagListCoder(unionTag).decode(inStream)); } - valueMap.add(tagListCoder(lastIndex).decode(inStream, context)); return new CoGbkResult(schema, valueMap); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java index 3194a37..66959d3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java @@ -54,6 +54,12 @@ public class UnionCoder extends StructuredCoder<RawUnionValue> { return index; } + @Override + public void encode(RawUnionValue union, OutputStream outStream) + throws IOException, CoderException { + encode(union, outStream, Context.NESTED); + } + @SuppressWarnings("unchecked") @Override public void encode( @@ -74,6 +80,11 @@ public class UnionCoder extends StructuredCoder<RawUnionValue> { } @Override + public RawUnionValue decode(InputStream inStream) throws IOException, CoderException { + return decode(inStream, Context.NESTED); + } + + @Override public RawUnionValue decode(InputStream inStream, Context context) throws IOException, CoderException { int index = VarInt.decodeInt(inStream); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java index 0bfb875..078cbee 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java @@ -67,10 +67,10 @@ public class GlobalWindow extends BoundedWindow { public static final Coder INSTANCE = new Coder(); @Override - public void encode(GlobalWindow window, OutputStream outStream, Context context) {} + public void encode(GlobalWindow window, OutputStream outStream) {} @Override - public GlobalWindow decode(InputStream inStream, Context context) { + public GlobalWindow decode(InputStream inStream) { return GlobalWindow.INSTANCE; } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java index 46ece09..f25a208 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java @@ -179,17 +179,17 @@ public class IntervalWindow extends BoundedWindow } @Override - public void encode(IntervalWindow window, OutputStream outStream, Context context) + public void encode(IntervalWindow window, OutputStream outStream) throws IOException, CoderException { - instantCoder.encode(window.end, outStream, context.nested()); - durationCoder.encode(new Duration(window.start, window.end), outStream, context); + instantCoder.encode(window.end, outStream); + durationCoder.encode(new Duration(window.start, window.end), outStream); } @Override - public IntervalWindow decode(InputStream inStream, Context context) + public IntervalWindow decode(InputStream inStream) throws IOException, CoderException { - Instant end = instantCoder.decode(inStream, context.nested()); - ReadableDuration duration = durationCoder.decode(inStream, context); + Instant end = instantCoder.decode(inStream); + ReadableDuration duration = durationCoder.decode(inStream); return new IntervalWindow(end.minus(duration), end); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java index 79ce2f5..1e9a187 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java @@ -27,7 +27,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.Objects; import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.WindowedContext; @@ -347,7 +346,7 @@ public final class PaneInfo { private PaneInfoCoder() {} @Override - public void encode(PaneInfo value, final OutputStream outStream, Coder.Context context) + public void encode(PaneInfo value, final OutputStream outStream) throws CoderException, IOException { Encoding encoding = chooseEncoding(value); switch (chooseEncoding(value)) { @@ -369,7 +368,7 @@ public final class PaneInfo { } @Override - public PaneInfo decode(final InputStream inStream, Coder.Context context) + public PaneInfo decode(final InputStream inStream) throws CoderException, IOException { byte keyAndTag = (byte) inStream.read(); PaneInfo base = BYTE_TO_PANE_INFO.get((byte) (keyAndTag & 0x0F)); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java index a0896f5..b202065 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java @@ -43,12 +43,23 @@ public class BitSetCoder extends AtomicCoder<BitSet> { } @Override + public void encode(BitSet value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(BitSet value, OutputStream outStream, Context context) throws CoderException, IOException { BYTE_ARRAY_CODER.encodeAndOwn(value.toByteArray(), outStream, context); } @Override + public BitSet decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public BitSet decode(InputStream inStream, Context context) throws CoderException, IOException { return BitSet.valueOf(BYTE_ARRAY_CODER.decode(inStream, context)); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java index 1b7e335..444521a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java @@ -636,26 +636,34 @@ public abstract class WindowedValue<T> { } @Override + public void encode(WindowedValue<T> windowedElem, OutputStream outStream) + throws CoderException, IOException { + encode(windowedElem, outStream, Context.NESTED); + } + + @Override public void encode(WindowedValue<T> windowedElem, OutputStream outStream, Context context) throws CoderException, IOException { - Context nestedContext = context.nested(); - InstantCoder.of().encode( - windowedElem.getTimestamp(), outStream, nestedContext); - windowsCoder.encode(windowedElem.getWindows(), outStream, nestedContext); - PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream, nestedContext); + InstantCoder.of().encode(windowedElem.getTimestamp(), outStream); + windowsCoder.encode(windowedElem.getWindows(), outStream); + PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream); valueCoder.encode(windowedElem.getValue(), outStream, context); } @Override + public WindowedValue<T> decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public WindowedValue<T> decode(InputStream inStream, Context context) throws CoderException, IOException { - Context nestedContext = context.nested(); - Instant timestamp = InstantCoder.of().decode(inStream, nestedContext); + Instant timestamp = InstantCoder.of().decode(inStream); Collection<? extends BoundedWindow> windows = - windowsCoder.decode(inStream, nestedContext); - PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream, nestedContext); + windowsCoder.decode(inStream); + PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream); T value = valueCoder.decode(inStream, context); return WindowedValue.of(value, timestamp, windows, pane); } @@ -710,12 +718,23 @@ public abstract class WindowedValue<T> { } @Override + public void encode(WindowedValue<T> windowedElem, OutputStream outStream) + throws CoderException, IOException { + encode(windowedElem, outStream, Context.NESTED); + } + + @Override public void encode(WindowedValue<T> windowedElem, OutputStream outStream, Context context) throws CoderException, IOException { valueCoder.encode(windowedElem.getValue(), outStream, context); } @Override + public WindowedValue<T> decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public WindowedValue<T> decode(InputStream inStream, Context context) throws CoderException, IOException { T value = valueCoder.decode(inStream, context); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java index c172885..a4c8b3f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java @@ -101,19 +101,18 @@ public class TimestampedValue<V> { @Override public void encode(TimestampedValue<T> windowedElem, - OutputStream outStream, - Context context) + OutputStream outStream) throws IOException { - valueCoder.encode(windowedElem.getValue(), outStream, context.nested()); + valueCoder.encode(windowedElem.getValue(), outStream); InstantCoder.of().encode( - windowedElem.getTimestamp(), outStream, context); + windowedElem.getTimestamp(), outStream); } @Override - public TimestampedValue<T> decode(InputStream inStream, Context context) + public TimestampedValue<T> decode(InputStream inStream) throws IOException { - T value = valueCoder.decode(inStream, context.nested()); - Instant timestamp = InstantCoder.of().decode(inStream, context); + T value = valueCoder.decode(inStream); + Instant timestamp = InstantCoder.of().decode(inStream); return TimestampedValue.of(value, timestamp); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java index 3ecbaa2..24c3c38 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java @@ -75,21 +75,30 @@ public abstract class ValueInSingleWindow<T> { } @Override + public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream) + throws IOException { + encode(windowedElem, outStream, Context.NESTED); + } + + @Override public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream, Context context) throws IOException { - Context nestedContext = context.nested(); - InstantCoder.of().encode(windowedElem.getTimestamp(), outStream, nestedContext); - windowCoder.encode(windowedElem.getWindow(), outStream, nestedContext); - PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream, nestedContext); + InstantCoder.of().encode(windowedElem.getTimestamp(), outStream); + windowCoder.encode(windowedElem.getWindow(), outStream); + PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream); valueCoder.encode(windowedElem.getValue(), outStream, context); } @Override + public ValueInSingleWindow<T> decode(InputStream inStream) throws IOException { + return decode(inStream, Context.NESTED); + } + + @Override public ValueInSingleWindow<T> decode(InputStream inStream, Context context) throws IOException { - Context nestedContext = context.nested(); - Instant timestamp = InstantCoder.of().decode(inStream, nestedContext); - BoundedWindow window = windowCoder.decode(inStream, nestedContext); - PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream, nestedContext); + Instant timestamp = InstantCoder.of().decode(inStream); + BoundedWindow window = windowCoder.decode(inStream); + PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream); T value = valueCoder.decode(inStream, context); return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, pane); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java index 3f057e1..96a5f1d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java @@ -101,17 +101,28 @@ public class ValueWithRecordId<ValueT> { } @Override + public void encode(ValueWithRecordId<ValueT> value, OutputStream outStream) + throws IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(ValueWithRecordId<ValueT> value, OutputStream outStream, Context context) throws IOException { - valueCoder.encode(value.value, outStream, context.nested()); + valueCoder.encode(value.value, outStream); idCoder.encode(value.id, outStream, context); } @Override + public ValueWithRecordId<ValueT> decode(InputStream inStream) throws IOException { + return decode(inStream, Context.NESTED); + } + + @Override public ValueWithRecordId<ValueT> decode(InputStream inStream, Context context) throws IOException { return new ValueWithRecordId<ValueT>( - valueCoder.decode(inStream, context.nested()), + valueCoder.decode(inStream), idCoder.decode(inStream, context)); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java index 7ca7fb9..d1113f7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java @@ -81,12 +81,12 @@ public class CoderRegistryTest { @SuppressWarnings("rawtypes") // this class exists to fail a test because of its rawtypes private class MyListCoder extends AtomicCoder<List> { @Override - public void encode(List value, OutputStream outStream, Context context) + public void encode(List value, OutputStream outStream) throws CoderException, IOException { } @Override - public List decode(InputStream inStream, Context context) + public List decode(InputStream inStream) throws CoderException, IOException { return Collections.emptyList(); } @@ -375,12 +375,12 @@ public class CoderRegistryTest { } @Override - public void encode(MyValue value, OutputStream outStream, Context context) + public void encode(MyValue value, OutputStream outStream) throws CoderException, IOException { } @Override - public MyValue decode(InputStream inStream, Context context) + public MyValue decode(InputStream inStream) throws CoderException, IOException { return new MyValue(); } @@ -438,6 +438,14 @@ public class CoderRegistryTest { private static class AutoRegistrationClassCoder extends CustomCoder<AutoRegistrationClass> { private static final AutoRegistrationClassCoder INSTANCE = new AutoRegistrationClassCoder(); + + @Override + public void encode(AutoRegistrationClass value, OutputStream outStream) {} + + @Override + public AutoRegistrationClass decode(InputStream inStream) { + return null; + } } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java index dfd4ea2..13a7261 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java @@ -48,13 +48,13 @@ public class CustomCoderTest { } @Override - public void encode(KV<String, Long> kv, OutputStream out, Context context) + public void encode(KV<String, Long> kv, OutputStream out) throws IOException { new DataOutputStream(out).writeLong(kv.getValue()); } @Override - public KV<String, Long> decode(InputStream inStream, Context context) + public KV<String, Long> decode(InputStream inStream) throws IOException { return KV.of(key, new DataInputStream(inStream).readLong()); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java index d6d7de8..9fb0b82 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java @@ -167,6 +167,12 @@ public class NullableCoderTest { private static class EntireStreamExpectingCoder extends AtomicCoder<String> { @Override + public void encode(String value, OutputStream outStream) + throws IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode( String value, OutputStream outStream, Context context) throws IOException { checkArgument(context.isWholeStream, "Expected to get entire stream"); @@ -174,6 +180,11 @@ public class NullableCoderTest { } @Override + public String decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public String decode(InputStream inStream, Context context) throws CoderException, IOException { checkArgument(context.isWholeStream, "Expected to get entire stream"); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java index d97eea6..adb6652 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java @@ -182,15 +182,15 @@ public class SerializableCoderTest implements Serializable { // Encode both strings into NESTED form. byte[] nestedEncoding; try (ByteArrayOutputStream os = new ByteArrayOutputStream()) { - coder.encode(source, os, Coder.Context.NESTED); - coder.encode(source2, os, Coder.Context.NESTED); + coder.encode(source, os); + coder.encode(source2, os); nestedEncoding = os.toByteArray(); } // Decode from NESTED form. try (ByteArrayInputStream is = new ByteArrayInputStream(nestedEncoding)) { - assertEquals(source, coder.decode(is, Coder.Context.NESTED)); - assertEquals(source2, coder.decode(is, Coder.Context.NESTED)); + assertEquals(source, coder.decode(is)); + assertEquals(source2, coder.decode(is)); assertEquals(0, is.available()); } } @@ -207,20 +207,20 @@ public class SerializableCoderTest implements Serializable { Coder<String> coder = SerializableCoder.of(String.class); byte[] encodedBytes; try (ByteArrayOutputStream os = new ByteArrayOutputStream()) { - coder.encode(null, os, Coder.Context.NESTED); - coder.encode("TestValue", os, Coder.Context.NESTED); - coder.encode(null, os, Coder.Context.NESTED); - coder.encode("TestValue2", os, Coder.Context.NESTED); - coder.encode(null, os, Coder.Context.NESTED); + coder.encode(null, os); + coder.encode("TestValue", os); + coder.encode(null, os); + coder.encode("TestValue2", os); + coder.encode(null, os); encodedBytes = os.toByteArray(); } try (ByteArrayInputStream is = new ByteArrayInputStream(encodedBytes)) { - assertNull(coder.decode(is, Coder.Context.NESTED)); - assertEquals("TestValue", coder.decode(is, Coder.Context.NESTED)); - assertNull(coder.decode(is, Coder.Context.NESTED)); - assertEquals("TestValue2", coder.decode(is, Coder.Context.NESTED)); - assertNull(coder.decode(is, Coder.Context.NESTED)); + assertNull(coder.decode(is)); + assertEquals("TestValue", coder.decode(is)); + assertNull(coder.decode(is)); + assertEquals("TestValue2", coder.decode(is)); + assertNull(coder.decode(is)); assertEquals(0, is.available()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java index af2c94e..7aa2080 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java @@ -47,7 +47,7 @@ public class StructuredCoderTest { private static final long serialVersionUID = 0L; @Override - public void encode(@Nullable Boolean value, OutputStream outStream, Context context) + public void encode(@Nullable Boolean value, OutputStream outStream) throws CoderException, IOException { if (value == null) { outStream.write(2); @@ -61,7 +61,7 @@ public class StructuredCoderTest { @Override @Nullable public Boolean decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) + InputStream inStream) throws CoderException, IOException { int value = inStream.read(); if (value == 0) { @@ -110,7 +110,7 @@ public class StructuredCoderTest { @Override public void encode( - @Nullable ObjectIdentityBoolean value, OutputStream outStream, Context context) + @Nullable ObjectIdentityBoolean value, OutputStream outStream) throws CoderException, IOException { if (value == null) { outStream.write(2); @@ -124,7 +124,7 @@ public class StructuredCoderTest { @Override @Nullable public ObjectIdentityBoolean decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) + InputStream inStream) throws CoderException, IOException { int value = inStream.read(); if (value == 0) { @@ -213,13 +213,13 @@ public class StructuredCoderTest { private static class Foo<T> extends StructuredCoder<T> { @Override - public void encode(T value, OutputStream outStream, Coder.Context context) + public void encode(T value, OutputStream outStream) throws CoderException, IOException { throw new UnsupportedOperationException(); } @Override - public T decode(InputStream inStream, Coder.Context context) + public T decode(InputStream inStream) throws CoderException, IOException { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java index 164d221..ce78411 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java @@ -50,15 +50,15 @@ public class CoderPropertiesTest { /** A coder that says it is not deterministic but actually is. */ public static class NonDeterministicCoder extends AtomicCoder<String> { @Override - public void encode(String value, OutputStream outStream, Context context) + public void encode(String value, OutputStream outStream) throws CoderException, IOException { - StringUtf8Coder.of().encode(value, outStream, context); + StringUtf8Coder.of().encode(value, outStream); } @Override - public String decode(InputStream inStream, Context context) + public String decode(InputStream inStream) throws CoderException, IOException { - return StringUtf8Coder.of().decode(inStream, context); + return StringUtf8Coder.of().decode(inStream); } public void verifyDeterministic() throws NonDeterministicException { @@ -96,15 +96,15 @@ public class CoderPropertiesTest { } @Override - public void encode(String value, OutputStream outStream, Context context) + public void encode(String value, OutputStream outStream) throws IOException, CoderException { - StringUtf8Coder.of().encode(value + System.nanoTime(), outStream, context); + StringUtf8Coder.of().encode(value + System.nanoTime(), outStream); } @Override - public String decode(InputStream inStream, Context context) + public String decode(InputStream inStream) throws CoderException, IOException { - return StringUtf8Coder.of().decode(inStream, context); + return StringUtf8Coder.of().decode(inStream); } @Override @@ -136,16 +136,16 @@ public class CoderPropertiesTest { } @Override - public void encode(String value, OutputStream outStream, Context context) + public void encode(String value, OutputStream outStream) throws CoderException, IOException { changedState += 1; - StringUtf8Coder.of().encode(value + Strings.repeat("A", changedState), outStream, context); + StringUtf8Coder.of().encode(value + Strings.repeat("A", changedState), outStream); } @Override - public String decode(InputStream inStream, Context context) + public String decode(InputStream inStream) throws CoderException, IOException { - String decodedValue = StringUtf8Coder.of().decode(inStream, context); + String decodedValue = StringUtf8Coder.of().decode(inStream); return decodedValue.substring(0, decodedValue.length() - changedState); } @@ -180,18 +180,18 @@ public class CoderPropertiesTest { } @Override - public void encode(String value, OutputStream outStream, Context context) + public void encode(String value, OutputStream outStream) throws CoderException, IOException { if (lostState == 0) { throw new RuntimeException("I forgot something..."); } - StringUtf8Coder.of().encode(value, outStream, context); + StringUtf8Coder.of().encode(value, outStream); } @Override - public String decode(InputStream inStream, Context context) + public String decode(InputStream inStream) throws CoderException, IOException { - return StringUtf8Coder.of().decode(inStream, context); + return StringUtf8Coder.of().decode(inStream); } @Override @@ -216,12 +216,12 @@ public class CoderPropertiesTest { /** A coder which closes the underlying stream during encoding and decoding. */ public static class ClosingCoder extends AtomicCoder<String> { @Override - public void encode(String value, OutputStream outStream, Context context) throws IOException { + public void encode(String value, OutputStream outStream) throws IOException { outStream.close(); } @Override - public String decode(InputStream inStream, Context context) throws IOException { + public String decode(InputStream inStream) throws IOException { inStream.close(); return null; } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java index 83f348c..37db4ef 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java @@ -98,12 +98,12 @@ public class PAssertTest implements Serializable { } @Override - public void encode(NotSerializableObject value, OutputStream outStream, Context context) + public void encode(NotSerializableObject value, OutputStream outStream) throws CoderException, IOException { } @Override - public NotSerializableObject decode(InputStream inStream, Context context) + public NotSerializableObject decode(InputStream inStream) throws CoderException, IOException { return new NotSerializableObject(); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java index db5ff2e..6b17696 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java @@ -30,7 +30,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.KV; import org.hamcrest.Matchers; @@ -153,11 +152,11 @@ public class SerializableMatchersTest implements Serializable { private static class NotSerializableClassCoder extends AtomicCoder<NotSerializableClass> { @Override - public void encode(NotSerializableClass value, OutputStream outStream, Coder.Context context) { + public void encode(NotSerializableClass value, OutputStream outStream) { } @Override - public NotSerializableClass decode(InputStream inStream, Coder.Context context) { + public NotSerializableClass decode(InputStream inStream) { return new NotSerializableClass(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java index 546683b..3939800 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java @@ -75,14 +75,14 @@ public class WindowSupplierTest { private static class FailingCoder extends AtomicCoder<BoundedWindow> { @Override public void encode( - BoundedWindow value, OutputStream outStream, Context context) + BoundedWindow value, OutputStream outStream) throws CoderException, IOException { throw new CoderException("Test Encode Exception"); } @Override public BoundedWindow decode( - InputStream inStream, Context context) throws CoderException, IOException { + InputStream inStream) throws CoderException, IOException { throw new CoderException("Test Decode Exception"); } } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java index 8a4d563..33c652a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java @@ -336,12 +336,23 @@ public class CombineFnsTest { private static final UserStringCoder INSTANCE = new UserStringCoder(); @Override + public void encode(UserString value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(UserString value, OutputStream outStream, Context context) throws CoderException, IOException { StringUtf8Coder.of().encode(value.strValue, outStream, context); } @Override + public UserString decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public UserString decode(InputStream inStream, Context context) throws CoderException, IOException { return UserString.of(StringUtf8Coder.of().decode(inStream, context)); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index a70af94..dc9788f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -876,17 +876,17 @@ public class CombineTest implements Serializable { */ private class CountSumCoder extends AtomicCoder<CountSum> { @Override - public void encode(CountSum value, OutputStream outStream, - Context context) throws CoderException, IOException { - LONG_CODER.encode(value.count, outStream, context.nested()); - DOUBLE_CODER.encode(value.sum, outStream, context); + public void encode(CountSum value, OutputStream outStream) + throws CoderException, IOException { + LONG_CODER.encode(value.count, outStream); + DOUBLE_CODER.encode(value.sum, outStream); } @Override - public CountSum decode(InputStream inStream, Coder.Context context) + public CountSum decode(InputStream inStream) throws CoderException, IOException { - long count = LONG_CODER.decode(inStream, context.nested()); - double sum = DOUBLE_CODER.decode(inStream, context); + long count = LONG_CODER.decode(inStream); + double sum = DOUBLE_CODER.decode(inStream); return new CountSum(count, sum); } @@ -925,12 +925,23 @@ public class CombineTest implements Serializable { public static Coder<Accumulator> getCoder() { return new AtomicCoder<Accumulator>() { @Override + public void encode(Accumulator accumulator, OutputStream outStream) + throws CoderException, IOException { + encode(accumulator, outStream, Coder.Context.NESTED); + } + + @Override public void encode(Accumulator accumulator, OutputStream outStream, Coder.Context context) throws CoderException, IOException { StringUtf8Coder.of().encode(accumulator.value, outStream, context); } @Override + public Accumulator decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Coder.Context.NESTED); + } + + @Override public Accumulator decode(InputStream inStream, Coder.Context context) throws CoderException, IOException { return new Accumulator(StringUtf8Coder.of().decode(inStream, context)); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java index a458812..a05d31c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java @@ -134,11 +134,11 @@ public class CreateTest { private static class RecordCoder extends AtomicCoder<Record> { @Override - public void encode(Record value, OutputStream outStream, Context context) + public void encode(Record value, OutputStream outStream) throws CoderException, IOException {} @Override - public Record decode(InputStream inStream, Context context) throws CoderException, IOException { + public Record decode(InputStream inStream) throws CoderException, IOException { return null; } } @@ -207,17 +207,16 @@ public class CreateTest { @Override public void encode( UnserializableRecord value, - OutputStream outStream, - org.apache.beam.sdk.coders.Coder.Context context) + OutputStream outStream) throws CoderException, IOException { - stringCoder.encode(value.myString, outStream, context.nested()); + stringCoder.encode(value.myString, outStream); } @Override public UnserializableRecord decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) + InputStream inStream) throws CoderException, IOException { - return new UnserializableRecord(stringCoder.decode(inStream, context.nested())); + return new UnserializableRecord(stringCoder.decode(inStream)); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index aba33eb..0cd885c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -469,13 +469,13 @@ public class GroupByKeyTest { private DeterministicKeyCoder() {} @Override - public void encode(BadEqualityKey value, OutputStream outStream, Context context) + public void encode(BadEqualityKey value, OutputStream outStream) throws IOException { new DataOutputStream(outStream).writeLong(value.key); } @Override - public BadEqualityKey decode(InputStream inStream, Context context) + public BadEqualityKey decode(InputStream inStream) throws IOException { return new BadEqualityKey(new DataInputStream(inStream).readLong()); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index d2cb980..ef27f4c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -986,12 +986,12 @@ public class ParDoTest implements Serializable { } @Override - public void encode(TestDummy value, OutputStream outStream, Context context) + public void encode(TestDummy value, OutputStream outStream) throws CoderException, IOException { } @Override - public TestDummy decode(InputStream inStream, Context context) + public TestDummy decode(InputStream inStream) throws CoderException, IOException { return new TestDummy(); } @@ -1090,15 +1090,15 @@ public class ParDoTest implements Serializable { } @Override - public void encode(MyInteger value, OutputStream outStream, Context context) + public void encode(MyInteger value, OutputStream outStream) throws CoderException, IOException { - delegate.encode(value.getValue(), outStream, context); + delegate.encode(value.getValue(), outStream); } @Override - public MyInteger decode(InputStream inStream, Context context) throws CoderException, + public MyInteger decode(InputStream inStream) throws CoderException, IOException { - return new MyInteger(delegate.decode(inStream, context)); + return new MyInteger(delegate.decode(inStream)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java index 84f3d69..cdd03d9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java @@ -507,12 +507,23 @@ public class ViewTest implements Serializable { private static class NonDeterministicStringCoder extends AtomicCoder<String> { @Override + public void encode(String value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Coder.Context.NESTED); + } + + @Override public void encode(String value, OutputStream outStream, Coder.Context context) throws CoderException, IOException { StringUtf8Coder.of().encode(value, outStream, context); } @Override + public String decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Coder.Context.NESTED); + } + + @Override public String decode(InputStream inStream, Coder.Context context) throws CoderException, IOException { return StringUtf8Coder.of().decode(inStream, context); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 489493a..a8cd35e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -277,10 +277,10 @@ public class DoFnInvokersTest { } @Override - public void encode(SomeRestriction value, OutputStream outStream, Context context) {} + public void encode(SomeRestriction value, OutputStream outStream) {} @Override - public SomeRestriction decode(InputStream inStream, Context context) { + public SomeRestriction decode(InputStream inStream) { return null; } } @@ -400,10 +400,10 @@ public class DoFnInvokersTest { @Override public void encode( - RestrictionWithDefaultTracker value, OutputStream outStream, Context context) {} + RestrictionWithDefaultTracker value, OutputStream outStream) {} @Override - public RestrictionWithDefaultTracker decode(InputStream inStream, Context context) { + public RestrictionWithDefaultTracker decode(InputStream inStream) { return null; } } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java index 314b969..9ae5d68 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java @@ -35,7 +35,7 @@ public class GlobalWindowTest { CountingOutputStream out = new CountingOutputStream(ByteStreams.nullOutputStream()); GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, out, Context.OUTER); assertEquals(0, out.getCount()); - GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, out, Context.NESTED); + GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, out); assertEquals(0, out.getCount()); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java index 36f7028..894d8a9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java @@ -32,7 +32,6 @@ import java.util.Collections; import java.util.List; import java.util.Random; import org.apache.beam.sdk.coders.ByteArrayCoder; -import org.apache.beam.sdk.coders.Coder.Context; import org.hamcrest.collection.IsIterableContainingInOrder; import org.junit.Rule; import org.junit.Test; @@ -180,7 +179,7 @@ public class BufferedElementCountingOutputStreamTest { do { count = VarInt.decodeLong(is); for (int i = 0; i < count; ++i) { - values.add(ByteArrayCoder.of().decode(is, Context.NESTED)); + values.add(ByteArrayCoder.of().decode(is)); } } while(count > 0); @@ -198,7 +197,7 @@ public class BufferedElementCountingOutputStreamTest { for (byte[] value : values) { os.markElementStart(); - ByteArrayCoder.of().encode(value, os, Context.NESTED); + ByteArrayCoder.of().encode(value, os); } return os; } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java index 7230a8b..f36e5e1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java @@ -50,12 +50,12 @@ public class CoderUtilsTest { } @Override - public void encode(Integer value, OutputStream outStream, Context context) { + public void encode(Integer value, OutputStream outStream) { throw new RuntimeException("not expecting to be called"); } @Override - public Integer decode(InputStream inStream, Context context) { + public Integer decode(InputStream inStream) { throw new RuntimeException("not expecting to be called"); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java index 6ba1d4a..9a80730 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java @@ -89,12 +89,12 @@ public class SerializableUtilsTest { private final Object unserializableField = new Object(); @Override - public void encode(Object value, OutputStream outStream, Context context) + public void encode(Object value, OutputStream outStream) throws CoderException, IOException { } @Override - public Object decode(InputStream inStream, Context context) + public Object decode(InputStream inStream) throws CoderException, IOException { return unserializableField; } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java index 325c69d..73c7977 100644 --- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java @@ -49,6 +49,12 @@ public class ByteStringCoder extends AtomicCoder<ByteString> { private ByteStringCoder() {} @Override + public void encode(ByteString value, OutputStream outStream) + throws IOException, CoderException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(ByteString value, OutputStream outStream, Context context) throws IOException, CoderException { if (value == null) { @@ -63,6 +69,11 @@ public class ByteStringCoder extends AtomicCoder<ByteString> { } @Override + public ByteString decode(InputStream inStream) throws IOException { + return decode(inStream, Context.NESTED); + } + + @Override public ByteString decode(InputStream inStream, Context context) throws IOException { if (context.isWholeStream) { return ByteString.readFrom(inStream); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java index 968a2fa..f73bf2b 100644 --- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java @@ -168,6 +168,12 @@ public class ProtoCoder<T extends Message> extends CustomCoder<T> { } @Override + public void encode(T value, OutputStream outStream) + throws IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(T value, OutputStream outStream, Context context) throws IOException { if (value == null) { throw new CoderException("cannot encode a null " + protoMessageClass.getSimpleName()); @@ -180,6 +186,11 @@ public class ProtoCoder<T extends Message> extends CustomCoder<T> { } @Override + public T decode(InputStream inStream) throws IOException { + return decode(inStream, Context.NESTED); + } + + @Override public T decode(InputStream inStream, Context context) throws IOException { if (context.isWholeStream) { return getParser().parseFrom(inStream, getExtensionRegistry()); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java index 18e0d95..7223e87 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java @@ -27,7 +27,6 @@ import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer; import org.apache.beam.fn.v1.BeamFnApi; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; @@ -110,7 +109,7 @@ public class BeamFnDataBufferingOutboundObserver<T> @Override public void accept(WindowedValue<T> t) throws IOException { - coder.encode(t, bufferedElements, Context.NESTED); + coder.encode(t, bufferedElements); counter += 1; if (bufferedElements.size() >= bufferLimit) { outboundObserver.onNext(convertBufferForTransmission().build()); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java index 24365d8..ac603bd 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java @@ -23,7 +23,6 @@ import java.util.function.Consumer; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.v1.BeamFnApi; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.util.WindowedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +70,7 @@ public class BeamFnDataInboundObserver<T> implements Consumer<BeamFnApi.Elements InputStream inputStream = t.getData().newInput(); while (inputStream.available() > 0) { counter += 1; - WindowedValue<T> value = coder.decode(inputStream, Context.NESTED); + WindowedValue<T> value = coder.decode(inputStream); consumer.accept(value); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java index 7cbf8eb..c2b4542 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java @@ -33,7 +33,6 @@ import org.apache.beam.fn.harness.test.TestStreams; import org.apache.beam.fn.v1.BeamFnApi; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.LengthPrefixCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.util.WindowedValue; @@ -135,7 +134,7 @@ public class BeamFnDataBufferingOutboundObserverTest { private static BeamFnApi.Elements messageWithData(byte[] ... datum) throws IOException { ByteString.Output output = ByteString.newOutput(); for (byte[] data : datum) { - CODER.encode(valueInGlobalWindow(data), output, Context.NESTED); + CODER.encode(valueInGlobalWindow(data), output); } return BeamFnApi.Elements.newBuilder() .addData(BeamFnApi.Elements.Data.newBuilder() http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java index c53f99d..54aba8b 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java @@ -34,7 +34,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.apache.beam.fn.v1.BeamFnApi; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; @@ -108,7 +107,7 @@ public class BeamFnDataInboundObserverTest { .setName("Test")); ByteString.Output output = ByteString.newOutput(); for (String value : values) { - CODER.encode(valueInGlobalWindow(value), output, Context.NESTED); + CODER.encode(valueInGlobalWindow(value), output); } builder.setData(output.toByteString()); return builder.build(); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java index 7aefcfa..c2b62b7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java @@ -53,17 +53,18 @@ class ShardedKeyCoder<KeyT> } @Override - public void encode(ShardedKey<KeyT> key, OutputStream outStream, Context context) + public void encode(ShardedKey<KeyT> key, OutputStream outStream) throws IOException { - keyCoder.encode(key.getKey(), outStream, context.nested()); - shardNumberCoder.encode(key.getShardNumber(), outStream, context); + keyCoder.encode(key.getKey(), outStream); + shardNumberCoder.encode(key.getShardNumber(), outStream); } @Override - public ShardedKey<KeyT> decode(InputStream inStream, Context context) + public ShardedKey<KeyT> decode(InputStream inStream) throws IOException { return new ShardedKey<>( - keyCoder.decode(inStream, context.nested()), shardNumberCoder.decode(inStream, context)); + keyCoder.decode(inStream), + shardNumberCoder.decode(inStream)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java index 01bc558..f034a03 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java @@ -38,19 +38,19 @@ public class TableDestinationCoder extends AtomicCoder<TableDestination> { } @Override - public void encode(TableDestination value, OutputStream outStream, Context context) + public void encode(TableDestination value, OutputStream outStream) throws IOException { if (value == null) { throw new CoderException("cannot encode a null value"); } - tableSpecCoder.encode(value.getTableSpec(), outStream, context.nested()); - tableDescriptionCoder.encode(value.getTableDescription(), outStream, context); + tableSpecCoder.encode(value.getTableSpec(), outStream); + tableDescriptionCoder.encode(value.getTableDescription(), outStream); } @Override - public TableDestination decode(InputStream inStream, Context context) throws IOException { - String tableSpec = tableSpecCoder.decode(inStream, context.nested()); - String tableDescription = tableDescriptionCoder.decode(inStream, context); + public TableDestination decode(InputStream inStream) throws IOException { + String tableSpec = tableSpecCoder.decode(inStream); + String tableDescription = tableDescriptionCoder.decode(inStream); return new TableDestination(tableSpec, tableDescription); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java index 2b1988a..c4707da 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java @@ -38,20 +38,31 @@ class TableRowInfoCoder extends AtomicCoder<TableRowInfo> { } @Override + public void encode(TableRowInfo value, OutputStream outStream) + throws IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(TableRowInfo value, OutputStream outStream, Context context) throws IOException { if (value == null) { throw new CoderException("cannot encode a null value"); } - tableRowCoder.encode(value.tableRow, outStream, context.nested()); + tableRowCoder.encode(value.tableRow, outStream); idCoder.encode(value.uniqueId, outStream, context); } @Override + public TableRowInfo decode(InputStream inStream) throws IOException { + return decode(inStream, Context.NESTED); + } + + @Override public TableRowInfo decode(InputStream inStream, Context context) throws IOException { return new TableRowInfo( - tableRowCoder.decode(inStream, context.nested()), + tableRowCoder.decode(inStream), idCoder.decode(inStream, context)); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java index cfec991..e4b6f1f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java @@ -38,6 +38,12 @@ public class TableRowJsonCoder extends AtomicCoder<TableRow> { } @Override + public void encode(TableRow value, OutputStream outStream) + throws IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(TableRow value, OutputStream outStream, Context context) throws IOException { String strValue = MAPPER.writeValueAsString(value); @@ -45,6 +51,11 @@ public class TableRowJsonCoder extends AtomicCoder<TableRow> { } @Override + public TableRow decode(InputStream inStream) throws IOException { + return decode(inStream, Context.NESTED); + } + + @Override public TableRow decode(InputStream inStream, Context context) throws IOException { String strValue = StringUtf8Coder.of().decode(inStream, context); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index 890979b..f014039 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -101,21 +101,21 @@ class WriteBundlesToFiles<DestinationT> } @Override - public void encode(Result<DestinationT> value, OutputStream outStream, Context context) + public void encode(Result<DestinationT> value, OutputStream outStream) throws IOException { if (value == null) { throw new CoderException("cannot encode a null value"); } - stringCoder.encode(value.filename, outStream, context.nested()); - longCoder.encode(value.fileByteSize, outStream, context.nested()); - destinationCoder.encode(value.destination, outStream, context.nested()); + stringCoder.encode(value.filename, outStream); + longCoder.encode(value.fileByteSize, outStream); + destinationCoder.encode(value.destination, outStream); } @Override - public Result<DestinationT> decode(InputStream inStream, Context context) throws IOException { - String filename = stringCoder.decode(inStream, context.nested()); - long fileByteSize = longCoder.decode(inStream, context.nested()); - DestinationT destination = destinationCoder.decode(inStream, context.nested()); + public Result<DestinationT> decode(InputStream inStream) throws IOException { + String filename = stringCoder.decode(inStream); + long fileByteSize = longCoder.decode(inStream); + DestinationT destination = destinationCoder.decode(inStream); return new Result<>(filename, fileByteSize, destination); }
