pnowojski commented on a change in pull request #10375: [FLINK-14845][runtime] 
Introduce data compression to reduce disk and network IO of shuffle.
URL: https://github.com/apache/flink/pull/10375#discussion_r354413514
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ##########
 @@ -524,36 +534,61 @@ private BufferOrEvent transformToBufferOrEvent(
                        boolean moreAvailable,
                        InputChannel currentChannel) throws IOException, 
InterruptedException {
                if (buffer.isBuffer()) {
-                       return new BufferOrEvent(buffer, 
currentChannel.getChannelIndex(), moreAvailable);
+                       return transformBuffer(buffer, moreAvailable, 
currentChannel);
+               } else {
+                       return transformEvent(buffer, moreAvailable, 
currentChannel);
                }
-               else {
-                       final AbstractEvent event;
-                       try {
-                               event = EventSerializer.fromBuffer(buffer, 
getClass().getClassLoader());
-                       }
-                       finally {
-                               buffer.recycleBuffer();
-                       }
+       }
 
-                       if (event.getClass() == EndOfPartitionEvent.class) {
-                               
channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
-
-                               if 
(channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) {
-                                       // Because of race condition between:
-                                       // 1. releasing inputChannelsWithData 
lock in this method and reaching this place
-                                       // 2. empty data notification that 
re-enqueues a channel
-                                       // we can end up with moreAvailable 
flag set to true, while we expect no more data.
-                                       checkState(!moreAvailable || 
!pollNext().isPresent());
-                                       moreAvailable = false;
-                                       hasReceivedAllEndOfPartitionEvents = 
true;
-                                       markAvailable();
-                               }
+       private BufferOrEvent transformBuffer(Buffer buffer, boolean 
moreAvailable, InputChannel currentChannel) {
+               return new BufferOrEvent(decompressBufferIfNeeded(buffer), 
currentChannel.getChannelIndex(), moreAvailable);
+       }
+
+       private BufferOrEvent transformEvent(
+                       Buffer buffer,
+                       boolean moreAvailable,
+                       InputChannel currentChannel) throws IOException, 
InterruptedException {
+               final AbstractEvent event;
+               try {
+                       event = EventSerializer.fromBuffer(buffer, 
getClass().getClassLoader());
+               } finally {
+                       buffer.recycleBuffer();
+               }
 
-                               currentChannel.releaseAllResources();
+               if (event.getClass() == EndOfPartitionEvent.class) {
+                       
channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
+
+                       if (channelsWithEndOfPartitionEvents.cardinality() == 
numberOfInputChannels) {
+                               // Because of race condition between:
+                               // 1. releasing inputChannelsWithData lock in 
this method and reaching this place
+                               // 2. empty data notification that re-enqueues 
a channel
+                               // we can end up with moreAvailable flag set to 
true, while we expect no more data.
+                               checkState(!moreAvailable || 
!pollNext().isPresent());
+                               moreAvailable = false;
+                               hasReceivedAllEndOfPartitionEvents = true;
+                               markAvailable();
                        }
 
-                       return new BufferOrEvent(event, 
currentChannel.getChannelIndex(), moreAvailable, buffer.getSize());
+                       currentChannel.releaseAllResources();
+               }
+
+               return new BufferOrEvent(event, 
currentChannel.getChannelIndex(), moreAvailable, buffer.getSize());
+       }
+
+       FileWriter fileWriter = null;
+       private Buffer decompressBufferIfNeeded(Buffer buffer) {
+//             if (fileWriter == null) {
 
 Review comment:
   nit: remove comments?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to