This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push: new 68a1db1e3 [CELEBORN-2005][FOLLOWUP] Introduce ShuffleMetricGroup for numBytesIn, numBytesOut, numRecordsOut, numBytesInPerSecond, numBytesOutPerSecond, numRecordsOutPerSecond metrics 68a1db1e3 is described below commit 68a1db1e3b037b8890d551d034dd73af3525f84b Author: SteNicholas <programg...@163.com> AuthorDate: Fri May 30 14:54:28 2025 +0800 [CELEBORN-2005][FOLLOWUP] Introduce ShuffleMetricGroup for numBytesIn, numBytesOut, numRecordsOut, numBytesInPerSecond, numBytesOutPerSecond, numRecordsOutPerSecond metrics ### What changes were proposed in this pull request? Introduce `ShuffleMetricGroup` for `numBytesIn`, `numBytesOut`, `numRecordsOut`, `numBytesInPerSecond`, `numBytesOutPerSecond`, `numRecordsOutPerSecond` metrics. Follow up #3272. ### Why are the changes needed? `numBytesIn`, `numBytesOut`, `numRecordsOut`, `numBytesInPerSecond`, `numBytesOutPerSecond`, `numRecordsOutPerSecond` metrics should put shuffle id into variables, which could introduce `ShuffleMetricGroup` to support. Meanwhile, #3272 would print many same logs as follows that shoud be improved: ``` 2025-05-28 10:48:54,433 WARN [flink-akka.actor.default-dispatcher-18] org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'numRecordsOut'. Metric will not be reported.[11.66.62.202, taskmanager, antc4flink3980005426-taskmanager-3-70, antc4flink3980005426, [vertex-2]HashJoin(joinType=[LeftOuterJoin], where=[(f0 = f00)], select=[f0, f1, f2, f3, f4, f5, f6, f00, f10, f20, f30, f40, f50, f60], build=[right]) -> S [...] ``` ### Does this PR introduce _any_ user-facing change? Introduce `celeborn.client.flink.metrics.scope.shuffle` config option to define the scope format string that is applied to all metrics scoped to a shuffle: - Variables: - Shuffle: `<task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index>, <shuffle_id>`. - Metrics: Scope | Metrics | Description | Type -- | -- | -- | -- Shuffle | numBytesIn | The total number of bytes this shuffle has read. | Counter | Shuffle | numBytesOut| The total number of bytes this shuffle has written. | Counter | Shuffle | numRecordsOut | The total number of records this shuffle has written. | Counter | Shuffle | numBytesInPerSecond | The number of bytes this shuffle reads per second. | Meter | Shuffle | numBytesOutPerSecond | The number of bytes this shuffle writes per second. | Meter | Shuffle | numRecordsOutPerSecond | The number of records this shuffle writes per second. | Meter | ### How was this patch tested? Manual test.   Closes #3296 from SteNicholas/CELEBORN-2005. Authored-by: SteNicholas <programg...@163.com> Signed-off-by: Weijie Guo <res...@163.com> --- .../flink/AbstractRemoteShuffleInputGate.java | 8 +- .../AbstractRemoteShuffleInputGateFactory.java | 13 ++- ...bstractRemoteShuffleResultPartitionFactory.java | 29 +++--- .../plugin/flink/RemoteShuffleEnvironment.java | 35 ++++++- .../flink/RemoteShuffleInputGateDelegation.java | 20 ++-- .../plugin/flink/RemoteShuffleOutputGate.java | 11 +- .../metrics/dump/ShuffleQueryScopeInfo.java | 64 ++++++++++++ .../metrics/groups}/ShuffleIOMetricGroup.java | 21 ++-- .../runtime/metrics/groups/ShuffleMetricGroup.java | 114 +++++++++++++++++++++ .../runtime/metrics/scope/ShuffleScopeFormat.java | 78 ++++++++++++++ .../flink/RemoteShuffleOutputGateSuiteJ.java | 10 +- .../plugin/flink/RemoteShuffleInputGate.java | 8 +- .../flink/RemoteShuffleInputGateFactory.java | 8 +- .../flink/RemoteShuffleResultPartitionFactory.java | 11 +- .../flink/RemoteShuffleResultPartitionSuiteJ.java | 10 +- .../plugin/flink/RemoteShuffleInputGate.java | 8 +- .../flink/RemoteShuffleInputGateFactory.java | 8 +- .../flink/RemoteShuffleResultPartitionFactory.java | 11 +- .../flink/RemoteShuffleResultPartitionSuiteJ.java | 10 +- .../plugin/flink/RemoteShuffleInputGate.java | 8 +- .../flink/RemoteShuffleInputGateFactory.java | 8 +- .../flink/RemoteShuffleResultPartitionFactory.java | 11 +- .../flink/RemoteShuffleResultPartitionSuiteJ.java | 10 +- .../plugin/flink/RemoteShuffleInputGate.java | 8 +- .../flink/RemoteShuffleInputGateFactory.java | 8 +- .../flink/RemoteShuffleResultPartitionFactory.java | 11 +- .../flink/RemoteShuffleResultPartitionSuiteJ.java | 10 +- .../plugin/flink/RemoteShuffleInputGate.java | 8 +- .../flink/RemoteShuffleInputGateFactory.java | 8 +- .../flink/RemoteShuffleResultPartitionFactory.java | 11 +- .../flink/RemoteShuffleResultPartitionSuiteJ.java | 10 +- .../plugin/flink/RemoteShuffleInputGate.java | 8 +- .../flink/RemoteShuffleInputGateFactory.java | 8 +- .../flink/RemoteShuffleResultPartitionFactory.java | 11 +- .../flink/RemoteShuffleResultPartitionSuiteJ.java | 10 +- .../org/apache/celeborn/common/CelebornConf.scala | 11 ++ docs/configuration/client.md | 1 + 37 files changed, 529 insertions(+), 108 deletions(-) diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGate.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGate.java index d04497536..9f3b8b9a9 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGate.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGate.java @@ -20,6 +20,7 @@ package org.apache.celeborn.plugin.flink; import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -33,6 +34,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate; import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.function.SupplierWithException; @@ -51,7 +53,8 @@ public abstract class AbstractRemoteShuffleInputGate extends IndexedInputGate { InputGateDeploymentDescriptor gateDescriptor, SupplierWithException<BufferPool, IOException> bufferPoolFactory, BufferDecompressor bufferDecompressor, - int numConcurrentReading) { + int numConcurrentReading, + Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) { Tuple2<Integer, Integer> indexRange = getConsumedSubpartitionIndexRange(gateDescriptor); inputGateDelegation = new RemoteShuffleInputGateDelegation( @@ -64,7 +67,8 @@ public abstract class AbstractRemoteShuffleInputGate extends IndexedInputGate { numConcurrentReading, availabilityHelper, indexRange.f0, - indexRange.f1); + indexRange.f1, + shuffleIOMetricGroups); } /** Setup gate and build network connections. */ diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java index 0c9e2e8bb..22e39ffdf 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java @@ -19,12 +19,14 @@ package org.apache.celeborn.plugin.flink; import java.io.IOException; +import java.util.Map; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.function.SupplierWithException; import org.slf4j.Logger; @@ -81,7 +83,10 @@ public abstract class AbstractRemoteShuffleInputGateFactory { /** Create RemoteShuffleInputGate from {@link InputGateDeploymentDescriptor}. */ public IndexedInputGate create( - ShuffleIOOwnerContext ownerContext, int gateIndex, InputGateDeploymentDescriptor igdd) { + ShuffleIOOwnerContext ownerContext, + int gateIndex, + InputGateDeploymentDescriptor igdd, + Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) { LOG.info( "Create input gate -- number of buffers per input gate={}, " + "number of concurrent readings={}.", @@ -96,7 +101,8 @@ public abstract class AbstractRemoteShuffleInputGateFactory { gateIndex, igdd, bufferPoolFactory, - celebornConf.shuffleCompressionCodec().name()); + celebornConf.shuffleCompressionCodec().name(), + shuffleIOMetricGroups); } protected abstract IndexedInputGate createInputGate( @@ -104,7 +110,8 @@ public abstract class AbstractRemoteShuffleInputGateFactory { int gateIndex, InputGateDeploymentDescriptor igdd, SupplierWithException<BufferPool, IOException> bufferPoolFactory, - String compressionCodec); + String compressionCodec, + Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroupMap); private SupplierWithException<BufferPool, IOException> createBufferPoolFactory( BufferPoolFactory bufferPoolFactory, int numBuffers, boolean supportFloatingBuffers) { diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java index 929939e47..55a7d09e1 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.util.function.SupplierWithException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,10 +89,11 @@ public abstract class AbstractRemoteShuffleResultPartitionFactory { } public ResultPartition create( - ShuffleIOOwnerContext ownerContext, + String taskNameWithSubtaskAndId, int partitionIndex, ResultPartitionDeploymentDescriptor desc, - CelebornConf celebornConf) { + CelebornConf celebornConf, + ShuffleIOMetricGroup shuffleIOMetricGroup) { LOG.info( "Create result partition -- number of buffers per result partition={}, " + "number of subpartitions={}.", @@ -100,7 +101,7 @@ public abstract class AbstractRemoteShuffleResultPartitionFactory { desc.getNumberOfSubpartitions()); return create( - ownerContext, + taskNameWithSubtaskAndId, partitionIndex, desc.getShuffleDescriptor().getResultPartitionID(), desc.getPartitionType(), @@ -109,11 +110,12 @@ public abstract class AbstractRemoteShuffleResultPartitionFactory { createBufferPoolFactory(), (RemoteShuffleDescriptor) desc.getShuffleDescriptor(), celebornConf, - desc.getTotalNumberOfPartitions()); + desc.getTotalNumberOfPartitions(), + shuffleIOMetricGroup); } public ResultPartition create( - ShuffleIOOwnerContext ownerContext, + String taskNameWithSubtaskAndId, int partitionIndex, ResultPartitionID id, ResultPartitionType type, @@ -122,10 +124,11 @@ public abstract class AbstractRemoteShuffleResultPartitionFactory { List<SupplierWithException<BufferPool, IOException>> bufferPoolFactories, RemoteShuffleDescriptor shuffleDescriptor, CelebornConf celebornConf, - int numMappers) { + int numMappers, + ShuffleIOMetricGroup shuffleIOMetricGroup) { ResultPartition partition = createRemoteShuffleResultPartitionInternal( - ownerContext, + taskNameWithSubtaskAndId, partitionIndex, id, type, @@ -135,13 +138,14 @@ public abstract class AbstractRemoteShuffleResultPartitionFactory { celebornConf, numMappers, getBufferCompressor(), - shuffleDescriptor); - LOG.debug("{}: Initialized {}", ownerContext.getOwnerName(), this); + shuffleDescriptor, + shuffleIOMetricGroup); + LOG.debug("{}: Initialized {}", taskNameWithSubtaskAndId, this); return partition; } abstract ResultPartition createRemoteShuffleResultPartitionInternal( - ShuffleIOOwnerContext ownerContext, + String taskNameWithSubtaskAndId, int partitionIndex, ResultPartitionID id, ResultPartitionType type, @@ -151,7 +155,8 @@ public abstract class AbstractRemoteShuffleResultPartitionFactory { CelebornConf celebornConf, int numMappers, BufferCompressor bufferCompressor, - RemoteShuffleDescriptor rsd); + RemoteShuffleDescriptor rsd, + ShuffleIOMetricGroup shuffleIOMetricGroup); /** * Used to create 2 buffer pools -- sorting buffer pool (7/8), transportation buffer pool (1/8). diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java index dc197ae3a..e64b1202e 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java @@ -22,11 +22,13 @@ import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull; import static org.apache.celeborn.plugin.flink.utils.Utils.checkState; import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_INPUT; import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_OUTPUT; +import static org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup.createShuffleIOMetricGroup; import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -45,6 +47,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.FlinkRuntimeException; @@ -52,6 +55,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.common.util.JavaUtils; import org.apache.celeborn.plugin.flink.netty.NettyShuffleEnvironmentWrapper; /** @@ -89,6 +93,9 @@ public class RemoteShuffleEnvironment private final ConcurrentHashMap.KeySetView<IntermediateResultPartitionID, Boolean> nettyResultPartitionIds = ConcurrentHashMap.newKeySet(); + private final Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups = + JavaUtils.newConcurrentHashMap(); + /** * @param networkBufferPool Network buffer pool for shuffle read and shuffle write. * @param resultPartitionManager A trivial {@link ResultPartitionManager}. @@ -113,6 +120,9 @@ public class RemoteShuffleEnvironment public void close() { LOG.info("Close RemoteShuffleEnvironment."); synchronized (lock) { + nettyResultIds.clear(); + nettyResultPartitionIds.clear(); + shuffleIOMetricGroups.clear(); try { networkBufferPool.destroyAllBufferPools(); } catch (Throwable t) { @@ -190,8 +200,19 @@ public class RemoteShuffleEnvironment CelebornConf conf) { if (resultPartitionDeploymentDescriptor.getShuffleDescriptor() instanceof RemoteShuffleDescriptor) { + int shuffleId = + ((RemoteShuffleDescriptor) resultPartitionDeploymentDescriptor.getShuffleDescriptor()) + .getShuffleResource() + .getMapPartitionShuffleDescriptor() + .getShuffleId(); + ; return resultPartitionFactory.create( - ownerContext, index, resultPartitionDeploymentDescriptor, conf); + ownerContext.getOwnerName(), + index, + resultPartitionDeploymentDescriptor, + conf, + shuffleIOMetricGroups.computeIfAbsent( + shuffleId, k -> createShuffleIOMetricGroup(ownerContext, shuffleId, conf))); } else { nettyResultIds.add(resultPartitionDeploymentDescriptor.getResultId()); nettyResultPartitionIds.add(resultPartitionDeploymentDescriptor.getPartitionId()); @@ -229,7 +250,12 @@ public class RemoteShuffleEnvironment InputGateDeploymentDescriptor igdd = inputGateDescriptors.get(gateIndex); IndexedInputGate inputGate = createInputGateInternal( - ownerContext, producerStateProvider, gateIndex, igdd, inputChannelMetrics); + ownerContext, + producerStateProvider, + gateIndex, + igdd, + inputChannelMetrics, + shuffleIOMetricGroups); inputGates[gateIndex] = inputGate; } return Arrays.asList(inputGates); @@ -241,12 +267,13 @@ public class RemoteShuffleEnvironment PartitionProducerStateProvider producerStateProvider, int gateIndex, InputGateDeploymentDescriptor igdd, - InputChannelMetrics inputChannelMetrics) { + InputChannelMetrics inputChannelMetrics, + Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) { return nettyResultIds.contains(igdd.getConsumedResultId()) ? shuffleEnvironmentWrapper .nettyInputGateFactory() .create(ownerContext, gateIndex, igdd, producerStateProvider, inputChannelMetrics) - : inputGateFactory.create(ownerContext, gateIndex, igdd); + : inputGateFactory.create(ownerContext, gateIndex, igdd, shuffleIOMetricGroups); } @VisibleForTesting diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java index fe871d68c..99f144c1c 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java @@ -19,6 +19,7 @@ package org.apache.celeborn.plugin.flink; import static org.apache.celeborn.plugin.flink.utils.Utils.checkState; +import static org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup.createShuffleIOMetricGroup; import java.io.IOException; import java.util.*; @@ -28,7 +29,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.commons.lang3.tuple.Pair; -import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.event.AbstractEvent; @@ -43,6 +43,7 @@ import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; @@ -58,7 +59,6 @@ import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.plugin.flink.buffer.BufferPacker; import org.apache.celeborn.plugin.flink.buffer.TransferBufferPool; import org.apache.celeborn.plugin.flink.client.FlinkShuffleClientImpl; -import org.apache.celeborn.plugin.flink.metric.ShuffleIOMetricGroup; import org.apache.celeborn.plugin.flink.utils.BufferUtils; public class RemoteShuffleInputGateDelegation { @@ -133,7 +133,7 @@ public class RemoteShuffleInputGateDelegation { private int endSubIndex; private boolean partitionConnectionExceptionEnabled; - private final MetricGroup taskIOMetricGroup; + private final ShuffleIOMetricGroup shuffleIOMetricGroup; public RemoteShuffleInputGateDelegation( CelebornConf celebornConf, @@ -145,9 +145,9 @@ public class RemoteShuffleInputGateDelegation { int numConcurrentReading, AvailabilityProvider.AvailabilityHelper availabilityHelper, int startSubIndex, - int endSubIndex) { + int endSubIndex, + Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) { this.taskName = ownerContext.getOwnerName(); - this.taskIOMetricGroup = ownerContext.getParentGroup(); this.gateIndex = gateIndex; this.gateDescriptor = gateDescriptor; this.bufferPoolFactory = bufferPoolFactory; @@ -161,6 +161,14 @@ public class RemoteShuffleInputGateDelegation { RemoteShuffleDescriptor remoteShuffleDescriptor = (RemoteShuffleDescriptor) gateDescriptor.getShuffleDescriptors()[0]; RemoteShuffleResource shuffleResource = remoteShuffleDescriptor.getShuffleResource(); + int shuffleId = + remoteShuffleDescriptor + .getShuffleResource() + .getMapPartitionShuffleDescriptor() + .getShuffleId(); + this.shuffleIOMetricGroup = + shuffleIOMetricGroups.computeIfAbsent( + shuffleId, k -> createShuffleIOMetricGroup(ownerContext, shuffleId, celebornConf)); try { String appUniqueId = @@ -204,8 +212,6 @@ public class RemoteShuffleInputGateDelegation { RemoteShuffleDescriptor remoteDescriptor = (RemoteShuffleDescriptor) descriptor.getRight(); ShuffleResourceDescriptor shuffleDescriptor = remoteDescriptor.getShuffleResource().getMapPartitionShuffleDescriptor(); - ShuffleIOMetricGroup shuffleIOMetricGroup = - new ShuffleIOMetricGroup(taskIOMetricGroup, shuffleDescriptor.getShuffleId()); LOG.debug("create shuffle reader for descriptor {}", shuffleDescriptor); diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java index fe25492fd..2630617a1 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java @@ -21,10 +21,10 @@ import java.io.IOException; import java.util.Optional; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; import org.apache.flink.util.function.SupplierWithException; import org.slf4j.Logger; @@ -37,7 +37,6 @@ import org.apache.celeborn.common.protocol.PartitionLocation; import org.apache.celeborn.plugin.flink.buffer.BufferHeader; import org.apache.celeborn.plugin.flink.buffer.BufferPacker; import org.apache.celeborn.plugin.flink.client.FlinkShuffleClientImpl; -import org.apache.celeborn.plugin.flink.metric.ShuffleIOMetricGroup; import org.apache.celeborn.plugin.flink.utils.BufferUtils; import org.apache.celeborn.plugin.flink.utils.Utils; @@ -99,7 +98,7 @@ public class RemoteShuffleOutputGate { SupplierWithException<BufferPool, IOException> bufferPoolFactory, CelebornConf celebornConf, int numMappers, - MetricGroup taskIOMetricGroup) { + ShuffleIOMetricGroup shuffleIOMetricGroup) { this( shuffleDesc, numSubs, @@ -107,7 +106,7 @@ public class RemoteShuffleOutputGate { bufferPoolFactory, celebornConf, numMappers, - taskIOMetricGroup, + shuffleIOMetricGroup, null); } @@ -118,7 +117,7 @@ public class RemoteShuffleOutputGate { SupplierWithException<BufferPool, IOException> bufferPoolFactory, CelebornConf celebornConf, int numMappers, - MetricGroup taskIOMetricGroup, + ShuffleIOMetricGroup shuffleIOMetricGroup, FlinkShuffleClientImpl flinkShuffleClient) { this.shuffleDesc = shuffleDesc; this.numSubs = numSubs; @@ -141,7 +140,7 @@ public class RemoteShuffleOutputGate { shuffleDesc.getShuffleResource().getLifecycleManagerTimestamp(); this.flinkShuffleClient = flinkShuffleClient == null ? getShuffleClient() : flinkShuffleClient; this.maxReviveTimes = celebornConf.clientPushMaxReviveTimes(); - this.shuffleIOMetricGroup = new ShuffleIOMetricGroup(taskIOMetricGroup, shuffleId); + this.shuffleIOMetricGroup = shuffleIOMetricGroup; } /** Initialize transportation gate. */ diff --git a/client-flink/common/src/main/java/org/apache/flink/runtime/metrics/dump/ShuffleQueryScopeInfo.java b/client-flink/common/src/main/java/org/apache/flink/runtime/metrics/dump/ShuffleQueryScopeInfo.java new file mode 100644 index 000000000..3c5e39d57 --- /dev/null +++ b/client-flink/common/src/main/java/org/apache/flink/runtime/metrics/dump/ShuffleQueryScopeInfo.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.dump; + +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo.TaskQueryScopeInfo; + +/** + * Container for the shuffle scope. Stores the id of the job/vertex, the subtask index and the id of + * the shuffle. + */ +public class ShuffleQueryScopeInfo extends TaskQueryScopeInfo { + + public final int shuffleId; + + public static final byte INFO_CATEGORY_SHUFFLE = 6; + + public ShuffleQueryScopeInfo( + String jobID, String vertexID, int subtaskIndex, int attemptNumber, int shuffleId) { + this(jobID, vertexID, subtaskIndex, attemptNumber, shuffleId, ""); + } + + public ShuffleQueryScopeInfo( + String jobID, + String vertexID, + int subtaskIndex, + int attemptNumber, + int shuffleId, + String scope) { + super(jobID, vertexID, subtaskIndex, attemptNumber, scope); + this.shuffleId = shuffleId; + } + + @Override + public ShuffleQueryScopeInfo copy(String additionalScope) { + return new ShuffleQueryScopeInfo( + this.jobID, + this.vertexID, + this.subtaskIndex, + this.attemptNumber, + this.shuffleId, + concatScopes(additionalScope)); + } + + @Override + public byte getCategory() { + return INFO_CATEGORY_SHUFFLE; + } +} diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/ShuffleIOMetricGroup.java b/client-flink/common/src/main/java/org/apache/flink/runtime/metrics/groups/ShuffleIOMetricGroup.java similarity index 82% rename from client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/ShuffleIOMetricGroup.java rename to client-flink/common/src/main/java/org/apache/flink/runtime/metrics/groups/ShuffleIOMetricGroup.java index 16a9f4cf5..6690f8813 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/ShuffleIOMetricGroup.java +++ b/client-flink/common/src/main/java/org/apache/flink/runtime/metrics/groups/ShuffleIOMetricGroup.java @@ -16,9 +16,7 @@ * limitations under the License. */ -package org.apache.celeborn.plugin.flink.metric; - -import static org.apache.celeborn.plugin.flink.metric.RemoteShuffleMetricFactory.createShuffleIOOwnerMetricGroup; +package org.apache.flink.runtime.metrics.groups; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Meter; @@ -26,7 +24,9 @@ import org.apache.flink.metrics.MeterView; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.metrics.MetricNames; -import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; + +import org.apache.celeborn.common.CelebornConf; /** * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is @@ -42,8 +42,8 @@ public class ShuffleIOMetricGroup extends ProxyMetricGroup<MetricGroup> { private final Meter numBytesOutRate; private final Meter numRecordsOutRate; - public ShuffleIOMetricGroup(MetricGroup taskIOMetricGroup, int shuffleId) { - super(createShuffleIOOwnerMetricGroup(taskIOMetricGroup).addGroup(shuffleId)); + public ShuffleIOMetricGroup(ShuffleMetricGroup parent) { + super(parent); this.numBytesIn = counter(MetricNames.IO_NUM_BYTES_IN, new SimpleCounter()); this.numBytesOut = counter(MetricNames.IO_NUM_BYTES_OUT, new SimpleCounter()); this.numRecordsOut = counter(MetricNames.IO_NUM_RECORDS_OUT, new SimpleCounter()); @@ -80,4 +80,13 @@ public class ShuffleIOMetricGroup extends ProxyMetricGroup<MetricGroup> { public Meter getNumRecordsOutRate() { return numRecordsOutRate; } + + public static ShuffleIOMetricGroup createShuffleIOMetricGroup( + ShuffleIOOwnerContext ownerContext, int shuffleId, CelebornConf celebornConf) { + return new ShuffleMetricGroup( + ownerContext.getParentGroup(), + shuffleId, + celebornConf.clientFlinkMetricsScopeNamingShuffle()) + .getIOMetricGroup(); + } } diff --git a/client-flink/common/src/main/java/org/apache/flink/runtime/metrics/groups/ShuffleMetricGroup.java b/client-flink/common/src/main/java/org/apache/flink/runtime/metrics/groups/ShuffleMetricGroup.java new file mode 100644 index 000000000..d2aa30151 --- /dev/null +++ b/client-flink/common/src/main/java/org/apache/flink/runtime/metrics/groups/ShuffleMetricGroup.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.groups; + +import static org.apache.flink.runtime.metrics.scope.ShuffleScopeFormat.SCOPE_SHUFFLE_ID; +import static org.apache.flink.runtime.metrics.scope.ShuffleScopeFormat.fromConfig; +import static org.apache.flink.util.Preconditions.checkNotNull; + +import java.util.Collections; +import java.util.Map; + +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.metrics.dump.ShuffleQueryScopeInfo; + +/** Special {@link MetricGroup} representing a Flink runtime Shuffle. */ +public class ShuffleMetricGroup extends ComponentMetricGroup<TaskMetricGroup> { + + /** The shuffle id uniquely identifying the executed shuffle represented by this metrics group. */ + private final int shuffleId; + + private final ShuffleIOMetricGroup ioMetrics; + + // ------------------------------------------------------------------------ + + public ShuffleMetricGroup(MetricGroup taskIOMetricGroup, int shuffleId, String scopeFormat) { + this(((TaskIOMetricGroup) taskIOMetricGroup).parentMetricGroup, shuffleId, scopeFormat); + } + + public ShuffleMetricGroup(TaskMetricGroup parent, int shuffleId, String scopeFormat) { + super( + parent.registry, + fromConfig(scopeFormat, parent.registry.getScopeFormats().getTaskFormat()) + .formatScope(checkNotNull(parent), shuffleId), + parent); + this.shuffleId = shuffleId; + this.ioMetrics = new ShuffleIOMetricGroup(this); + } + + // ------------------------------------------------------------------------ + // properties + // ------------------------------------------------------------------------ + + public final TaskMetricGroup parent() { + return parent; + } + + public int shuffleId() { + return shuffleId; + } + + /** + * Returns the {@link ShuffleIOMetricGroup} for this shuffle. + * + * @return {@link ShuffleIOMetricGroup} for this shuffle. + */ + public ShuffleIOMetricGroup getIOMetricGroup() { + return ioMetrics; + } + + @Override + protected ShuffleQueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) { + return new ShuffleQueryScopeInfo( + this.parent.parent.jobId.toString(), + this.parent.vertexId.toString(), + this.parent.subtaskIndex, + this.parent.attemptNumber(), + this.shuffleId); + } + + // ------------------------------------------------------------------------ + // cleanup + // ------------------------------------------------------------------------ + + @Override + public void close() { + super.close(); + } + + // ------------------------------------------------------------------------ + // Component Metric Group Specifics + // ------------------------------------------------------------------------ + + @Override + protected void putVariables(Map<String, String> variables) { + variables.put(SCOPE_SHUFFLE_ID, String.valueOf(shuffleId)); + } + + @Override + protected Iterable<? extends ComponentMetricGroup<?>> subComponents() { + return Collections.emptyList(); + } + + @Override + protected String getGroupName(CharacterFilter filter) { + return "shuffle"; + } +} diff --git a/client-flink/common/src/main/java/org/apache/flink/runtime/metrics/scope/ShuffleScopeFormat.java b/client-flink/common/src/main/java/org/apache/flink/runtime/metrics/scope/ShuffleScopeFormat.java new file mode 100644 index 000000000..a81a3a2e7 --- /dev/null +++ b/client-flink/common/src/main/java/org/apache/flink/runtime/metrics/scope/ShuffleScopeFormat.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.scope; + +import org.apache.flink.runtime.metrics.groups.ShuffleMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; + +/** The scope format for the {@link ShuffleMetricGroup}. */ +public class ShuffleScopeFormat extends ScopeFormat { + + public static final String SCOPE_SHUFFLE_ID = asVariable("shuffle_id"); + + public ShuffleScopeFormat(String format, TaskScopeFormat parentFormat) { + super( + format, + parentFormat, + new String[] { + SCOPE_HOST, + SCOPE_TASKMANAGER_ID, + SCOPE_JOB_ID, + SCOPE_JOB_NAME, + SCOPE_TASK_VERTEX_ID, + SCOPE_TASK_ATTEMPT_ID, + SCOPE_TASK_NAME, + SCOPE_TASK_SUBTASK_INDEX, + SCOPE_TASK_ATTEMPT_NUM, + SCOPE_SHUFFLE_ID + }); + } + + public String[] formatScope(TaskMetricGroup parent, int shuffleId) { + final String[] template = copyTemplate(); + final String[] values = { + parent.parent().parent().hostname(), + parent.parent().parent().taskManagerId(), + valueOrNull(parent.parent().jobId()), + valueOrNull(parent.parent().jobName()), + valueOrNull(parent.vertexId()), + valueOrNull(parent.executionId()), + valueOrNull(parent.taskName()), + String.valueOf(parent.subtaskIndex()), + String.valueOf(parent.attemptNumber()), + valueOrNull(shuffleId) + }; + return bindVariables(template, values); + } + + // ------------------------------------------------------------------------ + // Parsing from Config + // ------------------------------------------------------------------------ + + /** + * Creates the scope format as defined in the given configuration. + * + * @param format The scope naming of the format. + * @param parent The parent {@link TaskScopeFormat} of the format. + * @return The {@link ShuffleScopeFormat} parsed from the configuration. + */ + public static ShuffleScopeFormat fromConfig(String format, TaskScopeFormat parent) { + return new ShuffleScopeFormat(format, parent); + } +} diff --git a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java index cc3cf9761..3210125e1 100644 --- a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java +++ b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java @@ -25,11 +25,13 @@ import java.util.Optional; import java.util.Random; import org.apache.flink.api.common.JobID; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; +import org.apache.flink.runtime.metrics.groups.ShuffleMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; import org.junit.Assert; import org.junit.Before; @@ -54,7 +56,11 @@ public class RemoteShuffleOutputGateSuiteJ { () -> bufferPool, new CelebornConf(), 10, - new UnregisteredMetricsGroup(), + new ShuffleIOMetricGroup( + new ShuffleMetricGroup( + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), + 1, + CelebornConf.CLIENT_METRICS_SCOPE_NAMING_SHUFFLE().defaultValueString())), mock(FlinkShuffleClientImpl.class)); NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, BUFFER_SIZE); bufferPool = networkBufferPool.createBufferPool(10, 10); diff --git a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java index 32b22fa1e..6e663fd68 100644 --- a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java +++ b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetAddress; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.flink.api.java.tuple.Tuple2; @@ -46,6 +47,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.throughput.ThroughputCalculator; @@ -65,7 +67,8 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { InputGateDeploymentDescriptor gateDescriptor, SupplierWithException<BufferPool, IOException> bufferPoolFactory, BufferDecompressor bufferDecompressor, - int numConcurrentReading) { + int numConcurrentReading, + Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) { super( celebornConf, ownerContext, @@ -73,7 +76,8 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { gateDescriptor, bufferPoolFactory, bufferDecompressor, - numConcurrentReading); + numConcurrentReading, + shuffleIOMetricGroups); } @Override diff --git a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java index faf733bef..b19c77238 100644 --- a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java +++ b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java @@ -19,11 +19,13 @@ package org.apache.celeborn.plugin.flink; import java.io.IOException; +import java.util.Map; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.function.SupplierWithException; @@ -43,7 +45,8 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat int gateIndex, InputGateDeploymentDescriptor igdd, SupplierWithException<BufferPool, IOException> bufferPoolFactory, - String compressionCodec) { + String compressionCodec, + Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) { BufferDecompressor bufferDecompressor = new BufferDecompressor(networkBufferSize, compressionCodec); return new RemoteShuffleInputGate( @@ -53,6 +56,7 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat igdd, bufferPoolFactory, bufferDecompressor, - numConcurrentReading); + numConcurrentReading, + shuffleIOMetricGroups); } } diff --git a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java index 363366b46..d49827ed9 100644 --- a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java +++ b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java @@ -27,7 +27,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.util.function.SupplierWithException; import org.apache.celeborn.common.CelebornConf; @@ -48,7 +48,7 @@ public class RemoteShuffleResultPartitionFactory @Override ResultPartition createRemoteShuffleResultPartitionInternal( - ShuffleIOOwnerContext ownerContext, + String taskNameWithSubtaskAndId, int partitionIndex, ResultPartitionID id, ResultPartitionType type, @@ -58,9 +58,10 @@ public class RemoteShuffleResultPartitionFactory CelebornConf celebornConf, int numMappers, BufferCompressor bufferCompressor, - RemoteShuffleDescriptor rsd) { + RemoteShuffleDescriptor rsd, + ShuffleIOMetricGroup shuffleIOMetricGroup) { return new RemoteShuffleResultPartition( - ownerContext.getOwnerName(), + taskNameWithSubtaskAndId, partitionIndex, id, type, @@ -77,7 +78,7 @@ public class RemoteShuffleResultPartitionFactory bufferPoolFactories.get(1), celebornConf, numMappers, - ownerContext.getParentGroup())); + shuffleIOMetricGroup)); } protected BufferCompressor getBufferCompressor() { diff --git a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java index 22c4764f2..7e1946e76 100644 --- a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java +++ b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java @@ -44,7 +44,6 @@ import java.util.stream.IntStream; import org.apache.flink.api.common.JobID; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -56,6 +55,9 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; +import org.apache.flink.runtime.metrics.groups.ShuffleMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.util.function.SupplierWithException; import org.junit.After; import org.junit.Before; @@ -453,7 +455,11 @@ public class RemoteShuffleResultPartitionSuiteJ { bufferPoolFactory, celebornConf, numMappers, - new UnregisteredMetricsGroup()); + new ShuffleIOMetricGroup( + new ShuffleMetricGroup( + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), + 1, + CelebornConf.CLIENT_METRICS_SCOPE_NAMING_SHUFFLE().defaultValueString()))); isSetup = false; isFinished = false; isClosed = false; diff --git a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java index 56169b6d6..d340a74e6 100644 --- a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java +++ b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetAddress; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.flink.api.java.tuple.Tuple2; @@ -46,6 +47,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.throughput.ThroughputCalculator; @@ -65,7 +67,8 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { InputGateDeploymentDescriptor gateDescriptor, SupplierWithException<BufferPool, IOException> bufferPoolFactory, BufferDecompressor bufferDecompressor, - int numConcurrentReading) { + int numConcurrentReading, + Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) { super( celebornConf, ownerContext, @@ -73,7 +76,8 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { gateDescriptor, bufferPoolFactory, bufferDecompressor, - numConcurrentReading); + numConcurrentReading, + shuffleIOMetricGroups); } @Override diff --git a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java index f4102ec01..38c5292bf 100644 --- a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java +++ b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java @@ -19,11 +19,13 @@ package org.apache.celeborn.plugin.flink; import java.io.IOException; +import java.util.Map; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.function.SupplierWithException; @@ -44,7 +46,8 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat int gateIndex, InputGateDeploymentDescriptor igdd, SupplierWithException<BufferPool, IOException> bufferPoolFactory, - String compressionCodec) { + String compressionCodec, + Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) { BufferDecompressor bufferDecompressor = new BufferDecompressor(networkBufferSize, compressionCodec); return new RemoteShuffleInputGate( @@ -54,6 +57,7 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat igdd, bufferPoolFactory, bufferDecompressor, - numConcurrentReading); + numConcurrentReading, + shuffleIOMetricGroups); } } diff --git a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java index 363366b46..d49827ed9 100644 --- a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java +++ b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java @@ -27,7 +27,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.util.function.SupplierWithException; import org.apache.celeborn.common.CelebornConf; @@ -48,7 +48,7 @@ public class RemoteShuffleResultPartitionFactory @Override ResultPartition createRemoteShuffleResultPartitionInternal( - ShuffleIOOwnerContext ownerContext, + String taskNameWithSubtaskAndId, int partitionIndex, ResultPartitionID id, ResultPartitionType type, @@ -58,9 +58,10 @@ public class RemoteShuffleResultPartitionFactory CelebornConf celebornConf, int numMappers, BufferCompressor bufferCompressor, - RemoteShuffleDescriptor rsd) { + RemoteShuffleDescriptor rsd, + ShuffleIOMetricGroup shuffleIOMetricGroup) { return new RemoteShuffleResultPartition( - ownerContext.getOwnerName(), + taskNameWithSubtaskAndId, partitionIndex, id, type, @@ -77,7 +78,7 @@ public class RemoteShuffleResultPartitionFactory bufferPoolFactories.get(1), celebornConf, numMappers, - ownerContext.getParentGroup())); + shuffleIOMetricGroup)); } protected BufferCompressor getBufferCompressor() { diff --git a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java index 22c4764f2..7e1946e76 100644 --- a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java +++ b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java @@ -44,7 +44,6 @@ import java.util.stream.IntStream; import org.apache.flink.api.common.JobID; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -56,6 +55,9 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; +import org.apache.flink.runtime.metrics.groups.ShuffleMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.util.function.SupplierWithException; import org.junit.After; import org.junit.Before; @@ -453,7 +455,11 @@ public class RemoteShuffleResultPartitionSuiteJ { bufferPoolFactory, celebornConf, numMappers, - new UnregisteredMetricsGroup()); + new ShuffleIOMetricGroup( + new ShuffleMetricGroup( + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), + 1, + CelebornConf.CLIENT_METRICS_SCOPE_NAMING_SHUFFLE().defaultValueString()))); isSetup = false; isFinished = false; isClosed = false; diff --git a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java index 0f93e6975..bfba41530 100644 --- a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java +++ b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetAddress; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.flink.api.java.tuple.Tuple2; @@ -46,6 +47,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.throughput.ThroughputCalculator; @@ -65,7 +67,8 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { InputGateDeploymentDescriptor gateDescriptor, SupplierWithException<BufferPool, IOException> bufferPoolFactory, BufferDecompressor bufferDecompressor, - int numConcurrentReading) { + int numConcurrentReading, + Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) { super( celebornConf, ownerContext, @@ -73,7 +76,8 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { gateDescriptor, bufferPoolFactory, bufferDecompressor, - numConcurrentReading); + numConcurrentReading, + shuffleIOMetricGroups); } @Override diff --git a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java index f4102ec01..38c5292bf 100644 --- a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java +++ b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java @@ -19,11 +19,13 @@ package org.apache.celeborn.plugin.flink; import java.io.IOException; +import java.util.Map; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.function.SupplierWithException; @@ -44,7 +46,8 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat int gateIndex, InputGateDeploymentDescriptor igdd, SupplierWithException<BufferPool, IOException> bufferPoolFactory, - String compressionCodec) { + String compressionCodec, + Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) { BufferDecompressor bufferDecompressor = new BufferDecompressor(networkBufferSize, compressionCodec); return new RemoteShuffleInputGate( @@ -54,6 +57,7 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat igdd, bufferPoolFactory, bufferDecompressor, - numConcurrentReading); + numConcurrentReading, + shuffleIOMetricGroups); } } diff --git a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java index 363366b46..d49827ed9 100644 --- a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java +++ b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java @@ -27,7 +27,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.util.function.SupplierWithException; import org.apache.celeborn.common.CelebornConf; @@ -48,7 +48,7 @@ public class RemoteShuffleResultPartitionFactory @Override ResultPartition createRemoteShuffleResultPartitionInternal( - ShuffleIOOwnerContext ownerContext, + String taskNameWithSubtaskAndId, int partitionIndex, ResultPartitionID id, ResultPartitionType type, @@ -58,9 +58,10 @@ public class RemoteShuffleResultPartitionFactory CelebornConf celebornConf, int numMappers, BufferCompressor bufferCompressor, - RemoteShuffleDescriptor rsd) { + RemoteShuffleDescriptor rsd, + ShuffleIOMetricGroup shuffleIOMetricGroup) { return new RemoteShuffleResultPartition( - ownerContext.getOwnerName(), + taskNameWithSubtaskAndId, partitionIndex, id, type, @@ -77,7 +78,7 @@ public class RemoteShuffleResultPartitionFactory bufferPoolFactories.get(1), celebornConf, numMappers, - ownerContext.getParentGroup())); + shuffleIOMetricGroup)); } protected BufferCompressor getBufferCompressor() { diff --git a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java index 22c4764f2..7e1946e76 100644 --- a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java +++ b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java @@ -44,7 +44,6 @@ import java.util.stream.IntStream; import org.apache.flink.api.common.JobID; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -56,6 +55,9 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; +import org.apache.flink.runtime.metrics.groups.ShuffleMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.util.function.SupplierWithException; import org.junit.After; import org.junit.Before; @@ -453,7 +455,11 @@ public class RemoteShuffleResultPartitionSuiteJ { bufferPoolFactory, celebornConf, numMappers, - new UnregisteredMetricsGroup()); + new ShuffleIOMetricGroup( + new ShuffleMetricGroup( + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), + 1, + CelebornConf.CLIENT_METRICS_SCOPE_NAMING_SHUFFLE().defaultValueString()))); isSetup = false; isFinished = false; isClosed = false; diff --git a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java index 6d513265e..c89f54489 100644 --- a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java +++ b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetAddress; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.flink.api.java.tuple.Tuple2; @@ -47,6 +48,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.throughput.ThroughputCalculator; @@ -66,7 +68,8 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { InputGateDeploymentDescriptor gateDescriptor, SupplierWithException<BufferPool, IOException> bufferPoolFactory, BufferDecompressor bufferDecompressor, - int numConcurrentReading) { + int numConcurrentReading, + Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) { super( celebornConf, ownerContext, @@ -74,7 +77,8 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { gateDescriptor, bufferPoolFactory, bufferDecompressor, - numConcurrentReading); + numConcurrentReading, + shuffleIOMetricGroups); } @Override diff --git a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java index f4102ec01..38c5292bf 100644 --- a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java +++ b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java @@ -19,11 +19,13 @@ package org.apache.celeborn.plugin.flink; import java.io.IOException; +import java.util.Map; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.function.SupplierWithException; @@ -44,7 +46,8 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat int gateIndex, InputGateDeploymentDescriptor igdd, SupplierWithException<BufferPool, IOException> bufferPoolFactory, - String compressionCodec) { + String compressionCodec, + Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) { BufferDecompressor bufferDecompressor = new BufferDecompressor(networkBufferSize, compressionCodec); return new RemoteShuffleInputGate( @@ -54,6 +57,7 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat igdd, bufferPoolFactory, bufferDecompressor, - numConcurrentReading); + numConcurrentReading, + shuffleIOMetricGroups); } } diff --git a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java index 363366b46..d49827ed9 100644 --- a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java +++ b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java @@ -27,7 +27,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.util.function.SupplierWithException; import org.apache.celeborn.common.CelebornConf; @@ -48,7 +48,7 @@ public class RemoteShuffleResultPartitionFactory @Override ResultPartition createRemoteShuffleResultPartitionInternal( - ShuffleIOOwnerContext ownerContext, + String taskNameWithSubtaskAndId, int partitionIndex, ResultPartitionID id, ResultPartitionType type, @@ -58,9 +58,10 @@ public class RemoteShuffleResultPartitionFactory CelebornConf celebornConf, int numMappers, BufferCompressor bufferCompressor, - RemoteShuffleDescriptor rsd) { + RemoteShuffleDescriptor rsd, + ShuffleIOMetricGroup shuffleIOMetricGroup) { return new RemoteShuffleResultPartition( - ownerContext.getOwnerName(), + taskNameWithSubtaskAndId, partitionIndex, id, type, @@ -77,7 +78,7 @@ public class RemoteShuffleResultPartitionFactory bufferPoolFactories.get(1), celebornConf, numMappers, - ownerContext.getParentGroup())); + shuffleIOMetricGroup)); } protected BufferCompressor getBufferCompressor() { diff --git a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java index 3cc25bcbc..40a703db9 100644 --- a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java +++ b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java @@ -44,7 +44,6 @@ import java.util.stream.IntStream; import org.apache.flink.api.common.JobID; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -56,6 +55,9 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; +import org.apache.flink.runtime.metrics.groups.ShuffleMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.util.function.SupplierWithException; import org.junit.After; import org.junit.Before; @@ -455,7 +457,11 @@ public class RemoteShuffleResultPartitionSuiteJ { bufferPoolFactory, celebornConf, numMappers, - new UnregisteredMetricsGroup()); + new ShuffleIOMetricGroup( + new ShuffleMetricGroup( + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), + 1, + CelebornConf.CLIENT_METRICS_SCOPE_NAMING_SHUFFLE().defaultValueString()))); isSetup = false; isFinished = false; isClosed = false; diff --git a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java index 6d513265e..c89f54489 100644 --- a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java +++ b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetAddress; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.flink.api.java.tuple.Tuple2; @@ -47,6 +48,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.throughput.ThroughputCalculator; @@ -66,7 +68,8 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { InputGateDeploymentDescriptor gateDescriptor, SupplierWithException<BufferPool, IOException> bufferPoolFactory, BufferDecompressor bufferDecompressor, - int numConcurrentReading) { + int numConcurrentReading, + Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) { super( celebornConf, ownerContext, @@ -74,7 +77,8 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { gateDescriptor, bufferPoolFactory, bufferDecompressor, - numConcurrentReading); + numConcurrentReading, + shuffleIOMetricGroups); } @Override diff --git a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java index bb4abbc70..a5f3781d7 100644 --- a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java +++ b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java @@ -19,12 +19,14 @@ package org.apache.celeborn.plugin.flink; import java.io.IOException; +import java.util.Map; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.function.SupplierWithException; @@ -45,7 +47,8 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat int gateIndex, InputGateDeploymentDescriptor igdd, SupplierWithException<BufferPool, IOException> bufferPoolFactory, - String compressionCodec) { + String compressionCodec, + Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) { BufferDecompressor bufferDecompressor = new BufferDecompressor(networkBufferSize, CompressionCodec.valueOf(compressionCodec)); return new RemoteShuffleInputGate( @@ -55,6 +58,7 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat igdd, bufferPoolFactory, bufferDecompressor, - numConcurrentReading); + numConcurrentReading, + shuffleIOMetricGroups); } } diff --git a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java index 69b9a71bc..691c70655 100644 --- a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java +++ b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java @@ -28,7 +28,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.util.function.SupplierWithException; import org.apache.celeborn.common.CelebornConf; @@ -49,7 +49,7 @@ public class RemoteShuffleResultPartitionFactory @Override ResultPartition createRemoteShuffleResultPartitionInternal( - ShuffleIOOwnerContext ownerContext, + String taskNameWithSubtaskAndId, int partitionIndex, ResultPartitionID id, ResultPartitionType type, @@ -59,9 +59,10 @@ public class RemoteShuffleResultPartitionFactory CelebornConf celebornConf, int numMappers, BufferCompressor bufferCompressor, - RemoteShuffleDescriptor rsd) { + RemoteShuffleDescriptor rsd, + ShuffleIOMetricGroup shuffleIOMetricGroup) { return new RemoteShuffleResultPartition( - ownerContext.getOwnerName(), + taskNameWithSubtaskAndId, partitionIndex, id, type, @@ -78,7 +79,7 @@ public class RemoteShuffleResultPartitionFactory bufferPoolFactories.get(1), celebornConf, numMappers, - ownerContext.getParentGroup())); + shuffleIOMetricGroup)); } @Override diff --git a/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java b/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java index 0ceabcca8..d86792e07 100644 --- a/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java +++ b/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java @@ -45,7 +45,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -57,6 +56,9 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; +import org.apache.flink.runtime.metrics.groups.ShuffleMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.util.function.SupplierWithException; import org.junit.After; import org.junit.Before; @@ -458,7 +460,11 @@ public class RemoteShuffleResultPartitionSuiteJ { bufferPoolFactory, celebornConf, numMappers, - new UnregisteredMetricsGroup()); + new ShuffleIOMetricGroup( + new ShuffleMetricGroup( + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), + 1, + CelebornConf.CLIENT_METRICS_SCOPE_NAMING_SHUFFLE().defaultValueString()))); isSetup = false; isFinished = false; isClosed = false; diff --git a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java index 7434b8cff..bd60bbe2b 100644 --- a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java +++ b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetAddress; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.flink.api.java.tuple.Tuple2; @@ -47,6 +48,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.throughput.ThroughputCalculator; @@ -66,7 +68,8 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { InputGateDeploymentDescriptor gateDescriptor, SupplierWithException<BufferPool, IOException> bufferPoolFactory, BufferDecompressor bufferDecompressor, - int numConcurrentReading) { + int numConcurrentReading, + Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) { super( celebornConf, ownerContext, @@ -74,7 +77,8 @@ public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate { gateDescriptor, bufferPoolFactory, bufferDecompressor, - numConcurrentReading); + numConcurrentReading, + shuffleIOMetricGroups); } @Override diff --git a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java index bb4abbc70..a5f3781d7 100644 --- a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java +++ b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java @@ -19,12 +19,14 @@ package org.apache.celeborn.plugin.flink; import java.io.IOException; +import java.util.Map; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.util.function.SupplierWithException; @@ -45,7 +47,8 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat int gateIndex, InputGateDeploymentDescriptor igdd, SupplierWithException<BufferPool, IOException> bufferPoolFactory, - String compressionCodec) { + String compressionCodec, + Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) { BufferDecompressor bufferDecompressor = new BufferDecompressor(networkBufferSize, CompressionCodec.valueOf(compressionCodec)); return new RemoteShuffleInputGate( @@ -55,6 +58,7 @@ public class RemoteShuffleInputGateFactory extends AbstractRemoteShuffleInputGat igdd, bufferPoolFactory, bufferDecompressor, - numConcurrentReading); + numConcurrentReading, + shuffleIOMetricGroups); } } diff --git a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java index 69b9a71bc..691c70655 100644 --- a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java +++ b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java @@ -28,7 +28,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; import org.apache.flink.util.function.SupplierWithException; import org.apache.celeborn.common.CelebornConf; @@ -49,7 +49,7 @@ public class RemoteShuffleResultPartitionFactory @Override ResultPartition createRemoteShuffleResultPartitionInternal( - ShuffleIOOwnerContext ownerContext, + String taskNameWithSubtaskAndId, int partitionIndex, ResultPartitionID id, ResultPartitionType type, @@ -59,9 +59,10 @@ public class RemoteShuffleResultPartitionFactory CelebornConf celebornConf, int numMappers, BufferCompressor bufferCompressor, - RemoteShuffleDescriptor rsd) { + RemoteShuffleDescriptor rsd, + ShuffleIOMetricGroup shuffleIOMetricGroup) { return new RemoteShuffleResultPartition( - ownerContext.getOwnerName(), + taskNameWithSubtaskAndId, partitionIndex, id, type, @@ -78,7 +79,7 @@ public class RemoteShuffleResultPartitionFactory bufferPoolFactories.get(1), celebornConf, numMappers, - ownerContext.getParentGroup())); + shuffleIOMetricGroup)); } @Override diff --git a/client-flink/flink-2.0/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java b/client-flink/flink-2.0/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java index 0ceabcca8..d86792e07 100644 --- a/client-flink/flink-2.0/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java +++ b/client-flink/flink-2.0/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java @@ -45,7 +45,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -57,6 +56,9 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup; +import org.apache.flink.runtime.metrics.groups.ShuffleMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.util.function.SupplierWithException; import org.junit.After; import org.junit.Before; @@ -458,7 +460,11 @@ public class RemoteShuffleResultPartitionSuiteJ { bufferPoolFactory, celebornConf, numMappers, - new UnregisteredMetricsGroup()); + new ShuffleIOMetricGroup( + new ShuffleMetricGroup( + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), + 1, + CelebornConf.CLIENT_METRICS_SCOPE_NAMING_SHUFFLE().defaultValueString()))); isSetup = false; isFinished = false; isClosed = false; diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 6140bd713..a70381bbd 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1410,6 +1410,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def clientFlinkResultPartitionSupportFloatingBuffer: Boolean = get(CLIENT_RESULT_PARTITION_SUPPORT_FLOATING_BUFFER) def clientFlinkDataCompressionEnabled: Boolean = get(CLIENT_DATA_COMPRESSION_ENABLED) + def clientFlinkMetricsScopeNamingShuffle: String = + get(CLIENT_METRICS_SCOPE_NAMING_SHUFFLE) def clientChunkPrefetchEnabled = get(CLIENT_CHUNK_PREFETCH_ENABLED) def clientInputStreamCreationWindow = get(CLIENT_INPUTSTREAM_CREATION_WINDOW) @@ -5840,6 +5842,15 @@ object CelebornConf extends Logging { .booleanConf .createWithDefault(true) + val CLIENT_METRICS_SCOPE_NAMING_SHUFFLE: ConfigEntry[String] = + buildConf("celeborn.client.flink.metrics.scope.shuffle") + .categories("client") + .version("0.6.0") + .doc("Defines the scope format string that is applied to all metrics scoped to a shuffle. Only effective when a identifier-based reporter is configured") + .stringConf + .createWithDefault( + "<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>.<shuffle_id>") + val CLIENT_MR_PUSH_DATA_MAX: ConfigEntry[Long] = buildConf("celeborn.client.mr.pushData.max") .categories("client") diff --git a/docs/configuration/client.md b/docs/configuration/client.md index 6a41b129a..5c4cbf006 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -40,6 +40,7 @@ license: | | celeborn.client.flink.inputGate.concurrentReadings | 2147483647 | false | Max concurrent reading channels for a input gate. | 0.3.0 | remote-shuffle.job.concurrent-readings-per-gate | | celeborn.client.flink.inputGate.memory | 32m | false | Memory reserved for a input gate. | 0.3.0 | remote-shuffle.job.memory-per-gate | | celeborn.client.flink.inputGate.supportFloatingBuffer | true | false | Whether to support floating buffer in Flink input gates. | 0.3.0 | remote-shuffle.job.support-floating-buffer-per-input-gate | +| celeborn.client.flink.metrics.scope.shuffle | <host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>.<shuffle_id> | false | Defines the scope format string that is applied to all metrics scoped to a shuffle. Only effective when a identifier-based reporter is configured | 0.6.0 | | | celeborn.client.flink.partitionConnectionException.enabled | false | false | If enabled, `org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException` would be thrown when RemoteBufferStreamReader finds that the current exception is about connection failure, then Flink can be aware of the lost Celeborn server side nodes and be able to re-compute affected data. | 0.6.0 | | | celeborn.client.flink.resultPartition.memory | 64m | false | Memory reserved for a result partition. | 0.3.0 | remote-shuffle.job.memory-per-partition | | celeborn.client.flink.resultPartition.supportFloatingBuffer | true | false | Whether to support floating buffer for result partitions. | 0.3.0 | remote-shuffle.job.support-floating-buffer-per-output-gate |