AHeise commented on a change in pull request #13501:
URL: https://github.com/apache/flink/pull/13501#discussion_r497014099



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
##########
@@ -131,8 +132,15 @@ public ResultPartition create(
                                bufferCompressor,
                                bufferPoolFactory);
 
-                       for (int i = 0; i < subpartitions.length; i++) {
-                               subpartitions[i] = new PipelinedSubpartition(i, 
pipelinedPartition);
+                       if (type == ResultPartitionType.PIPELINED_APPROXIMATE) {

Review comment:
       If we expect more partitions type, I'd rather see some factory lambda 
being extracted and used.
   
   ```
                                BiFunction<Integer, PipelinedResultPartition, 
PipelinedSubpartition> factory;
                                if (type == 
ResultPartitionType.PIPELINED_APPROXIMATE) {
                                        factory = 
PipelinedApproximateSubpartition::new;
                                } else {
                                        factory = PipelinedSubpartition::new;
                                }
   
                                for (int i = 0; i < subpartitions.length; i++) {
                                        subpartitions[i] = factory.apply(i, 
pipelinedPartition);
                                }
   ```
   

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,138 @@
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+       private boolean isPartialBuffer = false;
+
+       PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+               super(index, parent);
+       }
+
+       public void releaseView() {
+               readView = null;
+               isPartialBuffer = true;
+               isBlockedByCheckpoint = false;
+               sequenceNumber = 0;
+       }
+
+       @Override
+       public PipelinedSubpartitionView 
createReadView(BufferAvailabilityListener availabilityListener) {
+               synchronized (buffers) {
+                       checkState(!isReleased);
+
+                       // if the view is not released yet
+                       if (readView != null) {
+                               releaseView();
+                       }
+
+                       LOG.debug("{}: Creating read view for subpartition {} 
of partition {}.",
+                               parent.getOwningTaskName(), 
getSubPartitionIndex(), parent.getPartitionId());
+
+                       readView = new 
PipelinedApproximateSubpartitionView(this, availabilityListener);
+               }
+
+               return readView;
+       }
+
+       @Nullable
+       @Override
+       BufferAndBacklog pollBuffer() {
+               synchronized (buffers) {
+                       if (isBlockedByCheckpoint) {
+                               return null;
+                       }
+
+                       Buffer buffer = null;
+
+                       if (buffers.isEmpty()) {
+                               flushRequested = false;
+                       }
+
+                       while (!buffers.isEmpty()) {
+                               BufferConsumer bufferConsumer = buffers.peek();
+
+                               // `isPartialBuffer` is set to true in the same 
Netty thread when ResultPartitionView is released
+                               if (isPartialBuffer) {
+                                       
BufferConsumer.PartialRecordCleanupResult result = 
bufferConsumer.skipPartialRecord();
+                                       buffer = result.getBuffer();
+                                       isPartialBuffer = !result.getCleaned();
+                               } else {
+                                       buffer = bufferConsumer.build();
+                               }
+
+                               checkState(bufferConsumer.isFinished() || 
buffers.size() == 1,
+                                       "When there are multiple buffers, an 
unfinished bufferConsumer can not be at the head of the buffers queue.");
+
+                               if (buffers.size() == 1) {
+                                       // turn off flushRequested flag if we 
drained all of the available data
+                                       flushRequested = false;
+                               }
+
+                               if (bufferConsumer.isFinished()) {
+                                       buffers.poll().close();
+                                       
decreaseBuffersInBacklogUnsafe(bufferConsumer.isBuffer());
+                               }
+
+                               if (buffer.readableBytes() > 0) {
+                                       break;
+                               }
+
+                               buffer.recycleBuffer();
+                               buffer = null;
+
+                               if (!bufferConsumer.isFinished()) {
+                                       break;
+                               }
+                       }
+
+                       if (buffer == null) {
+                               return null;
+                       }
+
+                       if (buffer.getDataType().isBlockingUpstream()) {
+                               isBlockedByCheckpoint = true;
+                       }
+
+                       updateStatistics(buffer);
+                       // Do not report last remaining buffer on buffers as 
available to read (assuming it's unfinished).
+                       // It will be reported for reading either on flush or 
when the number of buffers in the queue
+                       // will be 2 or more.
+                       return new BufferAndBacklog(
+                               buffer,
+                               getBuffersInBacklog(),
+                               isDataAvailableUnsafe() ? 
getNextBufferTypeUnsafe() : Buffer.DataType.NONE,
+                               sequenceNumber++);
+               }
+       }
+
+       @Override
+       public String toString() {
+               final long numBuffers;
+               final long numBytes;
+               final boolean finished;
+               final boolean hasReadView;
+
+               synchronized (buffers) {

Review comment:
       You'd usually assume that toString is called under lock (also in IDE). 
(I'm not aware of such a complicated `toString`).

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
##########
@@ -358,6 +361,12 @@ public long getBufferTimeout() {
                return this.bufferTimeout;
        }
 
+
+       public StreamExecutionEnvironment 
setGlobalDataExchangeMode(GlobalDataExchangeMode globalDataExchangeMode) {

Review comment:
       I'd probably not expose `GlobalDataExchangeMode` directly, but rather go 
with `enableApproximateSinkFailover`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,138 @@
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+       private boolean isPartialBuffer = false;
+
+       PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+               super(index, parent);
+       }
+
+       public void releaseView() {
+               readView = null;
+               isPartialBuffer = true;
+               isBlockedByCheckpoint = false;
+               sequenceNumber = 0;
+       }
+
+       @Override
+       public PipelinedSubpartitionView 
createReadView(BufferAvailabilityListener availabilityListener) {
+               synchronized (buffers) {
+                       checkState(!isReleased);
+
+                       // if the view is not released yet
+                       if (readView != null) {
+                               releaseView();
+                       }
+
+                       LOG.debug("{}: Creating read view for subpartition {} 
of partition {}.",
+                               parent.getOwningTaskName(), 
getSubPartitionIndex(), parent.getPartitionId());
+
+                       readView = new 
PipelinedApproximateSubpartitionView(this, availabilityListener);
+               }
+
+               return readView;
+       }
+
+       @Nullable
+       @Override
+       BufferAndBacklog pollBuffer() {

Review comment:
       Instead of copying&adjusting the whole method, I'd like to see some 
inner method being extracted in the base class and being overwritten here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,138 @@
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+       private boolean isPartialBuffer = false;
+
+       PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+               super(index, parent);
+       }
+
+       public void releaseView() {
+               readView = null;
+               isPartialBuffer = true;
+               isBlockedByCheckpoint = false;
+               sequenceNumber = 0;
+       }
+
+       @Override
+       public PipelinedSubpartitionView 
createReadView(BufferAvailabilityListener availabilityListener) {
+               synchronized (buffers) {
+                       checkState(!isReleased);
+
+                       // if the view is not released yet
+                       if (readView != null) {
+                               releaseView();
+                       }
+
+                       LOG.debug("{}: Creating read view for subpartition {} 
of partition {}.",
+                               parent.getOwningTaskName(), 
getSubPartitionIndex(), parent.getPartitionId());
+
+                       readView = new 
PipelinedApproximateSubpartitionView(this, availabilityListener);
+               }
+
+               return readView;
+       }
+
+       @Nullable
+       @Override
+       BufferAndBacklog pollBuffer() {

Review comment:
       That would also avoid some of the visibility changes of the fields in 
the base class.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartitionView.java
##########
@@ -0,0 +1,27 @@
+package org.apache.flink.runtime.io.network.partition;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+public class PipelinedApproximateSubpartitionView extends 
PipelinedSubpartitionView {
+
+       PipelinedApproximateSubpartitionView(PipelinedApproximateSubpartition 
parent, BufferAvailabilityListener listener) {
+               super(parent, listener);
+       }
+
+       @Override
+       public void releaseAllResources() {
+               if (isReleased.compareAndSet(false, true)) {
+                       // The view doesn't hold any resources and the parent 
cannot be restarted. Therefore,
+                       // it's OK to notify about consumption as well.
+                       checkState(parent instanceof 
PipelinedApproximateSubpartition);
+                       ((PipelinedApproximateSubpartition) 
parent).releaseView();

Review comment:
       It might also be an option to replicate the field in this class with 
`PipelinedApproximateSubpartition` (a slightly different name would also be 
good like `approximateParent`).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to