[
https://issues.apache.org/jira/browse/FLINK-5059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15719998#comment-15719998
]
ASF GitHub Bot commented on FLINK-5059:
---------------------------------------
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2805#discussion_r90777958
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
---
@@ -131,35 +132,30 @@ private void sendToTarget(T record, int
targetChannel) throws IOException, Inter
}
public void broadcastEvent(AbstractEvent event) throws IOException,
InterruptedException {
- for (int targetChannel = 0; targetChannel < numChannels;
targetChannel++) {
- RecordSerializer<T> serializer =
serializers[targetChannel];
-
- synchronized (serializer) {
- Buffer buffer = serializer.getCurrentBuffer();
- if (buffer != null) {
- writeAndClearBuffer(buffer,
targetChannel, serializer);
- } else if (serializer.hasData()) {
- throw new IllegalStateException("No
buffer, but serializer has buffered data.");
- }
-
- targetPartition.writeEvent(event,
targetChannel);
- }
- }
- }
+ final Buffer eventBuffer = EventSerializer.toBuffer(event);
+ try {
+ for (int targetChannel = 0; targetChannel <
numChannels; targetChannel++) {
+ RecordSerializer<T> serializer =
serializers[targetChannel];
- public void sendEndOfSuperstep() throws IOException,
InterruptedException {
- for (int targetChannel = 0; targetChannel < numChannels;
targetChannel++) {
- RecordSerializer<T> serializer =
serializers[targetChannel];
+ synchronized (serializer) {
+ Buffer buffer =
serializer.getCurrentBuffer();
+ if (buffer != null) {
+ writeAndClearBuffer(buffer,
targetChannel, serializer);
+ } else if (serializer.hasData()) {
+ // sanity check
+ throw new
IllegalStateException("No buffer, but serializer has buffered data.");
+ }
- synchronized (serializer) {
- Buffer buffer = serializer.getCurrentBuffer();
- if (buffer != null) {
- writeAndClearBuffer(buffer,
targetChannel, serializer);
+ // retain the buffer so that it can be
recycled by each channel of targetPartition
+ eventBuffer.retain();
--- End diff --
It would be good to add a special RecordWriterTest that ensure that the
reference counting logic works.
```java
@Test
public void testBroadcastEventBufferReferenceCounting() throws Exception {
Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
// Partial mocking of static method...
PowerMockito.stub(PowerMockito.method(EventSerializer.class,
"toBuffer")).toReturn(buffer);
@SuppressWarnings("unchecked")
ArrayDeque<BufferOrEvent>[] queues = new ArrayDeque[] { new ArrayDeque(),
new ArrayDeque() };
ResultPartitionWriter partition = createCollectingPartitionWriter(queues,
new TestInfiniteBufferProvider());
RecordWriter<?> writer = new RecordWriter<>(partition);
writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);
// Verify added to all queues
assertEquals(1, queues[0].size());
assertEquals(1, queues[1].size());
assertTrue(buffer.isRecycled());
}
```
You have to adjust the `createCollectingPartitionWriter` to correctly
recycle event buffers and replace the `PrepareForTest` class annotation with
`@PrepareForTest({ResultPartitionWriter.class, EventSerializer.class})`.
---
Not changed in this PR, but to work correctly this relies on the
`ResultPartition` to recycle the buffer if the `add` calls fails. It might make
sense to add a special test (to `ResultPartitionTest` or `RecordWriterTest`)
where we ensure that this actually happens to guard against future behaviour
changes in `ResultPartition`. A possible better behaviour would be to let the
`RecordWriter` recycle it if an Exception occurs. This should be addressed in a
different PR though.
> only serialise events once in RecordWriter#broadcastEvent
> ---------------------------------------------------------
>
> Key: FLINK-5059
> URL: https://issues.apache.org/jira/browse/FLINK-5059
> Project: Flink
> Issue Type: Improvement
> Components: Network
> Reporter: Nico Kruber
> Assignee: Nico Kruber
>
> Currently,
> org.apache.flink.runtime.io.network.api.writer.RecordWriter#broadcastEvent
> serialises the event once per target channel. Instead, it could serialise the
> event only once and use the serialised form for every channel and thus save
> resources.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)