[
https://issues.apache.org/jira/browse/BEAM-5906?focusedWorklogId=163409&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-163409
]
ASF GitHub Bot logged work on BEAM-5906:
----------------------------------------
Author: ASF GitHub Bot
Created on: 07/Nov/18 10:26
Start Date: 07/Nov/18 10:26
Worklog Time Spent: 10m
Work Description: echauchot commented on a change in pull request #6886:
[BEAM-5906] Use dedicated BigQuery client for publishing Nexmark results
URL: https://github.com/apache/beam/pull/6886#discussion_r231440583
##########
File path:
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
##########
@@ -177,77 +160,36 @@ void runAll(String[] args) throws IOException {
@VisibleForTesting
static void savePerfsToBigQuery(
+ BigQueryClient bigQueryClient,
NexmarkOptions options,
Map<NexmarkConfiguration, NexmarkPerf> perfs,
- @Nullable BigQueryServices testBigQueryServices,
Instant start) {
- Pipeline pipeline = Pipeline.create(options);
- PCollection<KV<NexmarkConfiguration, NexmarkPerf>> perfsPCollection =
- pipeline.apply(
- Create.of(perfs)
- .withCoder(
- KvCoder.of(
- SerializableCoder.of(NexmarkConfiguration.class),
- new CustomCoder<NexmarkPerf>() {
-
- @Override
- public void encode(NexmarkPerf value, OutputStream
outStream)
- throws CoderException, IOException {
- StringUtf8Coder.of().encode(value.toString(),
outStream);
- }
-
- @Override
- public NexmarkPerf decode(InputStream inStream)
- throws CoderException, IOException {
- String perf =
StringUtf8Coder.of().decode(inStream);
- return NexmarkPerf.fromString(perf);
- }
- })));
-
- TableSchema tableSchema =
- new TableSchema()
- .setFields(
- ImmutableList.of(
- new
TableFieldSchema().setName("timestamp").setType("TIMESTAMP"),
- new
TableFieldSchema().setName("runtimeSec").setType("FLOAT"),
- new
TableFieldSchema().setName("eventsPerSec").setType("FLOAT"),
- new
TableFieldSchema().setName("numResults").setType("INTEGER")));
-
- String queryName = "{query}";
- if (options.getQueryLanguage() != null) {
- queryName = queryName + "_" + options.getQueryLanguage();
- }
- final String tableSpec = NexmarkUtils.tableSpec(options, queryName, 0L,
null);
- SerializableFunction<
- ValueInSingleWindow<KV<NexmarkConfiguration, NexmarkPerf>>,
TableDestination>
- tableFunction =
- input ->
- new TableDestination(
- tableSpec.replace("{query}",
input.getValue().getKey().query.getNumberOrName()),
- "perfkit queries");
- SerializableFunction<KV<NexmarkConfiguration, NexmarkPerf>, TableRow>
rowFunction =
- input -> {
- NexmarkPerf nexmarkPerf = input.getValue();
- TableRow row =
- new TableRow()
- .set("timestamp", start.getMillis() / 1000)
- .set("runtimeSec", nexmarkPerf.runtimeSec)
- .set("eventsPerSec", nexmarkPerf.eventsPerSec)
- .set("numResults", nexmarkPerf.numResults);
- return row;
- };
- BigQueryIO.Write io =
- BigQueryIO.<KV<NexmarkConfiguration, NexmarkPerf>>write()
- .to(tableFunction)
- .withSchema(tableSchema)
-
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
-
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
- .withFormatFunction(rowFunction);
- if (testBigQueryServices != null) {
- io = io.withTestServices(testBigQueryServices);
+
+ for (Map.Entry<NexmarkConfiguration, NexmarkPerf> entry :
perfs.entrySet()) {
+ String queryName =
+ NexmarkUtils.fullQueryName(
+ options.getQueryLanguage(),
entry.getKey().query.getNumberOrName());
+ String tableName = NexmarkUtils.tableName(options, queryName, 0L, null);
+
+ ImmutableMap<String, String> schema =
+ ImmutableMap.<String, String>builder()
+ .put("timestamp", "timestamp")
+ .put("runtimeSec", "float")
+ .put("eventsPerSec", "float")
+ .put("numResults", "integer")
+ .build();
+ bigQueryClient.createTableIfNotExists(tableName, schema);
+
+ Map<String, Object> record =
+ ImmutableMap.<String, Object>builder()
+ .put("timestamp", start.getMillis() / 1000)
+ .put("runtimeSec", entry.getValue().runtimeSec)
+ .put("eventsPerSec", entry.getValue().eventsPerSec)
+ .put("numResults", entry.getValue().numResults)
+ .build();
+
+ bigQueryClient.insertRow(record, tableName);
Review comment:
As the BQ mock is very simple (which is fine), are you sure it works on a
real BQ? Have you tested this code to a real instance?
Also does the insert method behaves as previous code (creates the table is
it does not exist and appends rows instead of replace them) ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 163409)
Time Spent: 1.5h (was: 1h 20m)
> Remove pipeline for publishing nexmark results to bigQuery and publish using
> BigQuery API only
> ----------------------------------------------------------------------------------------------
>
> Key: BEAM-5906
> URL: https://issues.apache.org/jira/browse/BEAM-5906
> Project: Beam
> Issue Type: Improvement
> Components: examples-nexmark
> Reporter: Lukasz Gajowy
> Assignee: Lukasz Gajowy
> Priority: Minor
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> There's no need to start a separate pipeline for uploading metrics results
> from Nexmark suites to BigQuery. We can use an API designed for that and
> place it in test-utils. Thanks to that:
> - it won't start a separate pipeline every time it publishes results
> - other suites will be able to use that code
> - We will not face problems like special long to int conversion due to
> problems in BigQueryIO (eg. BEAM-4734) because we will use a thin API instead.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)