yunfengzhou-hub commented on code in PR #7117:
URL: https://github.com/apache/paimon/pull/7117#discussion_r2785910945
##########
docs/content/maintenance/metrics.md:
##########
@@ -391,6 +391,12 @@ When using Flink to read and write, Paimon has implemented
some key standard Fli
<td>Gauge</td>
<td>Time difference between reading the data file and file
creation.</td>
</tr>
+ <tr>
+ <td>sourceParallelismUpperBound</td>
+ <td>Flink Source Enumerator</td>
+ <td>Gauge</td>
+ <td>Recommended upper bound of parallelism for auto-scaling
systems. For fixed bucket tables, this equals the bucket number. For dynamic or
postpone bucket tables (bucket < 0), this equals the flink max parallelism.
Note: This is a recommendation, not a hard limit.</td>
Review Comment:
nit: equals -> equals to
##########
docs/content/maintenance/metrics.md:
##########
@@ -391,6 +391,12 @@ When using Flink to read and write, Paimon has implemented
some key standard Fli
<td>Gauge</td>
<td>Time difference between reading the data file and file
creation.</td>
</tr>
+ <tr>
+ <td>sourceParallelismUpperBound</td>
+ <td>Flink Source Enumerator</td>
+ <td>Gauge</td>
+ <td>Recommended upper bound of parallelism for auto-scaling
systems. For fixed bucket tables, this equals the bucket number. For dynamic or
postpone bucket tables (bucket < 0), this equals the flink max parallelism.
Note: This is a recommendation, not a hard limit.</td>
Review Comment:
> For fixed bucket tables, this equals the bucket number. For dynamic or
postpone bucket tables (bucket < 0), this equals the flink max parallelism.
We might not need to describe these details. They are internal
implementations that may be subject to change once we can acquire a more
accurate recommended parallelism for different bucket modes, rather than the
API part.
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java:
##########
@@ -85,6 +94,12 @@ public SplitEnumerator<FileStoreSourceSplit,
PendingSplitsCheckpoint> restoreEnu
}
StreamTableScan scan = readBuilder.newStreamScan();
if (metricGroup(context) != null) {
+ int bucketNum = CoreOptions.fromMap(options).bucket();
+ int sourceParallelismUpperBound = bucketNum < 0 ?
MAX_PARALLELISM_OF_SOURCE : bucketNum;
+
+ context.metricGroup()
+ .gauge(SOURCE_PARALLELISM_UPPER_BOUND, () ->
sourceParallelismUpperBound);
Review Comment:
Better to trigger this registration in the open() or init() method of the
enumerator or split assigner.
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java:
##########
@@ -46,6 +46,15 @@ public class ContinuousFileStoreSource extends FlinkSource {
protected final Map<String, String> options;
protected final boolean unordered;
+ /**
+ * Metric name for source scaling max parallelism. This metric provides a
recommended upper
+ * bound of parallelism for auto-scaling systems.
+ */
+ public static final String SOURCE_PARALLELISM_UPPER_BOUND =
"sourceParallelismUpperBound";
+
+ /** refer org.apache.flink.configuration.PipelineOptions.MAX_PARALLELISM.
*/
Review Comment:
nit: refer -> refer to
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]