GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/2090

    Expose several new Metrics

    This PR exposes several new metrics in Flink.
    
    The names, scope and respective performance impact (outside of the actual 
metric itself) are listed blow:
    
    Metric | Scope | Performance Impact
    -------|-------|-------------------
    currentLowWatermark | Task | None
    lastCheckpointSize | Task | StreamTaskStateList.getStateSize() for every 
checkpoint
    numBytes(In/Out)(Local/Remote) | Task |  Buffer.getSize() for every buffer
    numRecords(In/Out) | Operator | Streaming: None / Batch: some additional 
null checks
    numSplitsProcessed | Operator | None
    
    The currentLowWatermark metric simply exposes the last emitted Watermark in 
the StreamInputProcessor using a Gauge.
    
    The lastCheckpointSize metric is gathered in the 
StreamTask.performCheckpoint method. For every checkpoint made the size is 
computed and stored in a field, which is exposed with a Gauge.
    
    The numSplitsProcessed metric is a simple counter in the 
DataSourceTask/FileSourceFunction.
    
    The numBytes(In/Out)(Local/Remote) metric touches the 2nd most files. It 
replaces the previous numBytes(In/Out) metric which was exposed in the 
Deserializers. It is now gathered directly in the InputChannels, adding the 
size of read buffers to a Counter. It is thus also more accurate than the 
previous version, since that one actually measured you many bytes were 
deserialized.
    
    The numRecords(In/Out) metric was extended to measure per operator instead 
of task. This includes chained operators. The information is no longer gathered 
in the deserializers but the operator itself. This was generally accomplished 
by wrapping in- and output in Counting(Output/Collector/Iterator/...) classes. 
Below are more specific details for both Streaming and Batch as to how these 
were implemented.
    
    ### Streaming
    * numRecordsIn is measured in the StreamInputProcesser or the 
ChainingOutput class.
    * numRecordsOut is measured by wrapping the output in a CountingOutput, set 
in the AbstractStreamOperator.
    
    The StreamSource class was slightly modified, adding a second run() method 
without a collector argument. Using this method the StreamSource now uses the 
same output that is set in the AbstractStreamOperator. The old method was kept 
in place so as not to change tests.
    
    The StreamIterationTail class needed some extra love as well, since the 
contained RecordPusher did not follow the StreamOperator lifecycle. On this 
operator setup() was never called, leading to an NPE since the MetricGroup was 
never set. In order to be able to pass an Output object into setup() the 
RecordPusher's internal logic was moved into a new IterationTailOutput class. 
This slightly modified the exception behaviour; InterruptedExceptions are now 
longer simply forwarded, but wrapped in a RuntimeException instead.
    
    ### Batch
    * numRecordsIn is measured in each Driver class separately. In cases where 
an iterator is passed into the UDF the underlying MutableObjectIterator was 
wrapped in a CountingMutableObjectIterator. This should ensure that no input 
record is counted multiple times.
    * numRecordsOut is measured by wrapping the collector in a 
CountingCollector, set in each Drivers prepare() or run() method.
    
    Note that the following drivers were nto changed: 
(Join/CoGroup)WithSolutionSet(First/Second)Driver.
    I coiuldn't figure out a way to not count records multiple times, 
specifically those residing in the HashTable, so I ignored them.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink metrics

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2090.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2090
    
----
commit e585c47c303b0c4feee097f36315be464da19a8e
Author: zentol <[email protected]>
Date:   2016-06-01T09:24:40Z

    task: lowWatermark

commit 2c7b18b1476d548382bf6e5a49d496508a4fc9ae
Author: zentol <[email protected]>
Date:   2016-06-03T09:32:28Z

    task: checkpointSize

commit b04e1dde2ecc101e6a45bcbc83921f8a4d9c3c1b
Author: zentol <[email protected]>
Date:   2016-06-08T09:20:14Z

    task: numBytes(In/Out)(Local/Remote)

commit d131bfa3a1f95d22900bba131b6c300ee881e5c7
Author: zentol <[email protected]>
Date:   2016-06-07T18:20:10Z

    operator: numSplitsProcessed

commit ffbba601fab4542b30c299c3650a18b4b2612df2
Author: zentol <[email protected]>
Date:   2016-06-10T11:24:49Z

    operator: numRecords(In/Out) Basics

commit 9428ba9291380f56c0486c124163e3ad58ccfae8
Author: zentol <[email protected]>
Date:   2016-06-08T09:25:30Z

    operator: streaming: numRecords(In/Out)

commit 85e81e42147930af255c1991726c290281b9d82e
Author: zentol <[email protected]>
Date:   2016-06-10T11:25:18Z

    operator: batch: unchained: numRecords(In/Out)

commit ae0ba804cc664d8044749257ab6303aadaeb853e
Author: zentol <[email protected]>
Date:   2016-06-08T12:03:19Z

    operator: batch: chaining: numRecords(In/Out)

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to