This is an automated email from the ASF dual-hosted git repository.
shuaixu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 8b66b60201 [GLUTEN-9525][FLINK]Support tps metric for source task
(#10023)
8b66b60201 is described below
commit 8b66b60201c80c4e1c90d6ec121722dc7b8fb919
Author: kevinyhzou <[email protected]>
AuthorDate: Sat Oct 11 09:19:46 2025 +0800
[GLUTEN-9525][FLINK]Support tps metric for source task (#10023)
* support tps metric for source task
---
.../table/runtime/metrics/SourceTaskMetrics.java | 63 ++++++++++++++++++++++
.../operators/GlutenVectorSourceFunction.java | 4 ++
2 files changed, 67 insertions(+)
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java
new file mode 100644
index 0000000000..5bbbb028b6
--- /dev/null
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.table.runtime.metrics;
+
+import io.github.zhztheplayer.velox4j.query.SerialTask;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class SourceTaskMetrics {
+
+ private final String keyOperatorType = "operatorType";
+ private final String sourceOperatorName = "TableScan";
+ private final String keyInputRows = "rawInputRows";
+ private final String keyInputBytes = "rawInputBytes";
+ private final long metricUpdateInterval = 2000;
+ private Counter sourceNumRecordsOut;
+ private Counter sourceNumBytesOut;
+ private long lastUpdateTime = System.currentTimeMillis();
+
+ public SourceTaskMetrics(OperatorMetricGroup metricGroup) {
+ sourceNumRecordsOut =
metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+ sourceNumBytesOut = metricGroup.getIOMetricGroup().getNumBytesOutCounter();
+ }
+
+ public boolean updateMetrics(SerialTask task, String planId) {
+ long currentTime = System.currentTimeMillis();
+ if (currentTime - lastUpdateTime < metricUpdateInterval) {
+ return false;
+ }
+ try {
+ ObjectNode planStats = task.collectStats().planStats(planId);
+ JsonNode jsonNode = planStats.get(keyOperatorType);
+ if (jsonNode.asText().equals(sourceOperatorName)) {
+ long numRecordsOut = planStats.get(keyInputRows).asInt();
+ long numBytesOut = planStats.get(keyInputBytes).asInt();
+ sourceNumRecordsOut.inc(numRecordsOut -
sourceNumRecordsOut.getCount());
+ sourceNumBytesOut.inc(numBytesOut - sourceNumBytesOut.getCount());
+ }
+ } catch (Exception e) {
+ return false;
+ }
+ lastUpdateTime = currentTime;
+ return true;
+ }
+}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
index 399211ec1b..472bd0bfed 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
@@ -17,6 +17,7 @@
package org.apache.gluten.table.runtime.operators;
import org.apache.gluten.table.runtime.config.VeloxQueryConfig;
+import org.apache.gluten.table.runtime.metrics.SourceTaskMetrics;
import io.github.zhztheplayer.velox4j.Velox4j;
import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
@@ -66,6 +67,7 @@ public class GlutenVectorSourceFunction extends
RichParallelSourceFunction<State
private BufferAllocator allocator;
private MemoryManager memoryManager;
private SerialTask task;
+ private SourceTaskMetrics taskMetrics;
public GlutenVectorSourceFunction(
StatefulPlanNode planNode,
@@ -108,6 +110,7 @@ public class GlutenVectorSourceFunction extends
RichParallelSourceFunction<State
task.addSplit(id, split);
task.noMoreSplits(id);
}
+ taskMetrics = new SourceTaskMetrics(getRuntimeContext().getMetricGroup());
}
@Override
@@ -128,6 +131,7 @@ public class GlutenVectorSourceFunction extends
RichParallelSourceFunction<State
LOG.info("Velox task finished");
break;
}
+ taskMetrics.updateMetrics(task, id);
}
task.close();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]