fapaul commented on a change in pull request #15972:
URL: https://github.com/apache/flink/pull/15972#discussion_r663972500



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceMetricGroup.java
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SettableGauge;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.metrics.groups.SourceMetricGroup;
+import org.apache.flink.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.util.clock.Clock;
+
+/** Special {@link org.apache.flink.metrics.MetricGroup} representing an 
Operator. */
+@Internal
+public class InternalSourceMetricGroup extends 
ProxyMetricGroup<OperatorMetricGroup>
+        implements SourceMetricGroup {
+
+    public static final long ACTIVE = Long.MAX_VALUE;
+    private final Clock clock;
+    private final Counter numRecordsInErrors;
+    // only if source emits at least one watermark
+    private SettableGauge<Long> watermarkGauge;
+    // only if records with event timestamp are emitted
+    private SettableGauge<Long> eventTimeGauge;
+    private long idleStartTime = ACTIVE;
+
+    public InternalSourceMetricGroup(OperatorMetricGroup parentMetricGroup, 
Clock clock) {
+        super(parentMetricGroup);
+        numRecordsInErrors = 
parentMetricGroup.counter(MetricNames.NUM_RECORDS_IN_ERRORS);
+        this.clock = clock;
+        parentMetricGroup.gauge(
+                MetricNames.SOURCE_IDLE_TIME_GAUGE,
+                () -> isIdling() ? 0 : this.clock.absoluteTimeMillis() - 
idleStartTime);
+    }
+
+    private boolean isIdling() {
+        return idleStartTime == ACTIVE;

Review comment:
       Do we really provide much value with the source-specific idle value in 
comparison to the overall `idleTimeMs`?

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
##########
@@ -29,6 +33,17 @@
  * The interface for a source reader which is responsible for reading the 
records from the source
  * splits assigned by {@link SplitEnumerator}.
  *
+ * <p>Implementations should can provide the following metrics:

Review comment:
       ```suggestion
    * <p>Implementations can provide the following metrics:
   ```
   ?

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
##########
@@ -18,12 +18,22 @@
 
 package org.apache.flink.connector.base.source.reader;
 
+import org.apache.flink.metrics.groups.SourceMetricGroup;
+
 import javax.annotation.Nullable;
 
 import java.util.Set;
 
 /** An interface for the elements passed from the fetchers to the source 
reader. */
 public interface RecordsWithSplitIds<E> {
+    /**
+     * Returns the timestamp of the last fetch. Will be used to automatically 
set {@link
+     * SourceMetricGroup#addLastFetchTimeGauge(Gauge)}.
+     */
+    @Nullable
+    default Long lastFetchTime() {

Review comment:
       Is there some guidance when deciding between returning `null` by default 
or using `Optional`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -291,13 +300,23 @@ public InputStatus emitNext(DataOutput<OUT> output) 
throws Exception {
 
         // short circuit the common case (every invocation except the first)
         if (currentMainOutput != null) {
-            return sourceReader.pollNext(currentMainOutput);
+            return pollNext();
         }
 
         // this creates a batch or streaming output based on the runtime mode
-        currentMainOutput = eventTimeLogic.createMainOutput(output);
+        currentMainOutput =
+                eventTimeLogic.createMainOutput(
+                        new MetricTrackingOutput<>(output, sourceMetricGroup));
         lastInvokedOutput = output;
-        return sourceReader.pollNext(currentMainOutput);
+        return pollNext();
+    }
+
+    private InputStatus pollNext() throws Exception {
+        InputStatus inputStatus = sourceReader.pollNext(currentMainOutput);
+        if (inputStatus == InputStatus.NOTHING_AVAILABLE) {
+            sourceMetricGroup.idlingStarted();
+        }
+        return inputStatus;

Review comment:
       Does this logic require a test?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceMetricGroup.java
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SettableGauge;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.metrics.groups.SourceMetricGroup;
+import org.apache.flink.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.util.clock.Clock;
+
+/** Special {@link org.apache.flink.metrics.MetricGroup} representing an 
Operator. */
+@Internal
+public class InternalSourceMetricGroup extends 
ProxyMetricGroup<OperatorMetricGroup>
+        implements SourceMetricGroup {
+
+    public static final long ACTIVE = Long.MAX_VALUE;
+    private final Clock clock;
+    private final Counter numRecordsInErrors;
+    // only if source emits at least one watermark
+    private SettableGauge<Long> watermarkGauge;
+    // only if records with event timestamp are emitted
+    private SettableGauge<Long> eventTimeGauge;
+    private long idleStartTime = ACTIVE;
+
+    public InternalSourceMetricGroup(OperatorMetricGroup parentMetricGroup, 
Clock clock) {
+        super(parentMetricGroup);
+        numRecordsInErrors = 
parentMetricGroup.counter(MetricNames.NUM_RECORDS_IN_ERRORS);
+        this.clock = clock;
+        parentMetricGroup.gauge(
+                MetricNames.SOURCE_IDLE_TIME_GAUGE,
+                () -> isIdling() ? 0 : this.clock.absoluteTimeMillis() - 
idleStartTime);

Review comment:
       Something looks off here if the `isIdling()` the metric returns 0 
although I'd expect an increasing number until the next record arrives.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to