[ 
https://issues.apache.org/jira/browse/BEAM-6627?focusedWorklogId=203721&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-203721
 ]

ASF GitHub Bot logged work on BEAM-6627:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Feb/19 15:47
            Start Date: 25/Feb/19 15:47
    Worklog Time Spent: 10m 
      Work Description: lgajowy commented on pull request #7772: [BEAM-6627] 
Added Metrics API processing time reporting to TextIOIT
URL: https://github.com/apache/beam/pull/7772#discussion_r259886027
 
 

 ##########
 File path: 
sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
 ##########
 @@ -127,28 +140,49 @@ public void writeThenReadAll() {
 
     PipelineResult result = pipeline.run();
     result.waitUntilFinish();
-    publishGcsResults(result);
+    gatherAndPublishMetrics(result);
   }
 
-  private void publishGcsResults(PipelineResult result) {
+  private void gatherAndPublishMetrics(PipelineResult result) {
+    String uuid = UUID.randomUUID().toString();
+    Timestamp timestamp = Timestamp.now();
+    List<NamedTestResult> namedTestResults = readMetrics(result, uuid, 
timestamp);
+    publishToBigQuery(namedTestResults, bigQueryDataset, bigQueryTable);
+    ConsoleResultPublisher.publish(namedTestResults, uuid, 
timestamp.toString());
+  }
+
+  private List<NamedTestResult> readMetrics(
+      PipelineResult result, String uuid, Timestamp timestamp) {
+    List<NamedTestResult> results = new ArrayList<>();
+
+    MetricsReader reader = new MetricsReader(result, FILEIOIT_NAMESPACE);
+    long writeStartTime = reader.getStartTimeMetric("startTime");
+    long writeEndTime = reader.getEndTimeMetric("middleTime");
+    long readStartTime = reader.getStartTimeMetric("middleTime");
+    long readEndTime = reader.getEndTimeMetric("endTime");
+    double writeTime = (writeEndTime - writeStartTime) / 1000.0;
+    double readTime = (readEndTime - readStartTime) / 1000.0;
+    double copiesPerSec = calculateGcsMetric(result);
+
+    if (copiesPerSec > 0) {
+      results.add(
+          NamedTestResult.create(uuid, timestamp.toString(), "copies_per_sec", 
copiesPerSec));
+    }
+
+    results.add(NamedTestResult.create(uuid, timestamp.toString(), 
"read_time", readTime));
+    results.add(NamedTestResult.create(uuid, timestamp.toString(), 
"write_time", writeTime));
+
+    return results;
+  }
+
+  private double calculateGcsMetric(PipelineResult result) {
 
 Review comment:
   Interesting: if I read this correctly, we didn’t have to import the 
`sdks-java-extensions-google-cloud-platform` dependency because it is a 
transitive dependency from test-utils package. What if we refactor test utils 
and get rid of the extensions there? If we stick with one flag, we should 
import the `sdks-java-extensions-google-cloud-platform` dependency in 
`file-based-io-tests` module.
   
   Looking at this in a broader, more future-proof way, I’ve begun to think 
it's better to stick with 2 separate flags: first flag in the production code 
that makes the IO/pipeline/whatever else actually produce the metric and the 
other one in the test code that explicitly tells that the metric should be 
harvested. IMO it has the following benefits: 
   
    - we separate the concerns of producing and collecting the metrics - people 
reading test code/implementing new tests can find/define the latter option in 
TestPipelineOptions interface, read it’s the description and decide if it 
should be used without digging into beam implementation specifics and searching 
for the option in the whole codebase 
    - we don't have to import runner/IO specific dependencies every time 
similar case occurs (like: ‘sdks-java-extensions-google-cloud-platform`) - this 
is especially important in case we’re running on other runners than Dataflow. I 
can imagine that tests run on Flink may not require dataflow/gcs related 
dependencies
   
   The disadvantage is that we have 2 flags for each case like this but I think 
such solution will “scale” better in case we require more flags like this from 
other filesystems/runners (eg s3 copies/hdfs copies).
   
   wdyt? 

 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 203721)
    Time Spent: 5h 20m  (was: 5h 10m)

> Use Metrics API in IO performance tests
> ---------------------------------------
>
>                 Key: BEAM-6627
>                 URL: https://issues.apache.org/jira/browse/BEAM-6627
>             Project: Beam
>          Issue Type: Improvement
>          Components: testing
>            Reporter: Michal Walenia
>            Assignee: Michal Walenia
>            Priority: Minor
>          Time Spent: 5h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to