[
https://issues.apache.org/jira/browse/BEAM-4283?focusedWorklogId=112374&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112374
]
ASF GitHub Bot logged work on BEAM-4283:
----------------------------------------
Author: ASF GitHub Bot
Created on: 15/Jun/18 16:06
Start Date: 15/Jun/18 16:06
Worklog Time Spent: 10m
Work Description: chamikaramj closed pull request #5464: [BEAM-4283]
Write Nexmark execution times to bigquery
URL: https://github.com/apache/beam/pull/5464
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 29fb8924f22..19aef00094d 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1452,7 +1452,10 @@ static String getExtractDestinationUri(String
extractDestinationDir) {
}
@VisibleForTesting
- Write<T> withTestServices(BigQueryServices testServices) {
+ /**
+ * This method is for test usage only
+ */
+ public Write<T> withTestServices(BigQueryServices testServices) {
checkArgument(testServices != null, "testServices can not be null");
return toBuilder().setBigQueryServices(testServices).build();
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 1295cc0fe2c..c4e5462306f 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -32,10 +32,12 @@
import java.io.Serializable;
import java.util.List;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.values.ValueInSingleWindow;
/** An interface for real, mock, or fake implementations of Cloud BigQuery
services. */
-interface BigQueryServices extends Serializable {
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public interface BigQueryServices extends Serializable {
/**
* Returns a real, mock, or fake {@link JobService}.
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
index 0a384e74e17..d581a925f6c 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
@@ -23,6 +23,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.ListCoder;
@@ -30,16 +31,17 @@
/**
* A fake implementation of BigQuery's query service..
*/
-class FakeBigQueryServices implements BigQueryServices {
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class FakeBigQueryServices implements BigQueryServices {
private JobService jobService;
private FakeDatasetService datasetService;
- FakeBigQueryServices withJobService(JobService jobService) {
+ public FakeBigQueryServices withJobService(JobService jobService) {
this.jobService = jobService;
return this;
}
- FakeBigQueryServices withDatasetService(FakeDatasetService datasetService) {
+ public FakeBigQueryServices withDatasetService(FakeDatasetService
datasetService) {
this.datasetService = datasetService;
return this;
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
index 3526ed5ddd0..50d4b7af06d 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
@@ -39,6 +39,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy.Context;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -46,7 +47,8 @@
import org.apache.beam.sdk.values.ValueInSingleWindow;
/** A fake dataset service that can be serialized, for use in
testReadFromTable. */
-class FakeDatasetService implements DatasetService, Serializable {
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class FakeDatasetService implements DatasetService, Serializable {
// Table information must be static, as each ParDo will get a separate
instance of
// FakeDatasetServices, and they must all modify the same storage.
static com.google.common.collect.Table<String, String, Map<String,
TableContainer>>
@@ -76,7 +78,7 @@ public Table getTable(TableReference tableRef)
}
}
- List<TableRow> getAllRows(String projectId, String datasetId, String tableId)
+ public List<TableRow> getAllRows(String projectId, String datasetId, String
tableId)
throws InterruptedException, IOException {
synchronized (tables) {
return getTableContainer(projectId, datasetId, tableId).getRows();
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index 46c03f44f97..6f1e715389b 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -62,6 +62,7 @@
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.io.FileSystems;
@@ -78,7 +79,8 @@
/**
* A fake implementation of BigQuery's job service.
*/
-class FakeJobService implements JobService, Serializable {
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class FakeJobService implements JobService, Serializable {
private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
// Whenever a job is started, the first 2 calls to GetJob will report the
job as pending,
// the next 2 will return the job as running, and only then will the job
report as done.
@@ -103,7 +105,7 @@
private static com.google.common.collect.Table<String, String, JobStatistics>
dryRunQueryResults;
- FakeJobService() {
+ public FakeJobService() {
this.datasetService = new FakeDatasetService();
}
diff --git a/sdks/java/nexmark/build.gradle b/sdks/java/nexmark/build.gradle
index 703495007f1..6dc6d87d179 100644
--- a/sdks/java/nexmark/build.gradle
+++ b/sdks/java/nexmark/build.gradle
@@ -63,8 +63,11 @@ dependencies {
shadow library.java.slf4j_jdk14
provided library.java.junit
provided library.java.hamcrest_core
+ shadow project(path: ":beam-sdks-java-io-google-cloud-platform",
configuration: "shadow")
+ shadowTest project(path: ":beam-sdks-java-io-google-cloud-platform",
configuration: "shadowTest")
testCompile library.java.hamcrest_core
testCompile library.java.hamcrest_library
+
testCompileOnly library.java.findbugs_annotations
gradleRun project(path: project.path, configuration: "shadow")
diff --git
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
index 1cada92e0ec..ceed04774c1 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
@@ -17,7 +17,14 @@
*/
package org.apache.beam.sdk.nexmark;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
@@ -27,10 +34,24 @@
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
+import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.nexmark.model.Auction;
import org.apache.beam.sdk.nexmark.model.Bid;
import org.apache.beam.sdk.nexmark.model.Person;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -74,22 +95,93 @@ void runAll(OptionT options, NexmarkLauncher
nexmarkLauncher) throws IOException
appendPerf(options.getPerfFilename(), configuration, perf);
actual.put(configuration, perf);
// Summarize what we've run so far.
- saveSummary(null, configurations, actual, baseline, start);
+ saveSummary(null, configurations, actual, baseline, start, options);
}
}
+ if (options.getExportSummaryToBigQuery()){
+ savePerfsToBigQuery(options, actual, null);
+ }
} finally {
if (options.getMonitorJobs()) {
// Report overall performance.
- saveSummary(options.getSummaryFilename(), configurations, actual,
baseline, start);
+ saveSummary(options.getSummaryFilename(), configurations, actual,
baseline, start, options);
saveJavascript(options.getJavascriptFilename(), configurations,
actual, baseline, start);
}
}
-
if (!successful) {
throw new RuntimeException("Execution was not successful");
}
}
+ @VisibleForTesting
+ static void savePerfsToBigQuery(
+ NexmarkOptions options,
+ Map<NexmarkConfiguration, NexmarkPerf> perfs,
+ @Nullable BigQueryServices testBigQueryServices) {
+ Pipeline pipeline = Pipeline.create(options);
+ PCollection<KV<NexmarkConfiguration, NexmarkPerf>> perfsPCollection =
+ pipeline.apply(
+ Create.of(perfs)
+ .withCoder(
+ KvCoder.of(
+ SerializableCoder.of(NexmarkConfiguration.class),
+ new CustomCoder<NexmarkPerf>() {
+
+ @Override
+ public void encode(NexmarkPerf value, OutputStream
outStream)
+ throws CoderException, IOException {
+ StringUtf8Coder.of().encode(value.toString(),
outStream);
+ }
+
+ @Override
+ public NexmarkPerf decode(InputStream inStream)
+ throws CoderException, IOException {
+ String perf =
StringUtf8Coder.of().decode(inStream);
+ return NexmarkPerf.fromString(perf);
+ }
+ })));
+
+ TableSchema tableSchema =
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new
TableFieldSchema().setName("Runtime(sec)").setType("FLOAT"),
+ new
TableFieldSchema().setName("Events(/sec)").setType("FLOAT"),
+ new TableFieldSchema()
+ .setName("Size of the result collection")
+ .setType("INTEGER")));
+
+ String tableSpec =
+ NexmarkUtils.tableSpec(options, "{query}", 0L, null);
+ SerializableFunction<
+ ValueInSingleWindow<KV<NexmarkConfiguration, NexmarkPerf>>,
TableDestination>
+ tableFunction =
+ input -> new TableDestination(
+ tableSpec.replace("{query}",
String.valueOf(input.getValue().getKey().query)),
+ "perfkit queries");
+ SerializableFunction<KV<NexmarkConfiguration, NexmarkPerf>, TableRow>
rowFunction =
+ input -> {
+ NexmarkPerf nexmarkPerf = input.getValue();
+ TableRow row = new TableRow()
+ .set("Runtime(sec)", nexmarkPerf.runtimeSec)
+ .set("Events(/sec)", nexmarkPerf.eventsPerSec)
+ .set("Size of the result collection", nexmarkPerf.numResults);
+ return row;
+ };
+ BigQueryIO.Write io =
+ BigQueryIO.<KV<NexmarkConfiguration, NexmarkPerf>>write()
+ .to(tableFunction)
+ .withSchema(tableSchema)
+
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
+ .withFormatFunction(rowFunction);
+ if (testBigQueryServices != null){
+ io = io.withTestServices(testBigQueryServices);
+ }
+ perfsPCollection.apply("savePerfsToBigQuery", io);
+ pipeline.run();
+ }
+
/**
* Append the pair of {@code configuration} and {@code perf} to perf file.
*/
@@ -146,13 +238,15 @@ private void appendPerf(
private static final String LINE =
"==========================================================================================";
- /**
- * Print summary of {@code actual} vs (if non-null) {@code baseline}.
- */
+ /** Print summary of {@code actual} vs (if non-null) {@code baseline}. */
private static void saveSummary(
@Nullable String summaryFilename,
- Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration,
NexmarkPerf> actual,
- @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant
start) {
+ Iterable<NexmarkConfiguration> configurations,
+ Map<NexmarkConfiguration, NexmarkPerf> actual,
+ @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline,
+ Instant start,
+ NexmarkOptions options) {
+
List<String> lines = new ArrayList<>();
lines.add("");
diff --git
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
index 904fcd5cbcb..5f834cf6405 100644
---
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
+++
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
@@ -47,6 +47,13 @@
@JsonProperty
public NexmarkUtils.SinkType sinkType = NexmarkUtils.SinkType.DEVNULL;
+ /**
+ * If false, the summary is only output to the console. If true the summary
is output to the
+ * console and it's content is written to bigquery tables according to {@link
+ * NexmarkOptions#getResourceNameMode()}.
+ */
+ @JsonProperty public boolean exportSummaryToBigQuery = false;
+
/**
* Control whether pub/sub publishing is done in a stand-alone pipeline or
is integrated
* into the overall query pipeline.
@@ -253,6 +260,9 @@ public void overrideFromOptions(NexmarkOptions options) {
if (options.getSinkType() != null) {
sinkType = options.getSinkType();
}
+ if (options.getExportSummaryToBigQuery() != null) {
+ exportSummaryToBigQuery = options.getExportSummaryToBigQuery();
+ }
if (options.getPubSubMode() != null) {
pubSubMode = options.getPubSubMode();
}
@@ -367,6 +377,7 @@ public NexmarkConfiguration copy() {
result.query = query;
result.sourceType = sourceType;
result.sinkType = sinkType;
+ result.exportSummaryToBigQuery = exportSummaryToBigQuery;
result.pubSubMode = pubSubMode;
result.numEvents = numEvents;
result.numEventGenerators = numEventGenerators;
@@ -420,6 +431,9 @@ public String toShortString() {
if (sinkType != DEFAULT.sinkType) {
sb.append(String.format("; sinkType:%s", sinkType));
}
+ if (exportSummaryToBigQuery != DEFAULT.exportSummaryToBigQuery) {
+ sb.append(String.format("; exportSummaryToBigQuery:%s",
exportSummaryToBigQuery));
+ }
if (pubSubMode != DEFAULT.pubSubMode) {
sb.append(String.format("; pubSubMode:%s", pubSubMode));
}
@@ -553,6 +567,7 @@ public int hashCode() {
query,
sourceType,
sinkType,
+ exportSummaryToBigQuery,
pubSubMode,
numEvents,
numEventGenerators,
@@ -695,6 +710,9 @@ public boolean equals(Object obj) {
if (sinkType != other.sinkType) {
return false;
}
+ if (exportSummaryToBigQuery != other.exportSummaryToBigQuery) {
+ return false;
+ }
if (sourceType != other.sourceType) {
return false;
}
diff --git
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
index ebc87054936..2725b1909b4 100644
---
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
+++
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
@@ -20,7 +20,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.beam.sdk.nexmark.NexmarkUtils.PubSubMode.COMBINED;
-import static
org.apache.beam.sdk.nexmark.NexmarkUtils.PubSubMode.SUBSCRIBE_ONLY;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
@@ -686,6 +685,14 @@ private String shortTopic(long now) {
return String.format("%s_%s_source", baseTopic, queryName);
case QUERY_AND_SALT:
return String.format("%s_%s_%d_source", baseTopic, queryName, now);
+ case QUERY_RUNNER_AND_MODE:
+ return String.format(
+ "%s_%s_%s_%s_source",
+ baseTopic,
+ queryName,
+ options.getRunner().getSimpleName(),
+ options.isStreaming());
+
}
throw new RuntimeException("Unrecognized enum " +
options.getResourceNameMode());
}
@@ -705,6 +712,13 @@ private String shortSubscription(long now) {
return String.format("%s_%s_source", baseSubscription, queryName);
case QUERY_AND_SALT:
return String.format("%s_%s_%d_source", baseSubscription, queryName,
now);
+ case QUERY_RUNNER_AND_MODE:
+ return String.format(
+ "%s_%s_%s_%s_source",
+ baseSubscription,
+ queryName,
+ options.getRunner().getSimpleName(),
+ options.isStreaming());
}
throw new RuntimeException("Unrecognized enum " +
options.getResourceNameMode());
}
@@ -724,31 +738,17 @@ private String textFilename(long now) {
return String.format("%s/nexmark_%s.txt", baseFilename, queryName);
case QUERY_AND_SALT:
return String.format("%s/nexmark_%s_%d.txt", baseFilename, queryName,
now);
+ case QUERY_RUNNER_AND_MODE:
+ return String.format(
+ "%s/nexmark_%s_%s_%s",
+ baseFilename,
+ queryName,
+ options.getRunner().getSimpleName(),
+ options.isStreaming());
}
throw new RuntimeException("Unrecognized enum " +
options.getResourceNameMode());
}
- /**
- * Return a BigQuery table spec.
- */
- private String tableSpec(long now, String version) {
- String baseTableName = options.getBigQueryTable();
- if (Strings.isNullOrEmpty(baseTableName)) {
- throw new RuntimeException("Missing --bigQueryTable");
- }
- switch (options.getResourceNameMode()) {
- case VERBATIM:
- return String.format("%s:nexmark.%s_%s",
- options.getProject(), baseTableName, version);
- case QUERY:
- return String.format("%s:nexmark.%s_%s_%s",
- options.getProject(), baseTableName, queryName,
version);
- case QUERY_AND_SALT:
- return String.format("%s:nexmark.%s_%s_%s_%d",
- options.getProject(), baseTableName, queryName,
version, now);
- }
- throw new RuntimeException("Unrecognized enum " +
options.getResourceNameMode());
- }
/**
* Return a directory for logs.
@@ -765,6 +765,14 @@ private String logsDir(long now) {
return String.format("%s/logs_%s", baseFilename, queryName);
case QUERY_AND_SALT:
return String.format("%s/logs_%s_%d", baseFilename, queryName, now);
+ case QUERY_RUNNER_AND_MODE:
+ return String.format(
+ "%s/logs_%s_%s_%s",
+ baseFilename,
+ queryName,
+ options.getRunner().getSimpleName(),
+ options.isStreaming());
+
}
throw new RuntimeException("Unrecognized enum " +
options.getResourceNameMode());
}
@@ -980,7 +988,7 @@ public void processElement(ProcessContext c) {
private void sinkResultsToBigQuery(
PCollection<String> formattedResults, long now,
String version) {
- String tableSpec = tableSpec(now, version);
+ String tableSpec = NexmarkUtils.tableSpec(options, queryName, now,
version);
TableSchema tableSchema =
new TableSchema().setFields(ImmutableList.of(
new TableFieldSchema().setName("result").setType("STRING"),
diff --git
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
index 592d1aa9c06..58ab594bbbd 100644
---
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
+++
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
@@ -61,6 +61,12 @@
void setSinkType(NexmarkUtils.SinkType sinkType);
+ @Description("Shall we export the summary to BigQuery.")
+ @Default.Boolean(false)
+ Boolean getExportSummaryToBigQuery();
+
+ void setExportSummaryToBigQuery(Boolean exportSummaryToBigQuery);
+
@Description("Which mode to run in when source is PUBSUB.")
@Nullable
NexmarkUtils.PubSubMode getPubSubMode();
@@ -100,6 +106,12 @@
void setBigQueryTable(String bigQueryTable);
+ @Description("BigQuery dataset")
+ @Default.String("nexmark")
+ String getBigQueryDataset();
+
+ void setBigQueryDataset(String bigQueryDataset);
+
@Description("Approximate number of events to generate. "
+ "Zero for effectively unlimited in streaming mode.")
@Nullable
diff --git
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
index 5d89dbde439..28e713a20ea 100644
---
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
+++
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.nexmark;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
import com.google.common.hash.Hashing;
import java.io.IOException;
import java.io.InputStream;
@@ -185,7 +186,56 @@
/** Names are suffixed with the query being run. */
QUERY,
/** Names are suffixed with the query being run and a random number. */
- QUERY_AND_SALT
+ QUERY_AND_SALT,
+ /** Names are suffixed with the runner being used and a the mode
(streaming/batch). */
+ QUERY_RUNNER_AND_MODE
+ }
+
+ /** Return a BigQuery table spec. */
+ static String tableSpec(NexmarkOptions options, String queryName, long now,
String version) {
+ String baseTableName = options.getBigQueryTable();
+ if (Strings.isNullOrEmpty(baseTableName)) {
+ throw new RuntimeException("Missing --bigQueryTable");
+ }
+ switch (options.getResourceNameMode()) {
+ case VERBATIM:
+ return String.format(
+ "%s:%s.%s_%s",
+ options.getProject(), options.getBigQueryDataset(), baseTableName,
version);
+ case QUERY:
+ return String.format(
+ "%s:%s.%s_%s_%s",
+ options.getProject(), options.getBigQueryDataset(), baseTableName,
queryName, version);
+ case QUERY_AND_SALT:
+ return String.format(
+ "%s:%s.%s_%s_%s_%d",
+ options.getProject(),
+ options.getBigQueryDataset(),
+ baseTableName,
+ queryName,
+ version,
+ now);
+ case QUERY_RUNNER_AND_MODE:
+ return (version != null)
+ ? String.format(
+ "%s:%s.%s_%s_%s_%s_%s",
+ options.getProject(),
+ options.getBigQueryDataset(),
+ baseTableName,
+ queryName,
+ options.getRunner().getSimpleName(),
+ options.isStreaming(),
+ version)
+ : String.format(
+ "%s:%s.%s_%s_%s_%s",
+ options.getProject(),
+ options.getBigQueryDataset(),
+ baseTableName,
+ queryName,
+ options.getRunner().getSimpleName(),
+ options.isStreaming());
+ }
+ throw new RuntimeException("Unrecognized enum " +
options.getResourceNameMode());
}
/**
diff --git
a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
new file mode 100644
index 00000000000..6fc5dabb50e
--- /dev/null
+++
b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.beam.runners.direct.DirectRunner;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
+import org.apache.beam.sdk.io.gcp.bigquery.FakeBigQueryServices;
+import org.apache.beam.sdk.io.gcp.bigquery.FakeDatasetService;
+import org.apache.beam.sdk.io.gcp.bigquery.FakeJobService;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/** Test class for BigQuery sinks. */
+public class PerfsToBigQueryTest {
+
+ private static final int QUERY = 1;
+ private NexmarkOptions options;
+ private FakeDatasetService fakeDatasetService = new FakeDatasetService();
+ private FakeJobService fakeJobService = new FakeJobService();
+ private FakeBigQueryServices fakeBqServices =
+ new FakeBigQueryServices()
+ .withDatasetService(fakeDatasetService)
+ .withJobService(fakeJobService);
+ @Rule
+ public transient TemporaryFolder testFolder = new TemporaryFolder();
+
+
+ @Before
+ public void before() throws IOException, InterruptedException {
+ options = PipelineOptionsFactory.create().as(NexmarkOptions.class);
+ options.setBigQueryTable("nexmark");
+ options.setBigQueryDataset("nexmark");
+ options.setRunner(DirectRunner.class);
+ options.setStreaming(true);
+ options.setProject("nexmark-test");
+ options.setTempLocation(testFolder.getRoot().getAbsolutePath());
+
options.setResourceNameMode(NexmarkUtils.ResourceNameMode.QUERY_RUNNER_AND_MODE);
+ FakeDatasetService.setUp();
+ // BigQueryIO.clearCreatedTables();
+ fakeDatasetService.createDataset(
+ options.getProject(), options.getBigQueryDataset(), "", "", null);
+ }
+
+ @Test
+ public void testSavePerfsToBigQuery() throws IOException,
InterruptedException {
+ NexmarkConfiguration nexmarkConfiguration1 = new NexmarkConfiguration();
+ nexmarkConfiguration1.query = QUERY;
+ // just for the 2 configurations to be different to have different keys
+ nexmarkConfiguration1.cpuDelayMs = 100L;
+ NexmarkPerf nexmarkPerf1 = new NexmarkPerf();
+ nexmarkPerf1.numResults = 1000L;
+ nexmarkPerf1.eventsPerSec = 0.5F;
+ nexmarkPerf1.runtimeSec = 0.325F;
+
+ NexmarkConfiguration nexmarkConfiguration2 = new NexmarkConfiguration();
+ nexmarkConfiguration2.query = QUERY;
+ // just for the 2 configurations to be different to have different keys
+ nexmarkConfiguration1.cpuDelayMs = 200L;
+ NexmarkPerf nexmarkPerf2 = new NexmarkPerf();
+ nexmarkPerf2.numResults = 1001L;
+ nexmarkPerf2.eventsPerSec = 1.5F;
+ nexmarkPerf2.runtimeSec = 1.325F;
+
+ // simulate 2 runs of the same query just to check that rows are apened
correctly.
+ HashMap<NexmarkConfiguration, NexmarkPerf> perfs = new HashMap<>(2);
+ perfs.put(nexmarkConfiguration1, nexmarkPerf1);
+ perfs.put(nexmarkConfiguration2, nexmarkPerf2);
+ Main.savePerfsToBigQuery(options, perfs, fakeBqServices);
+
+ String tableSpec = NexmarkUtils.tableSpec(options, String.valueOf(QUERY),
0L, null);
+ List<TableRow> actualRows =
+ fakeDatasetService.getAllRows(
+ options.getProject(),
+ options.getBigQueryDataset(),
+ BigQueryHelpers.parseTableSpec(tableSpec).getTableId());
+ assertEquals("Wrong number of rows inserted", 2, actualRows.size());
+ List<TableRow> expectedRows = new ArrayList<>();
+ TableRow row1 = new TableRow()
+ .set("Runtime(sec)", nexmarkPerf1.runtimeSec).set("Events(/sec)",
nexmarkPerf1.eventsPerSec)
+ // when read using TableRowJsonCoder the row field is boxed into an
Integer, cast it to int
+ // to for bowing into Integer in the expectedRows.
+ .set("Size of the result collection", (int) nexmarkPerf1.numResults);
+ expectedRows.add(row1);
+ TableRow row2 = new TableRow()
+ .set("Runtime(sec)", nexmarkPerf2.runtimeSec).set("Events(/sec)",
nexmarkPerf2.eventsPerSec)
+ // when read using TableRowJsonCoder the row field is boxed into an
Integer, cast it to int
+ // to for bowing into Integer in the expectedRows.
+ .set("Size of the result collection", (int) nexmarkPerf2.numResults);
+ expectedRows.add(row2);
+ assertThat(actualRows, containsInAnyOrder(Iterables.toArray(expectedRows,
TableRow.class)));
+
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 112374)
Time Spent: 8h 10m (was: 8h)
> Export nexmark execution times to bigQuery
> ------------------------------------------
>
> Key: BEAM-4283
> URL: https://issues.apache.org/jira/browse/BEAM-4283
> Project: Beam
> Issue Type: Sub-task
> Components: examples-nexmark
> Reporter: Etienne Chauchot
> Assignee: Etienne Chauchot
> Priority: Major
> Time Spent: 8h 10m
> Remaining Estimate: 0h
>
> Nexmark only outputs the results collection to bigQuery and prints in the
> console the execution times. To supervise Nexmark execution times, we need to
> store them as well per runner/query/mode
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)