[ 
https://issues.apache.org/jira/browse/BEAM-5906?focusedWorklogId=166479&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166479
 ]

ASF GitHub Bot logged work on BEAM-5906:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Nov/18 16:16
            Start Date: 15/Nov/18 16:16
    Worklog Time Spent: 10m 
      Work Description: echauchot closed pull request #6886: [BEAM-5906] Use 
dedicated BigQuery client for publishing Nexmark results
URL: https://github.com/apache/beam/pull/6886
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index ff1d38b859f..2ef7b5f1654 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -374,6 +374,7 @@ class BeamModulePlugin implements Plugin<Project> {
         google_api_services_storage                 : 
"com.google.apis:google-api-services-storage:v1-rev136-$google_clients_version",
         google_auth_library_credentials             : 
"com.google.auth:google-auth-library-credentials:$google_auth_version",
         google_auth_library_oauth2_http             : 
"com.google.auth:google-auth-library-oauth2-http:$google_auth_version",
+        google_cloud_bigquery                       : 
"com.google.cloud:google-cloud-bigquery:$google_clients_version",
         google_cloud_core                           : 
"com.google.cloud:google-cloud-core:$google_cloud_core_version",
         google_cloud_core_grpc                      : 
"com.google.cloud:google-cloud-core-grpc:$google_cloud_core_version",
         google_cloud_dataflow_java_proto_library_all: 
"com.google.cloud.dataflow:google-cloud-dataflow-java-proto-library-all:0.5.160304",
diff --git a/sdks/java/testing/nexmark/build.gradle 
b/sdks/java/testing/nexmark/build.gradle
index 1944d638285..16d144e8ee9 100644
--- a/sdks/java/testing/nexmark/build.gradle
+++ b/sdks/java/testing/nexmark/build.gradle
@@ -71,6 +71,7 @@ dependencies {
   provided library.java.hamcrest_core
   shadow project(path: ":beam-sdks-java-io-google-cloud-platform", 
configuration: "shadow")
   shadowTest project(path: ":beam-sdks-java-io-google-cloud-platform", 
configuration: "shadowTest")
+  shadowTest project(path: ":beam-sdks-java-test-utils", configuration: 
"shadowTest")
   testCompile library.java.hamcrest_core
   testCompile library.java.hamcrest_library
 
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
index fcccf8d2431..2461584847f 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
@@ -17,14 +17,9 @@
  */
 package org.apache.beam.sdk.nexmark;
 
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
@@ -41,24 +36,11 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
-import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
 import org.apache.beam.sdk.nexmark.model.Auction;
 import org.apache.beam.sdk.nexmark.model.Bid;
 import org.apache.beam.sdk.nexmark.model.Person;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.testutils.publishing.BigQueryClient;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -162,7 +144,8 @@ void runAll(NexmarkOptions options) throws IOException {
       }
 
       if (options.getExportSummaryToBigQuery()) {
-        savePerfsToBigQuery(options, actual, null, start);
+        BigQueryClient publisher = 
BigQueryClient.create(options.getBigQueryDataset());
+        savePerfsToBigQuery(publisher, options, actual, start);
       }
     } finally {
       if (options.getMonitorJobs()) {
@@ -180,77 +163,37 @@ void runAll(NexmarkOptions options) throws IOException {
 
   @VisibleForTesting
   static void savePerfsToBigQuery(
+      BigQueryClient bigQueryClient,
       NexmarkOptions options,
       Map<NexmarkConfiguration, NexmarkPerf> perfs,
-      @Nullable BigQueryServices testBigQueryServices,
       Instant start) {
-    Pipeline pipeline = Pipeline.create(options);
-    PCollection<KV<NexmarkConfiguration, NexmarkPerf>> perfsPCollection =
-        pipeline.apply(
-            Create.of(perfs)
-                .withCoder(
-                    KvCoder.of(
-                        SerializableCoder.of(NexmarkConfiguration.class),
-                        new CustomCoder<NexmarkPerf>() {
-
-                          @Override
-                          public void encode(NexmarkPerf value, OutputStream 
outStream)
-                              throws CoderException, IOException {
-                            StringUtf8Coder.of().encode(value.toString(), 
outStream);
-                          }
-
-                          @Override
-                          public NexmarkPerf decode(InputStream inStream)
-                              throws CoderException, IOException {
-                            String perf = 
StringUtf8Coder.of().decode(inStream);
-                            return NexmarkPerf.fromString(perf);
-                          }
-                        })));
-
-    TableSchema tableSchema =
-        new TableSchema()
-            .setFields(
-                ImmutableList.of(
-                    new 
TableFieldSchema().setName("timestamp").setType("TIMESTAMP"),
-                    new 
TableFieldSchema().setName("runtimeSec").setType("FLOAT"),
-                    new 
TableFieldSchema().setName("eventsPerSec").setType("FLOAT"),
-                    new 
TableFieldSchema().setName("numResults").setType("INTEGER")));
-
-    String queryName = "{query}";
-    if (options.getQueryLanguage() != null) {
-      queryName = queryName + "_" + options.getQueryLanguage();
-    }
-    final String tableSpec = NexmarkUtils.tableSpec(options, queryName, 0L, 
null);
-    SerializableFunction<
-            ValueInSingleWindow<KV<NexmarkConfiguration, NexmarkPerf>>, 
TableDestination>
-        tableFunction =
-            input ->
-                new TableDestination(
-                    tableSpec.replace("{query}", 
input.getValue().getKey().query.getNumberOrName()),
-                    "perfkit queries");
-    SerializableFunction<KV<NexmarkConfiguration, NexmarkPerf>, TableRow> 
rowFunction =
-        input -> {
-          NexmarkPerf nexmarkPerf = input.getValue();
-          TableRow row =
-              new TableRow()
-                  .set("timestamp", start.getMillis() / 1000)
-                  .set("runtimeSec", nexmarkPerf.runtimeSec)
-                  .set("eventsPerSec", nexmarkPerf.eventsPerSec)
-                  .set("numResults", nexmarkPerf.numResults);
-          return row;
-        };
-    BigQueryIO.Write io =
-        BigQueryIO.<KV<NexmarkConfiguration, NexmarkPerf>>write()
-            .to(tableFunction)
-            .withSchema(tableSchema)
-            
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
-            
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
-            .withFormatFunction(rowFunction);
-    if (testBigQueryServices != null) {
-      io = io.withTestServices(testBigQueryServices);
+
+    for (Map.Entry<NexmarkConfiguration, NexmarkPerf> entry : 
perfs.entrySet()) {
+      String queryName =
+          NexmarkUtils.fullQueryName(
+              options.getQueryLanguage(), 
entry.getKey().query.getNumberOrName());
+      String tableName = NexmarkUtils.tableName(options, queryName, 0L, null);
+
+      ImmutableMap<String, String> schema =
+          ImmutableMap.<String, String>builder()
+              .put("timestamp", "timestamp")
+              .put("runtimeSec", "float")
+              .put("eventsPerSec", "float")
+              .put("numResults", "integer")
+              .build();
+      bigQueryClient.createTableIfNotExists(tableName, schema);
+
+      // convert millis to seconds (it's a BigQuery's requirement).
+      Map<String, Object> record =
+          ImmutableMap.<String, Object>builder()
+              .put("timestamp", start.getMillis() / 1000)
+              .put("runtimeSec", entry.getValue().runtimeSec)
+              .put("eventsPerSec", entry.getValue().eventsPerSec)
+              .put("numResults", entry.getValue().numResults)
+              .build();
+
+      bigQueryClient.insertRow(record, tableName);
     }
-    perfsPCollection.apply("savePerfsToBigQuery", io);
-    pipeline.run();
   }
 
   /** Append the pair of {@code configuration} and {@code perf} to perf file. 
*/
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
index 72dccc4df34..df1c7908cd5 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
@@ -165,53 +165,51 @@
     QUERY_RUNNER_AND_MODE
   }
 
+  /** Return a query name with query language (if applicable). */
+  static String fullQueryName(String queryLanguage, String query) {
+    return queryLanguage != null ? query + "_" + queryLanguage : query;
+  }
+
   /** Return a BigQuery table spec. */
   static String tableSpec(NexmarkOptions options, String queryName, long now, 
String version) {
+    return String.format(
+        "%s:%s.%s",
+        options.getProject(),
+        options.getBigQueryDataset(),
+        tableName(options, queryName, now, version));
+  }
+
+  /** Return a BigQuery table name. */
+  static String tableName(NexmarkOptions options, String queryName, long now, 
String version) {
     String baseTableName = options.getBigQueryTable();
     if (Strings.isNullOrEmpty(baseTableName)) {
       throw new RuntimeException("Missing --bigQueryTable");
     }
+
     switch (options.getResourceNameMode()) {
       case VERBATIM:
-        return String.format(
-            "%s:%s.%s_%s",
-            options.getProject(), options.getBigQueryDataset(), baseTableName, 
version);
+        return String.format("%s_%s", baseTableName, version);
       case QUERY:
-        return String.format(
-            "%s:%s.%s_%s_%s",
-            options.getProject(), options.getBigQueryDataset(), baseTableName, 
queryName, version);
+        return String.format("%s_%s_%s", baseTableName, queryName, version);
       case QUERY_AND_SALT:
-        return String.format(
-            "%s:%s.%s_%s_%s_%d",
-            options.getProject(),
-            options.getBigQueryDataset(),
-            baseTableName,
-            queryName,
-            version,
-            now);
+        return String.format("%s_%s_%s_%d", baseTableName, queryName, version, 
now);
       case QUERY_RUNNER_AND_MODE:
-        return (version != null)
-            ? String.format(
-                "%s:%s.%s_%s_%s_%s_%s",
-                options.getProject(),
-                options.getBigQueryDataset(),
-                baseTableName,
-                queryName,
-                options.getRunner().getSimpleName(),
-                options.isStreaming() ? "streaming" : "batch",
-                version)
-            : String.format(
-                "%s:%s.%s_%s_%s_%s",
-                options.getProject(),
-                options.getBigQueryDataset(),
-                baseTableName,
-                queryName,
-                options.getRunner().getSimpleName(),
-                options.isStreaming() ? "streaming" : "batch");
+        String runnerName = options.getRunner().getSimpleName();
+        boolean isStreaming = options.isStreaming();
+
+        String tableName =
+            String.format(
+                "%s_%s_%s_%s", baseTableName, queryName, runnerName, 
processingMode(isStreaming));
+
+        return version != null ? String.format("%s_%s", tableName, version) : 
tableName;
     }
     throw new RuntimeException("Unrecognized enum " + 
options.getResourceNameMode());
   }
 
+  private static String processingMode(boolean isStreaming) {
+    return isStreaming ? "streaming" : "batch";
+  }
+
   /** Units for rates. */
   public enum RateUnit {
     PER_SECOND(1_000_000L),
diff --git 
a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/NexmarkUtilsTest.java
 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/NexmarkUtilsTest.java
index 9a255895726..df1927d343b 100644
--- 
a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/NexmarkUtilsTest.java
+++ 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/NexmarkUtilsTest.java
@@ -17,11 +17,21 @@
  */
 package org.apache.beam.sdk.nexmark;
 
+import static org.apache.beam.sdk.nexmark.NexmarkUtils.ResourceNameMode.QUERY;
+import static 
org.apache.beam.sdk.nexmark.NexmarkUtils.ResourceNameMode.QUERY_AND_SALT;
+import static 
org.apache.beam.sdk.nexmark.NexmarkUtils.ResourceNameMode.QUERY_RUNNER_AND_MODE;
+import static 
org.apache.beam.sdk.nexmark.NexmarkUtils.ResourceNameMode.VERBATIM;
+import static org.testng.Assert.assertEquals;
+
 import java.util.Random;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.KV;
@@ -31,7 +41,7 @@
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/** Test the Nexmark utils. */
+/** Tests the {@link NexmarkUtils}. */
 @RunWith(JUnit4.class)
 public class NexmarkUtilsTest {
 
@@ -64,4 +74,100 @@ public void testPrepareCsvSideInput() throws Exception {
       NexmarkUtils.cleanUpSideInput(config);
     }
   }
+
+  @Test
+  public void testFullQueryNameAppendsLanguageIfNeeded() {
+    String fullName = NexmarkUtils.fullQueryName("sql", "1");
+    assertEquals(fullName, "1_sql");
+  }
+
+  @Test
+  public void testFullQueryNameDoesntContainNullLanguage() {
+    String fullName = NexmarkUtils.fullQueryName(null, "1");
+    assertEquals(fullName, "1");
+  }
+
+  @Test
+  public void testTableName() {
+    String table = "nexmark";
+    String query = "query";
+    long salt = 1111;
+    String version = "version";
+    Class runner = Runner.class;
+    boolean isStreaming = true;
+
+    testTableName(VERBATIM, table, query, salt, version, runner, isStreaming, 
"nexmark_version");
+
+    testTableName(QUERY, table, query, salt, version, runner, isStreaming, 
"nexmark_query_version");
+
+    testTableName(
+        QUERY_AND_SALT,
+        table,
+        query,
+        salt,
+        version,
+        runner,
+        isStreaming,
+        "nexmark_query_version_1111");
+
+    testTableName(
+        QUERY_RUNNER_AND_MODE,
+        table,
+        query,
+        salt,
+        version,
+        runner,
+        isStreaming,
+        "nexmark_query_Runner_streaming_version");
+
+    testTableName(
+        QUERY_RUNNER_AND_MODE,
+        table,
+        query,
+        salt,
+        version,
+        runner,
+        !isStreaming,
+        "nexmark_query_Runner_batch_version");
+
+    testTableName(
+        QUERY_RUNNER_AND_MODE,
+        table,
+        query,
+        salt,
+        null,
+        runner,
+        isStreaming,
+        "nexmark_query_Runner_streaming");
+  }
+
+  private void testTableName(
+      NexmarkUtils.ResourceNameMode nameMode,
+      String baseTableName,
+      String queryName,
+      Long salt,
+      String version,
+      Class runner,
+      Boolean isStreaming,
+      String expected) {
+    NexmarkOptions options = PipelineOptionsFactory.as(NexmarkOptions.class);
+    options.setResourceNameMode(nameMode);
+    options.setBigQueryTable(baseTableName);
+    options.setRunner(runner);
+    options.setStreaming(isStreaming);
+
+    String tableName = NexmarkUtils.tableName(options, queryName, salt, 
version);
+
+    assertEquals(tableName, expected);
+  }
+
+  private static class Runner extends PipelineRunner<PipelineResult> {
+
+    private Runner() {}
+
+    @Override
+    public PipelineResult run(Pipeline pipeline) {
+      return null;
+    }
+  }
 }
diff --git 
a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
index 363c3176ea8..d3849e6ea35 100644
--- 
a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
+++ 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
@@ -17,58 +17,42 @@
  */
 package org.apache.beam.sdk.nexmark;
 
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.common.collect.Iterables;
-import java.io.IOException;
-import java.util.ArrayList;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableMap;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.beam.runners.direct.DirectRunner;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
-import org.apache.beam.sdk.io.gcp.bigquery.FakeBigQueryServices;
-import org.apache.beam.sdk.io.gcp.bigquery.FakeDatasetService;
-import org.apache.beam.sdk.io.gcp.bigquery.FakeJobService;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testutils.fakes.FakeBigQueryClient;
 import org.joda.time.Instant;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
 /** Test class for BigQuery sinks. */
 public class PerfsToBigQueryTest {
 
   private static final NexmarkQueryName QUERY = 
NexmarkQueryName.CURRENCY_CONVERSION;
   private NexmarkOptions options;
-  private FakeDatasetService fakeDatasetService = new FakeDatasetService();
-  private FakeJobService fakeJobService = new FakeJobService();
-  private FakeBigQueryServices fakeBqServices =
-      new FakeBigQueryServices()
-          .withDatasetService(fakeDatasetService)
-          .withJobService(fakeJobService);
-  @Rule public transient TemporaryFolder testFolder = new TemporaryFolder();
+
+  private FakeBigQueryClient bigQueryClient;
 
   @Before
-  public void before() throws IOException, InterruptedException {
+  public void before() {
     options = PipelineOptionsFactory.create().as(NexmarkOptions.class);
     options.setBigQueryTable("nexmark");
     options.setBigQueryDataset("nexmark");
     options.setRunner(DirectRunner.class);
     options.setStreaming(true);
     options.setProject("nexmark-test");
-    options.setTempLocation(testFolder.getRoot().getAbsolutePath());
     
options.setResourceNameMode(NexmarkUtils.ResourceNameMode.QUERY_RUNNER_AND_MODE);
-    FakeDatasetService.setUp();
-    fakeDatasetService.createDataset(
-        options.getProject(), options.getBigQueryDataset(), "", "", null);
+
+    bigQueryClient = new FakeBigQueryClient(options.getBigQueryDataset());
   }
 
   @Test
-  public void testSavePerfsToBigQuery() throws IOException, 
InterruptedException {
+  public void testSavePerfsToBigQuery() {
     NexmarkConfiguration nexmarkConfiguration1 = new NexmarkConfiguration();
     nexmarkConfiguration1.query = QUERY;
     // just for the 2 configurations to be different to have different keys
@@ -87,41 +71,48 @@ public void testSavePerfsToBigQuery() throws IOException, 
InterruptedException {
     nexmarkPerf2.eventsPerSec = 1.5F;
     nexmarkPerf2.runtimeSec = 1.325F;
 
-    // simulate 2 runs of the same query just to check that rows are apened 
correctly.
+    // simulate 2 runs of the same query just to check that rows are appended 
correctly.
     HashMap<NexmarkConfiguration, NexmarkPerf> perfs = new HashMap<>(2);
     perfs.put(nexmarkConfiguration1, nexmarkPerf1);
     perfs.put(nexmarkConfiguration2, nexmarkPerf2);
 
-    // cast to int due to BEAM-4734. To avoid overflow on int capacity,
-    // set the instant to a fixed date (and not Instant.now())
-    int startTimestampSeconds = 1454284800;
+    long startTimestampMilliseconds = 1454284800000L;
     Main.savePerfsToBigQuery(
-        options, perfs, fakeBqServices, new Instant(startTimestampSeconds * 
1000L));
-
-    String tableSpec = NexmarkUtils.tableSpec(options, 
QUERY.getNumberOrName(), 0L, null);
-    List<TableRow> actualRows =
-        fakeDatasetService.getAllRows(
-            options.getProject(),
-            options.getBigQueryDataset(),
-            BigQueryHelpers.parseTableSpec(tableSpec).getTableId());
-    assertEquals("Wrong number of rows inserted", 2, actualRows.size());
-    List<TableRow> expectedRows = new ArrayList<>();
-    TableRow row1 =
-        new TableRow()
-            .set("timestamp", startTimestampSeconds)
-            .set("runtimeSec", nexmarkPerf1.runtimeSec)
-            .set("eventsPerSec", nexmarkPerf1.eventsPerSec)
-            // cast to int due to BEAM-4734.
-            .set("numResults", (int) nexmarkPerf1.numResults);
-    expectedRows.add(row1);
-    TableRow row2 =
-        new TableRow()
-            .set("timestamp", startTimestampSeconds)
-            .set("runtimeSec", nexmarkPerf2.runtimeSec)
-            .set("eventsPerSec", nexmarkPerf2.eventsPerSec)
-            // cast to int  due to BEAM-4734.
-            .set("numResults", (int) nexmarkPerf2.numResults);
-    expectedRows.add(row2);
-    assertThat(actualRows, containsInAnyOrder(Iterables.toArray(expectedRows, 
TableRow.class)));
+        bigQueryClient, options, perfs, new 
Instant(startTimestampMilliseconds));
+
+    String tableName = NexmarkUtils.tableName(options, 
QUERY.getNumberOrName(), 0L, null);
+    List<Map<String, ?>> rows = bigQueryClient.getRows(tableName);
+
+    // savePerfsToBigQuery converts millis to seconds (it's a BigQuery's 
requirement).
+    assertContains(nexmarkRecord(nexmarkPerf1, startTimestampMilliseconds / 
1000), rows);
+    assertContains(nexmarkRecord(nexmarkPerf2, startTimestampMilliseconds / 
1000), rows);
+  }
+
+  private Map<String, Object> nexmarkRecord(NexmarkPerf nexmarkPerf, long 
startTimestampSeconds) {
+    return ImmutableMap.<String, Object>builder()
+        .put("timestamp", startTimestampSeconds)
+        .put("runtimeSec", nexmarkPerf.runtimeSec)
+        .put("eventsPerSec", nexmarkPerf.eventsPerSec)
+        .put("numResults", nexmarkPerf.numResults)
+        .build();
+  }
+
+  private void assertContains(Map<String, ?> expectedRecord, List<Map<String, 
?>> actualRecords) {
+    assertTrue(
+        String.format("Record not found: %s", expectedRecord),
+        actualRecords
+            .stream()
+            .anyMatch(actualRecord -> recordEquals(actualRecord, 
expectedRecord)));
+  }
+
+  private boolean recordEquals(Map<String, ?> expected, Map<String, ?> actual) 
{
+    if (expected == null || actual == null) {
+      return false;
+    }
+
+    return expected.get("timestamp").equals(actual.get("timestamp"))
+        && expected.get("runtimeSec").equals(actual.get("runtimeSec"))
+        && expected.get("eventsPerSec").equals(actual.get("eventsPerSec"))
+        && expected.get("numResults").equals(actual.get("numResults"));
   }
 }
diff --git a/sdks/java/testing/test-utils/build.gradle 
b/sdks/java/testing/test-utils/build.gradle
index 4672093f39a..380b31dcea9 100644
--- a/sdks/java/testing/test-utils/build.gradle
+++ b/sdks/java/testing/test-utils/build.gradle
@@ -24,6 +24,8 @@ description = "Apache Beam :: SDKs :: Java :: Test Utils"
 dependencies {
   shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
   compile library.java.guava
+  shadow library.java.google_cloud_bigquery
+  shadow project(path: 
":beam-sdks-java-extensions-google-cloud-platform-core", configuration: 
"shadow")
 
   shadowTest library.java.junit
   shadowTest library.java.mockito_core
diff --git 
a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryClient.java
 
b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryClient.java
new file mode 100644
index 00000000000..fd8b4823d2a
--- /dev/null
+++ 
b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryClient.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testutils.publishing;
+
+import static java.lang.String.format;
+
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.InsertAllRequest;
+import com.google.cloud.bigquery.InsertAllResponse;
+import com.google.cloud.bigquery.LegacySQLTypeName;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.StandardTableDefinition;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.TableInfo;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Wraps {@link BigQuery} to provide high level useful methods for working 
with big query database.
+ */
+public class BigQueryClient {
+
+  private BigQuery client;
+
+  private String projectId;
+
+  private String dataset;
+
+  protected BigQueryClient(BigQuery client, String projectId, String dataset) {
+    this.client = client;
+    this.projectId = projectId;
+    this.dataset = dataset;
+  }
+
+  /**
+   * Creates the publisher with a application default credentials from the 
environment and default
+   * project name.
+   */
+  public static BigQueryClient create(String dataset) {
+    BigQueryOptions options = BigQueryOptions.newBuilder().build();
+
+    return new BigQueryClient(options.getService(), options.getProjectId(), 
dataset);
+  }
+
+  /**
+   * Creates a new table with given schema when it not exists.
+   *
+   * @param tableName name of the desired table
+   * @param schema schema of consequent table fields (name, type pairs).
+   */
+  public void createTableIfNotExists(String tableName, Map<String, String> 
schema) {
+    TableId tableId = TableId.of(projectId, dataset, tableName);
+
+    if (client.getTable(tableId) == null) {
+      List<Field> schemaFields =
+          schema
+              .entrySet()
+              .stream()
+              .map(entry -> Field.of(entry.getKey(), 
LegacySQLTypeName.valueOf(entry.getValue())))
+              .collect(Collectors.toList());
+
+      createTable(tableId, Schema.of(schemaFields));
+    }
+  }
+
+  private void createTable(TableId tableId, Schema schema) {
+    TableInfo tableInfo =
+        TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema))
+            .setFriendlyName(tableId.getTable())
+            .build();
+
+    client.create(tableInfo);
+  }
+
+  /**
+   * Inserts one row to BigQuery table.
+   *
+   * @see #insertAll(Collection, String) for more details
+   */
+  public void insertRow(Map<String, ?> row, String table) {
+    insertAll(Collections.singletonList(row), table);
+  }
+
+  /** Inserts multiple rows of the same schema to a BigQuery table. */
+  public void insertAll(Collection<Map<String, ?>> rows, String table) {
+    TableId tableId = TableId.of(projectId, dataset, table);
+
+    InsertAllRequest.Builder builder = InsertAllRequest.newBuilder(tableId);
+
+    for (Map<String, ?> row : rows) {
+      builder.addRow(row);
+    }
+
+    InsertAllResponse response = client.insertAll(builder.build());
+    handleBigQueryResponseExceptions(response);
+  }
+
+  private void handleBigQueryResponseExceptions(InsertAllResponse response) {
+    if (response.hasErrors()) {
+      throw new RuntimeException(
+          format(
+              "The following errors occurred while inserting to BigQuery: %s",
+              response.getInsertErrors()));
+    }
+  }
+}
diff --git 
a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/package-info.java
 
b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/package-info.java
new file mode 100644
index 00000000000..3bbc1d5de6b
--- /dev/null
+++ 
b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** Tools for publishing data to various data stores (such as BigQuery etc.). 
*/
+package org.apache.beam.sdk.testutils.publishing;
diff --git 
a/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/fakes/FakeBigQueryClient.java
 
b/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/fakes/FakeBigQueryClient.java
new file mode 100644
index 00000000000..313958840ca
--- /dev/null
+++ 
b/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/fakes/FakeBigQueryClient.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testutils.fakes;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.testutils.publishing.BigQueryClient;
+
+/**
+ * A fake implementation of BigQuery client for testing purposes only.
+ *
+ * @see BigQueryClient
+ */
+public class FakeBigQueryClient extends BigQueryClient {
+
+  private Map<String, List<Map<String, ?>>> rowsPerTable;
+
+  public FakeBigQueryClient(String dataset) {
+    super(null, null, dataset);
+    rowsPerTable = new HashMap<>();
+  }
+
+  @Override
+  public void createTableIfNotExists(String tableName, Map<String, String> 
schema) {
+    // do nothing. Assume the table exists.
+  }
+
+  @Override
+  public void insertRow(Map<String, ?> newRow, String table) {
+    List<Map<String, ?>> rows = rowsPerTable.get(table);
+
+    if (rows == null) {
+      rows = new ArrayList<>();
+      rows.add(newRow);
+
+      rowsPerTable.put(table, rows);
+    } else {
+      rows.add(newRow);
+    }
+  }
+
+  public List<Map<String, ?>> getRows(String table) {
+    return rowsPerTable.get(table);
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 166479)
    Time Spent: 7h  (was: 6h 50m)

> Remove pipeline for publishing nexmark results to bigQuery and publish using 
> BigQuery API only
> ----------------------------------------------------------------------------------------------
>
>                 Key: BEAM-5906
>                 URL: https://issues.apache.org/jira/browse/BEAM-5906
>             Project: Beam
>          Issue Type: Improvement
>          Components: examples-nexmark
>            Reporter: Lukasz Gajowy
>            Assignee: Lukasz Gajowy
>            Priority: Minor
>          Time Spent: 7h
>  Remaining Estimate: 0h
>
> There's no need to start a separate pipeline for uploading metrics results 
> from Nexmark suites to BigQuery. We can use an API designed for that and 
> place it in test-utils. Thanks to that: 
>  - it won't start a separate pipeline every time it publishes results
>  - other suites will be able to use that code
>  - We will not face problems like special long to int conversion due to 
> problems in BigQueryIO (eg. BEAM-4734) because we will use a thin API instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to