Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/5105#discussion_r156407089
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
---
@@ -144,51 +136,12 @@ public void addInputGate(InputGate gate) {
inputs.add(gate);
}
- public <T> void addOutput(final Queue<Object> outputList, final
TypeSerializer<T> serializer) {
+ public <T> void addOutput(final Collection<Object> outputList, final
TypeSerializer<T> serializer) {
try {
- // The record-oriented writers wrap the buffer writer.
We mock it
- // to collect the returned buffers and deserialize the
content to
- // the output list
- BufferProvider mockBufferProvider =
mock(BufferProvider.class);
-
when(mockBufferProvider.requestBufferBlocking()).thenAnswer(new
Answer<Buffer>() {
-
- @Override
- public Buffer answer(InvocationOnMock
invocationOnMock) throws Throwable {
- return new Buffer(
-
MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
- mock(BufferRecycler.class));
- }
- });
-
- ResultPartitionWriter mockWriter =
mock(ResultPartitionWriter.class);
-
when(mockWriter.getNumberOfOutputChannels()).thenReturn(1);
-
when(mockWriter.getBufferProvider()).thenReturn(mockBufferProvider);
-
- final RecordDeserializer<DeserializationDelegate<T>>
recordDeserializer = new
AdaptiveSpanningRecordDeserializer<DeserializationDelegate<T>>();
- final NonReusingDeserializationDelegate<T> delegate =
new NonReusingDeserializationDelegate<T>(serializer);
-
- // Add records and events from the buffer to the output
list
- doAnswer(new Answer<Void>() {
-
- @Override
- public Void answer(InvocationOnMock
invocationOnMock) throws Throwable {
- Buffer buffer = (Buffer)
invocationOnMock.getArguments()[0];
-
addBufferToOutputList(recordDeserializer, delegate, buffer, outputList);
- return null;
- }
- }).when(mockWriter).writeBuffer(any(Buffer.class),
anyInt());
-
- doAnswer(new Answer<Void>() {
-
- @Override
- public Void answer(InvocationOnMock
invocationOnMock) throws Throwable {
- Buffer buffer = (Buffer)
invocationOnMock.getArguments()[0];
-
addBufferToOutputList(recordDeserializer, delegate, buffer, outputList);
--- End diff --
since you included the contents of `addBufferToOutputList()` into
`RecordOrEventCollectingResultPartitionWriter<T>`, you can remove this unused
method
---