Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2805#discussion_r90777958
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
---
@@ -131,35 +132,30 @@ private void sendToTarget(T record, int
targetChannel) throws IOException, Inter
}
public void broadcastEvent(AbstractEvent event) throws IOException,
InterruptedException {
- for (int targetChannel = 0; targetChannel < numChannels;
targetChannel++) {
- RecordSerializer<T> serializer =
serializers[targetChannel];
-
- synchronized (serializer) {
- Buffer buffer = serializer.getCurrentBuffer();
- if (buffer != null) {
- writeAndClearBuffer(buffer,
targetChannel, serializer);
- } else if (serializer.hasData()) {
- throw new IllegalStateException("No
buffer, but serializer has buffered data.");
- }
-
- targetPartition.writeEvent(event,
targetChannel);
- }
- }
- }
+ final Buffer eventBuffer = EventSerializer.toBuffer(event);
+ try {
+ for (int targetChannel = 0; targetChannel <
numChannels; targetChannel++) {
+ RecordSerializer<T> serializer =
serializers[targetChannel];
- public void sendEndOfSuperstep() throws IOException,
InterruptedException {
- for (int targetChannel = 0; targetChannel < numChannels;
targetChannel++) {
- RecordSerializer<T> serializer =
serializers[targetChannel];
+ synchronized (serializer) {
+ Buffer buffer =
serializer.getCurrentBuffer();
+ if (buffer != null) {
+ writeAndClearBuffer(buffer,
targetChannel, serializer);
+ } else if (serializer.hasData()) {
+ // sanity check
+ throw new
IllegalStateException("No buffer, but serializer has buffered data.");
+ }
- synchronized (serializer) {
- Buffer buffer = serializer.getCurrentBuffer();
- if (buffer != null) {
- writeAndClearBuffer(buffer,
targetChannel, serializer);
+ // retain the buffer so that it can be
recycled by each channel of targetPartition
+ eventBuffer.retain();
--- End diff --
It would be good to add a special RecordWriterTest that ensure that the
reference counting logic works.
```java
@Test
public void testBroadcastEventBufferReferenceCounting() throws Exception {
Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
// Partial mocking of static method...
PowerMockito.stub(PowerMockito.method(EventSerializer.class,
"toBuffer")).toReturn(buffer);
@SuppressWarnings("unchecked")
ArrayDeque<BufferOrEvent>[] queues = new ArrayDeque[] { new ArrayDeque(),
new ArrayDeque() };
ResultPartitionWriter partition = createCollectingPartitionWriter(queues,
new TestInfiniteBufferProvider());
RecordWriter<?> writer = new RecordWriter<>(partition);
writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);
// Verify added to all queues
assertEquals(1, queues[0].size());
assertEquals(1, queues[1].size());
assertTrue(buffer.isRecycled());
}
```
You have to adjust the `createCollectingPartitionWriter` to correctly
recycle event buffers and replace the `PrepareForTest` class annotation with
`@PrepareForTest({ResultPartitionWriter.class, EventSerializer.class})`.
---
Not changed in this PR, but to work correctly this relies on the
`ResultPartition` to recycle the buffer if the `add` calls fails. It might make
sense to add a special test (to `ResultPartitionTest` or `RecordWriterTest`)
where we ensure that this actually happens to guard against future behaviour
changes in `ResultPartition`. A possible better behaviour would be to let the
`RecordWriter` recycle it if an Exception occurs. This should be addressed in a
different PR though.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---