xishuaidelin commented on code in PR #27560:
URL: https://github.com/apache/flink/pull/27560#discussion_r2922853942


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncPredictFunction.java:
##########
@@ -30,22 +33,170 @@
  * A wrapper class of {@link AsyncTableFunction} for asynchronous model 
inference.
  *
  * <p>The output type of this table function is fixed as {@link RowData}.
+ *
+ * <p>This class provides built-in metrics for monitoring model inference 
performance, including:
+ *
+ * <ul>
+ *   <li>inference_requests: Total number of inference requests

Review Comment:
   Please keep consistent with the metric name. Actual names registered have no 
inference prefix.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncPredictFunction.java:
##########
@@ -30,22 +33,170 @@
  * A wrapper class of {@link AsyncTableFunction} for asynchronous model 
inference.
  *
  * <p>The output type of this table function is fixed as {@link RowData}.
+ *
+ * <p>This class provides built-in metrics for monitoring model inference 
performance, including:
+ *
+ * <ul>
+ *   <li>inference_requests: Total number of inference requests
+ *   <li>inference_requests_success: Number of successful inference requests
+ *   <li>inference_requests_failure: Number of failed inference requests
+ *   <li>inference_latency: Histogram of inference latency in milliseconds
+ *   <li>inference_rows_output: Total number of output rows from inference
+ * </ul>
  */
 @PublicEvolving
 public abstract class AsyncPredictFunction extends AsyncTableFunction<RowData> 
{
 
+    // ------------------------------------------------------------------------
+    // Metrics
+    // ------------------------------------------------------------------------
+
+    /** Counter for total inference requests. */
+    private transient Counter inferenceRequests;
+
+    /** Counter for successful inference requests. */
+    private transient Counter inferenceRequestsSuccess;
+
+    /** Counter for failed inference requests. */
+    private transient Counter inferenceRequestsFailure;
+
+    /** Histogram for inference latency in milliseconds. */
+    private transient Histogram inferenceLatency;
+
+    /** Counter for total output rows from inference. */
+    private transient Counter inferenceRowsOutput;
+
+    // ------------------------------------------------------------------------
+    // Lifecycle Methods
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        super.open(context);
+        initializeMetrics(context);
+    }
+
+    /**
+     * Initialize metrics for monitoring model inference performance.
+     *
+     * <p>This method creates a metric group named "model_inference" and 
registers the following
+     * metrics:
+     *
+     * <ul>
+     *   <li>requests: Counter for total inference requests
+     *   <li>requests_success: Counter for successful inference requests
+     *   <li>requests_failure: Counter for failed inference requests
+     *   <li>latency: Histogram for inference latency (milliseconds)
+     *   <li>rows_output: Counter for total output rows
+     * </ul>
+     *
+     * <p>Subclasses can override {@link #createLatencyHistogram(MetricGroup)} 
to provide a custom
+     * histogram implementation.
+     *
+     * @param context The function context
+     */
+    protected void initializeMetrics(FunctionContext context) {
+        MetricGroup metricGroup = 
context.getMetricGroup().addGroup("model_inference");
+
+        inferenceRequests = metricGroup.counter("requests");
+        inferenceRequestsSuccess = metricGroup.counter("requests_success");
+        inferenceRequestsFailure = metricGroup.counter("requests_failure");
+        inferenceLatency = createLatencyHistogram(metricGroup);
+        inferenceRowsOutput = metricGroup.counter("rows_output");
+    }
+
+    /**
+     * Create a histogram for tracking inference latency. Subclasses can 
override this method to
+     * provide a custom histogram implementation.
+     *
+     * @param metricGroup The metric group to register the histogram
+     * @return A Histogram instance, or null to disable latency tracking
+     */
+    protected Histogram createLatencyHistogram(MetricGroup metricGroup) {
+        // Default: no histogram (return null)
+        // Subclasses can override to provide implementation
+        return null;
+    }
+
+    // ------------------------------------------------------------------------
+    // Prediction Methods
+    // ------------------------------------------------------------------------
+
     /**
      * Asynchronously predict result based on input row.
      *
+     * <p>Subclasses must implement this method to perform the actual 
inference logic.
+     *
      * @param inputRow - A {@link RowData} that wraps input for predict 
function.
      * @return A collection of all predicted results.
      */
     public abstract CompletableFuture<Collection<RowData>> 
asyncPredict(RowData inputRow);
 
-    /** Invokes {@link #asyncPredict} and chains futures. */
+    /**
+     * Wrapper method that tracks metrics around asyncPredict calls.
+     *
+     * <p>This method automatically tracks inference metrics including request 
count,
+     * success/failure count, latency, and output row count.
+     *
+     * @param inputRow The input row data
+     * @return A CompletableFuture containing the prediction results
+     */
+    protected final CompletableFuture<Collection<RowData>> 
asyncPredictWithMetrics(
+            RowData inputRow) {
+
+        // Increment request counter
+        if (inferenceRequests != null) {
+            inferenceRequests.inc();
+        }
+
+        // Record start time

Review Comment:
   Please remove these redundant comments and similar ones elsewhere.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncPredictFunction.java:
##########
@@ -30,22 +33,170 @@
  * A wrapper class of {@link AsyncTableFunction} for asynchronous model 
inference.
  *
  * <p>The output type of this table function is fixed as {@link RowData}.
+ *
+ * <p>This class provides built-in metrics for monitoring model inference 
performance, including:
+ *
+ * <ul>
+ *   <li>inference_requests: Total number of inference requests
+ *   <li>inference_requests_success: Number of successful inference requests
+ *   <li>inference_requests_failure: Number of failed inference requests
+ *   <li>inference_latency: Histogram of inference latency in milliseconds
+ *   <li>inference_rows_output: Total number of output rows from inference
+ * </ul>
  */
 @PublicEvolving
 public abstract class AsyncPredictFunction extends AsyncTableFunction<RowData> 
{
 
+    // ------------------------------------------------------------------------
+    // Metrics
+    // ------------------------------------------------------------------------
+
+    /** Counter for total inference requests. */
+    private transient Counter inferenceRequests;

Review Comment:
   These metric is replicate in the AsyncPredictFunction and PredictFunction. 
Could we abstract a PredictFunctionMetrics? Then just combine the logic in 
seperate funcion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to