[ https://issues.apache.org/jira/browse/BEAM-4283?focusedWorklogId=112374&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112374 ]
ASF GitHub Bot logged work on BEAM-4283: ---------------------------------------- Author: ASF GitHub Bot Created on: 15/Jun/18 16:06 Start Date: 15/Jun/18 16:06 Worklog Time Spent: 10m Work Description: chamikaramj closed pull request #5464: [BEAM-4283] Write Nexmark execution times to bigquery URL: https://github.com/apache/beam/pull/5464 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 29fb8924f22..19aef00094d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1452,7 +1452,10 @@ static String getExtractDestinationUri(String extractDestinationDir) { } @VisibleForTesting - Write<T> withTestServices(BigQueryServices testServices) { + /** + * This method is for test usage only + */ + public Write<T> withTestServices(BigQueryServices testServices) { checkArgument(testServices != null, "testServices can not be null"); return toBuilder().setBigQueryServices(testServices).build(); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index 1295cc0fe2c..c4e5462306f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -32,10 +32,12 @@ import java.io.Serializable; import java.util.List; import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.values.ValueInSingleWindow; /** An interface for real, mock, or fake implementations of Cloud BigQuery services. */ -interface BigQueryServices extends Serializable { +@Experimental(Experimental.Kind.SOURCE_SINK) +public interface BigQueryServices extends Serializable { /** * Returns a real, mock, or fake {@link JobService}. diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java index 0a384e74e17..d581a925f6c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java @@ -23,6 +23,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.ListCoder; @@ -30,16 +31,17 @@ /** * A fake implementation of BigQuery's query service.. */ -class FakeBigQueryServices implements BigQueryServices { +@Experimental(Experimental.Kind.SOURCE_SINK) +public class FakeBigQueryServices implements BigQueryServices { private JobService jobService; private FakeDatasetService datasetService; - FakeBigQueryServices withJobService(JobService jobService) { + public FakeBigQueryServices withJobService(JobService jobService) { this.jobService = jobService; return this; } - FakeBigQueryServices withDatasetService(FakeDatasetService datasetService) { + public FakeBigQueryServices withDatasetService(FakeDatasetService datasetService) { this.datasetService = datasetService; return this; } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java index 3526ed5ddd0..50d4b7af06d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java @@ -39,6 +39,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.regex.Pattern; import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy.Context; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -46,7 +47,8 @@ import org.apache.beam.sdk.values.ValueInSingleWindow; /** A fake dataset service that can be serialized, for use in testReadFromTable. */ -class FakeDatasetService implements DatasetService, Serializable { +@Experimental(Experimental.Kind.SOURCE_SINK) +public class FakeDatasetService implements DatasetService, Serializable { // Table information must be static, as each ParDo will get a separate instance of // FakeDatasetServices, and they must all modify the same storage. static com.google.common.collect.Table<String, String, Map<String, TableContainer>> @@ -76,7 +78,7 @@ public Table getTable(TableReference tableRef) } } - List<TableRow> getAllRows(String projectId, String datasetId, String tableId) + public List<TableRow> getAllRows(String projectId, String datasetId, String tableId) throws InterruptedException, IOException { synchronized (tables) { return getTableContainer(projectId, datasetId, tableId).getRows(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java index 46c03f44f97..6f1e715389b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java @@ -62,6 +62,7 @@ import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.io.FileSystems; @@ -78,7 +79,8 @@ /** * A fake implementation of BigQuery's job service. */ -class FakeJobService implements JobService, Serializable { +@Experimental(Experimental.Kind.SOURCE_SINK) +public class FakeJobService implements JobService, Serializable { private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory(); // Whenever a job is started, the first 2 calls to GetJob will report the job as pending, // the next 2 will return the job as running, and only then will the job report as done. @@ -103,7 +105,7 @@ private static com.google.common.collect.Table<String, String, JobStatistics> dryRunQueryResults; - FakeJobService() { + public FakeJobService() { this.datasetService = new FakeDatasetService(); } diff --git a/sdks/java/nexmark/build.gradle b/sdks/java/nexmark/build.gradle index 703495007f1..6dc6d87d179 100644 --- a/sdks/java/nexmark/build.gradle +++ b/sdks/java/nexmark/build.gradle @@ -63,8 +63,11 @@ dependencies { shadow library.java.slf4j_jdk14 provided library.java.junit provided library.java.hamcrest_core + shadow project(path: ":beam-sdks-java-io-google-cloud-platform", configuration: "shadow") + shadowTest project(path: ":beam-sdks-java-io-google-cloud-platform", configuration: "shadowTest") testCompile library.java.hamcrest_core testCompile library.java.hamcrest_library + testCompileOnly library.java.findbugs_annotations gradleRun project(path: project.path, configuration: "shadow") diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java index 1cada92e0ec..ceed04774c1 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java @@ -17,7 +17,14 @@ */ package org.apache.beam.sdk.nexmark; +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; @@ -27,10 +34,24 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices; +import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; import org.apache.beam.sdk.nexmark.model.Auction; import org.apache.beam.sdk.nexmark.model.Bid; import org.apache.beam.sdk.nexmark.model.Person; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.ValueInSingleWindow; import org.joda.time.Duration; import org.joda.time.Instant; @@ -74,22 +95,93 @@ void runAll(OptionT options, NexmarkLauncher nexmarkLauncher) throws IOException appendPerf(options.getPerfFilename(), configuration, perf); actual.put(configuration, perf); // Summarize what we've run so far. - saveSummary(null, configurations, actual, baseline, start); + saveSummary(null, configurations, actual, baseline, start, options); } } + if (options.getExportSummaryToBigQuery()){ + savePerfsToBigQuery(options, actual, null); + } } finally { if (options.getMonitorJobs()) { // Report overall performance. - saveSummary(options.getSummaryFilename(), configurations, actual, baseline, start); + saveSummary(options.getSummaryFilename(), configurations, actual, baseline, start, options); saveJavascript(options.getJavascriptFilename(), configurations, actual, baseline, start); } } - if (!successful) { throw new RuntimeException("Execution was not successful"); } } + @VisibleForTesting + static void savePerfsToBigQuery( + NexmarkOptions options, + Map<NexmarkConfiguration, NexmarkPerf> perfs, + @Nullable BigQueryServices testBigQueryServices) { + Pipeline pipeline = Pipeline.create(options); + PCollection<KV<NexmarkConfiguration, NexmarkPerf>> perfsPCollection = + pipeline.apply( + Create.of(perfs) + .withCoder( + KvCoder.of( + SerializableCoder.of(NexmarkConfiguration.class), + new CustomCoder<NexmarkPerf>() { + + @Override + public void encode(NexmarkPerf value, OutputStream outStream) + throws CoderException, IOException { + StringUtf8Coder.of().encode(value.toString(), outStream); + } + + @Override + public NexmarkPerf decode(InputStream inStream) + throws CoderException, IOException { + String perf = StringUtf8Coder.of().decode(inStream); + return NexmarkPerf.fromString(perf); + } + }))); + + TableSchema tableSchema = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("Runtime(sec)").setType("FLOAT"), + new TableFieldSchema().setName("Events(/sec)").setType("FLOAT"), + new TableFieldSchema() + .setName("Size of the result collection") + .setType("INTEGER"))); + + String tableSpec = + NexmarkUtils.tableSpec(options, "{query}", 0L, null); + SerializableFunction< + ValueInSingleWindow<KV<NexmarkConfiguration, NexmarkPerf>>, TableDestination> + tableFunction = + input -> new TableDestination( + tableSpec.replace("{query}", String.valueOf(input.getValue().getKey().query)), + "perfkit queries"); + SerializableFunction<KV<NexmarkConfiguration, NexmarkPerf>, TableRow> rowFunction = + input -> { + NexmarkPerf nexmarkPerf = input.getValue(); + TableRow row = new TableRow() + .set("Runtime(sec)", nexmarkPerf.runtimeSec) + .set("Events(/sec)", nexmarkPerf.eventsPerSec) + .set("Size of the result collection", nexmarkPerf.numResults); + return row; + }; + BigQueryIO.Write io = + BigQueryIO.<KV<NexmarkConfiguration, NexmarkPerf>>write() + .to(tableFunction) + .withSchema(tableSchema) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) + .withFormatFunction(rowFunction); + if (testBigQueryServices != null){ + io = io.withTestServices(testBigQueryServices); + } + perfsPCollection.apply("savePerfsToBigQuery", io); + pipeline.run(); + } + /** * Append the pair of {@code configuration} and {@code perf} to perf file. */ @@ -146,13 +238,15 @@ private void appendPerf( private static final String LINE = "=========================================================================================="; - /** - * Print summary of {@code actual} vs (if non-null) {@code baseline}. - */ + /** Print summary of {@code actual} vs (if non-null) {@code baseline}. */ private static void saveSummary( @Nullable String summaryFilename, - Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual, - @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) { + Iterable<NexmarkConfiguration> configurations, + Map<NexmarkConfiguration, NexmarkPerf> actual, + @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, + Instant start, + NexmarkOptions options) { + List<String> lines = new ArrayList<>(); lines.add(""); diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java index 904fcd5cbcb..5f834cf6405 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java @@ -47,6 +47,13 @@ @JsonProperty public NexmarkUtils.SinkType sinkType = NexmarkUtils.SinkType.DEVNULL; + /** + * If false, the summary is only output to the console. If true the summary is output to the + * console and it's content is written to bigquery tables according to {@link + * NexmarkOptions#getResourceNameMode()}. + */ + @JsonProperty public boolean exportSummaryToBigQuery = false; + /** * Control whether pub/sub publishing is done in a stand-alone pipeline or is integrated * into the overall query pipeline. @@ -253,6 +260,9 @@ public void overrideFromOptions(NexmarkOptions options) { if (options.getSinkType() != null) { sinkType = options.getSinkType(); } + if (options.getExportSummaryToBigQuery() != null) { + exportSummaryToBigQuery = options.getExportSummaryToBigQuery(); + } if (options.getPubSubMode() != null) { pubSubMode = options.getPubSubMode(); } @@ -367,6 +377,7 @@ public NexmarkConfiguration copy() { result.query = query; result.sourceType = sourceType; result.sinkType = sinkType; + result.exportSummaryToBigQuery = exportSummaryToBigQuery; result.pubSubMode = pubSubMode; result.numEvents = numEvents; result.numEventGenerators = numEventGenerators; @@ -420,6 +431,9 @@ public String toShortString() { if (sinkType != DEFAULT.sinkType) { sb.append(String.format("; sinkType:%s", sinkType)); } + if (exportSummaryToBigQuery != DEFAULT.exportSummaryToBigQuery) { + sb.append(String.format("; exportSummaryToBigQuery:%s", exportSummaryToBigQuery)); + } if (pubSubMode != DEFAULT.pubSubMode) { sb.append(String.format("; pubSubMode:%s", pubSubMode)); } @@ -553,6 +567,7 @@ public int hashCode() { query, sourceType, sinkType, + exportSummaryToBigQuery, pubSubMode, numEvents, numEventGenerators, @@ -695,6 +710,9 @@ public boolean equals(Object obj) { if (sinkType != other.sinkType) { return false; } + if (exportSummaryToBigQuery != other.exportSummaryToBigQuery) { + return false; + } if (sourceType != other.sourceType) { return false; } diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java index ebc87054936..2725b1909b4 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java @@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static org.apache.beam.sdk.nexmark.NexmarkUtils.PubSubMode.COMBINED; -import static org.apache.beam.sdk.nexmark.NexmarkUtils.PubSubMode.SUBSCRIBE_ONLY; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableRow; @@ -686,6 +685,14 @@ private String shortTopic(long now) { return String.format("%s_%s_source", baseTopic, queryName); case QUERY_AND_SALT: return String.format("%s_%s_%d_source", baseTopic, queryName, now); + case QUERY_RUNNER_AND_MODE: + return String.format( + "%s_%s_%s_%s_source", + baseTopic, + queryName, + options.getRunner().getSimpleName(), + options.isStreaming()); + } throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode()); } @@ -705,6 +712,13 @@ private String shortSubscription(long now) { return String.format("%s_%s_source", baseSubscription, queryName); case QUERY_AND_SALT: return String.format("%s_%s_%d_source", baseSubscription, queryName, now); + case QUERY_RUNNER_AND_MODE: + return String.format( + "%s_%s_%s_%s_source", + baseSubscription, + queryName, + options.getRunner().getSimpleName(), + options.isStreaming()); } throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode()); } @@ -724,31 +738,17 @@ private String textFilename(long now) { return String.format("%s/nexmark_%s.txt", baseFilename, queryName); case QUERY_AND_SALT: return String.format("%s/nexmark_%s_%d.txt", baseFilename, queryName, now); + case QUERY_RUNNER_AND_MODE: + return String.format( + "%s/nexmark_%s_%s_%s", + baseFilename, + queryName, + options.getRunner().getSimpleName(), + options.isStreaming()); } throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode()); } - /** - * Return a BigQuery table spec. - */ - private String tableSpec(long now, String version) { - String baseTableName = options.getBigQueryTable(); - if (Strings.isNullOrEmpty(baseTableName)) { - throw new RuntimeException("Missing --bigQueryTable"); - } - switch (options.getResourceNameMode()) { - case VERBATIM: - return String.format("%s:nexmark.%s_%s", - options.getProject(), baseTableName, version); - case QUERY: - return String.format("%s:nexmark.%s_%s_%s", - options.getProject(), baseTableName, queryName, version); - case QUERY_AND_SALT: - return String.format("%s:nexmark.%s_%s_%s_%d", - options.getProject(), baseTableName, queryName, version, now); - } - throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode()); - } /** * Return a directory for logs. @@ -765,6 +765,14 @@ private String logsDir(long now) { return String.format("%s/logs_%s", baseFilename, queryName); case QUERY_AND_SALT: return String.format("%s/logs_%s_%d", baseFilename, queryName, now); + case QUERY_RUNNER_AND_MODE: + return String.format( + "%s/logs_%s_%s_%s", + baseFilename, + queryName, + options.getRunner().getSimpleName(), + options.isStreaming()); + } throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode()); } @@ -980,7 +988,7 @@ public void processElement(ProcessContext c) { private void sinkResultsToBigQuery( PCollection<String> formattedResults, long now, String version) { - String tableSpec = tableSpec(now, version); + String tableSpec = NexmarkUtils.tableSpec(options, queryName, now, version); TableSchema tableSchema = new TableSchema().setFields(ImmutableList.of( new TableFieldSchema().setName("result").setType("STRING"), diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java index 592d1aa9c06..58ab594bbbd 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java @@ -61,6 +61,12 @@ void setSinkType(NexmarkUtils.SinkType sinkType); + @Description("Shall we export the summary to BigQuery.") + @Default.Boolean(false) + Boolean getExportSummaryToBigQuery(); + + void setExportSummaryToBigQuery(Boolean exportSummaryToBigQuery); + @Description("Which mode to run in when source is PUBSUB.") @Nullable NexmarkUtils.PubSubMode getPubSubMode(); @@ -100,6 +106,12 @@ void setBigQueryTable(String bigQueryTable); + @Description("BigQuery dataset") + @Default.String("nexmark") + String getBigQueryDataset(); + + void setBigQueryDataset(String bigQueryDataset); + @Description("Approximate number of events to generate. " + "Zero for effectively unlimited in streaming mode.") @Nullable diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java index 5d89dbde439..28e713a20ea 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.nexmark; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; import com.google.common.hash.Hashing; import java.io.IOException; import java.io.InputStream; @@ -185,7 +186,56 @@ /** Names are suffixed with the query being run. */ QUERY, /** Names are suffixed with the query being run and a random number. */ - QUERY_AND_SALT + QUERY_AND_SALT, + /** Names are suffixed with the runner being used and a the mode (streaming/batch). */ + QUERY_RUNNER_AND_MODE + } + + /** Return a BigQuery table spec. */ + static String tableSpec(NexmarkOptions options, String queryName, long now, String version) { + String baseTableName = options.getBigQueryTable(); + if (Strings.isNullOrEmpty(baseTableName)) { + throw new RuntimeException("Missing --bigQueryTable"); + } + switch (options.getResourceNameMode()) { + case VERBATIM: + return String.format( + "%s:%s.%s_%s", + options.getProject(), options.getBigQueryDataset(), baseTableName, version); + case QUERY: + return String.format( + "%s:%s.%s_%s_%s", + options.getProject(), options.getBigQueryDataset(), baseTableName, queryName, version); + case QUERY_AND_SALT: + return String.format( + "%s:%s.%s_%s_%s_%d", + options.getProject(), + options.getBigQueryDataset(), + baseTableName, + queryName, + version, + now); + case QUERY_RUNNER_AND_MODE: + return (version != null) + ? String.format( + "%s:%s.%s_%s_%s_%s_%s", + options.getProject(), + options.getBigQueryDataset(), + baseTableName, + queryName, + options.getRunner().getSimpleName(), + options.isStreaming(), + version) + : String.format( + "%s:%s.%s_%s_%s_%s", + options.getProject(), + options.getBigQueryDataset(), + baseTableName, + queryName, + options.getRunner().getSimpleName(), + options.isStreaming()); + } + throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode()); } /** diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java new file mode 100644 index 00000000000..6fc5dabb50e --- /dev/null +++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import com.google.api.services.bigquery.model.TableRow; +import com.google.common.collect.Iterables; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import org.apache.beam.runners.direct.DirectRunner; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; +import org.apache.beam.sdk.io.gcp.bigquery.FakeBigQueryServices; +import org.apache.beam.sdk.io.gcp.bigquery.FakeDatasetService; +import org.apache.beam.sdk.io.gcp.bigquery.FakeJobService; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** Test class for BigQuery sinks. */ +public class PerfsToBigQueryTest { + + private static final int QUERY = 1; + private NexmarkOptions options; + private FakeDatasetService fakeDatasetService = new FakeDatasetService(); + private FakeJobService fakeJobService = new FakeJobService(); + private FakeBigQueryServices fakeBqServices = + new FakeBigQueryServices() + .withDatasetService(fakeDatasetService) + .withJobService(fakeJobService); + @Rule + public transient TemporaryFolder testFolder = new TemporaryFolder(); + + + @Before + public void before() throws IOException, InterruptedException { + options = PipelineOptionsFactory.create().as(NexmarkOptions.class); + options.setBigQueryTable("nexmark"); + options.setBigQueryDataset("nexmark"); + options.setRunner(DirectRunner.class); + options.setStreaming(true); + options.setProject("nexmark-test"); + options.setTempLocation(testFolder.getRoot().getAbsolutePath()); + options.setResourceNameMode(NexmarkUtils.ResourceNameMode.QUERY_RUNNER_AND_MODE); + FakeDatasetService.setUp(); + // BigQueryIO.clearCreatedTables(); + fakeDatasetService.createDataset( + options.getProject(), options.getBigQueryDataset(), "", "", null); + } + + @Test + public void testSavePerfsToBigQuery() throws IOException, InterruptedException { + NexmarkConfiguration nexmarkConfiguration1 = new NexmarkConfiguration(); + nexmarkConfiguration1.query = QUERY; + // just for the 2 configurations to be different to have different keys + nexmarkConfiguration1.cpuDelayMs = 100L; + NexmarkPerf nexmarkPerf1 = new NexmarkPerf(); + nexmarkPerf1.numResults = 1000L; + nexmarkPerf1.eventsPerSec = 0.5F; + nexmarkPerf1.runtimeSec = 0.325F; + + NexmarkConfiguration nexmarkConfiguration2 = new NexmarkConfiguration(); + nexmarkConfiguration2.query = QUERY; + // just for the 2 configurations to be different to have different keys + nexmarkConfiguration1.cpuDelayMs = 200L; + NexmarkPerf nexmarkPerf2 = new NexmarkPerf(); + nexmarkPerf2.numResults = 1001L; + nexmarkPerf2.eventsPerSec = 1.5F; + nexmarkPerf2.runtimeSec = 1.325F; + + // simulate 2 runs of the same query just to check that rows are apened correctly. + HashMap<NexmarkConfiguration, NexmarkPerf> perfs = new HashMap<>(2); + perfs.put(nexmarkConfiguration1, nexmarkPerf1); + perfs.put(nexmarkConfiguration2, nexmarkPerf2); + Main.savePerfsToBigQuery(options, perfs, fakeBqServices); + + String tableSpec = NexmarkUtils.tableSpec(options, String.valueOf(QUERY), 0L, null); + List<TableRow> actualRows = + fakeDatasetService.getAllRows( + options.getProject(), + options.getBigQueryDataset(), + BigQueryHelpers.parseTableSpec(tableSpec).getTableId()); + assertEquals("Wrong number of rows inserted", 2, actualRows.size()); + List<TableRow> expectedRows = new ArrayList<>(); + TableRow row1 = new TableRow() + .set("Runtime(sec)", nexmarkPerf1.runtimeSec).set("Events(/sec)", nexmarkPerf1.eventsPerSec) + // when read using TableRowJsonCoder the row field is boxed into an Integer, cast it to int + // to for bowing into Integer in the expectedRows. + .set("Size of the result collection", (int) nexmarkPerf1.numResults); + expectedRows.add(row1); + TableRow row2 = new TableRow() + .set("Runtime(sec)", nexmarkPerf2.runtimeSec).set("Events(/sec)", nexmarkPerf2.eventsPerSec) + // when read using TableRowJsonCoder the row field is boxed into an Integer, cast it to int + // to for bowing into Integer in the expectedRows. + .set("Size of the result collection", (int) nexmarkPerf2.numResults); + expectedRows.add(row2); + assertThat(actualRows, containsInAnyOrder(Iterables.toArray(expectedRows, TableRow.class))); + + } +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 112374) Time Spent: 8h 10m (was: 8h) > Export nexmark execution times to bigQuery > ------------------------------------------ > > Key: BEAM-4283 > URL: https://issues.apache.org/jira/browse/BEAM-4283 > Project: Beam > Issue Type: Sub-task > Components: examples-nexmark > Reporter: Etienne Chauchot > Assignee: Etienne Chauchot > Priority: Major > Time Spent: 8h 10m > Remaining Estimate: 0h > > Nexmark only outputs the results collection to bigQuery and prints in the > console the execution times. To supervise Nexmark execution times, we need to > store them as well per runner/query/mode -- This message was sent by Atlassian JIRA (v7.6.3#76005)