AHeise commented on a change in pull request #13501:
URL: https://github.com/apache/flink/pull/13501#discussion_r497014099
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
##########
@@ -131,8 +132,15 @@ public ResultPartition create(
bufferCompressor,
bufferPoolFactory);
- for (int i = 0; i < subpartitions.length; i++) {
- subpartitions[i] = new PipelinedSubpartition(i,
pipelinedPartition);
+ if (type == ResultPartitionType.PIPELINED_APPROXIMATE) {
Review comment:
If we expect more partitions type, I'd rather see some factory lambda
being extracted and used.
```
BiFunction<Integer, PipelinedResultPartition,
PipelinedSubpartition> factory;
if (type ==
ResultPartitionType.PIPELINED_APPROXIMATE) {
factory =
PipelinedApproximateSubpartition::new;
} else {
factory = PipelinedSubpartition::new;
}
for (int i = 0; i < subpartitions.length; i++) {
subpartitions[i] = factory.apply(i,
pipelinedPartition);
}
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,138 @@
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+ private boolean isPartialBuffer = false;
+
+ PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+ super(index, parent);
+ }
+
+ public void releaseView() {
+ readView = null;
+ isPartialBuffer = true;
+ isBlockedByCheckpoint = false;
+ sequenceNumber = 0;
+ }
+
+ @Override
+ public PipelinedSubpartitionView
createReadView(BufferAvailabilityListener availabilityListener) {
+ synchronized (buffers) {
+ checkState(!isReleased);
+
+ // if the view is not released yet
+ if (readView != null) {
+ releaseView();
+ }
+
+ LOG.debug("{}: Creating read view for subpartition {}
of partition {}.",
+ parent.getOwningTaskName(),
getSubPartitionIndex(), parent.getPartitionId());
+
+ readView = new
PipelinedApproximateSubpartitionView(this, availabilityListener);
+ }
+
+ return readView;
+ }
+
+ @Nullable
+ @Override
+ BufferAndBacklog pollBuffer() {
+ synchronized (buffers) {
+ if (isBlockedByCheckpoint) {
+ return null;
+ }
+
+ Buffer buffer = null;
+
+ if (buffers.isEmpty()) {
+ flushRequested = false;
+ }
+
+ while (!buffers.isEmpty()) {
+ BufferConsumer bufferConsumer = buffers.peek();
+
+ // `isPartialBuffer` is set to true in the same
Netty thread when ResultPartitionView is released
+ if (isPartialBuffer) {
+
BufferConsumer.PartialRecordCleanupResult result =
bufferConsumer.skipPartialRecord();
+ buffer = result.getBuffer();
+ isPartialBuffer = !result.getCleaned();
+ } else {
+ buffer = bufferConsumer.build();
+ }
+
+ checkState(bufferConsumer.isFinished() ||
buffers.size() == 1,
+ "When there are multiple buffers, an
unfinished bufferConsumer can not be at the head of the buffers queue.");
+
+ if (buffers.size() == 1) {
+ // turn off flushRequested flag if we
drained all of the available data
+ flushRequested = false;
+ }
+
+ if (bufferConsumer.isFinished()) {
+ buffers.poll().close();
+
decreaseBuffersInBacklogUnsafe(bufferConsumer.isBuffer());
+ }
+
+ if (buffer.readableBytes() > 0) {
+ break;
+ }
+
+ buffer.recycleBuffer();
+ buffer = null;
+
+ if (!bufferConsumer.isFinished()) {
+ break;
+ }
+ }
+
+ if (buffer == null) {
+ return null;
+ }
+
+ if (buffer.getDataType().isBlockingUpstream()) {
+ isBlockedByCheckpoint = true;
+ }
+
+ updateStatistics(buffer);
+ // Do not report last remaining buffer on buffers as
available to read (assuming it's unfinished).
+ // It will be reported for reading either on flush or
when the number of buffers in the queue
+ // will be 2 or more.
+ return new BufferAndBacklog(
+ buffer,
+ getBuffersInBacklog(),
+ isDataAvailableUnsafe() ?
getNextBufferTypeUnsafe() : Buffer.DataType.NONE,
+ sequenceNumber++);
+ }
+ }
+
+ @Override
+ public String toString() {
+ final long numBuffers;
+ final long numBytes;
+ final boolean finished;
+ final boolean hasReadView;
+
+ synchronized (buffers) {
Review comment:
You'd usually assume that toString is called under lock (also in IDE).
(I'm not aware of such a complicated `toString`).
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
##########
@@ -358,6 +361,12 @@ public long getBufferTimeout() {
return this.bufferTimeout;
}
+
+ public StreamExecutionEnvironment
setGlobalDataExchangeMode(GlobalDataExchangeMode globalDataExchangeMode) {
Review comment:
I'd probably not expose `GlobalDataExchangeMode` directly, but rather go
with `enableApproximateSinkFailover`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,138 @@
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+ private boolean isPartialBuffer = false;
+
+ PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+ super(index, parent);
+ }
+
+ public void releaseView() {
+ readView = null;
+ isPartialBuffer = true;
+ isBlockedByCheckpoint = false;
+ sequenceNumber = 0;
+ }
+
+ @Override
+ public PipelinedSubpartitionView
createReadView(BufferAvailabilityListener availabilityListener) {
+ synchronized (buffers) {
+ checkState(!isReleased);
+
+ // if the view is not released yet
+ if (readView != null) {
+ releaseView();
+ }
+
+ LOG.debug("{}: Creating read view for subpartition {}
of partition {}.",
+ parent.getOwningTaskName(),
getSubPartitionIndex(), parent.getPartitionId());
+
+ readView = new
PipelinedApproximateSubpartitionView(this, availabilityListener);
+ }
+
+ return readView;
+ }
+
+ @Nullable
+ @Override
+ BufferAndBacklog pollBuffer() {
Review comment:
Instead of copying&adjusting the whole method, I'd like to see some
inner method being extracted in the base class and being overwritten here.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,138 @@
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+ private boolean isPartialBuffer = false;
+
+ PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+ super(index, parent);
+ }
+
+ public void releaseView() {
+ readView = null;
+ isPartialBuffer = true;
+ isBlockedByCheckpoint = false;
+ sequenceNumber = 0;
+ }
+
+ @Override
+ public PipelinedSubpartitionView
createReadView(BufferAvailabilityListener availabilityListener) {
+ synchronized (buffers) {
+ checkState(!isReleased);
+
+ // if the view is not released yet
+ if (readView != null) {
+ releaseView();
+ }
+
+ LOG.debug("{}: Creating read view for subpartition {}
of partition {}.",
+ parent.getOwningTaskName(),
getSubPartitionIndex(), parent.getPartitionId());
+
+ readView = new
PipelinedApproximateSubpartitionView(this, availabilityListener);
+ }
+
+ return readView;
+ }
+
+ @Nullable
+ @Override
+ BufferAndBacklog pollBuffer() {
Review comment:
That would also avoid some of the visibility changes of the fields in
the base class.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartitionView.java
##########
@@ -0,0 +1,27 @@
+package org.apache.flink.runtime.io.network.partition;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+public class PipelinedApproximateSubpartitionView extends
PipelinedSubpartitionView {
+
+ PipelinedApproximateSubpartitionView(PipelinedApproximateSubpartition
parent, BufferAvailabilityListener listener) {
+ super(parent, listener);
+ }
+
+ @Override
+ public void releaseAllResources() {
+ if (isReleased.compareAndSet(false, true)) {
+ // The view doesn't hold any resources and the parent
cannot be restarted. Therefore,
+ // it's OK to notify about consumption as well.
+ checkState(parent instanceof
PipelinedApproximateSubpartition);
+ ((PipelinedApproximateSubpartition)
parent).releaseView();
Review comment:
It might also be an option to replicate the field in this class with
`PipelinedApproximateSubpartition` (a slightly different name would also be
good like `approximateParent`).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]