zentol commented on a change in pull request #15972:
URL: https://github.com/apache/flink/pull/15972#discussion_r664011654



##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
##########
@@ -29,6 +33,17 @@
  * The interface for a source reader which is responsible for reading the 
records from the source
  * splits assigned by {@link SplitEnumerator}.
  *
+ * <p>Implementations should can provide the following metrics:

Review comment:
       ```suggestion
    * <p>Implementations should provide the following metrics:
   ```

##########
File path: 
flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SourceMetricGroup.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SettableGauge;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * Pre-defined metrics for sources.
+ *
+ * <p>All metrics can only be accessed in the main operator thread.
+ */
+@NotThreadSafe
+public interface SourceMetricGroup extends OperatorMetricGroup {
+    /** The total number of record that failed to consume, process, or emit. */
+    Counter getNumRecordsInErrorsCounter();
+
+    /**
+     * Adds an optional gauge for last fetch time. Source readers can use this 
gauge to indicate the
+     * timestamp in milliseconds that Flink used to fetch a record.
+     *
+     * <p>The timestamp will be used to calculate the currentFetchEventTimeLag 
metric <code>
+     * currentFetchEventTimeLag = FetchTime - EventTime</code>.
+     *
+     * <p>Note that this time must strictly reflect the time of the last 
polled record. For sources
+     * that retrieve batches from the external system, the best way is to 
attach the timestamp to
+     * the batch and return the time of that batch. For multi-threaded 
sources, the timestamp should
+     * be embedded into the hand-over data structure.
+     *
+     * @see SettableGauge SettableGauge to continuously update the value.
+     */
+    Gauge<Long> addLastFetchTimeGauge(Gauge<Long> lastFetchTimeGauge);

Review comment:
       hmm :/ This method just exists to keep the names in sync? I'm wondering 
if we really want to extend the API for that, when some static utility could do 
it as well.

##########
File path: 
flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/SettableGauge.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics;
+
+/** A gauge that returns the last set value. */
+public class SettableGauge<T> implements Gauge<T> {
+    private T value;
+
+    public SettableGauge(T initialValue) {
+        this.value = initialValue;

Review comment:
       checkNotNull

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceMetricGroup.java
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SettableGauge;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.metrics.groups.SourceMetricGroup;
+import org.apache.flink.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.util.clock.Clock;
+
+/** Special {@link org.apache.flink.metrics.MetricGroup} representing an 
Operator. */
+@Internal
+public class InternalSourceMetricGroup extends 
ProxyMetricGroup<OperatorMetricGroup>

Review comment:
       This class seems to be quite overloaded; I'd move all the metric 
handling into a separate class.

##########
File path: 
flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SourceMetricGroup.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SettableGauge;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * Pre-defined metrics for sources.
+ *
+ * <p>All metrics can only be accessed in the main operator thread.
+ */
+@NotThreadSafe
+public interface SourceMetricGroup extends OperatorMetricGroup {
+    /** The total number of record that failed to consume, process, or emit. */
+    Counter getNumRecordsInErrorsCounter();
+
+    /**
+     * Adds an optional gauge for last fetch time. Source readers can use this 
gauge to indicate the
+     * timestamp in milliseconds that Flink used to fetch a record.
+     *
+     * <p>The timestamp will be used to calculate the currentFetchEventTimeLag 
metric <code>
+     * currentFetchEventTimeLag = FetchTime - EventTime</code>.
+     *
+     * <p>Note that this time must strictly reflect the time of the last 
polled record. For sources
+     * that retrieve batches from the external system, the best way is to 
attach the timestamp to
+     * the batch and return the time of that batch. For multi-threaded 
sources, the timestamp should
+     * be embedded into the hand-over data structure.
+     *
+     * @see SettableGauge SettableGauge to continuously update the value.
+     */
+    Gauge<Long> addLastFetchTimeGauge(Gauge<Long> lastFetchTimeGauge);

Review comment:
       `MetricGroup#gauge` behaves like that for consistency with all other 
metrics methods.
   This allows you to do neat stuff like `Counter x = 
metricGroup.counter("foo", new CustomCounter());` which can be useful.

##########
File path: 
flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SourceMetricGroup.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SettableGauge;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * Pre-defined metrics for sources.
+ *
+ * <p>All metrics can only be accessed in the main operator thread.
+ */
+@NotThreadSafe
+public interface SourceMetricGroup extends OperatorMetricGroup {
+    /** The total number of record that failed to consume, process, or emit. */
+    Counter getNumRecordsInErrorsCounter();
+
+    /**
+     * Adds an optional gauge for last fetch time. Source readers can use this 
gauge to indicate the
+     * timestamp in milliseconds that Flink used to fetch a record.
+     *
+     * <p>The timestamp will be used to calculate the currentFetchEventTimeLag 
metric <code>
+     * currentFetchEventTimeLag = FetchTime - EventTime</code>.
+     *
+     * <p>Note that this time must strictly reflect the time of the last 
polled record. For sources
+     * that retrieve batches from the external system, the best way is to 
attach the timestamp to
+     * the batch and return the time of that batch. For multi-threaded 
sources, the timestamp should
+     * be embedded into the hand-over data structure.
+     *
+     * @see SettableGauge SettableGauge to continuously update the value.
+     */
+    Gauge<Long> addLastFetchTimeGauge(Gauge<Long> lastFetchTimeGauge);

Review comment:
       `MetricGroup#gauge` behaves like that for consistency with all other 
metrics methods.
   This allows you to do neat stuff like `CustomCounter x = 
metricGroup.counter("foo", new CustomCounter());` which can be useful.

##########
File path: 
flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SourceMetricGroup.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SettableGauge;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * Pre-defined metrics for sources.
+ *
+ * <p>All metrics can only be accessed in the main operator thread.
+ */
+@NotThreadSafe
+public interface SourceMetricGroup extends OperatorMetricGroup {
+    /** The total number of record that failed to consume, process, or emit. */
+    Counter getNumRecordsInErrorsCounter();
+
+    /**
+     * Adds an optional gauge for last fetch time. Source readers can use this 
gauge to indicate the
+     * timestamp in milliseconds that Flink used to fetch a record.
+     *
+     * <p>The timestamp will be used to calculate the currentFetchEventTimeLag 
metric <code>
+     * currentFetchEventTimeLag = FetchTime - EventTime</code>.
+     *
+     * <p>Note that this time must strictly reflect the time of the last 
polled record. For sources
+     * that retrieve batches from the external system, the best way is to 
attach the timestamp to
+     * the batch and return the time of that batch. For multi-threaded 
sources, the timestamp should
+     * be embedded into the hand-over data structure.
+     *
+     * @see SettableGauge SettableGauge to continuously update the value.
+     */
+    Gauge<Long> addLastFetchTimeGauge(Gauge<Long> lastFetchTimeGauge);

Review comment:
       Whether it is useful to return a gauge depends on the implementation. 
Usually, the code registering a gauge does not interact with it directly, but 
this is for example not the gauge for the SettableGauge you're adding.
   If this method is close to the MetricGroup interface then imo it should 
behave similarly; if it isn't then it wouldn't be world-ending to return 
`void`; it could also be interesting to return the object the method was called 
in like for builders.

##########
File path: 
flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SourceMetricGroup.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SettableGauge;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * Pre-defined metrics for sources.
+ *
+ * <p>All metrics can only be accessed in the main operator thread.
+ */
+@NotThreadSafe
+public interface SourceMetricGroup extends OperatorMetricGroup {
+    /** The total number of record that failed to consume, process, or emit. */
+    Counter getNumRecordsInErrorsCounter();
+
+    /**
+     * Adds an optional gauge for last fetch time. Source readers can use this 
gauge to indicate the
+     * timestamp in milliseconds that Flink used to fetch a record.
+     *
+     * <p>The timestamp will be used to calculate the currentFetchEventTimeLag 
metric <code>
+     * currentFetchEventTimeLag = FetchTime - EventTime</code>.
+     *
+     * <p>Note that this time must strictly reflect the time of the last 
polled record. For sources
+     * that retrieve batches from the external system, the best way is to 
attach the timestamp to
+     * the batch and return the time of that batch. For multi-threaded 
sources, the timestamp should
+     * be embedded into the hand-over data structure.
+     *
+     * @see SettableGauge SettableGauge to continuously update the value.
+     */
+    Gauge<Long> addLastFetchTimeGauge(Gauge<Long> lastFetchTimeGauge);

Review comment:
       Whether it is useful to return a gauge depends on the implementation. 
Usually, the code registering a gauge does not interact with it directly, but 
this is for example not the case for the SettableGauge you're adding.
   If this method is close to the MetricGroup interface then imo it should 
behave similarly; if it isn't then it wouldn't be world-ending to return 
`void`; it could also be interesting to return the object the method was called 
in like for builders.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to