GitHub user zentol opened a pull request:
https://github.com/apache/flink/pull/2090
Expose several new Metrics
This PR exposes several new metrics in Flink.
The names, scope and respective performance impact (outside of the actual
metric itself) are listed blow:
Metric | Scope | Performance Impact
-------|-------|-------------------
currentLowWatermark | Task | None
lastCheckpointSize | Task | StreamTaskStateList.getStateSize() for every
checkpoint
numBytes(In/Out)(Local/Remote) | Task | Buffer.getSize() for every buffer
numRecords(In/Out) | Operator | Streaming: None / Batch: some additional
null checks
numSplitsProcessed | Operator | None
The currentLowWatermark metric simply exposes the last emitted Watermark in
the StreamInputProcessor using a Gauge.
The lastCheckpointSize metric is gathered in the
StreamTask.performCheckpoint method. For every checkpoint made the size is
computed and stored in a field, which is exposed with a Gauge.
The numSplitsProcessed metric is a simple counter in the
DataSourceTask/FileSourceFunction.
The numBytes(In/Out)(Local/Remote) metric touches the 2nd most files. It
replaces the previous numBytes(In/Out) metric which was exposed in the
Deserializers. It is now gathered directly in the InputChannels, adding the
size of read buffers to a Counter. It is thus also more accurate than the
previous version, since that one actually measured you many bytes were
deserialized.
The numRecords(In/Out) metric was extended to measure per operator instead
of task. This includes chained operators. The information is no longer gathered
in the deserializers but the operator itself. This was generally accomplished
by wrapping in- and output in Counting(Output/Collector/Iterator/...) classes.
Below are more specific details for both Streaming and Batch as to how these
were implemented.
### Streaming
* numRecordsIn is measured in the StreamInputProcesser or the
ChainingOutput class.
* numRecordsOut is measured by wrapping the output in a CountingOutput, set
in the AbstractStreamOperator.
The StreamSource class was slightly modified, adding a second run() method
without a collector argument. Using this method the StreamSource now uses the
same output that is set in the AbstractStreamOperator. The old method was kept
in place so as not to change tests.
The StreamIterationTail class needed some extra love as well, since the
contained RecordPusher did not follow the StreamOperator lifecycle. On this
operator setup() was never called, leading to an NPE since the MetricGroup was
never set. In order to be able to pass an Output object into setup() the
RecordPusher's internal logic was moved into a new IterationTailOutput class.
This slightly modified the exception behaviour; InterruptedExceptions are now
longer simply forwarded, but wrapped in a RuntimeException instead.
### Batch
* numRecordsIn is measured in each Driver class separately. In cases where
an iterator is passed into the UDF the underlying MutableObjectIterator was
wrapped in a CountingMutableObjectIterator. This should ensure that no input
record is counted multiple times.
* numRecordsOut is measured by wrapping the collector in a
CountingCollector, set in each Drivers prepare() or run() method.
Note that the following drivers were nto changed:
(Join/CoGroup)WithSolutionSet(First/Second)Driver.
I coiuldn't figure out a way to not count records multiple times,
specifically those residing in the HashTable, so I ignored them.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/zentol/flink metrics
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2090.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2090
----
commit e585c47c303b0c4feee097f36315be464da19a8e
Author: zentol <[email protected]>
Date: 2016-06-01T09:24:40Z
task: lowWatermark
commit 2c7b18b1476d548382bf6e5a49d496508a4fc9ae
Author: zentol <[email protected]>
Date: 2016-06-03T09:32:28Z
task: checkpointSize
commit b04e1dde2ecc101e6a45bcbc83921f8a4d9c3c1b
Author: zentol <[email protected]>
Date: 2016-06-08T09:20:14Z
task: numBytes(In/Out)(Local/Remote)
commit d131bfa3a1f95d22900bba131b6c300ee881e5c7
Author: zentol <[email protected]>
Date: 2016-06-07T18:20:10Z
operator: numSplitsProcessed
commit ffbba601fab4542b30c299c3650a18b4b2612df2
Author: zentol <[email protected]>
Date: 2016-06-10T11:24:49Z
operator: numRecords(In/Out) Basics
commit 9428ba9291380f56c0486c124163e3ad58ccfae8
Author: zentol <[email protected]>
Date: 2016-06-08T09:25:30Z
operator: streaming: numRecords(In/Out)
commit 85e81e42147930af255c1991726c290281b9d82e
Author: zentol <[email protected]>
Date: 2016-06-10T11:25:18Z
operator: batch: unchained: numRecords(In/Out)
commit ae0ba804cc664d8044749257ab6303aadaeb853e
Author: zentol <[email protected]>
Date: 2016-06-08T12:03:19Z
operator: batch: chaining: numRecords(In/Out)
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---