[ 
https://issues.apache.org/jira/browse/BEAM-5906?focusedWorklogId=163513&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-163513
 ]

ASF GitHub Bot logged work on BEAM-5906:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Nov/18 17:26
            Start Date: 07/Nov/18 17:26
    Worklog Time Spent: 10m 
      Work Description: lgajowy commented on a change in pull request #6886: 
[BEAM-5906] Use dedicated BigQuery client for publishing Nexmark results
URL: https://github.com/apache/beam/pull/6886#discussion_r231601088
 
 

 ##########
 File path: 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
 ##########
 @@ -177,77 +160,36 @@ void runAll(String[] args) throws IOException {
 
   @VisibleForTesting
   static void savePerfsToBigQuery(
+      BigQueryClient bigQueryClient,
       NexmarkOptions options,
       Map<NexmarkConfiguration, NexmarkPerf> perfs,
-      @Nullable BigQueryServices testBigQueryServices,
       Instant start) {
-    Pipeline pipeline = Pipeline.create(options);
-    PCollection<KV<NexmarkConfiguration, NexmarkPerf>> perfsPCollection =
-        pipeline.apply(
-            Create.of(perfs)
-                .withCoder(
-                    KvCoder.of(
-                        SerializableCoder.of(NexmarkConfiguration.class),
-                        new CustomCoder<NexmarkPerf>() {
-
-                          @Override
-                          public void encode(NexmarkPerf value, OutputStream 
outStream)
-                              throws CoderException, IOException {
-                            StringUtf8Coder.of().encode(value.toString(), 
outStream);
-                          }
-
-                          @Override
-                          public NexmarkPerf decode(InputStream inStream)
-                              throws CoderException, IOException {
-                            String perf = 
StringUtf8Coder.of().decode(inStream);
-                            return NexmarkPerf.fromString(perf);
-                          }
-                        })));
-
-    TableSchema tableSchema =
-        new TableSchema()
-            .setFields(
-                ImmutableList.of(
-                    new 
TableFieldSchema().setName("timestamp").setType("TIMESTAMP"),
-                    new 
TableFieldSchema().setName("runtimeSec").setType("FLOAT"),
-                    new 
TableFieldSchema().setName("eventsPerSec").setType("FLOAT"),
-                    new 
TableFieldSchema().setName("numResults").setType("INTEGER")));
-
-    String queryName = "{query}";
-    if (options.getQueryLanguage() != null) {
-      queryName = queryName + "_" + options.getQueryLanguage();
-    }
-    final String tableSpec = NexmarkUtils.tableSpec(options, queryName, 0L, 
null);
-    SerializableFunction<
-            ValueInSingleWindow<KV<NexmarkConfiguration, NexmarkPerf>>, 
TableDestination>
-        tableFunction =
-            input ->
-                new TableDestination(
-                    tableSpec.replace("{query}", 
input.getValue().getKey().query.getNumberOrName()),
-                    "perfkit queries");
-    SerializableFunction<KV<NexmarkConfiguration, NexmarkPerf>, TableRow> 
rowFunction =
-        input -> {
-          NexmarkPerf nexmarkPerf = input.getValue();
-          TableRow row =
-              new TableRow()
-                  .set("timestamp", start.getMillis() / 1000)
-                  .set("runtimeSec", nexmarkPerf.runtimeSec)
-                  .set("eventsPerSec", nexmarkPerf.eventsPerSec)
-                  .set("numResults", nexmarkPerf.numResults);
-          return row;
-        };
-    BigQueryIO.Write io =
-        BigQueryIO.<KV<NexmarkConfiguration, NexmarkPerf>>write()
-            .to(tableFunction)
-            .withSchema(tableSchema)
-            
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
-            
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
-            .withFormatFunction(rowFunction);
-    if (testBigQueryServices != null) {
-      io = io.withTestServices(testBigQueryServices);
+
+    for (Map.Entry<NexmarkConfiguration, NexmarkPerf> entry : 
perfs.entrySet()) {
+      String queryName =
+          NexmarkUtils.fullQueryName(
+              options.getQueryLanguage(), 
entry.getKey().query.getNumberOrName());
+      String tableName = NexmarkUtils.tableName(options, queryName, 0L, null);
+
+      ImmutableMap<String, String> schema =
+          ImmutableMap.<String, String>builder()
+              .put("timestamp", "timestamp")
+              .put("runtimeSec", "float")
+              .put("eventsPerSec", "float")
+              .put("numResults", "integer")
+              .build();
+      bigQueryClient.createTableIfNotExists(tableName, schema);
+
+      Map<String, Object> record =
+          ImmutableMap.<String, Object>builder()
+              .put("timestamp", start.getMillis() / 1000)
+              .put("runtimeSec", entry.getValue().runtimeSec)
+              .put("eventsPerSec", entry.getValue().eventsPerSec)
+              .put("numResults", entry.getValue().numResults)
+              .build();
+
+      bigQueryClient.insertRow(record, tableName);
 
 Review comment:
   Yes - I tested this and I spotted no differences or errors. However, there 
is now some work in enabling phrase triggering on Nexmark going on 
([BEAM-6011](https://issues.apache.org/jira/browse/BEAM-6011)) so it may be 
reasonable to wait for it to be done and then merge it (if the waiting won't be 
too long). 
   
   The library I used does all the connection retries, logs in to google acount 
(the same way as the `BigQueryIO` did) etc. So IMO, it makes a perfect fit 
there. As you can see, the only thing I had to do is to add a thin wrapper over 
it to fit testing needs.  
   
   As for new table creation - there is a new method `createTableIfNotExists()` 
called in line 181 for that. 
   As for appending vs replacing: let me check this one more time but I think 
that it creates new rows instead of replacing them. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 163513)
    Time Spent: 1h 50m  (was: 1h 40m)

> Remove pipeline for publishing nexmark results to bigQuery and publish using 
> BigQuery API only
> ----------------------------------------------------------------------------------------------
>
>                 Key: BEAM-5906
>                 URL: https://issues.apache.org/jira/browse/BEAM-5906
>             Project: Beam
>          Issue Type: Improvement
>          Components: examples-nexmark
>            Reporter: Lukasz Gajowy
>            Assignee: Lukasz Gajowy
>            Priority: Minor
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> There's no need to start a separate pipeline for uploading metrics results 
> from Nexmark suites to BigQuery. We can use an API designed for that and 
> place it in test-utils. Thanks to that: 
>  - it won't start a separate pipeline every time it publishes results
>  - other suites will be able to use that code
>  - We will not face problems like special long to int conversion due to 
> problems in BigQueryIO (eg. BEAM-4734) because we will use a thin API instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to