Activate monitoring on NexmarkSparkRunner and on specific runners issue #28
Fix compilation issue after rebase + make checkstyle happy again Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a1fe33bc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a1fe33bc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a1fe33bc Branch: refs/heads/master Commit: a1fe33bc122b26960697c87620ca0dc2ed522e56 Parents: a095e40 Author: Etienne Chauchot <[email protected]> Authored: Wed Mar 15 15:25:41 2017 +0100 Committer: Ismaël MejÃa <[email protected]> Committed: Wed Aug 23 19:07:27 2017 +0200 ---------------------------------------------------------------------- integration/java/nexmark/pom.xml | 32 ++++++++++---------- .../integration/nexmark/NexmarkApexRunner.java | 2 -- .../nexmark/NexmarkDirectRunner.java | 5 --- .../integration/nexmark/NexmarkFlinkRunner.java | 12 +------- .../nexmark/NexmarkGoogleDriver.java | 2 -- .../nexmark/NexmarkGoogleRunner.java | 2 +- .../beam/integration/nexmark/NexmarkRunner.java | 13 ++++---- .../integration/nexmark/NexmarkSparkDriver.java | 4 +-- .../integration/nexmark/NexmarkSparkRunner.java | 11 +------ .../beam/integration/nexmark/NexmarkUtils.java | 3 +- .../apache/beam/integration/nexmark/Query5.java | 3 +- 11 files changed, 31 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/pom.xml ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml index 07d14c2..febd96d 100644 --- a/integration/java/nexmark/pom.xml +++ b/integration/java/nexmark/pom.xml @@ -179,28 +179,28 @@ <artifactId>beam-runners-flink_2.10</artifactId> </dependency> - <!--<dependency>--> - <!--<groupId>org.apache.flink</groupId>--> - <!--<artifactId>flink-shaded-hadoop2</artifactId>--> - <!--<version>${flink.version}</version>--> - <!--<scope>provided</scope>--> - <!--</dependency>--> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-hadoop2</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> <!-- Spark runner --> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-spark</artifactId> </dependency> - <!--<dependency>--> - <!--<groupId>org.apache.spark</groupId>--> - <!--<artifactId>spark-core_2.10</artifactId>--> - <!--<version>${spark.version}</version>--> - <!--</dependency>--> - <!--<dependency>--> - <!--<groupId>org.apache.spark</groupId>--> - <!--<artifactId>spark-streaming_2.10</artifactId>--> - <!--<version>${spark.version}</version>--> - <!--</dependency>--> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.10</artifactId> + <version>${spark.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_2.10</artifactId> + <version>${spark.version}</version> + </dependency> <!-- Apex runner --> <dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java index f2da1c7..ea46082 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java @@ -18,8 +18,6 @@ package org.apache.beam.integration.nexmark; import javax.annotation.Nullable; -import org.apache.beam.runners.apex.ApexRunnerResult; -import org.apache.beam.sdk.PipelineResult; /** * Run a query using the Apex runner. http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java index ee234b1..c70e41e 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java @@ -17,11 +17,6 @@ */ package org.apache.beam.integration.nexmark; -import javax.annotation.Nullable; -import org.apache.beam.runners.dataflow.DataflowPipelineJob; -import org.apache.beam.runners.direct.DirectRunner; -import org.apache.beam.sdk.PipelineResult; - /** * Run a single query using the Direct Runner. */ http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java index a8b4401..8e22917 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java @@ -17,10 +17,6 @@ */ package org.apache.beam.integration.nexmark; -import javax.annotation.Nullable; -import org.apache.beam.runners.flink.FlinkRunnerResult; -import org.apache.beam.sdk.PipelineResult; - /** * Run a query using the Flink runner. */ @@ -42,7 +38,7 @@ public class NexmarkFlinkRunner extends NexmarkRunner<NexmarkFlinkDriver.Nexmark @Override protected boolean canMonitor() { - return false; + return true; } @Override @@ -56,12 +52,6 @@ public class NexmarkFlinkRunner extends NexmarkRunner<NexmarkFlinkDriver.Nexmark throw new UnsupportedOperationException(); } - @Override - @Nullable - protected NexmarkPerf monitor(NexmarkQuery query) { - return null; - } - public NexmarkFlinkRunner(NexmarkFlinkDriver.NexmarkFlinkOptions options) { super(options); } http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java index 67c4aeb..50c2a7c 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java @@ -17,10 +17,8 @@ */ package org.apache.beam.integration.nexmark; -import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; -import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; /** http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java index c78bb42..135d428 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java @@ -66,7 +66,7 @@ class NexmarkGoogleRunner extends NexmarkRunner<NexmarkGoogleDriver.NexmarkGoogl @Override protected String getJobId(PipelineResult job) { - return ((DataflowPipelineJob)job).getJobId(); + return ((DataflowPipelineJob) job).getJobId(); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java index 5365dbe..8d4c1f1 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java @@ -53,8 +53,6 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.joda.time.Duration; -import static org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate; - /** * Run a single Nexmark query using a given configuration. */ @@ -203,7 +201,8 @@ public abstract class NexmarkRunner<OptionT extends Options> { * Find a 'steady state' events/sec from {@code snapshots} and * store it in {@code perf} if found. */ - protected void captureSteadyState(NexmarkPerf perf, List<NexmarkPerf.ProgressSnapshot> snapshots) { + protected void captureSteadyState(NexmarkPerf perf, + List<NexmarkPerf.ProgressSnapshot> snapshots) { if (!options.isStreaming()) { return; } @@ -365,7 +364,9 @@ public abstract class NexmarkRunner<OptionT extends Options> { return perf; } - String getJobId(PipelineResult job){return "";} + String getJobId(PipelineResult job) { + return ""; + } // TODO specific to dataflow, see if we can find an equivalent /* @@ -926,8 +927,8 @@ public abstract class NexmarkRunner<OptionT extends Options> { new TableFieldSchema().setName("index").setType("INTEGER"), new TableFieldSchema().setName("value").setType("STRING"))))); NexmarkUtils.console("Writing results to BigQuery table %s", tableSpec); - BigQueryIO.Write.Bound io = - BigQueryIO.Write.to(tableSpec) + BigQueryIO.Write io = + BigQueryIO.write().to(tableSpec) .withSchema(tableSchema) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND); http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java index 1ea963d..a46d38a 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java @@ -18,7 +18,6 @@ package org.apache.beam.integration.nexmark; import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.sdk.options.PipelineOptionsFactory; /** @@ -39,7 +38,8 @@ class NexmarkSparkDriver extends NexmarkDriver<NexmarkSparkDriver.NexmarkSparkOp PipelineOptionsFactory.fromArgs(args) .withValidation() .as(NexmarkSparkOptions.class); - options.setRunner(SparkRunner.class); +// options.setRunner(org.apache.beam.runners.spark.SparkRunner.class); + options.setRunner(org.apache.beam.runners.spark.SparkRunnerDebugger.class); NexmarkSparkRunner runner = new NexmarkSparkRunner(options); new NexmarkSparkDriver().runAll(options, runner); } http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java index 109e8a0..32fee30 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java @@ -17,10 +17,6 @@ */ package org.apache.beam.integration.nexmark; -import javax.annotation.Nullable; -import org.apache.beam.runners.spark.SparkPipelineResult; -import org.apache.beam.sdk.PipelineResult; - /** * Run a query using the Spark runner. */ @@ -42,7 +38,7 @@ public class NexmarkSparkRunner extends NexmarkRunner<NexmarkSparkDriver.Nexmark @Override protected boolean canMonitor() { - return false; + return true; } @Override @@ -56,11 +52,6 @@ public class NexmarkSparkRunner extends NexmarkRunner<NexmarkSparkDriver.Nexmark throw new UnsupportedOperationException(); } - @Override - @Nullable - protected NexmarkPerf monitor(NexmarkQuery query) { - return null; - } public NexmarkSparkRunner(NexmarkSparkDriver.NexmarkSparkOptions options) { super(options); http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java index 6588f85..8f4cb22 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java @@ -37,7 +37,6 @@ import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; @@ -325,8 +324,8 @@ public class NexmarkUtils { * Setup pipeline with codes and some other options. */ public static void setupPipeline(CoderStrategy coderStrategy, Pipeline p) { - PipelineRunner<?> runner = p.getRunner(); //TODO Ismael check +// PipelineRunner<?> runner = p.getRunner(); // if (runner instanceof DirectRunner) { // // Disable randomization of output since we want to check batch and streaming match the // // model both locally and on the cloud. http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java index 7001986..9020494 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java @@ -67,7 +67,8 @@ class Query5 extends NexmarkQuery { // Count the number of bids per auction id. .apply(Count.<Long>perElement()) - // We'll want to keep all auctions with the maximal number of bids. + //TODO replace by simple key + // We'll want to keep all auctions with the maximal number of bids. // Start by lifting each into a singleton list. .apply(name + ".ToSingletons", ParDo.of(new DoFn<KV<Long, Long>, KV<List<Long>, Long>>() {
