AHeise commented on a change in pull request #15972:
URL: https://github.com/apache/flink/pull/15972#discussion_r664270238
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
##########
@@ -18,12 +18,22 @@
package org.apache.flink.connector.base.source.reader;
+import org.apache.flink.metrics.groups.SourceMetricGroup;
+
import javax.annotation.Nullable;
import java.util.Set;
/** An interface for the elements passed from the fetchers to the source
reader. */
public interface RecordsWithSplitIds<E> {
+ /**
+ * Returns the timestamp of the last fetch. Will be used to automatically
set {@link
+ * SourceMetricGroup#addLastFetchTimeGauge(Gauge)}.
+ */
+ @Nullable
+ default Long lastFetchTime() {
Review comment:
Not really. Here I have just used `null` because all other methods in
this interface use it.
Tbh in Java 8 I don't see much difference between properly annotated methods
and Optional. With Java 9 most Optionals are proper monads, so it looks
different.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceMetricGroup.java
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SettableGauge;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.metrics.groups.SourceMetricGroup;
+import org.apache.flink.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.util.clock.Clock;
+
+/** Special {@link org.apache.flink.metrics.MetricGroup} representing an
Operator. */
+@Internal
+public class InternalSourceMetricGroup extends
ProxyMetricGroup<OperatorMetricGroup>
+ implements SourceMetricGroup {
+
+ public static final long ACTIVE = Long.MAX_VALUE;
+ private final Clock clock;
+ private final Counter numRecordsInErrors;
+ // only if source emits at least one watermark
+ private SettableGauge<Long> watermarkGauge;
+ // only if records with event timestamp are emitted
+ private SettableGauge<Long> eventTimeGauge;
+ private long idleStartTime = ACTIVE;
+
+ public InternalSourceMetricGroup(OperatorMetricGroup parentMetricGroup,
Clock clock) {
+ super(parentMetricGroup);
+ numRecordsInErrors =
parentMetricGroup.counter(MetricNames.NUM_RECORDS_IN_ERRORS);
+ this.clock = clock;
+ parentMetricGroup.gauge(
+ MetricNames.SOURCE_IDLE_TIME_GAUGE,
+ () -> isIdling() ? 0 : this.clock.absoluteTimeMillis() -
idleStartTime);
+ }
+
+ private boolean isIdling() {
+ return idleStartTime == ACTIVE;
Review comment:
It's a good question. It's in FLIP-33, but that came before proper idle
time metric.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceMetricGroup.java
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SettableGauge;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.metrics.groups.SourceMetricGroup;
+import org.apache.flink.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.util.clock.Clock;
+
+/** Special {@link org.apache.flink.metrics.MetricGroup} representing an
Operator. */
+@Internal
+public class InternalSourceMetricGroup extends
ProxyMetricGroup<OperatorMetricGroup>
+ implements SourceMetricGroup {
+
+ public static final long ACTIVE = Long.MAX_VALUE;
+ private final Clock clock;
+ private final Counter numRecordsInErrors;
+ // only if source emits at least one watermark
+ private SettableGauge<Long> watermarkGauge;
+ // only if records with event timestamp are emitted
+ private SettableGauge<Long> eventTimeGauge;
+ private long idleStartTime = ACTIVE;
+
+ public InternalSourceMetricGroup(OperatorMetricGroup parentMetricGroup,
Clock clock) {
+ super(parentMetricGroup);
+ numRecordsInErrors =
parentMetricGroup.counter(MetricNames.NUM_RECORDS_IN_ERRORS);
+ this.clock = clock;
+ parentMetricGroup.gauge(
+ MetricNames.SOURCE_IDLE_TIME_GAUGE,
+ () -> isIdling() ? 0 : this.clock.absoluteTimeMillis() -
idleStartTime);
Review comment:
Yes, you are right, the cases are swapped.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceMetricGroup.java
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SettableGauge;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.metrics.groups.SourceMetricGroup;
+import org.apache.flink.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.util.clock.Clock;
+
+/** Special {@link org.apache.flink.metrics.MetricGroup} representing an
Operator. */
+@Internal
+public class InternalSourceMetricGroup extends
ProxyMetricGroup<OperatorMetricGroup>
+ implements SourceMetricGroup {
+
+ public static final long ACTIVE = Long.MAX_VALUE;
+ private final Clock clock;
+ private final Counter numRecordsInErrors;
+ // only if source emits at least one watermark
+ private SettableGauge<Long> watermarkGauge;
+ // only if records with event timestamp are emitted
+ private SettableGauge<Long> eventTimeGauge;
+ private long idleStartTime = ACTIVE;
+
+ public InternalSourceMetricGroup(OperatorMetricGroup parentMetricGroup,
Clock clock) {
+ super(parentMetricGroup);
+ numRecordsInErrors =
parentMetricGroup.counter(MetricNames.NUM_RECORDS_IN_ERRORS);
+ this.clock = clock;
+ parentMetricGroup.gauge(
+ MetricNames.SOURCE_IDLE_TIME_GAUGE,
+ () -> isIdling() ? 0 : this.clock.absoluteTimeMillis() -
idleStartTime);
Review comment:
Yes, you are right, the cases are swapped. Actually, the factored out
`isIdling` was incorrect, so I fixed that as well.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -291,13 +300,23 @@ public InputStatus emitNext(DataOutput<OUT> output)
throws Exception {
// short circuit the common case (every invocation except the first)
if (currentMainOutput != null) {
- return sourceReader.pollNext(currentMainOutput);
+ return pollNext();
}
// this creates a batch or streaming output based on the runtime mode
- currentMainOutput = eventTimeLogic.createMainOutput(output);
+ currentMainOutput =
+ eventTimeLogic.createMainOutput(
+ new MetricTrackingOutput<>(output, sourceMetricGroup));
lastInvokedOutput = output;
- return sourceReader.pollNext(currentMainOutput);
+ return pollNext();
+ }
+
+ private InputStatus pollNext() throws Exception {
+ InputStatus inputStatus = sourceReader.pollNext(currentMainOutput);
+ if (inputStatus == InputStatus.NOTHING_AVAILABLE) {
+ sourceMetricGroup.idlingStarted();
+ }
+ return inputStatus;
Review comment:
Yes, I didn't want to add tests until the design is approved to save
some time. The proper PR on top of the FLIP needs pretty much a test per metric.
##########
File path:
flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/SettableGauge.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics;
+
+/** A gauge that returns the last set value. */
+public class SettableGauge<T> implements Gauge<T> {
+ private T value;
+
+ public SettableGauge(T initialValue) {
+ this.value = initialValue;
Review comment:
I used `Objects.requireNonNull` as `Preconditions` are not avail here.
##########
File path:
flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SourceMetricGroup.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SettableGauge;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * Pre-defined metrics for sources.
+ *
+ * <p>All metrics can only be accessed in the main operator thread.
+ */
+@NotThreadSafe
+public interface SourceMetricGroup extends OperatorMetricGroup {
+ /** The total number of record that failed to consume, process, or emit. */
+ Counter getNumRecordsInErrorsCounter();
+
+ /**
+ * Adds an optional gauge for last fetch time. Source readers can use this
gauge to indicate the
+ * timestamp in milliseconds that Flink used to fetch a record.
+ *
+ * <p>The timestamp will be used to calculate the currentFetchEventTimeLag
metric <code>
+ * currentFetchEventTimeLag = FetchTime - EventTime</code>.
+ *
+ * <p>Note that this time must strictly reflect the time of the last
polled record. For sources
+ * that retrieve batches from the external system, the best way is to
attach the timestamp to
+ * the batch and return the time of that batch. For multi-threaded
sources, the timestamp should
+ * be embedded into the hand-over data structure.
+ *
+ * @see SettableGauge SettableGauge to continuously update the value.
+ */
+ Gauge<Long> addLastFetchTimeGauge(Gauge<Long> lastFetchTimeGauge);
Review comment:
This argument holds more for `addPendingBytesGauge` and
`addPendingRecordsGauge`.
Here this gauge is used to supply half of the information needed to
calculate `currentFetchEventTimeLag = FetchTime - EventTime`. So while I think
it's okay to have `addLastFetchTimeGauge`, we can discuss if the pendingXGauge
makes sense. Alternatively, we could also add the name constants to the
interface?
Another related question: do we want to return the gauges here? I don't
really see the benefit but the same can be set about `MetricGroup#gauge`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceMetricGroup.java
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SettableGauge;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.metrics.groups.SourceMetricGroup;
+import org.apache.flink.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.util.clock.Clock;
+
+/** Special {@link org.apache.flink.metrics.MetricGroup} representing an
Operator. */
+@Internal
+public class InternalSourceMetricGroup extends
ProxyMetricGroup<OperatorMetricGroup>
Review comment:
Okay, I will do so for the non-draft PR.
##########
File path:
flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SourceMetricGroup.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SettableGauge;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * Pre-defined metrics for sources.
+ *
+ * <p>All metrics can only be accessed in the main operator thread.
+ */
+@NotThreadSafe
+public interface SourceMetricGroup extends OperatorMetricGroup {
+ /** The total number of record that failed to consume, process, or emit. */
+ Counter getNumRecordsInErrorsCounter();
+
+ /**
+ * Adds an optional gauge for last fetch time. Source readers can use this
gauge to indicate the
+ * timestamp in milliseconds that Flink used to fetch a record.
+ *
+ * <p>The timestamp will be used to calculate the currentFetchEventTimeLag
metric <code>
+ * currentFetchEventTimeLag = FetchTime - EventTime</code>.
+ *
+ * <p>Note that this time must strictly reflect the time of the last
polled record. For sources
+ * that retrieve batches from the external system, the best way is to
attach the timestamp to
+ * the batch and return the time of that batch. For multi-threaded
sources, the timestamp should
+ * be embedded into the hand-over data structure.
+ *
+ * @see SettableGauge SettableGauge to continuously update the value.
+ */
+ Gauge<Long> addLastFetchTimeGauge(Gauge<Long> lastFetchTimeGauge);
Review comment:
I just realized that I should have used generics to capture the specific
gauge type. A added a fixup for that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]