[
https://issues.apache.org/jira/browse/FLINK-5059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15728924#comment-15728924
]
ASF GitHub Bot commented on FLINK-5059:
---------------------------------------
Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/2805#discussion_r91306180
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
---
@@ -131,35 +132,30 @@ private void sendToTarget(T record, int
targetChannel) throws IOException, Inter
}
public void broadcastEvent(AbstractEvent event) throws IOException,
InterruptedException {
- for (int targetChannel = 0; targetChannel < numChannels;
targetChannel++) {
- RecordSerializer<T> serializer =
serializers[targetChannel];
-
- synchronized (serializer) {
- Buffer buffer = serializer.getCurrentBuffer();
- if (buffer != null) {
- writeAndClearBuffer(buffer,
targetChannel, serializer);
- } else if (serializer.hasData()) {
- throw new IllegalStateException("No
buffer, but serializer has buffered data.");
- }
-
- targetPartition.writeEvent(event,
targetChannel);
- }
- }
- }
+ final Buffer eventBuffer = EventSerializer.toBuffer(event);
+ try {
+ for (int targetChannel = 0; targetChannel <
numChannels; targetChannel++) {
+ RecordSerializer<T> serializer =
serializers[targetChannel];
- public void sendEndOfSuperstep() throws IOException,
InterruptedException {
- for (int targetChannel = 0; targetChannel < numChannels;
targetChannel++) {
- RecordSerializer<T> serializer =
serializers[targetChannel];
+ synchronized (serializer) {
+ Buffer buffer =
serializer.getCurrentBuffer();
+ if (buffer != null) {
+ writeAndClearBuffer(buffer,
targetChannel, serializer);
+ } else if (serializer.hasData()) {
+ // sanity check
+ throw new
IllegalStateException("No buffer, but serializer has buffered data.");
+ }
- synchronized (serializer) {
- Buffer buffer = serializer.getCurrentBuffer();
- if (buffer != null) {
- writeAndClearBuffer(buffer,
targetChannel, serializer);
+ // retain the buffer so that it can be
recycled by each channel of targetPartition
+ eventBuffer.retain();
--- End diff --
done, thanks
I opened [FLINK-5277] for the ResultPartition#add test:
https://issues.apache.org/jira/browse/FLINK-5277
> only serialise events once in RecordWriter#broadcastEvent
> ---------------------------------------------------------
>
> Key: FLINK-5059
> URL: https://issues.apache.org/jira/browse/FLINK-5059
> Project: Flink
> Issue Type: Improvement
> Components: Network
> Reporter: Nico Kruber
> Assignee: Nico Kruber
>
> Currently,
> org.apache.flink.runtime.io.network.api.writer.RecordWriter#broadcastEvent
> serialises the event once per target channel. Instead, it could serialise the
> event only once and use the serialised form for every channel and thus save
> resources.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)