KarmaGYZ commented on a change in pull request #13397:
URL: https://github.com/apache/flink/pull/13397#discussion_r489294355
##########
File path: flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
##########
@@ -139,12 +142,12 @@ public static int getNewNodeId() {
private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
/**
- * This weight indicates how much this transformation relies on managed
memory, so that
- * transformation highly relies on managed memory would be able to
acquire more managed
- * memory in runtime (linear association). Note that it only works in
cases of UNKNOWN
- * resources.
+ * Each entry in this map represents a use case that this
transformation needs managed memory for. The key of the
+ * entry indicates the use case, while the value is the
use-case-specific weight for this transformation. Managed
+ * memory reserved for an OPERATOR scope use case will be shared by all
the declaring transformations within a slot
+ * according to this weight. For SLOT scope use cases, the weights are
ignored.
*/
- private int managedMemoryWeight = DEFAULT_MANAGED_MEMORY_WEIGHT;
+ private final Map<ManagedMemoryUseCase, Integer>
managedMemoryUseCaseWeights = new HashMap<>();
Review comment:
Just trying to understand, is it illegal to put `ROCKSDB` or `PYTHON` in
this map in your design? If so, should we check the sanity if someone tries to
do it?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -849,26 +843,23 @@ private static void
setManagedMemoryFractionForSlotSharingGroup(
}
private static void setManagedMemoryFractionForOperator(
- final ResourceSpec operatorResourceSpec,
final ResourceSpec groupResourceSpec,
final int operatorManagedMemoryWeight,
final int groupManagedMemoryWeight,
final StreamConfig operatorConfig) {
- final double managedMemoryFraction;
-
if (groupResourceSpec.equals(ResourceSpec.UNKNOWN)) {
- managedMemoryFraction = groupManagedMemoryWeight > 0
- ?
getFractionRoundedDown(operatorManagedMemoryWeight, groupManagedMemoryWeight)
- : 0.0;
+ operatorConfig.setManagedMemoryFraction(
+ groupManagedMemoryWeight > 0 ?
+
getFractionRoundedDown(operatorManagedMemoryWeight, groupManagedMemoryWeight) :
+ 0.0);
} else {
- final long groupManagedMemoryBytes =
groupResourceSpec.getManagedMemory().getBytes();
- managedMemoryFraction = groupManagedMemoryBytes > 0
- ?
getFractionRoundedDown(operatorResourceSpec.getManagedMemory().getBytes(),
groupManagedMemoryBytes)
- : 0.0;
+ // Supporting for fine grained resource specs is still
under developing.
+ // This branch should not be executed in production.
Not throwing exception for testing purpose.
+ LOG.error("Failed setting managed memory fractions. " +
+ " Operators may not be able to use
managed memory properly." +
+ " Calculating managed memory fractions
with fine grained resource spec is currently not supported.");
Review comment:
I agree that the solution for supporting fine-grained resource specs
should not block this work. However, we might create a ticket to track that
work and mention it in the comment. We could also add the `TODO` label to the
comment.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -849,26 +843,23 @@ private static void
setManagedMemoryFractionForSlotSharingGroup(
}
private static void setManagedMemoryFractionForOperator(
- final ResourceSpec operatorResourceSpec,
final ResourceSpec groupResourceSpec,
final int operatorManagedMemoryWeight,
final int groupManagedMemoryWeight,
final StreamConfig operatorConfig) {
- final double managedMemoryFraction;
-
if (groupResourceSpec.equals(ResourceSpec.UNKNOWN)) {
- managedMemoryFraction = groupManagedMemoryWeight > 0
- ?
getFractionRoundedDown(operatorManagedMemoryWeight, groupManagedMemoryWeight)
- : 0.0;
+ operatorConfig.setManagedMemoryFraction(
+ groupManagedMemoryWeight > 0 ?
+
getFractionRoundedDown(operatorManagedMemoryWeight, groupManagedMemoryWeight) :
+ 0.0);
} else {
- final long groupManagedMemoryBytes =
groupResourceSpec.getManagedMemory().getBytes();
- managedMemoryFraction = groupManagedMemoryBytes > 0
- ?
getFractionRoundedDown(operatorResourceSpec.getManagedMemory().getBytes(),
groupManagedMemoryBytes)
- : 0.0;
+ // Supporting for fine grained resource specs is still
under developing.
+ // This branch should not be executed in production.
Not throwing exception for testing purpose.
+ LOG.error("Failed setting managed memory fractions. " +
+ " Operators may not be able to use
managed memory properly." +
+ " Calculating managed memory fractions
with fine grained resource spec is currently not supported.");
Review comment:
I know ATM we will never go into this branch by design, but does it make
sense to throw an exception here?
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
##########
@@ -401,6 +403,22 @@
.withDescription("Fraction of Total Flink Memory to be
used as Managed Memory, if Managed Memory size is not"
+ " explicitly specified.");
+ /**
+ * Weights of managed memory consumers.
+ */
+ // Do not advertise this option until the feature is completed.
+ @Documentation.ExcludeFromDocumentation
+ public static final ConfigOption<Map<String, String>>
MANAGED_MEMORY_CONSUMER_WEIGHTS =
+ key("taskmanager.memory.managed.consumer-weights")
+ .mapType()
+ .defaultValue(new HashMap<String, String>() {{
+ put("DATAPROC", "70");
+ put("PYTHON", "30");
Review comment:
It would be good to have an `enum` which holds all supported kinds.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
##########
@@ -49,6 +48,8 @@
@Internal
public class StreamNode implements Serializable {
+ public static final int DEFAULT_MANAGED_MEMORY_WEIGHT = 1;
Review comment:
Could be package-private.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -177,11 +178,13 @@ private JobGraph createJobGraph() {
setSlotSharingAndCoLocation();
+ // For now, only consider managed memory for batch algorithms.
+ // TODO: extend managed memory fraction calculations w.r.t.
various managed memory use cases.
setManagedMemoryFraction(
Collections.unmodifiableMap(jobVertices),
Collections.unmodifiableMap(vertexConfigs),
Collections.unmodifiableMap(chainedConfigs),
- id ->
streamGraph.getStreamNode(id).getManagedMemoryWeight());
+ id ->
streamGraph.getStreamNode(id).getManagedMemoryUseCaseWeights().getOrDefault(ManagedMemoryUseCase.BATCH_OP,
0));
Review comment:
We probably need a static variable for the default weight.
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
##########
@@ -557,10 +557,11 @@ public void testSetManagedMemoryWeight() {
final StreamGraph streamGraph = env.getStreamGraph();
for (StreamNode streamNode : streamGraph.getStreamNodes()) {
- final int expectedWeight =
streamNode.getOperatorName().contains("source")
- ? 123
- : StreamNode.DEFAULT_MANAGED_MEMORY_WEIGHT;
- assertEquals(expectedWeight,
streamNode.getManagedMemoryWeight());
+ if (streamNode.getOperatorName().contains("source")) {
+
assertThat(streamNode.getManagedMemoryUseCaseWeights().get(ManagedMemoryUseCase.BATCH_OP),
is(123));
+ } else {
+
assertThat(streamNode.getManagedMemoryUseCaseWeights().size(), is(0));
Review comment:
Then, the expected value should be a reference instead of a specific
value.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]