[
https://issues.apache.org/jira/browse/BEAM-5984?focusedWorklogId=171590&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-171590
]
ASF GitHub Bot logged work on BEAM-5984:
----------------------------------------
Author: ASF GitHub Bot
Created on: 03/Dec/18 13:52
Start Date: 03/Dec/18 13:52
Worklog Time Spent: 10m
Work Description: lgajowy closed pull request #7090: [BEAM-5984] Enable
publishing load test results to Big Query
URL: https://github.com/apache/beam/pull/7090
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/ConsoleResultPublisher.java
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/ConsoleResultPublisher.java
new file mode 100644
index 000000000000..c9eba52ca86a
--- /dev/null
+++
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/ConsoleResultPublisher.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.loadtests;
+
+/** Writes {@link LoadTestResult} to console. */
+public class ConsoleResultPublisher {
+
+ static void publish(LoadTestResult result) {
+ System.out.println(String.format("Total bytes: %s",
result.getTotalBytesCount()));
+ System.out.println(String.format("Total time (millis): %s",
result.getRuntime()));
+ }
+}
diff --git
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
index 0f936d11583c..76ded71d4de9 100644
---
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
+++
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
@@ -18,6 +18,8 @@
package org.apache.beam.sdk.loadtests;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Optional;
import org.apache.beam.sdk.Pipeline;
@@ -25,8 +27,8 @@
import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO;
import org.apache.beam.sdk.io.synthetic.SyntheticOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticStep;
-import org.apache.beam.sdk.loadtests.metrics.MetricsPublisher;
import org.apache.beam.sdk.loadtests.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.BigQueryResultsPublisher;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -62,18 +64,51 @@
/** The load test pipeline implementation. */
abstract void loadTest() throws IOException;
- /** Runs the load test. */
+ /**
+ * Runs the load test, collects and publishes test results to various data
store and/or console.
+ */
public PipelineResult run() throws IOException {
+ long testStartTime = System.currentTimeMillis();
+
loadTest();
PipelineResult result = pipeline.run();
result.waitUntilFinish();
- MetricsPublisher.toConsole(result, metricsNamespace);
+ LoadTestResult testResult = LoadTestResult.create(result,
metricsNamespace, testStartTime);
+
+ ConsoleResultPublisher.publish(testResult);
+ if (options.getPublishToBigQuery()) {
+ publishResultToBigQuery(testResult);
+ }
return result;
}
+ private void publishResultToBigQuery(LoadTestResult testResult) {
+ String dataset = options.getBigQueryDataset();
+ String table = options.getBigQueryTable();
+ checkBigQueryOptions(dataset, table);
+
+ ImmutableMap<String, String> schema =
+ ImmutableMap.<String, String>builder()
+ .put("timestamp", "timestamp")
+ .put("runtime", "float")
+ .put("total_bytes_count", "integer")
+ .build();
+
+ BigQueryResultsPublisher.create(dataset, schema).publish(testResult,
table);
+ }
+
+ private static void checkBigQueryOptions(String dataset, String table) {
+ Preconditions.checkArgument(
+ dataset != null,
+ "Please specify --bigQueryDataset option if you want to publish to
BigQuery");
+
+ Preconditions.checkArgument(
+ table != null, "Please specify --bigQueryTable option if you want to
publish to BigQuery");
+ }
+
<T extends SyntheticOptions> T fromJsonString(String json, Class<T> type)
throws IOException {
ObjectMapper mapper = new ObjectMapper();
T result = mapper.readValue(json, type);
diff --git
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTestOptions.java
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTestOptions.java
index 5654666b82a1..3eaff3161346 100644
---
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTestOptions.java
+++
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTestOptions.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.loadtests;
import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -37,6 +38,22 @@
void setStepOptions(String stepOptions);
+ @Description("Whether the results should be published to BigQuery database")
+ @Default.Boolean(false)
+ Boolean getPublishToBigQuery();
+
+ void setPublishToBigQuery(Boolean publishToBigQuery);
+
+ @Description("BigQuery dataset name")
+ String getBigQueryDataset();
+
+ void setBigQueryDataset(String dataset);
+
+ @Description("BigQuery table name")
+ String getBigQueryTable();
+
+ void setBigQueryTable(String tableName);
+
static <T extends LoadTestOptions> T readFromArgs(String[] args, Class<T>
optionsClass) {
return
PipelineOptionsFactory.fromArgs(args).withValidation().as(optionsClass);
}
diff --git
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTestResult.java
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTestResult.java
new file mode 100644
index 000000000000..b09ce882617f
--- /dev/null
+++
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTestResult.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.loadtests;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.testutils.TestResult;
+import org.apache.beam.sdk.testutils.metrics.MetricsReader;
+
+/** POJO that represents load test results. */
+public class LoadTestResult implements TestResult {
+
+ private final Long timestamp;
+
+ private final Long runtime;
+
+ private final Long totalBytesCount;
+
+ private LoadTestResult(Long timestamp, Long runtime, Long totalBytesCount) {
+ this.timestamp = timestamp;
+ this.runtime = runtime;
+ this.totalBytesCount = totalBytesCount;
+ }
+
+ /** Constructs {@link LoadTestResult} from {@link PipelineResult}. */
+ static LoadTestResult create(PipelineResult result, String namespace, long
now) {
+ MetricsReader reader = new MetricsReader(result, namespace);
+
+ return new LoadTestResult(
+ now,
+ reader.getEndTimeMetric("runtime") -
reader.getStartTimeMetric("runtime"),
+ reader.getCounterMetric("totalBytes.count"));
+ }
+
+ public Long getRuntime() {
+ return runtime;
+ }
+
+ public Long getTotalBytesCount() {
+ return totalBytesCount;
+ }
+
+ @Override
+ public Map<String, Object> toMap() {
+ return ImmutableMap.<String, Object>builder()
+ .put("timestamp", timestamp)
+ .put("runtime", runtime)
+ .put("totalBytesCount", totalBytesCount)
+ .build();
+ }
+}
diff --git
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/ByteMonitor.java
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/ByteMonitor.java
index 26a3ef5e3529..170af31f589a 100644
---
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/ByteMonitor.java
+++
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/ByteMonitor.java
@@ -19,6 +19,7 @@
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
@@ -26,8 +27,8 @@
* Monitor that records the number of bytes flowing through a PCollection.
*
* <p>To use: apply a monitor in a desired place in the pipeline. This will
capture how many bytes
- * flew through this DoFn which then can be collected and written out using
{@link
- * MetricsPublisher}.
+ * flew through this DoFn. Such information can be then collected and written
out and queried using
+ * {@link MetricsReader}.
*/
public class ByteMonitor extends DoFn<KV<byte[], byte[]>, KV<byte[], byte[]>> {
diff --git
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsPublisher.java
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsPublisher.java
deleted file mode 100644
index 365b95babca2..000000000000
---
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsPublisher.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.loadtests.metrics;
-
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.testutils.metrics.MetricsReader;
-
-/** Provides ways to publish metrics gathered during test invocation. */
-public class MetricsPublisher {
-
- /**
- * This prints out metrics results to console. It will work only if metrics
with appropriate
- * (conventional) names are present to be collected in {@link PipelineResult}
- *
- * <p>See {@link org.apache.beam.sdk.loadtests.GroupByKeyLoadTest} for hints
on how to use it.
- */
- public static void toConsole(PipelineResult result, String namespace) {
- MetricsReader resultMetrics = new MetricsReader(result, namespace);
-
- long totalBytes = resultMetrics.getCounterMetric("totalBytes.count");
- long startTime = resultMetrics.getStartTimeMetric("runtime");
- long endTime = resultMetrics.getEndTimeMetric("runtime");
-
- System.out.println(String.format("Total bytes: %s", totalBytes));
- System.out.println(String.format("Total time (millis): %s", endTime -
startTime));
- }
-}
diff --git
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/TimeMonitor.java
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/TimeMonitor.java
index 128162da41c6..19b4f959c58e 100644
---
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/TimeMonitor.java
+++
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/TimeMonitor.java
@@ -19,6 +19,7 @@
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
@@ -26,8 +27,8 @@
* Monitor that records processing time distribution in the pipeline.
*
* <p>To use: apply a monitor directly after each source and sink transform.
This will capture a
- * distribution of element processing timestamps, which can be collected and
written out using
- * {@link MetricsPublisher}.
+ * distribution of element processing timestamps, which can be collected and
queried using {@link
+ * MetricsReader}.
*/
public class TimeMonitor<K, V> extends DoFn<KV<K, V>, KV<K, V>> {
diff --git
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
index 8e6dbc9f4fac..af801ca555ad 100644
---
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
+++
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
@@ -40,7 +40,7 @@
import org.apache.beam.sdk.nexmark.model.Bid;
import org.apache.beam.sdk.nexmark.model.Person;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testutils.publishing.BigQueryClient;
+import org.apache.beam.sdk.testutils.publishing.BigQueryResultsPublisher;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -142,8 +142,19 @@ void runAll(String[] args) throws IOException {
}
if (options.getExportSummaryToBigQuery()) {
- BigQueryClient publisher =
BigQueryClient.create(options.getBigQueryDataset());
- savePerfsToBigQuery(publisher, options, actual, start);
+ ImmutableMap<String, String> schema =
+ ImmutableMap.<String, String>builder()
+ .put("timestamp", "timestamp")
+ .put("runtimeSec", "float")
+ .put("eventsPerSec", "float")
+ .put("numResults", "integer")
+ .build();
+
+ savePerfsToBigQuery(
+ BigQueryResultsPublisher.create(options.getBigQueryDataset(),
schema),
+ options,
+ actual,
+ start);
}
} finally {
if (options.getMonitorJobs()) {
@@ -161,7 +172,7 @@ void runAll(String[] args) throws IOException {
@VisibleForTesting
static void savePerfsToBigQuery(
- BigQueryClient bigQueryClient,
+ BigQueryResultsPublisher publisher,
NexmarkOptions options,
Map<NexmarkConfiguration, NexmarkPerf> perfs,
Instant start) {
@@ -172,25 +183,7 @@ static void savePerfsToBigQuery(
options.getQueryLanguage(),
entry.getKey().query.getNumberOrName());
String tableName = NexmarkUtils.tableName(options, queryName, 0L, null);
- ImmutableMap<String, String> schema =
- ImmutableMap.<String, String>builder()
- .put("timestamp", "timestamp")
- .put("runtimeSec", "float")
- .put("eventsPerSec", "float")
- .put("numResults", "integer")
- .build();
- bigQueryClient.createTableIfNotExists(tableName, schema);
-
- // convert millis to seconds (it's a BigQuery's requirement).
- Map<String, Object> record =
- ImmutableMap.<String, Object>builder()
- .put("timestamp", start.getMillis() / 1000)
- .put("runtimeSec", entry.getValue().runtimeSec)
- .put("eventsPerSec", entry.getValue().eventsPerSec)
- .put("numResults", entry.getValue().numResults)
- .build();
-
- bigQueryClient.insertRow(record, tableName);
+ publisher.publish(entry.getValue(), tableName, start.getMillis());
}
}
diff --git
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkPerf.java
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkPerf.java
index a6a8430fe472..bea6767407a0 100644
---
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkPerf.java
+++
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkPerf.java
@@ -19,12 +19,16 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.testutils.TestResult;
/** Summary of performance for a particular run of a configuration. */
-public class NexmarkPerf {
+public class NexmarkPerf implements TestResult {
+
/**
* A sample of the number of events and number of results (if known)
generated at a particular
* time.
@@ -152,4 +156,9 @@ public boolean anyActivity(NexmarkPerf that) {
}
return false;
}
+
+ @Override
+ public Map<String, Object> toMap() {
+ return NexmarkUtils.MAPPER.convertValue(this, new
TypeReference<Map<String, Object>>() {});
+ }
}
diff --git
a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
index d3849e6ea353..4b86c29ce839 100644
---
a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
+++
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
@@ -17,15 +17,15 @@
*/
package org.apache.beam.sdk.nexmark;
-import static org.junit.Assert.assertTrue;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.junit.Assert.assertThat;
-import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testutils.fakes.FakeBigQueryClient;
+import org.apache.beam.sdk.testutils.TestResult;
+import org.apache.beam.sdk.testutils.fakes.FakeBigQueryResultsPublisher;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
@@ -34,9 +34,10 @@
public class PerfsToBigQueryTest {
private static final NexmarkQueryName QUERY =
NexmarkQueryName.CURRENCY_CONVERSION;
+
private NexmarkOptions options;
- private FakeBigQueryClient bigQueryClient;
+ private FakeBigQueryResultsPublisher publisher;
@Before
public void before() {
@@ -48,7 +49,7 @@ public void before() {
options.setProject("nexmark-test");
options.setResourceNameMode(NexmarkUtils.ResourceNameMode.QUERY_RUNNER_AND_MODE);
- bigQueryClient = new FakeBigQueryClient(options.getBigQueryDataset());
+ publisher = new FakeBigQueryResultsPublisher();
}
@Test
@@ -77,42 +78,11 @@ public void testSavePerfsToBigQuery() {
perfs.put(nexmarkConfiguration2, nexmarkPerf2);
long startTimestampMilliseconds = 1454284800000L;
- Main.savePerfsToBigQuery(
- bigQueryClient, options, perfs, new
Instant(startTimestampMilliseconds));
+ Main.savePerfsToBigQuery(publisher, options, perfs, new
Instant(startTimestampMilliseconds));
String tableName = NexmarkUtils.tableName(options,
QUERY.getNumberOrName(), 0L, null);
- List<Map<String, ?>> rows = bigQueryClient.getRows(tableName);
-
- // savePerfsToBigQuery converts millis to seconds (it's a BigQuery's
requirement).
- assertContains(nexmarkRecord(nexmarkPerf1, startTimestampMilliseconds /
1000), rows);
- assertContains(nexmarkRecord(nexmarkPerf2, startTimestampMilliseconds /
1000), rows);
- }
-
- private Map<String, Object> nexmarkRecord(NexmarkPerf nexmarkPerf, long
startTimestampSeconds) {
- return ImmutableMap.<String, Object>builder()
- .put("timestamp", startTimestampSeconds)
- .put("runtimeSec", nexmarkPerf.runtimeSec)
- .put("eventsPerSec", nexmarkPerf.eventsPerSec)
- .put("numResults", nexmarkPerf.numResults)
- .build();
- }
-
- private void assertContains(Map<String, ?> expectedRecord, List<Map<String,
?>> actualRecords) {
- assertTrue(
- String.format("Record not found: %s", expectedRecord),
- actualRecords
- .stream()
- .anyMatch(actualRecord -> recordEquals(actualRecord,
expectedRecord)));
- }
-
- private boolean recordEquals(Map<String, ?> expected, Map<String, ?> actual)
{
- if (expected == null || actual == null) {
- return false;
- }
+ List<TestResult> rows = publisher.getRecords(tableName);
- return expected.get("timestamp").equals(actual.get("timestamp"))
- && expected.get("runtimeSec").equals(actual.get("runtimeSec"))
- && expected.get("eventsPerSec").equals(actual.get("eventsPerSec"))
- && expected.get("numResults").equals(actual.get("numResults"));
+ assertThat(rows, hasItems(nexmarkPerf1, nexmarkPerf2));
}
}
diff --git
a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/TestResult.java
b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/TestResult.java
new file mode 100644
index 000000000000..485e9f867161
--- /dev/null
+++
b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/TestResult.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testutils;
+
+import java.util.Map;
+
+/**
+ * Represents ANY test result (POJO). Implementations of this interface are to
be used by generic
+ * mechanisms for publishing, parsing etc.
+ */
+public interface TestResult {
+
+ /** Transforms the TestResult POJO to map of field names and values. */
+ Map<String, Object> toMap();
+}
diff --git
a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/package-info.java
b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/package-info.java
new file mode 100644
index 000000000000..4aa51e97c40c
--- /dev/null
+++
b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** Testing utilities for whole Java SDK. */
+package org.apache.beam.sdk.testutils;
diff --git
a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryClient.java
b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryClient.java
index 5a47afe18e49..42fd23d77108 100644
---
a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryClient.java
+++
b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryClient.java
@@ -98,6 +98,18 @@ private void createTable(TableId tableId, Schema schema) {
client.create(tableInfo, FIELD_OPTIONS);
}
+ /**
+ * Inserts one row to BigQuery table. Creates table using the given schema
if the table does not
+ * exist.
+ *
+ * @see #insertRow(Map, String)
+ * @see #createTableIfNotExists(String, Map) for more details.
+ */
+ public void insertRow(Map<String, ?> row, Map<String, String> schema, String
table) {
+ createTableIfNotExists(table, schema);
+ insertRow(row, table);
+ }
+
/**
* Inserts one row to BigQuery table.
*
diff --git
a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryResultsPublisher.java
b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryResultsPublisher.java
new file mode 100644
index 000000000000..e7203e3b92bd
--- /dev/null
+++
b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryResultsPublisher.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testutils.publishing;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.testutils.TestResult;
+
+/** Publishes {@link TestResult}. */
+public class BigQueryResultsPublisher {
+
+ private BigQueryClient client;
+
+ private Map<String, String> schema;
+
+ protected BigQueryResultsPublisher(BigQueryClient client, Map<String,
String> schema) {
+ this.client = client;
+ this.schema = schema;
+ }
+
+ public static BigQueryResultsPublisher create(String dataset, Map<String,
String> schema) {
+ return new BigQueryResultsPublisher(BigQueryClient.create(dataset),
schema);
+ }
+
+ public void publish(TestResult result, String tableName, long nowInMillis) {
+ Map<String, Object> row = getRowOfSchema(result);
+
+ // BigQuery requires seconds so we have to divide here
+ row.put("timestamp", nowInMillis / 1000);
+ client.insertRow(row, schema, tableName);
+ }
+
+ public void publish(TestResult result, String tableName) {
+ client.insertRow(getRowOfSchema(result), schema, tableName);
+ }
+
+ private Map<String, Object> getRowOfSchema(TestResult result) {
+ return result
+ .toMap()
+ .entrySet()
+ .stream()
+ .filter(element -> schema.containsKey(element.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+}
diff --git
a/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/fakes/FakeBigQueryClient.java
b/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/fakes/FakeBigQueryClient.java
index 313958840cab..49a2ad46f322 100644
---
a/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/fakes/FakeBigQueryClient.java
+++
b/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/fakes/FakeBigQueryClient.java
@@ -32,8 +32,8 @@
private Map<String, List<Map<String, ?>>> rowsPerTable;
- public FakeBigQueryClient(String dataset) {
- super(null, null, dataset);
+ public FakeBigQueryClient() {
+ super(null, null, null);
rowsPerTable = new HashMap<>();
}
diff --git
a/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/fakes/FakeBigQueryResultsPublisher.java
b/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/fakes/FakeBigQueryResultsPublisher.java
new file mode 100644
index 000000000000..e8adfb900f38
--- /dev/null
+++
b/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/fakes/FakeBigQueryResultsPublisher.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testutils.fakes;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.testutils.TestResult;
+import org.apache.beam.sdk.testutils.publishing.BigQueryResultsPublisher;
+
+/** A fake implementation of {@link BigQueryResultsPublisher} for testing
purposes only. */
+public class FakeBigQueryResultsPublisher extends BigQueryResultsPublisher {
+
+ private Map<String, List<TestResult>> recordsPerTable;
+
+ public FakeBigQueryResultsPublisher() {
+ super(null, null);
+ this.recordsPerTable = new HashMap<>();
+ }
+
+ @Override
+ public void publish(TestResult result, String tableName, long nowInMillis) {
+ List<TestResult> results = recordsPerTable.get(tableName);
+
+ if (results == null) {
+ results = new ArrayList<>();
+ }
+ results.add(result);
+
+ recordsPerTable.put(tableName, results);
+ }
+
+ public List<TestResult> getRecords(String table) {
+ return recordsPerTable.get(table);
+ }
+}
diff --git
a/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/publishing/BigQueryResultsPublisherTest.java
b/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/publishing/BigQueryResultsPublisherTest.java
new file mode 100644
index 000000000000..d5543da8d255
--- /dev/null
+++
b/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/publishing/BigQueryResultsPublisherTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testutils.publishing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.beam.sdk.testutils.TestResult;
+import org.apache.beam.sdk.testutils.fakes.FakeBigQueryClient;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BigQueryResultsPublisher}. */
+@RunWith(JUnit4.class)
+public class BigQueryResultsPublisherTest {
+
+ private static final String TABLE_NAME = "table";
+
+ private BigQueryResultsPublisher publisher;
+
+ private FakeBigQueryClient bigQueryClient;
+
+ @Before
+ public void setUp() {
+ ImmutableMap<String, String> schema =
+ ImmutableMap.<String, String>builder()
+ .put("timestamp", "timestamp")
+ .put("field1", "string")
+ .build();
+
+ this.bigQueryClient = new FakeBigQueryClient();
+ this.publisher = new BigQueryResultsPublisher(bigQueryClient, schema);
+ }
+
+ @Test
+ public void testPublishRowWithTimestampField() {
+ long now = 1000L;
+
+ publisher.publish(new SampleTestResult("a", "b"), TABLE_NAME, now);
+
+ Map<String, ?> rowInTable = bigQueryClient.getRows(TABLE_NAME).get(0);
+ assertEquals(2, rowInTable.entrySet().size());
+ assertEquals(1L, rowInTable.get("timestamp"));
+ assertEquals("a", rowInTable.get("field1"));
+ }
+
+ @Test
+ public void testPublishRowWithoutTimestamp() {
+ publisher.publish(new SampleTestResult("a", "b"), TABLE_NAME);
+
+ Map<String, ?> rowInTable = bigQueryClient.getRows(TABLE_NAME).get(0);
+ assertEquals(1, rowInTable.entrySet().size());
+ assertEquals("a", rowInTable.get("field1"));
+ }
+
+ @Test
+ public void testRowDoesntContainFieldsNotSpecifiedInSchema() {
+ publisher.publish(new SampleTestResult("a", "b"), TABLE_NAME);
+
+ Map<String, ?> rowInTable = bigQueryClient.getRows(TABLE_NAME).get(0);
+ assertNull(rowInTable.get("field2"));
+ }
+
+ private static class SampleTestResult implements TestResult {
+
+ private String field1;
+
+ private String field2;
+
+ SampleTestResult(String field1, String field2) {
+ this.field1 = field1;
+ this.field2 = field2;
+ }
+
+ @Override
+ public Map<String, Object> toMap() {
+ return ImmutableMap.<String, Object>builder()
+ .put("field1", field1)
+ .put("field2", field2)
+ .build();
+ }
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 171590)
Time Spent: 2h 50m (was: 2h 40m)
> Publish metrics from load tests to BigQuery database
> ----------------------------------------------------
>
> Key: BEAM-5984
> URL: https://issues.apache.org/jira/browse/BEAM-5984
> Project: Beam
> Issue Type: Sub-task
> Components: testing
> Reporter: Lukasz Gajowy
> Assignee: Lukasz Gajowy
> Priority: Major
> Time Spent: 2h 50m
> Remaining Estimate: 0h
>
> Collect metrics in BQ so that those could be read and plotted using
> PerfKitExplorer tool or any other similar.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)