[
https://issues.apache.org/jira/browse/BEAM-5996?focusedWorklogId=167696&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167696
]
ASF GitHub Bot logged work on BEAM-5996:
----------------------------------------
Author: ASF GitHub Bot
Created on: 20/Nov/18 10:04
Start Date: 20/Nov/18 10:04
Worklog Time Spent: 10m
Work Description: echauchot closed pull request #7041: [BEAM-5996] Append
query name to temp location.
URL: https://github.com/apache/beam/pull/7041
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
index 2461584847f..8e6dbc9f4fa 100644
---
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
+++
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
@@ -80,13 +80,13 @@ private Result(NexmarkConfiguration configuration,
NexmarkPerf perf) {
private final NexmarkConfiguration configuration;
private Run(NexmarkOptions options, NexmarkConfiguration configuration) {
- this.nexmarkLauncher = new NexmarkLauncher<>(options);
+ this.nexmarkLauncher = new NexmarkLauncher<>(options, configuration);
this.configuration = configuration;
}
@Override
public Result call() throws IOException {
- NexmarkPerf perf = nexmarkLauncher.run(configuration);
+ NexmarkPerf perf = nexmarkLauncher.run();
return new Result(configuration, perf);
}
}
@@ -95,10 +95,7 @@ public Result call() throws IOException {
void runAll(String[] args) throws IOException {
NexmarkOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(NexmarkOptions.class);
- runAll(options);
- }
- void runAll(NexmarkOptions options) throws IOException {
Instant start = Instant.now();
Map<NexmarkConfiguration, NexmarkPerf> baseline =
loadBaseline(options.getBaselineFilename());
Map<NexmarkConfiguration, NexmarkPerf> actual = new LinkedHashMap<>();
@@ -111,7 +108,8 @@ void runAll(NexmarkOptions options) throws IOException {
try {
// Schedule all the configurations.
for (NexmarkConfiguration configuration : configurations) {
- completion.submit(new Run(options, configuration));
+ NexmarkOptions optionsCopy =
PipelineOptionsFactory.fromArgs(args).as(NexmarkOptions.class);
+ completion.submit(new Run(optionsCopy, configuration));
}
// Collect all the results.
diff --git
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
index cbc1de0d270..73b8344e21c 100644
---
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
+++
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
@@ -129,11 +129,12 @@
* not been generated.
*/
private static final Duration STUCK_TERMINATE_DELAY =
Duration.standardDays(3);
- /** NexmarkOptions shared by all runs. */
+
+ /** NexmarkOptions for this run. */
private final OptionT options;
/** Which configuration we are running. */
- @Nullable private NexmarkConfiguration configuration;
+ private NexmarkConfiguration configuration;
/** If in --pubsubMode=COMBINED, the event monitor for the publisher
pipeline. Otherwise null. */
@Nullable private Monitor<Event> publisherMonitor;
@@ -157,8 +158,9 @@
@Nullable private PubsubHelper pubsubHelper;
- public NexmarkLauncher(OptionT options) {
+ public NexmarkLauncher(OptionT options, NexmarkConfiguration configuration) {
this.options = options;
+ this.configuration = configuration;
}
/** Is this query running in streaming mode? */
@@ -1064,7 +1066,7 @@ private void modelResultRates(NexmarkQueryModel model) {
/** Run {@code configuration} and return its performance if possible. */
@Nullable
- public NexmarkPerf run(NexmarkConfiguration runConfiguration) throws
IOException {
+ public NexmarkPerf run() throws IOException {
if (options.getManageResources() && !options.getMonitorJobs()) {
throw new RuntimeException("If using --manageResources then must also
use --monitorJobs.");
}
@@ -1072,9 +1074,7 @@ public NexmarkPerf run(NexmarkConfiguration
runConfiguration) throws IOException
//
// Setup per-run state.
//
- checkState(configuration == null);
checkState(queryName == null);
- configuration = runConfiguration;
if (configuration.sourceType.equals(SourceType.PUBSUB)) {
pubsubHelper = PubsubHelper.create(options);
}
@@ -1095,6 +1095,11 @@ public NexmarkPerf run(NexmarkConfiguration
runConfiguration) throws IOException
queryName = query.getName();
+ // Append queryName to temp location
+ if (!"".equals(options.getTempLocation())) {
+ options.setTempLocation(options.getTempLocation() + "/" + queryName);
+ }
+
NexmarkQueryModel model = getNexmarkQueryModel();
if (options.getJustModelResultRate()) {
diff --git
a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/MainTest.java
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/MainTest.java
index 23211d30fa4..d7580176e88 100644
---
a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/MainTest.java
+++
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/MainTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.nexmark;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.Test;
/**
@@ -27,11 +26,8 @@
public class MainTest {
@Test
public void testSmokeSuiteOnDirectRunner() throws Exception {
- NexmarkOptions options =
PipelineOptionsFactory.create().as(NexmarkOptions.class);
// Default for SMOKE is 100k or 10k for heavier queries - way overkill for
"smoke" test
- options.setNumEvents(500L);
- options.setSuite(NexmarkSuite.SMOKE);
- options.setManageResources(false);
- new Main().runAll(options);
+ final String[] pipelineArgs = {"--numEvents=500", "--suite=SMOKE",
"--manageResources=false"};
+ new Main().runAll(pipelineArgs);
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 167696)
Time Spent: 1h 20m (was: 1h 10m)
> Nexmark postCommits are failing for Dataflow
> --------------------------------------------
>
> Key: BEAM-5996
> URL: https://issues.apache.org/jira/browse/BEAM-5996
> Project: Beam
> Issue Type: Improvement
> Components: examples-nexmark, test-failures
> Reporter: Etienne Chauchot
> Assignee: Andrew Pilloud
> Priority: Major
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> Here is the gradle build scan: [https://scans.gradle.com/s/co2uh5xame2pc]
> [~apilloud] I took the liberty to assign it to you as you did the nexmark to
> dataflow integration. Feel free to re-assign if needed.
>
> {code:java}
> java.io.IOException:
> com.google.api.client.googleapis.json.GoogleJsonResponseException: 410 Gone
> {
> "code" : 429,
> "errors" : [ {
> "domain" : "usageLimits",
> "message" : "The total number of changes to the object
> temp-storage-for-perf-tests/nexmark/staging/beam-runners-google-cloud-dataflow-java-legacy-worker-2.9.0-SNAPSHOT-WjJ-Xb9pjar2Y6llqnj5Zg.jar
> exceeds the rate limit. Please reduce the rate of create, update, and delete
> requests.",
> "reason" : "rateLimitExceeded"
> } ],
> "message" : "The total number of changes to the object
> temp-storage-for-perf-tests/nexmark/staging/beam-runners-google-cloud-dataflow-java-legacy-worker-2.9.0-SNAPSHOT-WjJ-Xb9pjar2Y6llqnj5Zg.jar
> exceeds the rate limit. Please reduce the rate of create, update, and delete
> requests."
> }
> at
> com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(AbstractGoogleAsyncWriteChannel.java:432)
> at
> com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.close(AbstractGoogleAsyncWriteChannel.java:287)
> at
> org.apache.beam.runners.dataflow.util.PackageUtil.$closeResource(PackageUtil.java:260)
> at
> org.apache.beam.runners.dataflow.util.PackageUtil.tryStagePackage(PackageUtil.java:260)
> at
> org.apache.beam.runners.dataflow.util.PackageUtil.tryStagePackageWithRetry(PackageUtil.java:203)
> at
> org.apache.beam.runners.dataflow.util.PackageUtil.stagePackageSynchronously(PackageUtil.java:187)
> at
> org.apache.beam.runners.dataflow.util.PackageUtil.lambda$stagePackage$1(PackageUtil.java:171)
> at org.apache.beam.sdk.util.MoreFutures.lambda$supplyAsync$0(MoreFutures.java:
> 104)
> at
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748){code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)