Repository: beam
Updated Branches:
refs/heads/master 23731fe7a -> 3a09ed575
Remove explicit used of nested contexts.
find . -type f -name '*.java' | xargs sed -i '' 's/\([.]..code[(].*\),
*context.nested..[)]/\1)/'
find . -type f -name '*.java' | xargs sed -i '' 's/\([.]..code[(].*\),
*nestedContext[)]/\1)/'
find . -type f -name '*.java' | xargs sed -i '' 's/\([.]..code[(].*\),
*Context.NESTED[)]/\1)/'
find . -type f -name '*.java' | xargs sed -i '' 's/\([.]..code[(].*\), *[^
]*.Context.NESTED[)]/\1)/'
Added back explicit context in CoGbkResult.java due to compile error.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/27e9a060
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/27e9a060
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/27e9a060
Branch: refs/heads/master
Commit: 27e9a060ed593b2b53b88481591f14b1a274c61b
Parents: 23731fe
Author: Robert Bradshaw <[email protected]>
Authored: Fri May 5 16:20:37 2017 -0700
Committer: Luke Cwik <[email protected]>
Committed: Mon May 8 20:17:54 2017 -0700
----------------------------------------------------------------------
.../UnboundedReadFromBoundedSource.java | 4 +--
.../core/ElementAndRestrictionCoder.java | 4 +--
.../beam/runners/core/KeyedWorkItemCoder.java | 8 +++---
.../beam/runners/core/TimerInternals.java | 12 ++++-----
.../translation/types/CoderTypeSerializer.java | 4 +--
.../streaming/SingletonKeyedWorkItemCoder.java | 4 +--
.../state/FlinkKeyGroupStateInternals.java | 8 +++---
.../runners/dataflow/BatchViewOverrides.java | 4 +--
.../runners/dataflow/internal/IsmFormat.java | 24 ++++++++---------
.../spark/aggregators/NamedAggregators.java | 4 +--
.../apache/beam/sdk/coders/BigDecimalCoder.java | 4 +--
.../beam/sdk/coders/IterableLikeCoder.java | 8 +++---
.../org/apache/beam/sdk/coders/KvCoder.java | 4 +--
.../org/apache/beam/sdk/coders/MapCoder.java | 12 ++++-----
.../org/apache/beam/sdk/io/FileBasedSink.java | 4 +--
.../sdk/transforms/ApproximateQuantiles.java | 20 +++++++-------
.../apache/beam/sdk/transforms/CombineFns.java | 4 +--
.../org/apache/beam/sdk/transforms/Mean.java | 4 +--
.../beam/sdk/transforms/join/CoGbkResult.java | 2 +-
.../transforms/windowing/IntervalWindow.java | 4 +--
.../org/apache/beam/sdk/util/WindowedValue.java | 10 +++----
.../beam/sdk/values/TimestampedValue.java | 4 +--
.../beam/sdk/values/ValueInSingleWindow.java | 12 ++++-----
.../beam/sdk/values/ValueWithRecordId.java | 4 +--
.../beam/sdk/coders/SerializableCoderTest.java | 28 ++++++++++----------
.../apache/beam/sdk/transforms/CombineTest.java | 4 +--
.../apache/beam/sdk/transforms/CreateTest.java | 4 +--
.../transforms/windowing/GlobalWindowTest.java | 2 +-
...BufferedElementCountingOutputStreamTest.java | 5 ++--
.../BeamFnDataBufferingOutboundObserver.java | 2 +-
.../harness/data/BeamFnDataInboundObserver.java | 2 +-
...BeamFnDataBufferingOutboundObserverTest.java | 2 +-
.../data/BeamFnDataInboundObserverTest.java | 2 +-
.../sdk/io/gcp/bigquery/ShardedKeyCoder.java | 11 ++++----
.../io/gcp/bigquery/TableDestinationCoder.java | 10 +++----
.../sdk/io/gcp/bigquery/TableRowInfoCoder.java | 4 +--
.../io/gcp/bigquery/WriteBundlesToFiles.java | 12 ++++-----
.../PubsubMessageWithAttributesCoder.java | 4 +--
.../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 16 +++++------
.../io/gcp/pubsub/PubsubUnboundedSource.java | 2 +-
.../apache/beam/sdk/io/xml/JAXBCoderTest.java | 8 +++---
41 files changed, 145 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index 1424b8b..ae28e3a 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -223,7 +223,7 @@ public class UnboundedReadFromBoundedSource<T> extends
PTransform<PBegin, PColle
@Override
public void encode(Checkpoint<T> value, OutputStream outStream, Context
context)
throws CoderException, IOException {
- elemsCoder.encode(value.residualElements, outStream, context.nested());
+ elemsCoder.encode(value.residualElements, outStream);
sourceCoder.encode(value.residualSource, outStream, context);
}
@@ -232,7 +232,7 @@ public class UnboundedReadFromBoundedSource<T> extends
PTransform<PBegin, PColle
public Checkpoint<T> decode(InputStream inStream, Context context)
throws CoderException, IOException {
return new Checkpoint<>(
- elemsCoder.decode(inStream, context.nested()),
+ elemsCoder.decode(inStream),
sourceCoder.decode(inStream, context));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
----------------------------------------------------------------------
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
index 83c4e62..5ddd865 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
@@ -55,14 +55,14 @@ public class ElementAndRestrictionCoder<ElementT,
RestrictionT>
if (value == null) {
throw new CoderException("cannot encode a null ElementAndRestriction");
}
- elementCoder.encode(value.element(), outStream, context.nested());
+ elementCoder.encode(value.element(), outStream);
restrictionCoder.encode(value.restriction(), outStream, context);
}
@Override
public ElementAndRestriction<ElementT, RestrictionT> decode(InputStream
inStream, Context context)
throws IOException {
- ElementT key = elementCoder.decode(inStream, context.nested());
+ ElementT key = elementCoder.decode(inStream);
RestrictionT value = restrictionCoder.decode(inStream, context);
return ElementAndRestriction.of(key, value);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
index e1872b5..ac8a34c 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
@@ -72,8 +72,8 @@ public class KeyedWorkItemCoder<K, ElemT> extends
StructuredCoder<KeyedWorkItem<
public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream,
Coder.Context context)
throws CoderException, IOException {
Coder.Context nestedContext = context.nested();
- keyCoder.encode(value.key(), outStream, nestedContext);
- timersCoder.encode(value.timersIterable(), outStream, nestedContext);
+ keyCoder.encode(value.key(), outStream);
+ timersCoder.encode(value.timersIterable(), outStream);
elemsCoder.encode(value.elementsIterable(), outStream, context);
}
@@ -81,8 +81,8 @@ public class KeyedWorkItemCoder<K, ElemT> extends
StructuredCoder<KeyedWorkItem<
public KeyedWorkItem<K, ElemT> decode(InputStream inStream, Coder.Context
context)
throws CoderException, IOException {
Coder.Context nestedContext = context.nested();
- K key = keyCoder.decode(inStream, nestedContext);
- Iterable<TimerData> timers = timersCoder.decode(inStream, nestedContext);
+ K key = keyCoder.decode(inStream);
+ Iterable<TimerData> timers = timersCoder.decode(inStream);
Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream,
context);
return KeyedWorkItems.workItem(key, timers, elems);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
----------------------------------------------------------------------
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
index 888c11f..3607fdd 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
@@ -241,9 +241,9 @@ public interface TimerInternals {
public void encode(TimerData timer, OutputStream outStream, Context
context)
throws CoderException, IOException {
Context nestedContext = context.nested();
- STRING_CODER.encode(timer.getTimerId(), outStream, nestedContext);
- STRING_CODER.encode(timer.getNamespace().stringKey(), outStream,
nestedContext);
- INSTANT_CODER.encode(timer.getTimestamp(), outStream, nestedContext);
+ STRING_CODER.encode(timer.getTimerId(), outStream);
+ STRING_CODER.encode(timer.getNamespace().stringKey(), outStream);
+ INSTANT_CODER.encode(timer.getTimestamp(), outStream);
STRING_CODER.encode(timer.getDomain().name(), outStream, context);
}
@@ -251,10 +251,10 @@ public interface TimerInternals {
public TimerData decode(InputStream inStream, Context context)
throws CoderException, IOException {
Context nestedContext = context.nested();
- String timerId = STRING_CODER.decode(inStream, nestedContext);
+ String timerId = STRING_CODER.decode(inStream);
StateNamespace namespace =
- StateNamespaces.fromString(STRING_CODER.decode(inStream,
nestedContext), windowCoder);
- Instant timestamp = INSTANT_CODER.decode(inStream, nestedContext);
+ StateNamespaces.fromString(STRING_CODER.decode(inStream),
windowCoder);
+ Instant timestamp = INSTANT_CODER.decode(inStream);
TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream,
context));
return TimerData.of(timerId, namespace, timestamp, domain);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
----------------------------------------------------------------------
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
index e210ed9..e003119 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
@@ -77,14 +77,14 @@ public class CoderTypeSerializer<T> extends
TypeSerializer<T> {
@Override
public void serialize(T t, DataOutputView dataOutputView) throws IOException
{
DataOutputViewWrapper outputWrapper = new
DataOutputViewWrapper(dataOutputView);
- coder.encode(t, outputWrapper, Coder.Context.NESTED);
+ coder.encode(t, outputWrapper);
}
@Override
public T deserialize(DataInputView dataInputView) throws IOException {
try {
DataInputViewWrapper inputWrapper = new
DataInputViewWrapper(dataInputView);
- return coder.decode(inputWrapper, Coder.Context.NESTED);
+ return coder.decode(inputWrapper);
} catch (CoderException e) {
Throwable cause = e.getCause();
if (cause instanceof EOFException) {
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
index f218693..d7bae7e 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
@@ -70,14 +70,14 @@ public class SingletonKeyedWorkItemCoder<K, ElemT>
OutputStream outStream,
Context context)
throws CoderException, IOException {
- keyCoder.encode(value.key(), outStream, context.nested());
+ keyCoder.encode(value.key(), outStream);
valueCoder.encode(value.value, outStream, context);
}
@Override
public SingletonKeyedWorkItem<K, ElemT> decode(InputStream inStream, Context
context)
throws CoderException, IOException {
- K key = keyCoder.decode(inStream, context.nested());
+ K key = keyCoder.decode(inStream);
WindowedValue<ElemT> value = valueCoder.decode(inStream, context);
return new SingletonKeyedWorkItem<>(key, value);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
----------------------------------------------------------------------
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
index d6af4f9..8d437d5 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
@@ -430,8 +430,8 @@ public class FlinkKeyGroupStateInternals<K> implements
StateInternals {
Map<String, ?> map = entry.getValue().f1;
out.writeInt(map.size());
for (Map.Entry<String, ?> entry1 : map.entrySet()) {
- StringUtf8Coder.of().encode(entry1.getKey(), out, Context.NESTED);
- coder.encode(entry1.getValue(), out, Context.NESTED);
+ StringUtf8Coder.of().encode(entry1.getKey(), out);
+ coder.encode(entry1.getValue(), out);
}
}
}
@@ -463,8 +463,8 @@ public class FlinkKeyGroupStateInternals<K> implements
StateInternals {
Map<String, Object> map = (Map<String, Object>) tuple2.f1;
int mapSize = in.readInt();
for (int j = 0; j < mapSize; j++) {
- String namespace = StringUtf8Coder.of().decode(in, Context.NESTED);
- Object value = coder.decode(in, Context.NESTED);
+ String namespace = StringUtf8Coder.of().decode(in);
+ Object value = coder.decode(in);
map.put(namespace, value);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index ecd0365..0e60fa0 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -1353,7 +1353,7 @@ class BatchViewOverrides {
@Override
public void encode(TransformedMap<K, V1, V2> value, OutputStream outStream,
Coder.Context context) throws CoderException, IOException {
- transformCoder.encode(value.transform, outStream, context.nested());
+ transformCoder.encode(value.transform, outStream);
originalMapCoder.encode(value.originalMap, outStream, context);
}
@@ -1361,7 +1361,7 @@ class BatchViewOverrides {
public TransformedMap<K, V1, V2> decode(
InputStream inStream, Coder.Context context) throws CoderException,
IOException {
return new TransformedMap<>(
- transformCoder.decode(inStream, context.nested()),
+ transformCoder.decode(inStream),
originalMapCoder.decode(inStream, context));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
----------------------------------------------------------------------
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
index 00e0c54..0f0cd4d 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
@@ -239,12 +239,12 @@ public class IsmFormat {
keyComponentCoders.size(), value.getKeyComponents()));
}
for (int i = 0; i < keyComponentCoders.size(); ++i) {
- getKeyComponentCoder(i).encode(value.getKeyComponent(i), outStream,
context.nested());
+ getKeyComponentCoder(i).encode(value.getKeyComponent(i), outStream);
}
if (isMetadataKey(value.getKeyComponents())) {
- ByteArrayCoder.of().encode(value.getMetadata(), outStream,
context.nested());
+ ByteArrayCoder.of().encode(value.getMetadata(), outStream);
} else {
- valueCoder.encode(value.getValue(), outStream, context.nested());
+ valueCoder.encode(value.getValue(), outStream);
}
}
@@ -253,13 +253,13 @@ public class IsmFormat {
throws CoderException, IOException {
List<Object> keyComponents = new ArrayList<>(keyComponentCoders.size());
for (Coder<?> keyCoder : keyComponentCoders) {
- keyComponents.add(keyCoder.decode(inStream, context.nested()));
+ keyComponents.add(keyCoder.decode(inStream));
}
if (isMetadataKey(keyComponents)) {
return IsmRecord.<V>meta(
- keyComponents, ByteArrayCoder.of().decode(inStream,
context.nested()));
+ keyComponents, ByteArrayCoder.of().decode(inStream));
} else {
- return IsmRecord.<V>of(keyComponents, valueCoder.decode(inStream,
context.nested()));
+ return IsmRecord.<V>of(keyComponents, valueCoder.decode(inStream));
}
}
@@ -499,7 +499,7 @@ public class IsmFormat {
outStream.write(0);
} else {
outStream.write(1);
- keyCoder.encode(value, outStream, context.nested());
+ keyCoder.encode(value, outStream);
}
}
@@ -510,7 +510,7 @@ public class IsmFormat {
if (marker == 0) {
return (K) getMetadataKey();
} else if (marker == 1) {
- return keyCoder.decode(inStream, context.nested());
+ return keyCoder.decode(inStream);
} else {
throw new CoderException(String.format("Expected marker but got %s.",
marker));
}
@@ -626,8 +626,8 @@ public class IsmFormat {
checkState(value.getIndexOffset() >= 0,
"%s attempting to be written without index offset.",
value);
- VarIntCoder.of().encode(value.getId(), outStream, context.nested());
- VarLongCoder.of().encode(value.getBlockOffset(), outStream,
context.nested());
+ VarIntCoder.of().encode(value.getId(), outStream);
+ VarLongCoder.of().encode(value.getBlockOffset(), outStream);
VarLongCoder.of().encode(value.getIndexOffset(), outStream, context);
}
@@ -635,8 +635,8 @@ public class IsmFormat {
public IsmShard decode(
InputStream inStream, Coder.Context context) throws CoderException,
IOException {
return IsmShard.of(
- VarIntCoder.of().decode(inStream, context.nested()),
- VarLongCoder.of().decode(inStream, context.nested()),
+ VarIntCoder.of().decode(inStream),
+ VarLongCoder.of().decode(inStream),
VarLongCoder.of().decode(inStream, context));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
----------------------------------------------------------------------
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
index c836ca5..27f2ec8 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
@@ -207,7 +207,7 @@ public class NamedAggregators implements Serializable {
oos.writeObject(inCoder);
try {
combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder)
- .encode(state, oos, Coder.Context.NESTED);
+ .encode(state, oos);
} catch (CannotProvideCoderException e) {
throw new IllegalStateException("Could not determine coder for
accumulator", e);
}
@@ -220,7 +220,7 @@ public class NamedAggregators implements Serializable {
inCoder = (Coder<InputT>) ois.readObject();
try {
state = combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder)
- .decode(ois, Coder.Context.NESTED);
+ .decode(ois);
} catch (CannotProvideCoderException e) {
throw new IllegalStateException("Could not determine coder for
accumulator", e);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
index 97559a9..e2166cf 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
@@ -51,14 +51,14 @@ public class BigDecimalCoder extends
AtomicCoder<BigDecimal> {
public void encode(BigDecimal value, OutputStream outStream, Context context)
throws IOException, CoderException {
checkNotNull(value, String.format("cannot encode a null %s",
BigDecimal.class.getSimpleName()));
- VAR_INT_CODER.encode(value.scale(), outStream, context.nested());
+ VAR_INT_CODER.encode(value.scale(), outStream);
BIG_INT_CODER.encode(value.unscaledValue(), outStream, context);
}
@Override
public BigDecimal decode(InputStream inStream, Context context)
throws IOException, CoderException {
- int scale = VAR_INT_CODER.decode(inStream, context.nested());
+ int scale = VAR_INT_CODER.decode(inStream);
BigInteger bigInteger = BIG_INT_CODER.decode(inStream, context);
return new BigDecimal(bigInteger, scale);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
index 9994b3f..59d5424 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
@@ -97,7 +97,7 @@ public abstract class IterableLikeCoder<T, IterableT extends
Iterable<T>>
Collection<T> collection = (Collection<T>) iterable;
dataOutStream.writeInt(collection.size());
for (T elem : collection) {
- elementCoder.encode(elem, dataOutStream, nestedContext);
+ elementCoder.encode(elem, dataOutStream);
}
} else {
// We don't know the size without traversing it so use a fixed size
buffer
@@ -108,7 +108,7 @@ public abstract class IterableLikeCoder<T, IterableT
extends Iterable<T>>
new BufferedElementCountingOutputStream(dataOutStream);
for (T elem : iterable) {
countingOutputStream.markElementStart();
- elementCoder.encode(elem, countingOutputStream, nestedContext);
+ elementCoder.encode(elem, countingOutputStream);
}
countingOutputStream.finish();
}
@@ -125,7 +125,7 @@ public abstract class IterableLikeCoder<T, IterableT
extends Iterable<T>>
if (size >= 0) {
List<T> elements = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
- elements.add(elementCoder.decode(dataInStream, nestedContext));
+ elements.add(elementCoder.decode(dataInStream));
}
return decodeToIterable(elements);
}
@@ -134,7 +134,7 @@ public abstract class IterableLikeCoder<T, IterableT
extends Iterable<T>>
// each block of elements.
long count = VarInt.decodeLong(dataInStream);
while (count > 0L) {
- elements.add(elementCoder.decode(dataInStream, nestedContext));
+ elements.add(elementCoder.decode(dataInStream));
--count;
if (count == 0L) {
count = VarInt.decodeLong(dataInStream);
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index 1df4460..0bb53ec 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -63,14 +63,14 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K,
V>> {
if (kv == null) {
throw new CoderException("cannot encode a null KV");
}
- keyCoder.encode(kv.getKey(), outStream, context.nested());
+ keyCoder.encode(kv.getKey(), outStream);
valueCoder.encode(kv.getValue(), outStream, context);
}
@Override
public KV<K, V> decode(InputStream inStream, Context context)
throws IOException, CoderException {
- K key = keyCoder.decode(inStream, context.nested());
+ K key = keyCoder.decode(inStream);
V value = valueCoder.decode(inStream, context);
return KV.of(key, value);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
index 7df9ca9..f20eb93 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
@@ -89,12 +89,12 @@ public class MapCoder<K, V> extends StructuredCoder<Map<K,
V>> {
Iterator<Entry<K, V>> iterator = map.entrySet().iterator();
Entry<K, V> entry = iterator.next();
while (iterator.hasNext()) {
- keyCoder.encode(entry.getKey(), outStream, context.nested());
- valueCoder.encode(entry.getValue(), outStream, context.nested());
+ keyCoder.encode(entry.getKey(), outStream);
+ valueCoder.encode(entry.getValue(), outStream);
entry = iterator.next();
}
- keyCoder.encode(entry.getKey(), outStream, context.nested());
+ keyCoder.encode(entry.getKey(), outStream);
valueCoder.encode(entry.getValue(), outStream, context);
// no flush needed as DataOutputStream does not buffer
}
@@ -110,12 +110,12 @@ public class MapCoder<K, V> extends
StructuredCoder<Map<K, V>> {
Map<K, V> retval = Maps.newHashMapWithExpectedSize(size);
for (int i = 0; i < size - 1; ++i) {
- K key = keyCoder.decode(inStream, context.nested());
- V value = valueCoder.decode(inStream, context.nested());
+ K key = keyCoder.decode(inStream);
+ V value = valueCoder.decode(inStream);
retval.put(key, value);
}
- K key = keyCoder.decode(inStream, context.nested());
+ K key = keyCoder.decode(inStream);
V value = valueCoder.decode(inStream, context);
retval.put(key, value);
return retval;
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 20fab9b..d8a98cd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -952,7 +952,7 @@ public abstract class FileBasedSink<T> implements
Serializable, HasDisplayData {
if (value == null) {
throw new CoderException("cannot encode a null value");
}
- stringCoder.encode(value.getFilename().toString(), outStream,
context.nested());
+ stringCoder.encode(value.getFilename().toString(), outStream);
if (value.getDestinationFilename() == null) {
stringCoder.encode(null, outStream, context);
} else {
@@ -963,7 +963,7 @@ public abstract class FileBasedSink<T> implements
Serializable, HasDisplayData {
@Override
public FileResult decode(InputStream inStream, Context context)
throws IOException {
- String filename = stringCoder.decode(inStream, context.nested());
+ String filename = stringCoder.decode(inStream);
assert filename != null; // fixes a compiler warning
@Nullable String destinationFilename = stringCoder.decode(inStream,
context);
return new FileResult(
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
index 37d5a55..348cc5f 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
@@ -682,10 +682,10 @@ public class ApproximateQuantiles {
QuantileState<T, ComparatorT> state, OutputStream outStream,
Coder.Context context)
throws CoderException, IOException {
Coder.Context nestedContext = context.nested();
- intCoder.encode(state.numQuantiles, outStream, nestedContext);
- intCoder.encode(state.bufferSize, outStream, nestedContext);
- elementCoder.encode(state.min, outStream, nestedContext);
- elementCoder.encode(state.max, outStream, nestedContext);
+ intCoder.encode(state.numQuantiles, outStream);
+ intCoder.encode(state.bufferSize, outStream);
+ elementCoder.encode(state.min, outStream);
+ elementCoder.encode(state.max, outStream);
elementListCoder.encode(
state.unbufferedElements, outStream, nestedContext);
BigEndianIntegerCoder.of().encode(
@@ -699,14 +699,14 @@ public class ApproximateQuantiles {
public QuantileState<T, ComparatorT> decode(InputStream inStream,
Coder.Context context)
throws CoderException, IOException {
Coder.Context nestedContext = context.nested();
- int numQuantiles = intCoder.decode(inStream, nestedContext);
- int bufferSize = intCoder.decode(inStream, nestedContext);
- T min = elementCoder.decode(inStream, nestedContext);
- T max = elementCoder.decode(inStream, nestedContext);
+ int numQuantiles = intCoder.decode(inStream);
+ int bufferSize = intCoder.decode(inStream);
+ T min = elementCoder.decode(inStream);
+ T max = elementCoder.decode(inStream);
List<T> unbufferedElements =
- elementListCoder.decode(inStream, nestedContext);
+ elementListCoder.decode(inStream);
int numBuffers =
- BigEndianIntegerCoder.of().decode(inStream, nestedContext);
+ BigEndianIntegerCoder.of().decode(inStream);
List<QuantileBuffer<T>> buffers = new ArrayList<>(numBuffers);
for (int i = 0; i < numBuffers; i++) {
buffers.add(decodeBuffer(inStream, nestedContext));
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index 0515ed5..c45df04 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -543,7 +543,7 @@ public class CombineFns {
int lastIndex = codersCount - 1;
Context nestedContext = context.nested();
for (int i = 0; i < lastIndex; ++i) {
- coders.get(i).encode(value[i], outStream, nestedContext);
+ coders.get(i).encode(value[i], outStream);
}
coders.get(lastIndex).encode(value[lastIndex], outStream, context);
}
@@ -558,7 +558,7 @@ public class CombineFns {
int lastIndex = codersCount - 1;
Context nestedContext = context.nested();
for (int i = 0; i < lastIndex; ++i) {
- ret[i] = coders.get(i).decode(inStream, nestedContext);
+ ret[i] = coders.get(i).decode(inStream);
}
ret[lastIndex] = coders.get(lastIndex).decode(inStream, context);
return ret;
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index a309954..a46a21f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -187,7 +187,7 @@ public class Mean {
@Override
public void encode(CountSum<NumT> value, OutputStream outStream,
Coder.Context context)
throws CoderException, IOException {
- LONG_CODER.encode(value.count, outStream, context.nested());
+ LONG_CODER.encode(value.count, outStream);
DOUBLE_CODER.encode(value.sum, outStream, context);
}
@@ -195,7 +195,7 @@ public class Mean {
public CountSum<NumT> decode(InputStream inStream, Coder.Context context)
throws CoderException, IOException {
return new CountSum<>(
- LONG_CODER.decode(inStream, context.nested()),
+ LONG_CODER.decode(inStream),
DOUBLE_CODER.decode(inStream, context));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index e9a3571..bd669ef 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -253,7 +253,7 @@ public class CoGbkResult {
}
int lastIndex = schema.size() - 1;
for (int unionTag = 0; unionTag < lastIndex; unionTag++) {
- tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream,
context.nested());
+ tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream);
}
tagListCoder(lastIndex).encode(value.valueMap.get(lastIndex), outStream,
context);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
index 46ece09..cb5a7cf 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
@@ -181,14 +181,14 @@ public class IntervalWindow extends BoundedWindow
@Override
public void encode(IntervalWindow window, OutputStream outStream, Context
context)
throws IOException, CoderException {
- instantCoder.encode(window.end, outStream, context.nested());
+ instantCoder.encode(window.end, outStream);
durationCoder.encode(new Duration(window.start, window.end), outStream,
context);
}
@Override
public IntervalWindow decode(InputStream inStream, Context context)
throws IOException, CoderException {
- Instant end = instantCoder.decode(inStream, context.nested());
+ Instant end = instantCoder.decode(inStream);
ReadableDuration duration = durationCoder.decode(inStream, context);
return new IntervalWindow(end.minus(duration), end);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index 1b7e335..e3e61cf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -643,8 +643,8 @@ public abstract class WindowedValue<T> {
Context nestedContext = context.nested();
InstantCoder.of().encode(
windowedElem.getTimestamp(), outStream, nestedContext);
- windowsCoder.encode(windowedElem.getWindows(), outStream, nestedContext);
- PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream,
nestedContext);
+ windowsCoder.encode(windowedElem.getWindows(), outStream);
+ PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream);
valueCoder.encode(windowedElem.getValue(), outStream, context);
}
@@ -652,10 +652,10 @@ public abstract class WindowedValue<T> {
public WindowedValue<T> decode(InputStream inStream, Context context)
throws CoderException, IOException {
Context nestedContext = context.nested();
- Instant timestamp = InstantCoder.of().decode(inStream, nestedContext);
+ Instant timestamp = InstantCoder.of().decode(inStream);
Collection<? extends BoundedWindow> windows =
- windowsCoder.decode(inStream, nestedContext);
- PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream, nestedContext);
+ windowsCoder.decode(inStream);
+ PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream);
T value = valueCoder.decode(inStream, context);
return WindowedValue.of(value, timestamp, windows, pane);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
index c172885..89747a7 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
@@ -104,7 +104,7 @@ public class TimestampedValue<V> {
OutputStream outStream,
Context context)
throws IOException {
- valueCoder.encode(windowedElem.getValue(), outStream, context.nested());
+ valueCoder.encode(windowedElem.getValue(), outStream);
InstantCoder.of().encode(
windowedElem.getTimestamp(), outStream, context);
}
@@ -112,7 +112,7 @@ public class TimestampedValue<V> {
@Override
public TimestampedValue<T> decode(InputStream inStream, Context context)
throws IOException {
- T value = valueCoder.decode(inStream, context.nested());
+ T value = valueCoder.decode(inStream);
Instant timestamp = InstantCoder.of().decode(inStream, context);
return TimestampedValue.of(value, timestamp);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
index 3ecbaa2..e8a2dfd 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
@@ -78,18 +78,18 @@ public abstract class ValueInSingleWindow<T> {
public void encode(ValueInSingleWindow<T> windowedElem, OutputStream
outStream, Context context)
throws IOException {
Context nestedContext = context.nested();
- InstantCoder.of().encode(windowedElem.getTimestamp(), outStream,
nestedContext);
- windowCoder.encode(windowedElem.getWindow(), outStream, nestedContext);
- PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(),
outStream, nestedContext);
+ InstantCoder.of().encode(windowedElem.getTimestamp(), outStream);
+ windowCoder.encode(windowedElem.getWindow(), outStream);
+ PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(),
outStream);
valueCoder.encode(windowedElem.getValue(), outStream, context);
}
@Override
public ValueInSingleWindow<T> decode(InputStream inStream, Context
context) throws IOException {
Context nestedContext = context.nested();
- Instant timestamp = InstantCoder.of().decode(inStream, nestedContext);
- BoundedWindow window = windowCoder.decode(inStream, nestedContext);
- PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream,
nestedContext);
+ Instant timestamp = InstantCoder.of().decode(inStream);
+ BoundedWindow window = windowCoder.decode(inStream);
+ PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream);
T value = valueCoder.decode(inStream, context);
return new AutoValue_ValueInSingleWindow<>(value, timestamp, window,
pane);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
index 3f057e1..f06317b 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
@@ -103,7 +103,7 @@ public class ValueWithRecordId<ValueT> {
@Override
public void encode(ValueWithRecordId<ValueT> value, OutputStream
outStream, Context context)
throws IOException {
- valueCoder.encode(value.value, outStream, context.nested());
+ valueCoder.encode(value.value, outStream);
idCoder.encode(value.id, outStream, context);
}
@@ -111,7 +111,7 @@ public class ValueWithRecordId<ValueT> {
public ValueWithRecordId<ValueT> decode(InputStream inStream, Context
context)
throws IOException {
return new ValueWithRecordId<ValueT>(
- valueCoder.decode(inStream, context.nested()),
+ valueCoder.decode(inStream),
idCoder.decode(inStream, context));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
index d97eea6..adb6652 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
@@ -182,15 +182,15 @@ public class SerializableCoderTest implements
Serializable {
// Encode both strings into NESTED form.
byte[] nestedEncoding;
try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
- coder.encode(source, os, Coder.Context.NESTED);
- coder.encode(source2, os, Coder.Context.NESTED);
+ coder.encode(source, os);
+ coder.encode(source2, os);
nestedEncoding = os.toByteArray();
}
// Decode from NESTED form.
try (ByteArrayInputStream is = new ByteArrayInputStream(nestedEncoding)) {
- assertEquals(source, coder.decode(is, Coder.Context.NESTED));
- assertEquals(source2, coder.decode(is, Coder.Context.NESTED));
+ assertEquals(source, coder.decode(is));
+ assertEquals(source2, coder.decode(is));
assertEquals(0, is.available());
}
}
@@ -207,20 +207,20 @@ public class SerializableCoderTest implements
Serializable {
Coder<String> coder = SerializableCoder.of(String.class);
byte[] encodedBytes;
try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
- coder.encode(null, os, Coder.Context.NESTED);
- coder.encode("TestValue", os, Coder.Context.NESTED);
- coder.encode(null, os, Coder.Context.NESTED);
- coder.encode("TestValue2", os, Coder.Context.NESTED);
- coder.encode(null, os, Coder.Context.NESTED);
+ coder.encode(null, os);
+ coder.encode("TestValue", os);
+ coder.encode(null, os);
+ coder.encode("TestValue2", os);
+ coder.encode(null, os);
encodedBytes = os.toByteArray();
}
try (ByteArrayInputStream is = new ByteArrayInputStream(encodedBytes)) {
- assertNull(coder.decode(is, Coder.Context.NESTED));
- assertEquals("TestValue", coder.decode(is, Coder.Context.NESTED));
- assertNull(coder.decode(is, Coder.Context.NESTED));
- assertEquals("TestValue2", coder.decode(is, Coder.Context.NESTED));
- assertNull(coder.decode(is, Coder.Context.NESTED));
+ assertNull(coder.decode(is));
+ assertEquals("TestValue", coder.decode(is));
+ assertNull(coder.decode(is));
+ assertEquals("TestValue2", coder.decode(is));
+ assertNull(coder.decode(is));
assertEquals(0, is.available());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index a70af94..e4b016b 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -878,14 +878,14 @@ public class CombineTest implements Serializable {
@Override
public void encode(CountSum value, OutputStream outStream,
Context context) throws CoderException, IOException {
- LONG_CODER.encode(value.count, outStream, context.nested());
+ LONG_CODER.encode(value.count, outStream);
DOUBLE_CODER.encode(value.sum, outStream, context);
}
@Override
public CountSum decode(InputStream inStream, Coder.Context context)
throws CoderException, IOException {
- long count = LONG_CODER.decode(inStream, context.nested());
+ long count = LONG_CODER.decode(inStream);
double sum = DOUBLE_CODER.decode(inStream, context);
return new CountSum(count, sum);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index a458812..7e8a1dd 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -210,14 +210,14 @@ public class CreateTest {
OutputStream outStream,
org.apache.beam.sdk.coders.Coder.Context context)
throws CoderException, IOException {
- stringCoder.encode(value.myString, outStream, context.nested());
+ stringCoder.encode(value.myString, outStream);
}
@Override
public UnserializableRecord decode(
InputStream inStream, org.apache.beam.sdk.coders.Coder.Context
context)
throws CoderException, IOException {
- return new UnserializableRecord(stringCoder.decode(inStream,
context.nested()));
+ return new UnserializableRecord(stringCoder.decode(inStream));
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java
index 314b969..9ae5d68 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java
@@ -35,7 +35,7 @@ public class GlobalWindowTest {
CountingOutputStream out = new
CountingOutputStream(ByteStreams.nullOutputStream());
GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, out,
Context.OUTER);
assertEquals(0, out.getCount());
- GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, out,
Context.NESTED);
+ GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, out);
assertEquals(0, out.getCount());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java
index 36f7028..894d8a9 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java
@@ -32,7 +32,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.coders.Coder.Context;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.junit.Rule;
import org.junit.Test;
@@ -180,7 +179,7 @@ public class BufferedElementCountingOutputStreamTest {
do {
count = VarInt.decodeLong(is);
for (int i = 0; i < count; ++i) {
- values.add(ByteArrayCoder.of().decode(is, Context.NESTED));
+ values.add(ByteArrayCoder.of().decode(is));
}
} while(count > 0);
@@ -198,7 +197,7 @@ public class BufferedElementCountingOutputStreamTest {
for (byte[] value : values) {
os.markElementStart();
- ByteArrayCoder.of().encode(value, os, Context.NESTED);
+ ByteArrayCoder.of().encode(value, os);
}
return os;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
----------------------------------------------------------------------
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
index 18e0d95..37745be 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
@@ -110,7 +110,7 @@ public class BeamFnDataBufferingOutboundObserver<T>
@Override
public void accept(WindowedValue<T> t) throws IOException {
- coder.encode(t, bufferedElements, Context.NESTED);
+ coder.encode(t, bufferedElements);
counter += 1;
if (bufferedElements.size() >= bufferLimit) {
outboundObserver.onNext(convertBufferForTransmission().build());
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java
----------------------------------------------------------------------
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java
index 24365d8..ece87d2 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java
@@ -71,7 +71,7 @@ public class BeamFnDataInboundObserver<T> implements
Consumer<BeamFnApi.Elements
InputStream inputStream = t.getData().newInput();
while (inputStream.available() > 0) {
counter += 1;
- WindowedValue<T> value = coder.decode(inputStream, Context.NESTED);
+ WindowedValue<T> value = coder.decode(inputStream);
consumer.accept(value);
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
----------------------------------------------------------------------
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
index 7cbf8eb..3f6ece7 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
@@ -135,7 +135,7 @@ public class BeamFnDataBufferingOutboundObserverTest {
private static BeamFnApi.Elements messageWithData(byte[] ... datum) throws
IOException {
ByteString.Output output = ByteString.newOutput();
for (byte[] data : datum) {
- CODER.encode(valueInGlobalWindow(data), output, Context.NESTED);
+ CODER.encode(valueInGlobalWindow(data), output);
}
return BeamFnApi.Elements.newBuilder()
.addData(BeamFnApi.Elements.Data.newBuilder()
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java
----------------------------------------------------------------------
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java
index c53f99d..4b0bf0c 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java
@@ -108,7 +108,7 @@ public class BeamFnDataInboundObserverTest {
.setName("Test"));
ByteString.Output output = ByteString.newOutput();
for (String value : values) {
- CODER.encode(valueInGlobalWindow(value), output, Context.NESTED);
+ CODER.encode(valueInGlobalWindow(value), output);
}
builder.setData(output.toByteString());
return builder.build();
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
----------------------------------------------------------------------
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
index 7aefcfa..c2b62b7 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
@@ -53,17 +53,18 @@ class ShardedKeyCoder<KeyT>
}
@Override
- public void encode(ShardedKey<KeyT> key, OutputStream outStream, Context
context)
+ public void encode(ShardedKey<KeyT> key, OutputStream outStream)
throws IOException {
- keyCoder.encode(key.getKey(), outStream, context.nested());
- shardNumberCoder.encode(key.getShardNumber(), outStream, context);
+ keyCoder.encode(key.getKey(), outStream);
+ shardNumberCoder.encode(key.getShardNumber(), outStream);
}
@Override
- public ShardedKey<KeyT> decode(InputStream inStream, Context context)
+ public ShardedKey<KeyT> decode(InputStream inStream)
throws IOException {
return new ShardedKey<>(
- keyCoder.decode(inStream, context.nested()),
shardNumberCoder.decode(inStream, context));
+ keyCoder.decode(inStream),
+ shardNumberCoder.decode(inStream));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
----------------------------------------------------------------------
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
index 01bc558..33b9f77 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
@@ -43,14 +43,14 @@ public class TableDestinationCoder extends
AtomicCoder<TableDestination> {
if (value == null) {
throw new CoderException("cannot encode a null value");
}
- tableSpecCoder.encode(value.getTableSpec(), outStream, context.nested());
- tableDescriptionCoder.encode(value.getTableDescription(), outStream,
context);
+ tableSpecCoder.encode(value.getTableSpec(), outStream);
+ tableDescriptionCoder.encode(value.getTableDescription(), outStream);
}
@Override
- public TableDestination decode(InputStream inStream, Context context) throws
IOException {
- String tableSpec = tableSpecCoder.decode(inStream, context.nested());
- String tableDescription = tableDescriptionCoder.decode(inStream, context);
+ public TableDestination decode(InputStream inStream) throws IOException {
+ String tableSpec = tableSpecCoder.decode(inStream);
+ String tableDescription = tableDescriptionCoder.decode(inStream);
return new TableDestination(tableSpec, tableDescription);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
----------------------------------------------------------------------
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
index 2b1988a..8ae75c5 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
@@ -43,7 +43,7 @@ class TableRowInfoCoder extends AtomicCoder<TableRowInfo> {
if (value == null) {
throw new CoderException("cannot encode a null value");
}
- tableRowCoder.encode(value.tableRow, outStream, context.nested());
+ tableRowCoder.encode(value.tableRow, outStream);
idCoder.encode(value.uniqueId, outStream, context);
}
@@ -51,7 +51,7 @@ class TableRowInfoCoder extends AtomicCoder<TableRowInfo> {
public TableRowInfo decode(InputStream inStream, Context context)
throws IOException {
return new TableRowInfo(
- tableRowCoder.decode(inStream, context.nested()),
+ tableRowCoder.decode(inStream),
idCoder.decode(inStream, context));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index 890979b..9e83271 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -106,16 +106,16 @@ class WriteBundlesToFiles<DestinationT>
if (value == null) {
throw new CoderException("cannot encode a null value");
}
- stringCoder.encode(value.filename, outStream, context.nested());
- longCoder.encode(value.fileByteSize, outStream, context.nested());
- destinationCoder.encode(value.destination, outStream, context.nested());
+ stringCoder.encode(value.filename, outStream);
+ longCoder.encode(value.fileByteSize, outStream);
+ destinationCoder.encode(value.destination, outStream);
}
@Override
public Result<DestinationT> decode(InputStream inStream, Context context)
throws IOException {
- String filename = stringCoder.decode(inStream, context.nested());
- long fileByteSize = longCoder.decode(inStream, context.nested());
- DestinationT destination = destinationCoder.decode(inStream,
context.nested());
+ String filename = stringCoder.decode(inStream);
+ long fileByteSize = longCoder.decode(inStream);
+ DestinationT destination = destinationCoder.decode(inStream);
return new Result<>(filename, fileByteSize, destination);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
----------------------------------------------------------------------
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
index e061edc..5907c9e 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
@@ -47,13 +47,13 @@ public class PubsubMessageWithAttributesCoder extends
CustomCoder<PubsubMessage>
public void encode(PubsubMessage value, OutputStream outStream, Context
context)
throws IOException {
- PAYLOAD_CODER.encode(value.getPayload(), outStream, context.nested());
+ PAYLOAD_CODER.encode(value.getPayload(), outStream);
ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream, context);
}
@Override
public PubsubMessage decode(InputStream inStream, Context context) throws
IOException {
- byte[] payload = PAYLOAD_CODER.decode(inStream, context.nested());
+ byte[] payload = PAYLOAD_CODER.decode(inStream);
Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream,
context);
return new PubsubMessage(payload, attributes);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index 9f04a6c..ae320c7 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -110,19 +110,19 @@ public class PubsubUnboundedSink extends
PTransform<PCollection<PubsubMessage>,
public void encode(
OutgoingMessage value, OutputStream outStream, Context context)
throws CoderException, IOException {
- ByteArrayCoder.of().encode(value.elementBytes, outStream,
context.nested());
- ATTRIBUTES_CODER.encode(value.attributes, outStream, context.nested());
- BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream,
context.nested());
- RECORD_ID_CODER.encode(value.recordId, outStream, context.nested());
+ ByteArrayCoder.of().encode(value.elementBytes, outStream);
+ ATTRIBUTES_CODER.encode(value.attributes, outStream);
+ BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream);
+ RECORD_ID_CODER.encode(value.recordId, outStream);
}
@Override
public OutgoingMessage decode(
InputStream inStream, Context context) throws CoderException,
IOException {
- byte[] elementBytes = ByteArrayCoder.of().decode(inStream,
context.nested());
- Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream,
context.nested());
- long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream,
context.nested());
- @Nullable String recordId = RECORD_ID_CODER.decode(inStream,
context.nested());
+ byte[] elementBytes = ByteArrayCoder.of().decode(inStream);
+ Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream);
+ long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream);
+ @Nullable String recordId = RECORD_ID_CODER.decode(inStream);
return new OutgoingMessage(elementBytes, attributes,
timestampMsSinceEpoch, recordId);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index c16b8fb..e53976e 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -380,7 +380,7 @@ public class PubsubUnboundedSource extends
PTransform<PBegin, PCollection<Pubsub
@Override
public PubsubCheckpoint decode(InputStream inStream, Context context)
throws IOException {
- String path = SUBSCRIPTION_PATH_CODER.decode(inStream, context.nested());
+ String path = SUBSCRIPTION_PATH_CODER.decode(inStream);
List<String> notYetReadIds = LIST_CODER.decode(inStream, context);
return new PubsubCheckpoint(path, null, null, notYetReadIds);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
----------------------------------------------------------------------
diff --git
a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
index 2b4503a..5386a61 100644
---
a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
+++
b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
@@ -181,8 +181,8 @@ public class JAXBCoderTest {
public void encode(TestType value, OutputStream outStream, Context context)
throws CoderException, IOException {
Context nestedContext = context.nested();
- VarIntCoder.of().encode(3, outStream, nestedContext);
- jaxbCoder.encode(value, outStream, nestedContext);
+ VarIntCoder.of().encode(3, outStream);
+ jaxbCoder.encode(value, outStream);
VarLongCoder.of().encode(22L, outStream, context);
}
@@ -190,8 +190,8 @@ public class JAXBCoderTest {
public TestType decode(InputStream inStream, Context context)
throws CoderException, IOException {
Context nestedContext = context.nested();
- VarIntCoder.of().decode(inStream, nestedContext);
- TestType result = jaxbCoder.decode(inStream, nestedContext);
+ VarIntCoder.of().decode(inStream);
+ TestType result = jaxbCoder.decode(inStream);
VarLongCoder.of().decode(inStream, context);
return result;
}