[
https://issues.apache.org/jira/browse/BEAM-4420?focusedWorklogId=277583&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-277583
]
ASF GitHub Bot logged work on BEAM-4420:
----------------------------------------
Author: ASF GitHub Bot
Created on: 16/Jul/19 16:49
Start Date: 16/Jul/19 16:49
Worklog Time Spent: 10m
Work Description: aromanenko-dev commented on pull request #9073:
[BEAM-4420] KafkaIOIT metrics collection
URL: https://github.com/apache/beam/pull/9073#discussion_r304012071
##########
File path:
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
##########
@@ -79,22 +95,53 @@ public static void setup() throws IOException {
public void testKafkaIOReadsAndWritesCorrectly() throws IOException {
writePipeline
.apply("Generate records", Read.from(new
SyntheticBoundedSource(sourceOptions)))
+ .apply("Measure write time", ParDo.of(new TimeMonitor<>(NAMESPACE,
"write_time")))
.apply("Write to Kafka", writeToKafka());
- writePipeline.run().waitUntilFinish();
-
PCollection<String> hashcode =
readPipeline
.apply("Read from Kafka", readFromKafka())
+ .apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE,
"read_time")))
.apply("Map records to strings", MapElements.via(new
MapKafkaRecordsToStrings()))
.apply("Calculate hashcode", Combine.globally(new
HashingFn()).withoutDefaults());
PAssert.thatSingleton(hashcode).isEqualTo(EXPECTED_HASHCODE);
+ PipelineResult writeResult = writePipeline.run();
+ writeResult.waitUntilFinish();
+
PipelineResult readResult = readPipeline.run();
PipelineResult.State readState =
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
cancelIfNotTerminal(readResult, readState);
+
+ Set<NamedTestResult> metrics = readMetrics(writeResult, readResult);
+ IOITMetrics.publish(
+ testId, timestamp, options.getBigQueryDataset(),
options.getBigQueryTable(), metrics);
+ }
+
+ private Set<NamedTestResult> readMetrics(PipelineResult writeResult,
PipelineResult readResult) {
+ Function<MetricsReader, NamedTestResult> writeTimeSupplier =
+ reader -> {
+ long start = reader.getStartTimeMetric("write_time");
+ long end = reader.getEndTimeMetric("write_time");
+ return NamedTestResult.create(testId, timestamp, "write_time", (end
- start) / 1e3);
+ };
+
+ Function<MetricsReader, NamedTestResult> readTimeSupplier =
+ reader -> {
+ long start = reader.getStartTimeMetric("read_time");
Review comment:
Please, extract "read_time" as a constant value and use this value over the
test instead.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 277583)
Time Spent: 2h 50m (was: 2h 40m)
> Add KafkaIO Integration Tests
> -----------------------------
>
> Key: BEAM-4420
> URL: https://issues.apache.org/jira/browse/BEAM-4420
> Project: Beam
> Issue Type: Test
> Components: io-java-kafka, testing
> Reporter: Ismaël Mejía
> Assignee: Lukasz Gajowy
> Priority: Minor
> Time Spent: 2h 50m
> Remaining Estimate: 0h
>
> It is a good idea to have ITs for KafkaIO.
> There are two possible issues:
> 1. The tests should probably invert the pattern to be readThenWrite given
> that Unbounded IOs block on Read and ...
> 2. Until we have a way to do PAsserts on Unbounded sources we can rely on
> withMaxNumRecords to ensure this test ends.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)