pnowojski commented on a change in pull request #10375: [FLINK-14845][runtime]
Introduce data compression to reduce disk and network IO of shuffle.
URL: https://github.com/apache/flink/pull/10375#discussion_r354413514
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -524,36 +534,61 @@ private BufferOrEvent transformToBufferOrEvent(
boolean moreAvailable,
InputChannel currentChannel) throws IOException,
InterruptedException {
if (buffer.isBuffer()) {
- return new BufferOrEvent(buffer,
currentChannel.getChannelIndex(), moreAvailable);
+ return transformBuffer(buffer, moreAvailable,
currentChannel);
+ } else {
+ return transformEvent(buffer, moreAvailable,
currentChannel);
}
- else {
- final AbstractEvent event;
- try {
- event = EventSerializer.fromBuffer(buffer,
getClass().getClassLoader());
- }
- finally {
- buffer.recycleBuffer();
- }
+ }
- if (event.getClass() == EndOfPartitionEvent.class) {
-
channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
-
- if
(channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) {
- // Because of race condition between:
- // 1. releasing inputChannelsWithData
lock in this method and reaching this place
- // 2. empty data notification that
re-enqueues a channel
- // we can end up with moreAvailable
flag set to true, while we expect no more data.
- checkState(!moreAvailable ||
!pollNext().isPresent());
- moreAvailable = false;
- hasReceivedAllEndOfPartitionEvents =
true;
- markAvailable();
- }
+ private BufferOrEvent transformBuffer(Buffer buffer, boolean
moreAvailable, InputChannel currentChannel) {
+ return new BufferOrEvent(decompressBufferIfNeeded(buffer),
currentChannel.getChannelIndex(), moreAvailable);
+ }
+
+ private BufferOrEvent transformEvent(
+ Buffer buffer,
+ boolean moreAvailable,
+ InputChannel currentChannel) throws IOException,
InterruptedException {
+ final AbstractEvent event;
+ try {
+ event = EventSerializer.fromBuffer(buffer,
getClass().getClassLoader());
+ } finally {
+ buffer.recycleBuffer();
+ }
- currentChannel.releaseAllResources();
+ if (event.getClass() == EndOfPartitionEvent.class) {
+
channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
+
+ if (channelsWithEndOfPartitionEvents.cardinality() ==
numberOfInputChannels) {
+ // Because of race condition between:
+ // 1. releasing inputChannelsWithData lock in
this method and reaching this place
+ // 2. empty data notification that re-enqueues
a channel
+ // we can end up with moreAvailable flag set to
true, while we expect no more data.
+ checkState(!moreAvailable ||
!pollNext().isPresent());
+ moreAvailable = false;
+ hasReceivedAllEndOfPartitionEvents = true;
+ markAvailable();
}
- return new BufferOrEvent(event,
currentChannel.getChannelIndex(), moreAvailable, buffer.getSize());
+ currentChannel.releaseAllResources();
+ }
+
+ return new BufferOrEvent(event,
currentChannel.getChannelIndex(), moreAvailable, buffer.getSize());
+ }
+
+ FileWriter fileWriter = null;
+ private Buffer decompressBufferIfNeeded(Buffer buffer) {
+// if (fileWriter == null) {
Review comment:
nit: remove comments?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services