This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 11ca1a785 [CELEBORN-2005] Introduce numBytesIn, numBytesOut,
numBytesInPerSecond, numBytesOutPerSecond metrics for
RemoteShuffleServiceFactory
11ca1a785 is described below
commit 11ca1a7858a503e52b304e0d67ec0ee05a2b8b77
Author: SteNicholas <[email protected]>
AuthorDate: Fri May 23 16:25:40 2025 +0800
[CELEBORN-2005] Introduce numBytesIn, numBytesOut, numBytesInPerSecond,
numBytesOutPerSecond metrics for RemoteShuffleServiceFactory
### What changes were proposed in this pull request?
Introduce `numBytesIn`, `numBytesOut`, `numRecordsOut`,
`numBytesInPerSecond`, `numBytesOutPerSecond`, `numRecordsOutPerSecond` metrics
for `RemoteShuffleServiceFactory`.
Scope | Infix | Metrics | Description | Type
-- | -- | -- | -- | --
Task | Shuffle.Remote.[ShuffleId] | numBytesIn | The total number of bytes
this shuffle has read. | Counter |
Task | Shuffle.Remote.[ShuffleId] | numBytesOut | The total number of bytes
this shuffle has written. | Counter |
Task | Shuffle.Remote.[ShuffleId] | numRecordsOut | The total number of
records this shuffle has written. | Counter |
Task | Shuffle.Remote.[ShuffleId] | numBytesInPerSecond | The number of
bytes this shuffle reads per second. | Meter |
Task | Shuffle.Remote.[ShuffleId] | numBytesOutPerSecond | The number of
bytes this shuffle writes per second. | Meter |
Task | Shuffle.Remote.[ShuffleId] | numRecordsOutPerSecond | The number of
records this shuffle writes per second. | Meter |
Note:
- `numBytesIn` and `numBytesOut` metrics include the total number of bytes
for records and events.
- `numRecordsOut` metric only includes the total number of records, instead
of records and events.
### Why are the changes needed?
There is no any metrics related to shuffle read operations and operations
writing shuffle data for flink shuffle. It's proposed to introduce
`numBytesIn`, `numBytesOut`, `numRecordsOut`, `numBytesInPerSecond`,
`numBytesOutPerSecond`, `numRecordsOutPerSecond` metrics for
`RemoteShuffleServiceFactory`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- `RemoteShuffleOutputGateSuiteJ#testSimpleWriteData`
- `RemoteShuffleResultPartitionSuiteJ`
Closes #3272 from SteNicholas/CELEBORN-2005.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../flink/AbstractRemoteShuffleInputGate.java | 5 +-
.../AbstractRemoteShuffleInputGateFactory.java | 7 +-
...bstractRemoteShuffleResultPartitionFactory.java | 20 +++---
.../plugin/flink/RemoteShuffleEnvironment.java | 4 +-
.../flink/RemoteShuffleInputGateDelegation.java | 22 ++++--
.../plugin/flink/RemoteShuffleOutputGate.java | 46 +++++++++---
.../RemoteShuffleResultPartitionDelegation.java | 8 +++
.../plugin/flink/metric/ShuffleIOMetricGroup.java | 83 ++++++++++++++++++++++
.../flink/RemoteShuffleOutputGateSuiteJ.java | 51 +++++++++++--
.../plugin/flink/RemoteShuffleInputGate.java | 5 +-
.../flink/RemoteShuffleInputGateFactory.java | 7 +-
.../flink/RemoteShuffleResultPartitionFactory.java | 8 ++-
.../flink/RemoteShuffleResultPartitionSuiteJ.java | 52 +++++++++-----
.../plugin/flink/RemoteShuffleInputGate.java | 5 +-
.../flink/RemoteShuffleInputGateFactory.java | 7 +-
.../flink/RemoteShuffleResultPartitionFactory.java | 8 ++-
.../flink/RemoteShuffleResultPartitionSuiteJ.java | 16 ++++-
.../plugin/flink/RemoteShuffleInputGate.java | 5 +-
.../flink/RemoteShuffleInputGateFactory.java | 7 +-
.../flink/RemoteShuffleResultPartitionFactory.java | 8 ++-
.../flink/RemoteShuffleResultPartitionSuiteJ.java | 16 ++++-
.../plugin/flink/RemoteShuffleInputGate.java | 5 +-
.../flink/RemoteShuffleInputGateFactory.java | 7 +-
.../flink/RemoteShuffleResultPartitionFactory.java | 8 ++-
.../flink/RemoteShuffleResultPartitionSuiteJ.java | 16 ++++-
.../plugin/flink/RemoteShuffleInputGate.java | 5 +-
.../flink/RemoteShuffleInputGateFactory.java | 7 +-
.../flink/RemoteShuffleResultPartitionFactory.java | 8 ++-
.../flink/RemoteShuffleResultPartitionSuiteJ.java | 16 ++++-
.../plugin/flink/RemoteShuffleInputGate.java | 5 +-
.../flink/RemoteShuffleInputGateFactory.java | 7 +-
.../flink/RemoteShuffleResultPartitionFactory.java | 8 ++-
.../flink/RemoteShuffleResultPartitionSuiteJ.java | 16 ++++-
33 files changed, 386 insertions(+), 112 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGate.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGate.java
index a7330a2cb..d04497536 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGate.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGate.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.SupplierWithException;
@@ -45,7 +46,7 @@ public abstract class AbstractRemoteShuffleInputGate extends
IndexedInputGate {
public AbstractRemoteShuffleInputGate(
CelebornConf celebornConf,
- String taskName,
+ ShuffleIOOwnerContext ownerContext,
int gateIndex,
InputGateDeploymentDescriptor gateDescriptor,
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -55,7 +56,7 @@ public abstract class AbstractRemoteShuffleInputGate extends
IndexedInputGate {
inputGateDelegation =
new RemoteShuffleInputGateDelegation(
celebornConf,
- taskName,
+ ownerContext,
gateIndex,
gateDescriptor,
bufferPoolFactory,
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
index 9568ac546..0c9e2e8bb 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,7 +81,7 @@ public abstract class AbstractRemoteShuffleInputGateFactory {
/** Create RemoteShuffleInputGate from {@link
InputGateDeploymentDescriptor}. */
public IndexedInputGate create(
- String owningTaskName, int gateIndex, InputGateDeploymentDescriptor
igdd) {
+ ShuffleIOOwnerContext ownerContext, int gateIndex,
InputGateDeploymentDescriptor igdd) {
LOG.info(
"Create input gate -- number of buffers per input gate={}, "
+ "number of concurrent readings={}.",
@@ -91,7 +92,7 @@ public abstract class AbstractRemoteShuffleInputGateFactory {
createBufferPoolFactory(networkBufferPool, numBuffersPerGate,
supportFloatingBuffers);
return createInputGate(
- owningTaskName,
+ ownerContext,
gateIndex,
igdd,
bufferPoolFactory,
@@ -99,7 +100,7 @@ public abstract class AbstractRemoteShuffleInputGateFactory {
}
protected abstract IndexedInputGate createInputGate(
- String owningTaskName,
+ ShuffleIOOwnerContext ownerContext,
int gateIndex,
InputGateDeploymentDescriptor igdd,
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
index dfde34787..929939e47 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
@@ -30,7 +30,7 @@ import
org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,7 +89,7 @@ public abstract class
AbstractRemoteShuffleResultPartitionFactory {
}
public ResultPartition create(
- String taskNameWithSubtaskAndId,
+ ShuffleIOOwnerContext ownerContext,
int partitionIndex,
ResultPartitionDeploymentDescriptor desc,
CelebornConf celebornConf) {
@@ -100,32 +100,32 @@ public abstract class
AbstractRemoteShuffleResultPartitionFactory {
desc.getNumberOfSubpartitions());
return create(
- taskNameWithSubtaskAndId,
+ ownerContext,
partitionIndex,
desc.getShuffleDescriptor().getResultPartitionID(),
desc.getPartitionType(),
desc.getNumberOfSubpartitions(),
desc.getMaxParallelism(),
createBufferPoolFactory(),
- desc.getShuffleDescriptor(),
+ (RemoteShuffleDescriptor) desc.getShuffleDescriptor(),
celebornConf,
desc.getTotalNumberOfPartitions());
}
public ResultPartition create(
- String taskNameWithSubtaskAndId,
+ ShuffleIOOwnerContext ownerContext,
int partitionIndex,
ResultPartitionID id,
ResultPartitionType type,
int numSubpartitions,
int maxParallelism,
List<SupplierWithException<BufferPool, IOException>> bufferPoolFactories,
- ShuffleDescriptor shuffleDescriptor,
+ RemoteShuffleDescriptor shuffleDescriptor,
CelebornConf celebornConf,
int numMappers) {
ResultPartition partition =
createRemoteShuffleResultPartitionInternal(
- taskNameWithSubtaskAndId,
+ ownerContext,
partitionIndex,
id,
type,
@@ -135,13 +135,13 @@ public abstract class
AbstractRemoteShuffleResultPartitionFactory {
celebornConf,
numMappers,
getBufferCompressor(),
- (RemoteShuffleDescriptor) shuffleDescriptor);
- LOG.debug("{}: Initialized {}", taskNameWithSubtaskAndId, this);
+ shuffleDescriptor);
+ LOG.debug("{}: Initialized {}", ownerContext.getOwnerName(), this);
return partition;
}
abstract ResultPartition createRemoteShuffleResultPartitionInternal(
- String taskNameWithSubtaskAndId,
+ ShuffleIOOwnerContext ownerContext,
int partitionIndex,
ResultPartitionID id,
ResultPartitionType type,
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java
index 8851cc812..dc197ae3a 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java
@@ -191,7 +191,7 @@ public class RemoteShuffleEnvironment
if (resultPartitionDeploymentDescriptor.getShuffleDescriptor()
instanceof RemoteShuffleDescriptor) {
return resultPartitionFactory.create(
- ownerContext.getOwnerName(), index,
resultPartitionDeploymentDescriptor, conf);
+ ownerContext, index, resultPartitionDeploymentDescriptor, conf);
} else {
nettyResultIds.add(resultPartitionDeploymentDescriptor.getResultId());
nettyResultPartitionIds.add(resultPartitionDeploymentDescriptor.getPartitionId());
@@ -246,7 +246,7 @@ public class RemoteShuffleEnvironment
? shuffleEnvironmentWrapper
.nettyInputGateFactory()
.create(ownerContext, gateIndex, igdd, producerStateProvider,
inputChannelMetrics)
- : inputGateFactory.create(ownerContext.getOwnerName(), gateIndex,
igdd);
+ : inputGateFactory.create(ownerContext, gateIndex, igdd);
}
@VisibleForTesting
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
index 3a0d9f077..fe871d68c 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
@@ -28,6 +28,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.event.AbstractEvent;
@@ -43,6 +44,7 @@ import
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.SupplierWithException;
@@ -56,6 +58,7 @@ import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
import org.apache.celeborn.plugin.flink.buffer.TransferBufferPool;
import org.apache.celeborn.plugin.flink.client.FlinkShuffleClientImpl;
+import org.apache.celeborn.plugin.flink.metric.ShuffleIOMetricGroup;
import org.apache.celeborn.plugin.flink.utils.BufferUtils;
public class RemoteShuffleInputGateDelegation {
@@ -130,9 +133,11 @@ public class RemoteShuffleInputGateDelegation {
private int endSubIndex;
private boolean partitionConnectionExceptionEnabled;
+ private final MetricGroup taskIOMetricGroup;
+
public RemoteShuffleInputGateDelegation(
CelebornConf celebornConf,
- String taskName,
+ ShuffleIOOwnerContext ownerContext,
int gateIndex,
InputGateDeploymentDescriptor gateDescriptor,
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -141,7 +146,8 @@ public class RemoteShuffleInputGateDelegation {
AvailabilityProvider.AvailabilityHelper availabilityHelper,
int startSubIndex,
int endSubIndex) {
- this.taskName = taskName;
+ this.taskName = ownerContext.getOwnerName();
+ this.taskIOMetricGroup = ownerContext.getParentGroup();
this.gateIndex = gateIndex;
this.gateDescriptor = gateDescriptor;
this.bufferPoolFactory = bufferPoolFactory;
@@ -198,6 +204,8 @@ public class RemoteShuffleInputGateDelegation {
RemoteShuffleDescriptor remoteDescriptor = (RemoteShuffleDescriptor)
descriptor.getRight();
ShuffleResourceDescriptor shuffleDescriptor =
remoteDescriptor.getShuffleResource().getMapPartitionShuffleDescriptor();
+ ShuffleIOMetricGroup shuffleIOMetricGroup =
+ new ShuffleIOMetricGroup(taskIOMetricGroup,
shuffleDescriptor.getShuffleId());
LOG.debug("create shuffle reader for descriptor {}", shuffleDescriptor);
@@ -208,7 +216,7 @@ public class RemoteShuffleInputGateDelegation {
startSubIndex,
endSubIndex,
transferBufferPool,
- getDataListener(descriptor.getLeft()),
+ getDataListener(descriptor.getLeft(), shuffleIOMetricGroup),
getFailureListener(remoteDescriptor.getResultPartitionID()));
bufferReaders.add(reader);
@@ -235,13 +243,14 @@ public class RemoteShuffleInputGateDelegation {
.collect(Collectors.toList());
}
- private Consumer<ByteBuf> getDataListener(int channelIdx) {
+ private Consumer<ByteBuf> getDataListener(
+ int channelIdx, ShuffleIOMetricGroup shuffleIOMetricGroup) {
return byteBuf -> {
Queue<Buffer> unpackedBuffers = null;
try {
unpackedBuffers = BufferPacker.unpack(byteBuf);
while (!unpackedBuffers.isEmpty()) {
- onBuffer(unpackedBuffers.poll(), channelIdx);
+ onBuffer(unpackedBuffers.poll(), channelIdx, shuffleIOMetricGroup);
}
} catch (Throwable throwable) {
synchronized (lock) {
@@ -279,7 +288,7 @@ public class RemoteShuffleInputGateDelegation {
};
}
- private void onBuffer(Buffer buffer, int channelIdx) {
+ private void onBuffer(Buffer buffer, int channelIdx, ShuffleIOMetricGroup
shuffleIOMetricGroup) {
synchronized (lock) {
if (closed || cause != null) {
buffer.recycleBuffer();
@@ -293,6 +302,7 @@ public class RemoteShuffleInputGateDelegation {
checkState(channelInfo.getInputChannelIdx() == channelIdx, "Illegal
channel index.");
LOG.debug("ReceivedBuffers is adding buffer {} on {}", buffer,
channelInfo);
receivedBuffers.add(Pair.of(buffer, channelInfo));
+ shuffleIOMetricGroup.getNumBytesIn().inc(buffer.getSize());
needRecycle = false;
if (wasEmpty) {
availabilityHelper.getUnavailableToResetAvailable().complete(null);
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
index dc131f971..fe25492fd 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Optional;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -36,6 +37,7 @@ import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.plugin.flink.buffer.BufferHeader;
import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
import org.apache.celeborn.plugin.flink.client.FlinkShuffleClientImpl;
+import org.apache.celeborn.plugin.flink.metric.ShuffleIOMetricGroup;
import org.apache.celeborn.plugin.flink.utils.BufferUtils;
import org.apache.celeborn.plugin.flink.utils.Utils;
@@ -83,6 +85,7 @@ public class RemoteShuffleOutputGate {
private boolean isRegisterShuffle = false;
private int maxReviveTimes;
private boolean hasSentHandshake = false;
+ protected final ShuffleIOMetricGroup shuffleIOMetricGroup;
/**
* @param shuffleDesc Describes shuffle meta and shuffle worker address.
@@ -95,8 +98,28 @@ public class RemoteShuffleOutputGate {
int bufferSize,
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
CelebornConf celebornConf,
- int numMappers) {
+ int numMappers,
+ MetricGroup taskIOMetricGroup) {
+ this(
+ shuffleDesc,
+ numSubs,
+ bufferSize,
+ bufferPoolFactory,
+ celebornConf,
+ numMappers,
+ taskIOMetricGroup,
+ null);
+ }
+ public RemoteShuffleOutputGate(
+ RemoteShuffleDescriptor shuffleDesc,
+ int numSubs,
+ int bufferSize,
+ SupplierWithException<BufferPool, IOException> bufferPoolFactory,
+ CelebornConf celebornConf,
+ int numMappers,
+ MetricGroup taskIOMetricGroup,
+ FlinkShuffleClientImpl flinkShuffleClient) {
this.shuffleDesc = shuffleDesc;
this.numSubs = numSubs;
this.bufferPoolFactory = bufferPoolFactory;
@@ -116,8 +139,9 @@ public class RemoteShuffleOutputGate {
this.lifecycleManagerPort =
shuffleDesc.getShuffleResource().getLifecycleManagerPort();
this.lifecycleManagerTimestamp =
shuffleDesc.getShuffleResource().getLifecycleManagerTimestamp();
- this.flinkShuffleClient = getShuffleClient();
+ this.flinkShuffleClient = flinkShuffleClient == null ? getShuffleClient()
: flinkShuffleClient;
this.maxReviveTimes = celebornConf.clientPushMaxReviveTimes();
+ this.shuffleIOMetricGroup = new ShuffleIOMetricGroup(taskIOMetricGroup,
shuffleId);
}
/** Initialize transportation gate. */
@@ -210,14 +234,16 @@ public class RemoteShuffleOutputGate {
/** Writes a piece of data to a subpartition. */
public void write(ByteBuf byteBuf, BufferHeader bufferHeader) {
try {
- flinkShuffleClient.pushDataToLocation(
- shuffleId,
- mapId,
- attemptId,
- bufferHeader.getSubPartitionId(),
- io.netty.buffer.Unpooled.wrappedBuffer(byteBuf.nioBuffer()),
- partitionLocation,
- () -> byteBuf.release());
+ int bytesWritten =
+ flinkShuffleClient.pushDataToLocation(
+ shuffleId,
+ mapId,
+ attemptId,
+ bufferHeader.getSubPartitionId(),
+ io.netty.buffer.Unpooled.wrappedBuffer(byteBuf.nioBuffer()),
+ partitionLocation,
+ byteBuf::release);
+ shuffleIOMetricGroup.getNumBytesOut().inc(bytesWritten);
} catch (IOException e) {
Utils.rethrowAsRuntimeException(e);
}
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
index 1bd4285ee..d2e255968 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
@@ -108,6 +108,7 @@ public class RemoteShuffleResultPartitionDelegation {
DataBuffer dataBuffer = isBroadcast ? getBroadcastDataBuffer() :
getUnicastDataBuffer();
if (dataBuffer.append(record, targetSubpartition, dataType)) {
+ incNumRecordsOut(dataType);
return;
}
@@ -117,6 +118,7 @@ public class RemoteShuffleResultPartitionDelegation {
dataBuffer.finish();
dataBuffer.release();
writeLargeRecord(record, targetSubpartition, dataType, isBroadcast);
+ incNumRecordsOut(dataType);
return;
}
flushDataBuffer(dataBuffer, isBroadcast);
@@ -127,6 +129,12 @@ public class RemoteShuffleResultPartitionDelegation {
emit(record, targetSubpartition, dataType, isBroadcast);
}
+ private void incNumRecordsOut(Buffer.DataType dataType) {
+ if (dataType.isBuffer()) {
+ outputGate.shuffleIOMetricGroup.getNumRecordsOut().inc();
+ }
+ }
+
@VisibleForTesting
public DataBuffer getUnicastDataBuffer() throws IOException {
flushBroadcastDataBuffer();
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/ShuffleIOMetricGroup.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/ShuffleIOMetricGroup.java
new file mode 100644
index 000000000..16a9f4cf5
--- /dev/null
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/ShuffleIOMetricGroup.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.metric;
+
+import static
org.apache.celeborn.plugin.flink.metric.RemoteShuffleMetricFactory.createShuffleIOOwnerMetricGroup;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+
+/**
+ * Metric group that contains shareable pre-defined IO-related metrics. The
metrics registration is
+ * forwarded to the parent shuffle metric group.
+ */
+public class ShuffleIOMetricGroup extends ProxyMetricGroup<MetricGroup> {
+
+ private final Counter numBytesIn;
+ private final Counter numBytesOut;
+ private final Counter numRecordsOut;
+
+ private final Meter numBytesInRate;
+ private final Meter numBytesOutRate;
+ private final Meter numRecordsOutRate;
+
+ public ShuffleIOMetricGroup(MetricGroup taskIOMetricGroup, int shuffleId) {
+
super(createShuffleIOOwnerMetricGroup(taskIOMetricGroup).addGroup(shuffleId));
+ this.numBytesIn = counter(MetricNames.IO_NUM_BYTES_IN, new
SimpleCounter());
+ this.numBytesOut = counter(MetricNames.IO_NUM_BYTES_OUT, new
SimpleCounter());
+ this.numRecordsOut = counter(MetricNames.IO_NUM_RECORDS_OUT, new
SimpleCounter());
+ this.numBytesInRate = meter(MetricNames.IO_NUM_BYTES_IN_RATE, new
MeterView(numBytesIn));
+ this.numBytesOutRate = meter(MetricNames.IO_NUM_BYTES_OUT_RATE, new
MeterView(numBytesOut));
+ this.numRecordsOutRate =
+ meter(MetricNames.IO_NUM_RECORDS_OUT_RATE, new
MeterView(numRecordsOut));
+ }
+
+ //
============================================================================================
+ // Getters
+ //
============================================================================================
+
+ public Counter getNumBytesIn() {
+ return numBytesIn;
+ }
+
+ public Counter getNumBytesOut() {
+ return numBytesOut;
+ }
+
+ public Counter getNumRecordsOut() {
+ return numRecordsOut;
+ }
+
+ public Meter getNumBytesInRate() {
+ return numBytesInRate;
+ }
+
+ public Meter getNumBytesOutRate() {
+ return numBytesOutRate;
+ }
+
+ public Meter getNumRecordsOutRate() {
+ return numRecordsOutRate;
+ }
+}
diff --git
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
index 33c31b2e4..cc3cf9761 100644
---
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
+++
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
@@ -22,39 +22,52 @@ import static org.mockito.Mockito.*;
import java.io.IOException;
import java.util.Optional;
+import java.util.Random;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.plugin.flink.client.FlinkShuffleClientImpl;
public class RemoteShuffleOutputGateSuiteJ {
- private final RemoteShuffleOutputGate remoteShuffleOutputGate =
- mock(RemoteShuffleOutputGate.class);
- private final FlinkShuffleClientImpl shuffleClient =
mock(FlinkShuffleClientImpl.class);
private static final int BUFFER_SIZE = 20;
private BufferPool bufferPool;
+ private RemoteShuffleOutputGate remoteShuffleOutputGate;
@Before
public void setup() throws IOException {
- remoteShuffleOutputGate.flinkShuffleClient = shuffleClient;
+ remoteShuffleOutputGate =
+ new RemoteShuffleOutputGate(
+ shuffleDescriptor(),
+ 2,
+ BUFFER_SIZE,
+ () -> bufferPool,
+ new CelebornConf(),
+ 10,
+ new UnregisteredMetricsGroup(),
+ mock(FlinkShuffleClientImpl.class));
NetworkBufferPool networkBufferPool = new NetworkBufferPool(10,
BUFFER_SIZE);
bufferPool = networkBufferPool.createBufferPool(10, 10);
}
@Test
- public void TestSimpleWriteData() throws IOException, InterruptedException {
+ public void testSimpleWriteData() throws IOException, InterruptedException {
PartitionLocation partitionLocation =
new PartitionLocation(
1, 0, "localhost", 123, 245, 789, 238,
PartitionLocation.Mode.PRIMARY);
- when(shuffleClient.registerMapPartitionTask(anyInt(), anyInt(), anyInt(),
anyInt(), anyInt()))
+ when(remoteShuffleOutputGate.flinkShuffleClient.registerMapPartitionTask(
+ anyInt(), anyInt(), anyInt(), anyInt(), anyInt()))
.thenAnswer(t -> partitionLocation);
when(remoteShuffleOutputGate.flinkShuffleClient.pushDataHandShake(
anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), any()))
@@ -67,7 +80,19 @@ public class RemoteShuffleOutputGateSuiteJ {
.thenAnswer(t -> Optional.empty());
remoteShuffleOutputGate.regionStart(false);
- remoteShuffleOutputGate.write(bufferPool.requestBuffer(), 0);
+ Buffer buffer = bufferPool.requestBuffer();
+ buffer.asByteBuf().writeByte(10);
+ remoteShuffleOutputGate.write(buffer, 0);
+
+ when(remoteShuffleOutputGate.flinkShuffleClient.pushDataToLocation(
+ anyInt(), anyInt(), anyInt(), anyInt(), any(), any(), any()))
+ .thenReturn(buffer.getSize());
+ remoteShuffleOutputGate.write(buffer, 1);
+ Assert.assertEquals(
+ buffer.getSize(),
remoteShuffleOutputGate.shuffleIOMetricGroup.getNumBytesOut().getCount());
+ Assert.assertEquals(
+ buffer.getSize(),
+
remoteShuffleOutputGate.shuffleIOMetricGroup.getNumBytesOutRate().getCount());
doNothing()
.when(remoteShuffleOutputGate.flinkShuffleClient)
@@ -97,4 +122,16 @@ public class RemoteShuffleOutputGateSuiteJ {
Assert.assertEquals(0, byteBuf.refCnt());
Assert.assertEquals(0, celebornByteBuf.refCnt());
}
+
+ private RemoteShuffleDescriptor shuffleDescriptor() {
+ byte[] bytes = new byte[16];
+ new Random().nextBytes(bytes);
+ return new RemoteShuffleDescriptor(
+ new JobID(bytes).toString(),
+ new JobID(bytes),
+ new JobID(bytes).toString(),
+ new ResultPartitionID(),
+ new RemoteShuffleResource(
+ "1", 1, System.currentTimeMillis(), new
ShuffleResourceDescriptor(1, 1, 1, 0)));
+ }
}
diff --git
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index 348fb3556..32b22fa1e 100644
---
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -46,6 +46,7 @@ import
org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.throughput.ThroughputCalculator;
import org.apache.flink.util.CloseableIterator;
@@ -59,7 +60,7 @@ public class RemoteShuffleInputGate extends
AbstractRemoteShuffleInputGate {
public RemoteShuffleInputGate(
CelebornConf celebornConf,
- String taskName,
+ ShuffleIOOwnerContext ownerContext,
int gateIndex,
InputGateDeploymentDescriptor gateDescriptor,
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -67,7 +68,7 @@ public class RemoteShuffleInputGate extends
AbstractRemoteShuffleInputGate {
int numConcurrentReading) {
super(
celebornConf,
- taskName,
+ ownerContext,
gateIndex,
gateDescriptor,
bufferPoolFactory,
diff --git
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index 54737f03a..faf733bef 100644
---
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -24,6 +24,7 @@ import
org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.celeborn.common.CelebornConf;
@@ -38,7 +39,7 @@ public class RemoteShuffleInputGateFactory extends
AbstractRemoteShuffleInputGat
@Override
protected RemoteShuffleInputGate createInputGate(
- String owningTaskName,
+ ShuffleIOOwnerContext ownerContext,
int gateIndex,
InputGateDeploymentDescriptor igdd,
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -46,8 +47,8 @@ public class RemoteShuffleInputGateFactory extends
AbstractRemoteShuffleInputGat
BufferDecompressor bufferDecompressor =
new BufferDecompressor(networkBufferSize, compressionCodec);
return new RemoteShuffleInputGate(
- this.celebornConf,
- owningTaskName,
+ celebornConf,
+ ownerContext,
gateIndex,
igdd,
bufferPoolFactory,
diff --git
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
index 547d3fa08..363366b46 100644
---
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
+++
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -27,6 +27,7 @@ import
org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.celeborn.common.CelebornConf;
@@ -47,7 +48,7 @@ public class RemoteShuffleResultPartitionFactory
@Override
ResultPartition createRemoteShuffleResultPartitionInternal(
- String taskNameWithSubtaskAndId,
+ ShuffleIOOwnerContext ownerContext,
int partitionIndex,
ResultPartitionID id,
ResultPartitionType type,
@@ -59,7 +60,7 @@ public class RemoteShuffleResultPartitionFactory
BufferCompressor bufferCompressor,
RemoteShuffleDescriptor rsd) {
return new RemoteShuffleResultPartition(
- taskNameWithSubtaskAndId,
+ ownerContext.getOwnerName(),
partitionIndex,
id,
type,
@@ -75,7 +76,8 @@ public class RemoteShuffleResultPartitionFactory
networkBufferSize,
bufferPoolFactories.get(1),
celebornConf,
- numMappers));
+ numMappers,
+ ownerContext.getParentGroup()));
}
protected BufferCompressor getBufferCompressor() {
diff --git
a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index 14c2a7b59..22c4764f2 100644
---
a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++
b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -44,6 +44,7 @@ import java.util.stream.IntStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -68,9 +69,9 @@ import org.apache.celeborn.plugin.flink.utils.BufferUtils;
public class RemoteShuffleResultPartitionSuiteJ {
private final int networkBufferSize = 32 * 1024;
- private BufferCompressor bufferCompressor = new
BufferCompressor(networkBufferSize, "lz4");
- private RemoteShuffleOutputGate remoteShuffleOutputGate =
mock(RemoteShuffleOutputGate.class);
- private final String compressCodec = "LZ4";
+ private final BufferCompressor bufferCompressor = new
BufferCompressor(networkBufferSize, "lz4");
+ private final RemoteShuffleOutputGate remoteShuffleOutputGate =
+ mock(RemoteShuffleOutputGate.class);
private final CelebornConf conf = new CelebornConf();
BufferDecompressor bufferDecompressor = new
BufferDecompressor(networkBufferSize, "LZ4");
@@ -80,7 +81,7 @@ public class RemoteShuffleResultPartitionSuiteJ {
private NetworkBufferPool globalBufferPool;
- private BufferPool dataBufferPool;
+ private BufferPool sortBufferPool;
private BufferPool nettyBufferPool;
@@ -99,8 +100,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
outputGate.release();
}
- if (dataBufferPool != null) {
- dataBufferPool.lazyDestroy();
+ if (sortBufferPool != null) {
+ sortBufferPool.lazyDestroy();
}
if (nettyBufferPool != null) {
nettyBufferPool.lazyDestroy();
@@ -173,7 +174,9 @@ public class RemoteShuffleResultPartitionSuiteJ {
random.nextBytes(dataWritten);
ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
partitionWriter.emitRecord(recordWritten, 0);
- assertEquals(0, dataBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.finish();
partitionWriter.close();
@@ -206,7 +209,9 @@ public class RemoteShuffleResultPartitionSuiteJ {
random.nextBytes(dataWritten);
ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
partitionWriter.broadcastRecord(recordWritten);
- assertEquals(0, dataBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.finish();
partitionWriter.close();
@@ -245,20 +250,26 @@ public class RemoteShuffleResultPartitionSuiteJ {
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 1);
- assertEquals(3, dataBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(2,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(2,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.broadcastRecord(ByteBuffer.allocate(bufferSize));
- assertEquals(2, dataBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(2, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(3,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(3,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.flush(0);
- assertEquals(0, dataBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 2);
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 3);
- assertEquals(3, dataBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(5,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(5,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.flushAll();
- assertEquals(0, dataBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
partitionWriter.finish();
partitionWriter.close();
@@ -307,6 +318,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
record, Buffer.DataType.DATA_BUFFER, subpartition, dataWritten,
numBytesWritten);
}
}
+ assertEquals(numRecords,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(numRecords,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.finish();
assertTrue(outputGate.isFinished());
@@ -357,14 +370,14 @@ public class RemoteShuffleResultPartitionSuiteJ {
private void initResultPartitionWriter(
int numSubpartitions,
- int dataBufferPoolSize,
+ int sortBufferPoolSize,
int nettyBufferPoolSize,
boolean compressionEnabled,
CelebornConf conf,
int numMappers)
throws Exception {
- dataBufferPool = globalBufferPool.createBufferPool(dataBufferPoolSize,
dataBufferPoolSize);
+ sortBufferPool = globalBufferPool.createBufferPool(sortBufferPoolSize,
sortBufferPoolSize);
nettyBufferPool = globalBufferPool.createBufferPool(nettyBufferPoolSize,
nettyBufferPoolSize);
outputGate =
@@ -384,7 +397,7 @@ public class RemoteShuffleResultPartitionSuiteJ {
bufferSize,
new ResultPartitionManager(),
bufferCompressor,
- () -> dataBufferPool,
+ () -> sortBufferPool,
outputGate);
} else {
partitionWriter =
@@ -398,7 +411,7 @@ public class RemoteShuffleResultPartitionSuiteJ {
bufferSize,
new ResultPartitionManager(),
null,
- () -> dataBufferPool,
+ () -> sortBufferPool,
outputGate);
}
}
@@ -439,7 +452,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
bufferSize,
bufferPoolFactory,
celebornConf,
- numMappers);
+ numMappers,
+ new UnregisteredMetricsGroup());
isSetup = false;
isFinished = false;
isClosed = false;
@@ -551,7 +565,7 @@ public class RemoteShuffleResultPartitionSuiteJ {
}
}
- private RemoteShuffleDescriptor getShuffleDescriptor() throws Exception {
+ private RemoteShuffleDescriptor getShuffleDescriptor() {
Random random = new Random();
byte[] bytes = new byte[16];
random.nextBytes(bytes);
diff --git
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index a8fdf4774..56169b6d6 100644
---
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -46,6 +46,7 @@ import
org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.throughput.ThroughputCalculator;
import org.apache.flink.util.CloseableIterator;
@@ -59,7 +60,7 @@ public class RemoteShuffleInputGate extends
AbstractRemoteShuffleInputGate {
public RemoteShuffleInputGate(
CelebornConf celebornConf,
- String taskName,
+ ShuffleIOOwnerContext ownerContext,
int gateIndex,
InputGateDeploymentDescriptor gateDescriptor,
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -67,7 +68,7 @@ public class RemoteShuffleInputGate extends
AbstractRemoteShuffleInputGate {
int numConcurrentReading) {
super(
celebornConf,
- taskName,
+ ownerContext,
gateIndex,
gateDescriptor,
bufferPoolFactory,
diff --git
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index ef8ea9655..f4102ec01 100644
---
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -24,6 +24,7 @@ import
org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.celeborn.common.CelebornConf;
@@ -39,7 +40,7 @@ public class RemoteShuffleInputGateFactory extends
AbstractRemoteShuffleInputGat
// For testing.
@Override
protected RemoteShuffleInputGate createInputGate(
- String owningTaskName,
+ ShuffleIOOwnerContext ownerContext,
int gateIndex,
InputGateDeploymentDescriptor igdd,
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -47,8 +48,8 @@ public class RemoteShuffleInputGateFactory extends
AbstractRemoteShuffleInputGat
BufferDecompressor bufferDecompressor =
new BufferDecompressor(networkBufferSize, compressionCodec);
return new RemoteShuffleInputGate(
- this.celebornConf,
- owningTaskName,
+ celebornConf,
+ ownerContext,
gateIndex,
igdd,
bufferPoolFactory,
diff --git
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
index 547d3fa08..363366b46 100644
---
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
+++
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -27,6 +27,7 @@ import
org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.celeborn.common.CelebornConf;
@@ -47,7 +48,7 @@ public class RemoteShuffleResultPartitionFactory
@Override
ResultPartition createRemoteShuffleResultPartitionInternal(
- String taskNameWithSubtaskAndId,
+ ShuffleIOOwnerContext ownerContext,
int partitionIndex,
ResultPartitionID id,
ResultPartitionType type,
@@ -59,7 +60,7 @@ public class RemoteShuffleResultPartitionFactory
BufferCompressor bufferCompressor,
RemoteShuffleDescriptor rsd) {
return new RemoteShuffleResultPartition(
- taskNameWithSubtaskAndId,
+ ownerContext.getOwnerName(),
partitionIndex,
id,
type,
@@ -75,7 +76,8 @@ public class RemoteShuffleResultPartitionFactory
networkBufferSize,
bufferPoolFactories.get(1),
celebornConf,
- numMappers));
+ numMappers,
+ ownerContext.getParentGroup()));
}
protected BufferCompressor getBufferCompressor() {
diff --git
a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index c02e1ad8b..22c4764f2 100644
---
a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++
b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -44,6 +44,7 @@ import java.util.stream.IntStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -174,6 +175,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
partitionWriter.emitRecord(recordWritten, 0);
assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.finish();
partitionWriter.close();
@@ -207,6 +210,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
partitionWriter.broadcastRecord(recordWritten);
assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.finish();
partitionWriter.close();
@@ -246,9 +251,13 @@ public class RemoteShuffleResultPartitionSuiteJ {
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 1);
assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(2,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(2,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.broadcastRecord(ByteBuffer.allocate(bufferSize));
assertEquals(2, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(3,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(3,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.flush(0);
assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
@@ -256,6 +265,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 2);
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 3);
assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(5,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(5,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.flushAll();
assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
@@ -307,6 +318,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
record, Buffer.DataType.DATA_BUFFER, subpartition, dataWritten,
numBytesWritten);
}
}
+ assertEquals(numRecords,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(numRecords,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.finish();
assertTrue(outputGate.isFinished());
@@ -439,7 +452,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
bufferSize,
bufferPoolFactory,
celebornConf,
- numMappers);
+ numMappers,
+ new UnregisteredMetricsGroup());
isSetup = false;
isFinished = false;
isClosed = false;
diff --git
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index 90041fa0e..0f93e6975 100644
---
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -46,6 +46,7 @@ import
org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.throughput.ThroughputCalculator;
import org.apache.flink.util.CloseableIterator;
@@ -59,7 +60,7 @@ public class RemoteShuffleInputGate extends
AbstractRemoteShuffleInputGate {
public RemoteShuffleInputGate(
CelebornConf celebornConf,
- String taskName,
+ ShuffleIOOwnerContext ownerContext,
int gateIndex,
InputGateDeploymentDescriptor gateDescriptor,
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -67,7 +68,7 @@ public class RemoteShuffleInputGate extends
AbstractRemoteShuffleInputGate {
int numConcurrentReading) {
super(
celebornConf,
- taskName,
+ ownerContext,
gateIndex,
gateDescriptor,
bufferPoolFactory,
diff --git
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index ef8ea9655..f4102ec01 100644
---
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -24,6 +24,7 @@ import
org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.celeborn.common.CelebornConf;
@@ -39,7 +40,7 @@ public class RemoteShuffleInputGateFactory extends
AbstractRemoteShuffleInputGat
// For testing.
@Override
protected RemoteShuffleInputGate createInputGate(
- String owningTaskName,
+ ShuffleIOOwnerContext ownerContext,
int gateIndex,
InputGateDeploymentDescriptor igdd,
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -47,8 +48,8 @@ public class RemoteShuffleInputGateFactory extends
AbstractRemoteShuffleInputGat
BufferDecompressor bufferDecompressor =
new BufferDecompressor(networkBufferSize, compressionCodec);
return new RemoteShuffleInputGate(
- this.celebornConf,
- owningTaskName,
+ celebornConf,
+ ownerContext,
gateIndex,
igdd,
bufferPoolFactory,
diff --git
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
index 547d3fa08..363366b46 100644
---
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
+++
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -27,6 +27,7 @@ import
org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.celeborn.common.CelebornConf;
@@ -47,7 +48,7 @@ public class RemoteShuffleResultPartitionFactory
@Override
ResultPartition createRemoteShuffleResultPartitionInternal(
- String taskNameWithSubtaskAndId,
+ ShuffleIOOwnerContext ownerContext,
int partitionIndex,
ResultPartitionID id,
ResultPartitionType type,
@@ -59,7 +60,7 @@ public class RemoteShuffleResultPartitionFactory
BufferCompressor bufferCompressor,
RemoteShuffleDescriptor rsd) {
return new RemoteShuffleResultPartition(
- taskNameWithSubtaskAndId,
+ ownerContext.getOwnerName(),
partitionIndex,
id,
type,
@@ -75,7 +76,8 @@ public class RemoteShuffleResultPartitionFactory
networkBufferSize,
bufferPoolFactories.get(1),
celebornConf,
- numMappers));
+ numMappers,
+ ownerContext.getParentGroup()));
}
protected BufferCompressor getBufferCompressor() {
diff --git
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index c02e1ad8b..22c4764f2 100644
---
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -44,6 +44,7 @@ import java.util.stream.IntStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -174,6 +175,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
partitionWriter.emitRecord(recordWritten, 0);
assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.finish();
partitionWriter.close();
@@ -207,6 +210,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
partitionWriter.broadcastRecord(recordWritten);
assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.finish();
partitionWriter.close();
@@ -246,9 +251,13 @@ public class RemoteShuffleResultPartitionSuiteJ {
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 1);
assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(2,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(2,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.broadcastRecord(ByteBuffer.allocate(bufferSize));
assertEquals(2, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(3,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(3,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.flush(0);
assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
@@ -256,6 +265,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 2);
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 3);
assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(5,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(5,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.flushAll();
assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
@@ -307,6 +318,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
record, Buffer.DataType.DATA_BUFFER, subpartition, dataWritten,
numBytesWritten);
}
}
+ assertEquals(numRecords,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(numRecords,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.finish();
assertTrue(outputGate.isFinished());
@@ -439,7 +452,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
bufferSize,
bufferPoolFactory,
celebornConf,
- numMappers);
+ numMappers,
+ new UnregisteredMetricsGroup());
isSetup = false;
isFinished = false;
isClosed = false;
diff --git
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index dfe2eeca8..6d513265e 100644
---
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -47,6 +47,7 @@ import
org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.throughput.ThroughputCalculator;
import org.apache.flink.util.CloseableIterator;
@@ -60,7 +61,7 @@ public class RemoteShuffleInputGate extends
AbstractRemoteShuffleInputGate {
public RemoteShuffleInputGate(
CelebornConf celebornConf,
- String taskName,
+ ShuffleIOOwnerContext ownerContext,
int gateIndex,
InputGateDeploymentDescriptor gateDescriptor,
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -68,7 +69,7 @@ public class RemoteShuffleInputGate extends
AbstractRemoteShuffleInputGate {
int numConcurrentReading) {
super(
celebornConf,
- taskName,
+ ownerContext,
gateIndex,
gateDescriptor,
bufferPoolFactory,
diff --git
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index ef8ea9655..f4102ec01 100644
---
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -24,6 +24,7 @@ import
org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.celeborn.common.CelebornConf;
@@ -39,7 +40,7 @@ public class RemoteShuffleInputGateFactory extends
AbstractRemoteShuffleInputGat
// For testing.
@Override
protected RemoteShuffleInputGate createInputGate(
- String owningTaskName,
+ ShuffleIOOwnerContext ownerContext,
int gateIndex,
InputGateDeploymentDescriptor igdd,
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -47,8 +48,8 @@ public class RemoteShuffleInputGateFactory extends
AbstractRemoteShuffleInputGat
BufferDecompressor bufferDecompressor =
new BufferDecompressor(networkBufferSize, compressionCodec);
return new RemoteShuffleInputGate(
- this.celebornConf,
- owningTaskName,
+ celebornConf,
+ ownerContext,
gateIndex,
igdd,
bufferPoolFactory,
diff --git
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
index 547d3fa08..363366b46 100644
---
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
+++
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -27,6 +27,7 @@ import
org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.celeborn.common.CelebornConf;
@@ -47,7 +48,7 @@ public class RemoteShuffleResultPartitionFactory
@Override
ResultPartition createRemoteShuffleResultPartitionInternal(
- String taskNameWithSubtaskAndId,
+ ShuffleIOOwnerContext ownerContext,
int partitionIndex,
ResultPartitionID id,
ResultPartitionType type,
@@ -59,7 +60,7 @@ public class RemoteShuffleResultPartitionFactory
BufferCompressor bufferCompressor,
RemoteShuffleDescriptor rsd) {
return new RemoteShuffleResultPartition(
- taskNameWithSubtaskAndId,
+ ownerContext.getOwnerName(),
partitionIndex,
id,
type,
@@ -75,7 +76,8 @@ public class RemoteShuffleResultPartitionFactory
networkBufferSize,
bufferPoolFactories.get(1),
celebornConf,
- numMappers));
+ numMappers,
+ ownerContext.getParentGroup()));
}
protected BufferCompressor getBufferCompressor() {
diff --git
a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index fece38532..3cc25bcbc 100644
---
a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++
b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -44,6 +44,7 @@ import java.util.stream.IntStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -174,6 +175,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
partitionWriter.emitRecord(recordWritten, 0);
assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.finish();
partitionWriter.close();
@@ -207,6 +210,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
partitionWriter.broadcastRecord(recordWritten);
assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.finish();
partitionWriter.close();
@@ -246,9 +251,13 @@ public class RemoteShuffleResultPartitionSuiteJ {
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 1);
assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(2,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(2,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.broadcastRecord(ByteBuffer.allocate(bufferSize));
assertEquals(2, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(3,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(3,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.flush(0);
assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
@@ -256,6 +265,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 2);
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 3);
assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(5,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(5,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.flushAll();
assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
@@ -307,6 +318,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
record, Buffer.DataType.DATA_BUFFER, subpartition, dataWritten,
numBytesWritten);
}
}
+ assertEquals(numRecords,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(numRecords,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.finish();
assertTrue(outputGate.isFinished());
@@ -441,7 +454,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
bufferSize,
bufferPoolFactory,
celebornConf,
- numMappers);
+ numMappers,
+ new UnregisteredMetricsGroup());
isSetup = false;
isFinished = false;
isClosed = false;
diff --git
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index dfe2eeca8..6d513265e 100644
---
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -47,6 +47,7 @@ import
org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.throughput.ThroughputCalculator;
import org.apache.flink.util.CloseableIterator;
@@ -60,7 +61,7 @@ public class RemoteShuffleInputGate extends
AbstractRemoteShuffleInputGate {
public RemoteShuffleInputGate(
CelebornConf celebornConf,
- String taskName,
+ ShuffleIOOwnerContext ownerContext,
int gateIndex,
InputGateDeploymentDescriptor gateDescriptor,
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -68,7 +69,7 @@ public class RemoteShuffleInputGate extends
AbstractRemoteShuffleInputGate {
int numConcurrentReading) {
super(
celebornConf,
- taskName,
+ ownerContext,
gateIndex,
gateDescriptor,
bufferPoolFactory,
diff --git
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index 6f1fd803e..bb4abbc70 100644
---
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -25,6 +25,7 @@ import
org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.celeborn.common.CelebornConf;
@@ -40,7 +41,7 @@ public class RemoteShuffleInputGateFactory extends
AbstractRemoteShuffleInputGat
// For testing.
@Override
protected RemoteShuffleInputGate createInputGate(
- String owningTaskName,
+ ShuffleIOOwnerContext ownerContext,
int gateIndex,
InputGateDeploymentDescriptor igdd,
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -48,8 +49,8 @@ public class RemoteShuffleInputGateFactory extends
AbstractRemoteShuffleInputGat
BufferDecompressor bufferDecompressor =
new BufferDecompressor(networkBufferSize,
CompressionCodec.valueOf(compressionCodec));
return new RemoteShuffleInputGate(
- this.celebornConf,
- owningTaskName,
+ celebornConf,
+ ownerContext,
gateIndex,
igdd,
bufferPoolFactory,
diff --git
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
index a9742cc86..69b9a71bc 100644
---
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
+++
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -28,6 +28,7 @@ import
org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.celeborn.common.CelebornConf;
@@ -48,7 +49,7 @@ public class RemoteShuffleResultPartitionFactory
@Override
ResultPartition createRemoteShuffleResultPartitionInternal(
- String taskNameWithSubtaskAndId,
+ ShuffleIOOwnerContext ownerContext,
int partitionIndex,
ResultPartitionID id,
ResultPartitionType type,
@@ -60,7 +61,7 @@ public class RemoteShuffleResultPartitionFactory
BufferCompressor bufferCompressor,
RemoteShuffleDescriptor rsd) {
return new RemoteShuffleResultPartition(
- taskNameWithSubtaskAndId,
+ ownerContext.getOwnerName(),
partitionIndex,
id,
type,
@@ -76,7 +77,8 @@ public class RemoteShuffleResultPartitionFactory
networkBufferSize,
bufferPoolFactories.get(1),
celebornConf,
- numMappers));
+ numMappers,
+ ownerContext.getParentGroup()));
}
@Override
diff --git
a/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
b/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index 99f502d87..0ceabcca8 100644
---
a/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++
b/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -45,6 +45,7 @@ import org.apache.flink.api.common.JobID;
import
org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -177,6 +178,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
partitionWriter.emitRecord(recordWritten, 0);
assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.finish();
partitionWriter.close();
@@ -210,6 +213,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
partitionWriter.broadcastRecord(recordWritten);
assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.finish();
partitionWriter.close();
@@ -249,9 +254,13 @@ public class RemoteShuffleResultPartitionSuiteJ {
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 1);
assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(2,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(2,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.broadcastRecord(ByteBuffer.allocate(bufferSize));
assertEquals(2, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(3,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(3,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.flush(0);
assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
@@ -259,6 +268,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 2);
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 3);
assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(5,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(5,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.flushAll();
assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
@@ -310,6 +321,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
record, Buffer.DataType.DATA_BUFFER, subpartition, dataWritten,
numBytesWritten);
}
}
+ assertEquals(numRecords,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(numRecords,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.finish();
assertTrue(outputGate.isFinished());
@@ -444,7 +457,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
bufferSize,
bufferPoolFactory,
celebornConf,
- numMappers);
+ numMappers,
+ new UnregisteredMetricsGroup());
isSetup = false;
isFinished = false;
isClosed = false;
diff --git
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index 85443db36..7434b8cff 100644
---
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -47,6 +47,7 @@ import
org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.throughput.ThroughputCalculator;
import org.apache.flink.util.CloseableIterator;
@@ -60,7 +61,7 @@ public class RemoteShuffleInputGate extends
AbstractRemoteShuffleInputGate {
public RemoteShuffleInputGate(
CelebornConf celebornConf,
- String taskName,
+ ShuffleIOOwnerContext ownerContext,
int gateIndex,
InputGateDeploymentDescriptor gateDescriptor,
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -68,7 +69,7 @@ public class RemoteShuffleInputGate extends
AbstractRemoteShuffleInputGate {
int numConcurrentReading) {
super(
celebornConf,
- taskName,
+ ownerContext,
gateIndex,
gateDescriptor,
bufferPoolFactory,
diff --git
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index 6f1fd803e..bb4abbc70 100644
---
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -25,6 +25,7 @@ import
org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.celeborn.common.CelebornConf;
@@ -40,7 +41,7 @@ public class RemoteShuffleInputGateFactory extends
AbstractRemoteShuffleInputGat
// For testing.
@Override
protected RemoteShuffleInputGate createInputGate(
- String owningTaskName,
+ ShuffleIOOwnerContext ownerContext,
int gateIndex,
InputGateDeploymentDescriptor igdd,
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -48,8 +49,8 @@ public class RemoteShuffleInputGateFactory extends
AbstractRemoteShuffleInputGat
BufferDecompressor bufferDecompressor =
new BufferDecompressor(networkBufferSize,
CompressionCodec.valueOf(compressionCodec));
return new RemoteShuffleInputGate(
- this.celebornConf,
- owningTaskName,
+ celebornConf,
+ ownerContext,
gateIndex,
igdd,
bufferPoolFactory,
diff --git
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
index a9742cc86..69b9a71bc 100644
---
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
+++
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -28,6 +28,7 @@ import
org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.celeborn.common.CelebornConf;
@@ -48,7 +49,7 @@ public class RemoteShuffleResultPartitionFactory
@Override
ResultPartition createRemoteShuffleResultPartitionInternal(
- String taskNameWithSubtaskAndId,
+ ShuffleIOOwnerContext ownerContext,
int partitionIndex,
ResultPartitionID id,
ResultPartitionType type,
@@ -60,7 +61,7 @@ public class RemoteShuffleResultPartitionFactory
BufferCompressor bufferCompressor,
RemoteShuffleDescriptor rsd) {
return new RemoteShuffleResultPartition(
- taskNameWithSubtaskAndId,
+ ownerContext.getOwnerName(),
partitionIndex,
id,
type,
@@ -76,7 +77,8 @@ public class RemoteShuffleResultPartitionFactory
networkBufferSize,
bufferPoolFactories.get(1),
celebornConf,
- numMappers));
+ numMappers,
+ ownerContext.getParentGroup()));
}
@Override
diff --git
a/client-flink/flink-2.0/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
b/client-flink/flink-2.0/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index 99f502d87..0ceabcca8 100644
---
a/client-flink/flink-2.0/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++
b/client-flink/flink-2.0/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -45,6 +45,7 @@ import org.apache.flink.api.common.JobID;
import
org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -177,6 +178,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
partitionWriter.emitRecord(recordWritten, 0);
assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.finish();
partitionWriter.close();
@@ -210,6 +213,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
partitionWriter.broadcastRecord(recordWritten);
assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(1,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.finish();
partitionWriter.close();
@@ -249,9 +254,13 @@ public class RemoteShuffleResultPartitionSuiteJ {
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 1);
assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(2,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(2,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.broadcastRecord(ByteBuffer.allocate(bufferSize));
assertEquals(2, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(3,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(3,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.flush(0);
assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
@@ -259,6 +268,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 2);
partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 3);
assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+ assertEquals(5,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(5,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.flushAll();
assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
@@ -310,6 +321,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
record, Buffer.DataType.DATA_BUFFER, subpartition, dataWritten,
numBytesWritten);
}
}
+ assertEquals(numRecords,
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+ assertEquals(numRecords,
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
partitionWriter.finish();
assertTrue(outputGate.isFinished());
@@ -444,7 +457,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
bufferSize,
bufferPoolFactory,
celebornConf,
- numMappers);
+ numMappers,
+ new UnregisteredMetricsGroup());
isSetup = false;
isFinished = false;
isClosed = false;