[ 
https://issues.apache.org/jira/browse/BEAM-4283?focusedWorklogId=112374&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112374
 ]

ASF GitHub Bot logged work on BEAM-4283:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Jun/18 16:06
            Start Date: 15/Jun/18 16:06
    Worklog Time Spent: 10m 
      Work Description: chamikaramj closed pull request #5464: [BEAM-4283] 
Write Nexmark execution times to bigquery
URL: https://github.com/apache/beam/pull/5464
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 29fb8924f22..19aef00094d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1452,7 +1452,10 @@ static String getExtractDestinationUri(String 
extractDestinationDir) {
     }
 
     @VisibleForTesting
-    Write<T> withTestServices(BigQueryServices testServices) {
+    /**
+     * This method is for test usage only
+     */
+    public Write<T> withTestServices(BigQueryServices testServices) {
       checkArgument(testServices != null, "testServices can not be null");
       return toBuilder().setBigQueryServices(testServices).build();
     }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 1295cc0fe2c..c4e5462306f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -32,10 +32,12 @@
 import java.io.Serializable;
 import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 
 /** An interface for real, mock, or fake implementations of Cloud BigQuery 
services. */
-interface BigQueryServices extends Serializable {
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public interface BigQueryServices extends Serializable {
 
   /**
    * Returns a real, mock, or fake {@link JobService}.
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
index 0a384e74e17..d581a925f6c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
@@ -23,6 +23,7 @@
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.ListCoder;
 
@@ -30,16 +31,17 @@
 /**
  * A fake implementation of BigQuery's query service..
  */
-class FakeBigQueryServices implements BigQueryServices {
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class FakeBigQueryServices implements BigQueryServices {
   private JobService jobService;
   private FakeDatasetService datasetService;
 
-  FakeBigQueryServices withJobService(JobService jobService) {
+  public FakeBigQueryServices withJobService(JobService jobService) {
     this.jobService = jobService;
     return this;
   }
 
-  FakeBigQueryServices withDatasetService(FakeDatasetService datasetService) {
+  public FakeBigQueryServices withDatasetService(FakeDatasetService 
datasetService) {
     this.datasetService = datasetService;
     return this;
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
index 3526ed5ddd0..50d4b7af06d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
@@ -39,6 +39,7 @@
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy.Context;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -46,7 +47,8 @@
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 
 /** A fake dataset service that can be serialized, for use in 
testReadFromTable. */
-class FakeDatasetService implements DatasetService, Serializable {
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class FakeDatasetService implements DatasetService, Serializable {
   // Table information must be static, as each ParDo will get a separate 
instance of
   // FakeDatasetServices, and they must all modify the same storage.
   static com.google.common.collect.Table<String, String, Map<String, 
TableContainer>>
@@ -76,7 +78,7 @@ public Table getTable(TableReference tableRef)
     }
   }
 
-  List<TableRow> getAllRows(String projectId, String datasetId, String tableId)
+  public List<TableRow> getAllRows(String projectId, String datasetId, String 
tableId)
       throws InterruptedException, IOException {
     synchronized (tables) {
       return getTableContainer(projectId, datasetId, tableId).getRows();
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index 46c03f44f97..6f1e715389b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -62,6 +62,7 @@
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.io.FileSystems;
@@ -78,7 +79,8 @@
 /**
  * A fake implementation of BigQuery's job service.
  */
-class FakeJobService implements JobService, Serializable {
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class FakeJobService implements JobService, Serializable {
   private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
   // Whenever a job is started, the first 2 calls to GetJob will report the 
job as pending,
   // the next 2 will return the job as running, and only then will the job 
report as done.
@@ -103,7 +105,7 @@
   private static com.google.common.collect.Table<String, String, JobStatistics>
       dryRunQueryResults;
 
-  FakeJobService() {
+  public FakeJobService() {
     this.datasetService = new FakeDatasetService();
   }
 
diff --git a/sdks/java/nexmark/build.gradle b/sdks/java/nexmark/build.gradle
index 703495007f1..6dc6d87d179 100644
--- a/sdks/java/nexmark/build.gradle
+++ b/sdks/java/nexmark/build.gradle
@@ -63,8 +63,11 @@ dependencies {
   shadow library.java.slf4j_jdk14
   provided library.java.junit
   provided library.java.hamcrest_core
+  shadow project(path: ":beam-sdks-java-io-google-cloud-platform", 
configuration: "shadow")
+  shadowTest project(path: ":beam-sdks-java-io-google-cloud-platform", 
configuration: "shadowTest")
   testCompile library.java.hamcrest_core
   testCompile library.java.hamcrest_library
+
   testCompileOnly library.java.findbugs_annotations
 
   gradleRun project(path: project.path, configuration: "shadow")
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
index 1cada92e0ec..ceed04774c1 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
@@ -17,7 +17,14 @@
  */
 package org.apache.beam.sdk.nexmark;
 
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
@@ -27,10 +34,24 @@
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
+import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
 import org.apache.beam.sdk.nexmark.model.Auction;
 import org.apache.beam.sdk.nexmark.model.Bid;
 import org.apache.beam.sdk.nexmark.model.Person;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -74,22 +95,93 @@ void runAll(OptionT options, NexmarkLauncher 
nexmarkLauncher) throws IOException
           appendPerf(options.getPerfFilename(), configuration, perf);
           actual.put(configuration, perf);
           // Summarize what we've run so far.
-          saveSummary(null, configurations, actual, baseline, start);
+          saveSummary(null, configurations, actual, baseline, start, options);
         }
       }
+      if (options.getExportSummaryToBigQuery()){
+        savePerfsToBigQuery(options, actual, null);
+      }
     } finally {
       if (options.getMonitorJobs()) {
         // Report overall performance.
-        saveSummary(options.getSummaryFilename(), configurations, actual, 
baseline, start);
+        saveSummary(options.getSummaryFilename(), configurations, actual, 
baseline, start, options);
         saveJavascript(options.getJavascriptFilename(), configurations, 
actual, baseline, start);
       }
     }
-
     if (!successful) {
       throw new RuntimeException("Execution was not successful");
     }
   }
 
+  @VisibleForTesting
+  static void savePerfsToBigQuery(
+      NexmarkOptions options,
+      Map<NexmarkConfiguration, NexmarkPerf> perfs,
+      @Nullable BigQueryServices testBigQueryServices) {
+    Pipeline pipeline = Pipeline.create(options);
+    PCollection<KV<NexmarkConfiguration, NexmarkPerf>> perfsPCollection =
+        pipeline.apply(
+            Create.of(perfs)
+                .withCoder(
+                    KvCoder.of(
+                        SerializableCoder.of(NexmarkConfiguration.class),
+                        new CustomCoder<NexmarkPerf>() {
+
+                          @Override
+                          public void encode(NexmarkPerf value, OutputStream 
outStream)
+                              throws CoderException, IOException {
+                            StringUtf8Coder.of().encode(value.toString(), 
outStream);
+                          }
+
+                          @Override
+                          public NexmarkPerf decode(InputStream inStream)
+                              throws CoderException, IOException {
+                            String perf = 
StringUtf8Coder.of().decode(inStream);
+                            return NexmarkPerf.fromString(perf);
+                          }
+                        })));
+
+    TableSchema tableSchema =
+        new TableSchema()
+            .setFields(
+                ImmutableList.of(
+                    new 
TableFieldSchema().setName("Runtime(sec)").setType("FLOAT"),
+                    new 
TableFieldSchema().setName("Events(/sec)").setType("FLOAT"),
+                    new TableFieldSchema()
+                        .setName("Size of the result collection")
+                        .setType("INTEGER")));
+
+    String tableSpec =
+        NexmarkUtils.tableSpec(options, "{query}", 0L, null);
+    SerializableFunction<
+            ValueInSingleWindow<KV<NexmarkConfiguration, NexmarkPerf>>, 
TableDestination>
+        tableFunction =
+            input -> new TableDestination(
+                tableSpec.replace("{query}", 
String.valueOf(input.getValue().getKey().query)),
+                "perfkit queries");
+    SerializableFunction<KV<NexmarkConfiguration, NexmarkPerf>, TableRow> 
rowFunction =
+        input -> {
+          NexmarkPerf nexmarkPerf = input.getValue();
+          TableRow row = new TableRow()
+              .set("Runtime(sec)", nexmarkPerf.runtimeSec)
+              .set("Events(/sec)", nexmarkPerf.eventsPerSec)
+              .set("Size of the result collection", nexmarkPerf.numResults);
+          return row;
+        };
+    BigQueryIO.Write io =
+        BigQueryIO.<KV<NexmarkConfiguration, NexmarkPerf>>write()
+            .to(tableFunction)
+            .withSchema(tableSchema)
+            
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+            
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
+            .withFormatFunction(rowFunction);
+    if (testBigQueryServices != null){
+      io = io.withTestServices(testBigQueryServices);
+    }
+    perfsPCollection.apply("savePerfsToBigQuery", io);
+    pipeline.run();
+  }
+
   /**
    * Append the pair of {@code configuration} and {@code perf} to perf file.
    */
@@ -146,13 +238,15 @@ private void appendPerf(
   private static final String LINE =
       
"==========================================================================================";
 
-  /**
-   * Print summary  of {@code actual} vs (if non-null) {@code baseline}.
-   */
+  /** Print summary of {@code actual} vs (if non-null) {@code baseline}. */
   private static void saveSummary(
       @Nullable String summaryFilename,
-      Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, 
NexmarkPerf> actual,
-      @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant 
start) {
+      Iterable<NexmarkConfiguration> configurations,
+      Map<NexmarkConfiguration, NexmarkPerf> actual,
+      @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline,
+      Instant start,
+      NexmarkOptions options) {
+
     List<String> lines = new ArrayList<>();
 
     lines.add("");
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
index 904fcd5cbcb..5f834cf6405 100644
--- 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
+++ 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
@@ -47,6 +47,13 @@
   @JsonProperty
   public NexmarkUtils.SinkType sinkType = NexmarkUtils.SinkType.DEVNULL;
 
+  /**
+   * If false, the summary is only output to the console. If true the summary 
is output to the
+   * console and it's content is written to bigquery tables according to {@link
+   * NexmarkOptions#getResourceNameMode()}.
+   */
+  @JsonProperty public boolean exportSummaryToBigQuery = false;
+
   /**
    * Control whether pub/sub publishing is done in a stand-alone pipeline or 
is integrated
    * into the overall query pipeline.
@@ -253,6 +260,9 @@ public void overrideFromOptions(NexmarkOptions options) {
     if (options.getSinkType() != null) {
       sinkType = options.getSinkType();
     }
+    if (options.getExportSummaryToBigQuery() != null) {
+      exportSummaryToBigQuery = options.getExportSummaryToBigQuery();
+    }
     if (options.getPubSubMode() != null) {
       pubSubMode = options.getPubSubMode();
     }
@@ -367,6 +377,7 @@ public NexmarkConfiguration copy() {
     result.query = query;
     result.sourceType = sourceType;
     result.sinkType = sinkType;
+    result.exportSummaryToBigQuery = exportSummaryToBigQuery;
     result.pubSubMode = pubSubMode;
     result.numEvents = numEvents;
     result.numEventGenerators = numEventGenerators;
@@ -420,6 +431,9 @@ public String toShortString() {
     if (sinkType != DEFAULT.sinkType) {
       sb.append(String.format("; sinkType:%s", sinkType));
     }
+    if (exportSummaryToBigQuery != DEFAULT.exportSummaryToBigQuery) {
+      sb.append(String.format("; exportSummaryToBigQuery:%s", 
exportSummaryToBigQuery));
+    }
     if (pubSubMode != DEFAULT.pubSubMode) {
       sb.append(String.format("; pubSubMode:%s", pubSubMode));
     }
@@ -553,6 +567,7 @@ public int hashCode() {
         query,
         sourceType,
         sinkType,
+        exportSummaryToBigQuery,
         pubSubMode,
         numEvents,
         numEventGenerators,
@@ -695,6 +710,9 @@ public boolean equals(Object obj) {
     if (sinkType != other.sinkType) {
       return false;
     }
+    if (exportSummaryToBigQuery != other.exportSummaryToBigQuery) {
+      return false;
+    }
     if (sourceType != other.sourceType) {
       return false;
     }
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
index ebc87054936..2725b1909b4 100644
--- 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
+++ 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
@@ -20,7 +20,6 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static org.apache.beam.sdk.nexmark.NexmarkUtils.PubSubMode.COMBINED;
-import static 
org.apache.beam.sdk.nexmark.NexmarkUtils.PubSubMode.SUBSCRIBE_ONLY;
 
 import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableRow;
@@ -686,6 +685,14 @@ private String shortTopic(long now) {
         return String.format("%s_%s_source", baseTopic, queryName);
       case QUERY_AND_SALT:
         return String.format("%s_%s_%d_source", baseTopic, queryName, now);
+      case QUERY_RUNNER_AND_MODE:
+        return String.format(
+            "%s_%s_%s_%s_source",
+            baseTopic,
+            queryName,
+            options.getRunner().getSimpleName(),
+            options.isStreaming());
+
     }
     throw new RuntimeException("Unrecognized enum " + 
options.getResourceNameMode());
   }
@@ -705,6 +712,13 @@ private String shortSubscription(long now) {
         return String.format("%s_%s_source", baseSubscription, queryName);
       case QUERY_AND_SALT:
         return String.format("%s_%s_%d_source", baseSubscription, queryName, 
now);
+      case QUERY_RUNNER_AND_MODE:
+        return String.format(
+            "%s_%s_%s_%s_source",
+            baseSubscription,
+            queryName,
+            options.getRunner().getSimpleName(),
+            options.isStreaming());
     }
     throw new RuntimeException("Unrecognized enum " + 
options.getResourceNameMode());
   }
@@ -724,31 +738,17 @@ private String textFilename(long now) {
         return String.format("%s/nexmark_%s.txt", baseFilename, queryName);
       case QUERY_AND_SALT:
         return String.format("%s/nexmark_%s_%d.txt", baseFilename, queryName, 
now);
+      case QUERY_RUNNER_AND_MODE:
+        return String.format(
+            "%s/nexmark_%s_%s_%s",
+            baseFilename,
+            queryName,
+            options.getRunner().getSimpleName(),
+            options.isStreaming());
     }
     throw new RuntimeException("Unrecognized enum " + 
options.getResourceNameMode());
   }
 
-  /**
-   * Return a BigQuery table spec.
-   */
-  private String tableSpec(long now, String version) {
-    String baseTableName = options.getBigQueryTable();
-    if (Strings.isNullOrEmpty(baseTableName)) {
-      throw new RuntimeException("Missing --bigQueryTable");
-    }
-    switch (options.getResourceNameMode()) {
-      case VERBATIM:
-        return String.format("%s:nexmark.%s_%s",
-                             options.getProject(), baseTableName, version);
-      case QUERY:
-        return String.format("%s:nexmark.%s_%s_%s",
-                             options.getProject(), baseTableName, queryName, 
version);
-      case QUERY_AND_SALT:
-        return String.format("%s:nexmark.%s_%s_%s_%d",
-                             options.getProject(), baseTableName, queryName, 
version, now);
-    }
-    throw new RuntimeException("Unrecognized enum " + 
options.getResourceNameMode());
-  }
 
   /**
    * Return a directory for logs.
@@ -765,6 +765,14 @@ private String logsDir(long now) {
         return String.format("%s/logs_%s", baseFilename, queryName);
       case QUERY_AND_SALT:
         return String.format("%s/logs_%s_%d", baseFilename, queryName, now);
+      case QUERY_RUNNER_AND_MODE:
+        return String.format(
+            "%s/logs_%s_%s_%s",
+            baseFilename,
+            queryName,
+            options.getRunner().getSimpleName(),
+            options.isStreaming());
+
     }
     throw new RuntimeException("Unrecognized enum " + 
options.getResourceNameMode());
   }
@@ -980,7 +988,7 @@ public void processElement(ProcessContext c) {
   private void sinkResultsToBigQuery(
       PCollection<String> formattedResults, long now,
       String version) {
-    String tableSpec = tableSpec(now, version);
+    String tableSpec = NexmarkUtils.tableSpec(options, queryName, now, 
version);
     TableSchema tableSchema =
         new TableSchema().setFields(ImmutableList.of(
             new TableFieldSchema().setName("result").setType("STRING"),
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
index 592d1aa9c06..58ab594bbbd 100644
--- 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
+++ 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
@@ -61,6 +61,12 @@
 
   void setSinkType(NexmarkUtils.SinkType sinkType);
 
+  @Description("Shall we export the summary to BigQuery.")
+  @Default.Boolean(false)
+  Boolean getExportSummaryToBigQuery();
+
+  void setExportSummaryToBigQuery(Boolean exportSummaryToBigQuery);
+
   @Description("Which mode to run in when source is PUBSUB.")
   @Nullable
   NexmarkUtils.PubSubMode getPubSubMode();
@@ -100,6 +106,12 @@
 
   void setBigQueryTable(String bigQueryTable);
 
+  @Description("BigQuery dataset")
+  @Default.String("nexmark")
+  String getBigQueryDataset();
+
+  void setBigQueryDataset(String bigQueryDataset);
+
   @Description("Approximate number of events to generate. "
                + "Zero for effectively unlimited in streaming mode.")
   @Nullable
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
index 5d89dbde439..28e713a20ea 100644
--- 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
+++ 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.nexmark;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
 import com.google.common.hash.Hashing;
 import java.io.IOException;
 import java.io.InputStream;
@@ -185,7 +186,56 @@
     /** Names are suffixed with the query being run. */
     QUERY,
     /** Names are suffixed with the query being run and a random number. */
-    QUERY_AND_SALT
+    QUERY_AND_SALT,
+    /** Names are suffixed with the runner being used and a the mode 
(streaming/batch). */
+    QUERY_RUNNER_AND_MODE
+  }
+
+  /** Return a BigQuery table spec. */
+  static String tableSpec(NexmarkOptions options, String queryName, long now, 
String version) {
+    String baseTableName = options.getBigQueryTable();
+    if (Strings.isNullOrEmpty(baseTableName)) {
+      throw new RuntimeException("Missing --bigQueryTable");
+    }
+    switch (options.getResourceNameMode()) {
+      case VERBATIM:
+        return String.format(
+            "%s:%s.%s_%s",
+            options.getProject(), options.getBigQueryDataset(), baseTableName, 
version);
+      case QUERY:
+        return String.format(
+            "%s:%s.%s_%s_%s",
+            options.getProject(), options.getBigQueryDataset(), baseTableName, 
queryName, version);
+      case QUERY_AND_SALT:
+        return String.format(
+            "%s:%s.%s_%s_%s_%d",
+            options.getProject(),
+            options.getBigQueryDataset(),
+            baseTableName,
+            queryName,
+            version,
+            now);
+      case QUERY_RUNNER_AND_MODE:
+        return (version != null)
+            ? String.format(
+                "%s:%s.%s_%s_%s_%s_%s",
+                options.getProject(),
+                options.getBigQueryDataset(),
+                baseTableName,
+                queryName,
+                options.getRunner().getSimpleName(),
+                options.isStreaming(),
+                version)
+            : String.format(
+                "%s:%s.%s_%s_%s_%s",
+                options.getProject(),
+                options.getBigQueryDataset(),
+                baseTableName,
+                queryName,
+                options.getRunner().getSimpleName(),
+                options.isStreaming());
+    }
+    throw new RuntimeException("Unrecognized enum " + 
options.getResourceNameMode());
   }
 
   /**
diff --git 
a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
 
b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
new file mode 100644
index 00000000000..6fc5dabb50e
--- /dev/null
+++ 
b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.beam.runners.direct.DirectRunner;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
+import org.apache.beam.sdk.io.gcp.bigquery.FakeBigQueryServices;
+import org.apache.beam.sdk.io.gcp.bigquery.FakeDatasetService;
+import org.apache.beam.sdk.io.gcp.bigquery.FakeJobService;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/** Test class for BigQuery sinks. */
+public class PerfsToBigQueryTest {
+
+  private static final int QUERY = 1;
+  private NexmarkOptions options;
+  private FakeDatasetService fakeDatasetService = new FakeDatasetService();
+  private FakeJobService fakeJobService = new FakeJobService();
+  private FakeBigQueryServices fakeBqServices =
+      new FakeBigQueryServices()
+          .withDatasetService(fakeDatasetService)
+          .withJobService(fakeJobService);
+  @Rule
+  public transient TemporaryFolder testFolder = new TemporaryFolder();
+
+
+  @Before
+  public void before() throws IOException, InterruptedException {
+    options = PipelineOptionsFactory.create().as(NexmarkOptions.class);
+    options.setBigQueryTable("nexmark");
+    options.setBigQueryDataset("nexmark");
+    options.setRunner(DirectRunner.class);
+    options.setStreaming(true);
+    options.setProject("nexmark-test");
+    options.setTempLocation(testFolder.getRoot().getAbsolutePath());
+    
options.setResourceNameMode(NexmarkUtils.ResourceNameMode.QUERY_RUNNER_AND_MODE);
+    FakeDatasetService.setUp();
+    //    BigQueryIO.clearCreatedTables();
+    fakeDatasetService.createDataset(
+        options.getProject(), options.getBigQueryDataset(), "", "", null);
+  }
+
+  @Test
+  public void testSavePerfsToBigQuery() throws IOException, 
InterruptedException {
+    NexmarkConfiguration nexmarkConfiguration1 = new NexmarkConfiguration();
+    nexmarkConfiguration1.query = QUERY;
+    // just for the 2 configurations to be different to have different keys
+    nexmarkConfiguration1.cpuDelayMs = 100L;
+    NexmarkPerf nexmarkPerf1 = new NexmarkPerf();
+    nexmarkPerf1.numResults = 1000L;
+    nexmarkPerf1.eventsPerSec = 0.5F;
+    nexmarkPerf1.runtimeSec = 0.325F;
+
+    NexmarkConfiguration nexmarkConfiguration2 = new NexmarkConfiguration();
+    nexmarkConfiguration2.query = QUERY;
+    // just for the 2 configurations to be different to have different keys
+    nexmarkConfiguration1.cpuDelayMs = 200L;
+    NexmarkPerf nexmarkPerf2 = new NexmarkPerf();
+    nexmarkPerf2.numResults = 1001L;
+    nexmarkPerf2.eventsPerSec = 1.5F;
+    nexmarkPerf2.runtimeSec = 1.325F;
+
+    // simulate 2 runs of the same query just to check that rows are apened 
correctly.
+    HashMap<NexmarkConfiguration, NexmarkPerf> perfs = new HashMap<>(2);
+    perfs.put(nexmarkConfiguration1, nexmarkPerf1);
+    perfs.put(nexmarkConfiguration2, nexmarkPerf2);
+    Main.savePerfsToBigQuery(options, perfs, fakeBqServices);
+
+    String tableSpec = NexmarkUtils.tableSpec(options, String.valueOf(QUERY), 
0L, null);
+    List<TableRow> actualRows =
+        fakeDatasetService.getAllRows(
+            options.getProject(),
+            options.getBigQueryDataset(),
+            BigQueryHelpers.parseTableSpec(tableSpec).getTableId());
+    assertEquals("Wrong number of rows inserted", 2, actualRows.size());
+    List<TableRow> expectedRows = new ArrayList<>();
+    TableRow row1 = new TableRow()
+        .set("Runtime(sec)", nexmarkPerf1.runtimeSec).set("Events(/sec)", 
nexmarkPerf1.eventsPerSec)
+        // when read using TableRowJsonCoder the row field is boxed into an 
Integer, cast it to int
+        // to for bowing into Integer in the expectedRows.
+        .set("Size of the result collection", (int) nexmarkPerf1.numResults);
+    expectedRows.add(row1);
+    TableRow row2 = new TableRow()
+        .set("Runtime(sec)", nexmarkPerf2.runtimeSec).set("Events(/sec)", 
nexmarkPerf2.eventsPerSec)
+        // when read using TableRowJsonCoder the row field is boxed into an 
Integer, cast it to int
+        // to for bowing into Integer in the expectedRows.
+        .set("Size of the result collection", (int) nexmarkPerf2.numResults);
+    expectedRows.add(row2);
+    assertThat(actualRows, containsInAnyOrder(Iterables.toArray(expectedRows, 
TableRow.class)));
+
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 112374)
    Time Spent: 8h 10m  (was: 8h)

> Export nexmark execution times to bigQuery
> ------------------------------------------
>
>                 Key: BEAM-4283
>                 URL: https://issues.apache.org/jira/browse/BEAM-4283
>             Project: Beam
>          Issue Type: Sub-task
>          Components: examples-nexmark
>            Reporter: Etienne Chauchot
>            Assignee: Etienne Chauchot
>            Priority: Major
>          Time Spent: 8h 10m
>  Remaining Estimate: 0h
>
> Nexmark only outputs the results collection to bigQuery and prints in the 
> console the execution times. To supervise Nexmark execution times, we need to 
> store them as well per runner/query/mode



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to