Rename NexmarkDriver to Main and NexmarkRunner to NexmarkLauncher
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/683680b1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/683680b1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/683680b1 Branch: refs/heads/master Commit: 683680b1655e79d696a1d0f4588753a7d8ff2b82 Parents: 77eabba Author: Etienne Chauchot <[email protected]> Authored: Tue May 9 10:17:06 2017 +0200 Committer: Ismaël MejÃa <[email protected]> Committed: Wed Aug 23 19:07:28 2017 +0200 ---------------------------------------------------------------------- .../apache/beam/integration/nexmark/Main.java | 304 +++++ .../beam/integration/nexmark/NexmarkDriver.java | 304 ----- .../integration/nexmark/NexmarkLauncher.java | 1172 ++++++++++++++++++ .../beam/integration/nexmark/NexmarkRunner.java | 1172 ------------------ 4 files changed, 1476 insertions(+), 1476 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/683680b1/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java new file mode 100644 index 0000000..da4d446 --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +import org.apache.beam.integration.nexmark.model.Auction; +import org.apache.beam.integration.nexmark.model.Bid; +import org.apache.beam.integration.nexmark.model.Person; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * An implementation of the 'NEXMark queries' for Google Dataflow. + * These are multiple queries over a three table schema representing an online auction system: + * <ul> + * <li>{@link Person} represents a person submitting an item for auction and/or making a bid + * on an auction. + * <li>{@link Auction} represents an item under auction. + * <li>{@link Bid} represents a bid for an item under auction. + * </ul> + * The queries exercise many aspects of streaming dataflow. + * + * <p>We synthesize the creation of people, auctions and bids in real-time. The data is not + * particularly sensible. + * + * <p>See <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/"> + * http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a> + */ +public class Main<OptionT extends NexmarkOptions> { + + /** + * Entry point. + */ + void runAll(OptionT options, NexmarkLauncher nexmarkLauncher) { + Instant start = Instant.now(); + Map<NexmarkConfiguration, NexmarkPerf> baseline = loadBaseline(options.getBaselineFilename()); + Map<NexmarkConfiguration, NexmarkPerf> actual = new LinkedHashMap<>(); + Iterable<NexmarkConfiguration> configurations = options.getSuite().getConfigurations(options); + + boolean successful = true; + try { + // Run all the configurations. + for (NexmarkConfiguration configuration : configurations) { + NexmarkPerf perf = nexmarkLauncher.run(configuration); + if (perf != null) { + if (perf.errors == null || perf.errors.size() > 0) { + successful = false; + } + appendPerf(options.getPerfFilename(), configuration, perf); + actual.put(configuration, perf); + // Summarize what we've run so far. + saveSummary(null, configurations, actual, baseline, start); + } + } + } finally { + if (options.getMonitorJobs()) { + // Report overall performance. + saveSummary(options.getSummaryFilename(), configurations, actual, baseline, start); + saveJavascript(options.getJavascriptFilename(), configurations, actual, baseline, start); + } + } + + if (!successful) { + throw new RuntimeException("Execution was not successful"); + } + } + + /** + * Append the pair of {@code configuration} and {@code perf} to perf file. + */ + private void appendPerf( + @Nullable String perfFilename, NexmarkConfiguration configuration, + NexmarkPerf perf) { + if (perfFilename == null) { + return; + } + List<String> lines = new ArrayList<>(); + lines.add(""); + lines.add(String.format("# %s", Instant.now())); + lines.add(String.format("# %s", configuration.toShortString())); + lines.add(configuration.toString()); + lines.add(perf.toString()); + try { + Files.write(Paths.get(perfFilename), lines, StandardCharsets.UTF_8, StandardOpenOption.CREATE, + StandardOpenOption.APPEND); + } catch (IOException e) { + throw new RuntimeException("Unable to write perf file: ", e); + } + NexmarkUtils.console("appended results to perf file %s.", perfFilename); + } + + /** + * Load the baseline perf. + */ + @Nullable + private static Map<NexmarkConfiguration, NexmarkPerf> loadBaseline( + @Nullable String baselineFilename) { + if (baselineFilename == null) { + return null; + } + Map<NexmarkConfiguration, NexmarkPerf> baseline = new LinkedHashMap<>(); + List<String> lines; + try { + lines = Files.readAllLines(Paths.get(baselineFilename), StandardCharsets.UTF_8); + } catch (IOException e) { + throw new RuntimeException("Unable to read baseline perf file: ", e); + } + for (int i = 0; i < lines.size(); i++) { + if (lines.get(i).startsWith("#") || lines.get(i).trim().isEmpty()) { + continue; + } + NexmarkConfiguration configuration = NexmarkConfiguration.fromString(lines.get(i++)); + NexmarkPerf perf = NexmarkPerf.fromString(lines.get(i)); + baseline.put(configuration, perf); + } + NexmarkUtils.console("loaded %d entries from baseline file %s.", baseline.size(), + baselineFilename); + return baseline; + } + + private static final String LINE = + "=========================================================================================="; + + /** + * Print summary of {@code actual} vs (if non-null) {@code baseline}. + */ + private static void saveSummary( + @Nullable String summaryFilename, + Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual, + @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) { + List<String> lines = new ArrayList<>(); + + lines.add(""); + lines.add(LINE); + + lines.add( + String.format("Run started %s and ran for %s", start, new Duration(start, Instant.now()))); + lines.add(""); + + lines.add("Default configuration:"); + lines.add(NexmarkConfiguration.DEFAULT.toString()); + lines.add(""); + + lines.add("Configurations:"); + lines.add(" Conf Description"); + int conf = 0; + for (NexmarkConfiguration configuration : configurations) { + lines.add(String.format(" %04d %s", conf++, configuration.toShortString())); + NexmarkPerf actualPerf = actual.get(configuration); + if (actualPerf != null && actualPerf.jobId != null) { + lines.add(String.format(" %4s [Ran as job %s]", "", actualPerf.jobId)); + } + } + + lines.add(""); + lines.add("Performance:"); + lines.add(String.format(" %4s %12s %12s %12s %12s %12s %12s", "Conf", "Runtime(sec)", + "(Baseline)", "Events(/sec)", "(Baseline)", "Results", "(Baseline)")); + conf = 0; + for (NexmarkConfiguration configuration : configurations) { + String line = String.format(" %04d ", conf++); + NexmarkPerf actualPerf = actual.get(configuration); + if (actualPerf == null) { + line += "*** not run ***"; + } else { + NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration); + double runtimeSec = actualPerf.runtimeSec; + line += String.format("%12.1f ", runtimeSec); + if (baselinePerf == null) { + line += String.format("%12s ", ""); + } else { + double baselineRuntimeSec = baselinePerf.runtimeSec; + double diff = ((runtimeSec - baselineRuntimeSec) / baselineRuntimeSec) * 100.0; + line += String.format("%+11.2f%% ", diff); + } + + double eventsPerSec = actualPerf.eventsPerSec; + line += String.format("%12.1f ", eventsPerSec); + if (baselinePerf == null) { + line += String.format("%12s ", ""); + } else { + double baselineEventsPerSec = baselinePerf.eventsPerSec; + double diff = ((eventsPerSec - baselineEventsPerSec) / baselineEventsPerSec) * 100.0; + line += String.format("%+11.2f%% ", diff); + } + + long numResults = actualPerf.numResults; + line += String.format("%12d ", numResults); + if (baselinePerf == null) { + line += String.format("%12s", ""); + } else { + long baselineNumResults = baselinePerf.numResults; + long diff = numResults - baselineNumResults; + line += String.format("%+12d", diff); + } + } + lines.add(line); + + if (actualPerf != null) { + List<String> errors = actualPerf.errors; + if (errors == null) { + errors = new ArrayList<>(); + errors.add("NexmarkGoogleRunner returned null errors list"); + } + for (String error : errors) { + lines.add(String.format(" %4s *** %s ***", "", error)); + } + } + } + + lines.add(LINE); + lines.add(""); + + for (String line : lines) { + System.out.println(line); + } + + if (summaryFilename != null) { + try { + Files.write(Paths.get(summaryFilename), lines, StandardCharsets.UTF_8, + StandardOpenOption.CREATE, StandardOpenOption.APPEND); + } catch (IOException e) { + throw new RuntimeException("Unable to save summary file: ", e); + } + NexmarkUtils.console("appended summary to summary file %s.", summaryFilename); + } + } + + /** + * Write all perf data and any baselines to a javascript file which can be used by + * graphing page etc. + */ + private static void saveJavascript( + @Nullable String javascriptFilename, + Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual, + @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) { + if (javascriptFilename == null) { + return; + } + + List<String> lines = new ArrayList<>(); + lines.add(String.format( + "// Run started %s and ran for %s", start, new Duration(start, Instant.now()))); + lines.add("var all = ["); + + for (NexmarkConfiguration configuration : configurations) { + lines.add(" {"); + lines.add(String.format(" config: %s", configuration)); + NexmarkPerf actualPerf = actual.get(configuration); + if (actualPerf != null) { + lines.add(String.format(" ,perf: %s", actualPerf)); + } + NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration); + if (baselinePerf != null) { + lines.add(String.format(" ,baseline: %s", baselinePerf)); + } + lines.add(" },"); + } + + lines.add("];"); + + try { + Files.write(Paths.get(javascriptFilename), lines, StandardCharsets.UTF_8, + StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + } catch (IOException e) { + throw new RuntimeException("Unable to save javascript file: ", e); + } + NexmarkUtils.console("saved javascript to file %s.", javascriptFilename); + } + + public static void main(String[] args) { + NexmarkOptions options = PipelineOptionsFactory.fromArgs(args) + .withValidation() + .as(NexmarkOptions.class); + NexmarkLauncher<NexmarkOptions> nexmarkLauncher = new NexmarkLauncher<>(options); + new Main<>().runAll(options, nexmarkLauncher); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/683680b1/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java deleted file mode 100644 index a982a8d..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import javax.annotation.Nullable; - -import org.apache.beam.integration.nexmark.model.Auction; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.Person; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** - * An implementation of the 'NEXMark queries' for Google Dataflow. - * These are multiple queries over a three table schema representing an online auction system: - * <ul> - * <li>{@link Person} represents a person submitting an item for auction and/or making a bid - * on an auction. - * <li>{@link Auction} represents an item under auction. - * <li>{@link Bid} represents a bid for an item under auction. - * </ul> - * The queries exercise many aspects of streaming dataflow. - * - * <p>We synthesize the creation of people, auctions and bids in real-time. The data is not - * particularly sensible. - * - * <p>See <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/"> - * http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a> - */ -public class NexmarkDriver<OptionT extends NexmarkOptions> { - - /** - * Entry point. - */ - void runAll(OptionT options, NexmarkRunner runner) { - Instant start = Instant.now(); - Map<NexmarkConfiguration, NexmarkPerf> baseline = loadBaseline(options.getBaselineFilename()); - Map<NexmarkConfiguration, NexmarkPerf> actual = new LinkedHashMap<>(); - Iterable<NexmarkConfiguration> configurations = options.getSuite().getConfigurations(options); - - boolean successful = true; - try { - // Run all the configurations. - for (NexmarkConfiguration configuration : configurations) { - NexmarkPerf perf = runner.run(configuration); - if (perf != null) { - if (perf.errors == null || perf.errors.size() > 0) { - successful = false; - } - appendPerf(options.getPerfFilename(), configuration, perf); - actual.put(configuration, perf); - // Summarize what we've run so far. - saveSummary(null, configurations, actual, baseline, start); - } - } - } finally { - if (options.getMonitorJobs()) { - // Report overall performance. - saveSummary(options.getSummaryFilename(), configurations, actual, baseline, start); - saveJavascript(options.getJavascriptFilename(), configurations, actual, baseline, start); - } - } - - if (!successful) { - throw new RuntimeException("Execution was not successful"); - } - } - - /** - * Append the pair of {@code configuration} and {@code perf} to perf file. - */ - private void appendPerf( - @Nullable String perfFilename, NexmarkConfiguration configuration, - NexmarkPerf perf) { - if (perfFilename == null) { - return; - } - List<String> lines = new ArrayList<>(); - lines.add(""); - lines.add(String.format("# %s", Instant.now())); - lines.add(String.format("# %s", configuration.toShortString())); - lines.add(configuration.toString()); - lines.add(perf.toString()); - try { - Files.write(Paths.get(perfFilename), lines, StandardCharsets.UTF_8, StandardOpenOption.CREATE, - StandardOpenOption.APPEND); - } catch (IOException e) { - throw new RuntimeException("Unable to write perf file: ", e); - } - NexmarkUtils.console("appended results to perf file %s.", perfFilename); - } - - /** - * Load the baseline perf. - */ - @Nullable - private static Map<NexmarkConfiguration, NexmarkPerf> loadBaseline( - @Nullable String baselineFilename) { - if (baselineFilename == null) { - return null; - } - Map<NexmarkConfiguration, NexmarkPerf> baseline = new LinkedHashMap<>(); - List<String> lines; - try { - lines = Files.readAllLines(Paths.get(baselineFilename), StandardCharsets.UTF_8); - } catch (IOException e) { - throw new RuntimeException("Unable to read baseline perf file: ", e); - } - for (int i = 0; i < lines.size(); i++) { - if (lines.get(i).startsWith("#") || lines.get(i).trim().isEmpty()) { - continue; - } - NexmarkConfiguration configuration = NexmarkConfiguration.fromString(lines.get(i++)); - NexmarkPerf perf = NexmarkPerf.fromString(lines.get(i)); - baseline.put(configuration, perf); - } - NexmarkUtils.console("loaded %d entries from baseline file %s.", baseline.size(), - baselineFilename); - return baseline; - } - - private static final String LINE = - "=========================================================================================="; - - /** - * Print summary of {@code actual} vs (if non-null) {@code baseline}. - */ - private static void saveSummary( - @Nullable String summaryFilename, - Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual, - @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) { - List<String> lines = new ArrayList<>(); - - lines.add(""); - lines.add(LINE); - - lines.add( - String.format("Run started %s and ran for %s", start, new Duration(start, Instant.now()))); - lines.add(""); - - lines.add("Default configuration:"); - lines.add(NexmarkConfiguration.DEFAULT.toString()); - lines.add(""); - - lines.add("Configurations:"); - lines.add(" Conf Description"); - int conf = 0; - for (NexmarkConfiguration configuration : configurations) { - lines.add(String.format(" %04d %s", conf++, configuration.toShortString())); - NexmarkPerf actualPerf = actual.get(configuration); - if (actualPerf != null && actualPerf.jobId != null) { - lines.add(String.format(" %4s [Ran as job %s]", "", actualPerf.jobId)); - } - } - - lines.add(""); - lines.add("Performance:"); - lines.add(String.format(" %4s %12s %12s %12s %12s %12s %12s", "Conf", "Runtime(sec)", - "(Baseline)", "Events(/sec)", "(Baseline)", "Results", "(Baseline)")); - conf = 0; - for (NexmarkConfiguration configuration : configurations) { - String line = String.format(" %04d ", conf++); - NexmarkPerf actualPerf = actual.get(configuration); - if (actualPerf == null) { - line += "*** not run ***"; - } else { - NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration); - double runtimeSec = actualPerf.runtimeSec; - line += String.format("%12.1f ", runtimeSec); - if (baselinePerf == null) { - line += String.format("%12s ", ""); - } else { - double baselineRuntimeSec = baselinePerf.runtimeSec; - double diff = ((runtimeSec - baselineRuntimeSec) / baselineRuntimeSec) * 100.0; - line += String.format("%+11.2f%% ", diff); - } - - double eventsPerSec = actualPerf.eventsPerSec; - line += String.format("%12.1f ", eventsPerSec); - if (baselinePerf == null) { - line += String.format("%12s ", ""); - } else { - double baselineEventsPerSec = baselinePerf.eventsPerSec; - double diff = ((eventsPerSec - baselineEventsPerSec) / baselineEventsPerSec) * 100.0; - line += String.format("%+11.2f%% ", diff); - } - - long numResults = actualPerf.numResults; - line += String.format("%12d ", numResults); - if (baselinePerf == null) { - line += String.format("%12s", ""); - } else { - long baselineNumResults = baselinePerf.numResults; - long diff = numResults - baselineNumResults; - line += String.format("%+12d", diff); - } - } - lines.add(line); - - if (actualPerf != null) { - List<String> errors = actualPerf.errors; - if (errors == null) { - errors = new ArrayList<>(); - errors.add("NexmarkGoogleRunner returned null errors list"); - } - for (String error : errors) { - lines.add(String.format(" %4s *** %s ***", "", error)); - } - } - } - - lines.add(LINE); - lines.add(""); - - for (String line : lines) { - System.out.println(line); - } - - if (summaryFilename != null) { - try { - Files.write(Paths.get(summaryFilename), lines, StandardCharsets.UTF_8, - StandardOpenOption.CREATE, StandardOpenOption.APPEND); - } catch (IOException e) { - throw new RuntimeException("Unable to save summary file: ", e); - } - NexmarkUtils.console("appended summary to summary file %s.", summaryFilename); - } - } - - /** - * Write all perf data and any baselines to a javascript file which can be used by - * graphing page etc. - */ - private static void saveJavascript( - @Nullable String javascriptFilename, - Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual, - @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) { - if (javascriptFilename == null) { - return; - } - - List<String> lines = new ArrayList<>(); - lines.add(String.format( - "// Run started %s and ran for %s", start, new Duration(start, Instant.now()))); - lines.add("var all = ["); - - for (NexmarkConfiguration configuration : configurations) { - lines.add(" {"); - lines.add(String.format(" config: %s", configuration)); - NexmarkPerf actualPerf = actual.get(configuration); - if (actualPerf != null) { - lines.add(String.format(" ,perf: %s", actualPerf)); - } - NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration); - if (baselinePerf != null) { - lines.add(String.format(" ,baseline: %s", baselinePerf)); - } - lines.add(" },"); - } - - lines.add("];"); - - try { - Files.write(Paths.get(javascriptFilename), lines, StandardCharsets.UTF_8, - StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); - } catch (IOException e) { - throw new RuntimeException("Unable to save javascript file: ", e); - } - NexmarkUtils.console("saved javascript to file %s.", javascriptFilename); - } - - public static void main(String[] args) { - NexmarkOptions options = PipelineOptionsFactory.fromArgs(args) - .withValidation() - .as(NexmarkOptions.class); - NexmarkRunner<NexmarkOptions> runner = new NexmarkRunner<>(options); - new NexmarkDriver<>().runAll(options, runner); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/683680b1/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java new file mode 100644 index 0000000..ea4ff58 --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java @@ -0,0 +1,1172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.Nullable; +import org.apache.beam.integration.nexmark.model.Auction; +import org.apache.beam.integration.nexmark.model.Bid; +import org.apache.beam.integration.nexmark.model.Event; +import org.apache.beam.integration.nexmark.model.KnownSize; +import org.apache.beam.integration.nexmark.model.Person; +import org.apache.beam.integration.nexmark.queries.NexmarkQuery; +import org.apache.beam.integration.nexmark.queries.NexmarkQueryModel; +import org.apache.beam.integration.nexmark.queries.Query0; +import org.apache.beam.integration.nexmark.queries.Query0Model; +import org.apache.beam.integration.nexmark.queries.Query1; +import org.apache.beam.integration.nexmark.queries.Query10; +import org.apache.beam.integration.nexmark.queries.Query11; +import org.apache.beam.integration.nexmark.queries.Query12; +import org.apache.beam.integration.nexmark.queries.Query1Model; +import org.apache.beam.integration.nexmark.queries.Query2; +import org.apache.beam.integration.nexmark.queries.Query2Model; +import org.apache.beam.integration.nexmark.queries.Query3; +import org.apache.beam.integration.nexmark.queries.Query3Model; +import org.apache.beam.integration.nexmark.queries.Query4; +import org.apache.beam.integration.nexmark.queries.Query4Model; +import org.apache.beam.integration.nexmark.queries.Query5; +import org.apache.beam.integration.nexmark.queries.Query5Model; +import org.apache.beam.integration.nexmark.queries.Query6; +import org.apache.beam.integration.nexmark.queries.Query6Model; +import org.apache.beam.integration.nexmark.queries.Query7; +import org.apache.beam.integration.nexmark.queries.Query7Model; +import org.apache.beam.integration.nexmark.queries.Query8; +import org.apache.beam.integration.nexmark.queries.Query8Model; +import org.apache.beam.integration.nexmark.queries.Query9; +import org.apache.beam.integration.nexmark.queries.Query9Model; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.io.AvroIO; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.joda.time.Duration; + +/** + * Run a single Nexmark query using a given configuration. + */ +public class NexmarkLauncher<OptionT extends NexmarkOptions> { + /** + * Minimum number of samples needed for 'stead-state' rate calculation. + */ + private static final int MIN_SAMPLES = 9; + /** + * Minimum length of time over which to consider samples for 'steady-state' rate calculation. + */ + private static final Duration MIN_WINDOW = Duration.standardMinutes(2); + /** + * Delay between perf samples. + */ + private static final Duration PERF_DELAY = Duration.standardSeconds(15); + /** + * How long to let streaming pipeline run after all events have been generated and we've + * seen no activity. + */ + private static final Duration DONE_DELAY = Duration.standardMinutes(1); + /** + * How long to allow no activity without warning. + */ + private static final Duration STUCK_WARNING_DELAY = Duration.standardMinutes(10); + /** + * How long to let streaming pipeline run after we've + * seen no activity, even if all events have not been generated. + */ + private static final Duration STUCK_TERMINATE_DELAY = Duration.standardDays(3); + /** + * NexmarkOptions shared by all runs. + */ + private final OptionT options; + + /** + * Which configuration we are running. + */ + @Nullable + private NexmarkConfiguration configuration; + + /** + * If in --pubsubMode=COMBINED, the event monitor for the publisher pipeline. Otherwise null. + */ + @Nullable + private Monitor<Event> publisherMonitor; + + /** + * If in --pubsubMode=COMBINED, the pipeline result for the publisher pipeline. Otherwise null. + */ + @Nullable + private PipelineResult publisherResult; + + /** + * Result for the main pipeline. + */ + @Nullable + private PipelineResult mainResult; + + /** + * Query name we are running. + */ + @Nullable + private String queryName; + + public NexmarkLauncher(OptionT options) { + this.options = options; + } + + + /** + * Is this query running in streaming mode? + */ + private boolean isStreaming() { + return options.isStreaming(); + } + + /** + * Return number of cores per worker. + */ + protected int coresPerWorker() { + return 4; + } + + /** + * Return maximum number of workers. + */ + private int maxNumWorkers() { + return 5; + } + + /** + * Return the current value for a long counter, or a default value if can't be retrieved. + * Note this uses only attempted metrics because some runners don't support committed metrics. + */ + private long getCounterMetric(PipelineResult result, String namespace, String name, + long defaultValue) { + //TODO Ismael calc this only once + MetricQueryResults metrics = result.metrics().queryMetrics( + MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build()); + Iterable<MetricResult<Long>> counters = metrics.counters(); + try { + MetricResult<Long> metricResult = counters.iterator().next(); + return metricResult.attempted(); + } catch (NoSuchElementException e) { + //TODO Ismael + } + return defaultValue; + } + + /** + * Return the current value for a long counter, or a default value if can't be retrieved. + * Note this uses only attempted metrics because some runners don't support committed metrics. + */ + private long getDistributionMetric(PipelineResult result, String namespace, String name, + DistributionType distType, long defaultValue) { + MetricQueryResults metrics = result.metrics().queryMetrics( + MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build()); + Iterable<MetricResult<DistributionResult>> distributions = metrics.distributions(); + try { + MetricResult<DistributionResult> distributionResult = distributions.iterator().next(); + if (distType.equals(DistributionType.MIN)) { + return distributionResult.attempted().min(); + } else if (distType.equals(DistributionType.MAX)) { + return distributionResult.attempted().max(); + } else { + //TODO Ismael + } + } catch (NoSuchElementException e) { + //TODO Ismael + } + return defaultValue; + } + + private enum DistributionType {MIN, MAX} + + /** + * Return the current value for a time counter, or -1 if can't be retrieved. + */ + private long getTimestampMetric(long now, long value) { + //TODO Ismael improve doc + if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) { + return -1; + } + return value; + } + + /** + * Find a 'steady state' events/sec from {@code snapshots} and + * store it in {@code perf} if found. + */ + private void captureSteadyState(NexmarkPerf perf, List<NexmarkPerf.ProgressSnapshot> snapshots) { + if (!options.isStreaming()) { + return; + } + + // Find the first sample with actual event and result counts. + int dataStart = 0; + for (; dataStart < snapshots.size(); dataStart++) { + if (snapshots.get(dataStart).numEvents >= 0 && snapshots.get(dataStart).numResults >= 0) { + break; + } + } + + // Find the last sample which demonstrated progress. + int dataEnd = snapshots.size() - 1; + for (; dataEnd > dataStart; dataEnd--) { + if (snapshots.get(dataEnd).anyActivity(snapshots.get(dataEnd - 1))) { + break; + } + } + + int numSamples = dataEnd - dataStart + 1; + if (numSamples < MIN_SAMPLES) { + // Not enough samples. + NexmarkUtils.console("%d samples not enough to calculate steady-state event rate", + numSamples); + return; + } + + // We'll look at only the middle third samples. + int sampleStart = dataStart + numSamples / 3; + int sampleEnd = dataEnd - numSamples / 3; + + double sampleSec = + snapshots.get(sampleEnd).secSinceStart - snapshots.get(sampleStart).secSinceStart; + if (sampleSec < MIN_WINDOW.getStandardSeconds()) { + // Not sampled over enough time. + NexmarkUtils.console( + "sample of %.1f sec not long enough to calculate steady-state event rate", + sampleSec); + return; + } + + // Find rate with least squares error. + double sumxx = 0.0; + double sumxy = 0.0; + long prevNumEvents = -1; + for (int i = sampleStart; i <= sampleEnd; i++) { + if (prevNumEvents == snapshots.get(i).numEvents) { + // Skip samples with no change in number of events since they contribute no data. + continue; + } + // Use the effective runtime instead of wallclock time so we can + // insulate ourselves from delays and stutters in the query manager. + double x = snapshots.get(i).runtimeSec; + prevNumEvents = snapshots.get(i).numEvents; + double y = prevNumEvents; + sumxx += x * x; + sumxy += x * y; + } + double eventsPerSec = sumxy / sumxx; + NexmarkUtils.console("revising events/sec from %.1f to %.1f", perf.eventsPerSec, eventsPerSec); + perf.eventsPerSec = eventsPerSec; + } + + /** + * Return the current performance given {@code eventMonitor} and {@code resultMonitor}. + */ + private NexmarkPerf currentPerf( + long startMsSinceEpoch, long now, PipelineResult result, + List<NexmarkPerf.ProgressSnapshot> snapshots, Monitor<?> eventMonitor, + Monitor<?> resultMonitor) { + NexmarkPerf perf = new NexmarkPerf(); + + long numEvents = + getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + ".elements", -1); + long numEventBytes = + getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + ".bytes", -1); + long eventStart = + getTimestampMetric(now, + getDistributionMetric(result, eventMonitor.name, eventMonitor.prefix + ".startTime", + DistributionType.MIN, -1)); + long eventEnd = + getTimestampMetric(now, + getDistributionMetric(result, eventMonitor.name, eventMonitor.prefix + ".endTime", + DistributionType.MAX, -1)); + + long numResults = + getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + ".elements", -1); + long numResultBytes = + getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + ".bytes", -1); + long resultStart = + getTimestampMetric(now, + getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".startTime", + DistributionType.MIN, -1)); + long resultEnd = + getTimestampMetric(now, + getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".endTime", + DistributionType.MAX, -1)); + long timestampStart = + getTimestampMetric(now, + getDistributionMetric(result, + resultMonitor.name, resultMonitor.prefix + ".startTimestamp", + DistributionType.MIN, -1)); + long timestampEnd = + getTimestampMetric(now, + getDistributionMetric(result, + resultMonitor.name, resultMonitor.prefix + ".endTimestamp", + DistributionType.MAX, -1)); + + long effectiveEnd = -1; + if (eventEnd >= 0 && resultEnd >= 0) { + // It is possible for events to be generated after the last result was emitted. + // (Eg Query 2, which only yields results for a small prefix of the event stream.) + // So use the max of last event and last result times. + effectiveEnd = Math.max(eventEnd, resultEnd); + } else if (resultEnd >= 0) { + effectiveEnd = resultEnd; + } else if (eventEnd >= 0) { + // During startup we may have no result yet, but we would still like to track how + // long the pipeline has been running. + effectiveEnd = eventEnd; + } + + if (effectiveEnd >= 0 && eventStart >= 0 && effectiveEnd >= eventStart) { + perf.runtimeSec = (effectiveEnd - eventStart) / 1000.0; + } + + if (numEvents >= 0) { + perf.numEvents = numEvents; + } + + if (numEvents >= 0 && perf.runtimeSec > 0.0) { + // For streaming we may later replace this with a 'steady-state' value calculated + // from the progress snapshots. + perf.eventsPerSec = numEvents / perf.runtimeSec; + } + + if (numEventBytes >= 0 && perf.runtimeSec > 0.0) { + perf.eventBytesPerSec = numEventBytes / perf.runtimeSec; + } + + if (numResults >= 0) { + perf.numResults = numResults; + } + + if (numResults >= 0 && perf.runtimeSec > 0.0) { + perf.resultsPerSec = numResults / perf.runtimeSec; + } + + if (numResultBytes >= 0 && perf.runtimeSec > 0.0) { + perf.resultBytesPerSec = numResultBytes / perf.runtimeSec; + } + + if (eventStart >= 0) { + perf.startupDelaySec = (eventStart - startMsSinceEpoch) / 1000.0; + } + + if (resultStart >= 0 && eventStart >= 0 && resultStart >= eventStart) { + perf.processingDelaySec = (resultStart - eventStart) / 1000.0; + } + + if (timestampStart >= 0 && timestampEnd >= 0 && perf.runtimeSec > 0.0) { + double eventRuntimeSec = (timestampEnd - timestampStart) / 1000.0; + perf.timeDilation = eventRuntimeSec / perf.runtimeSec; + } + + if (resultEnd >= 0) { + // Fill in the shutdown delay assuming the job has now finished. + perf.shutdownDelaySec = (now - resultEnd) / 1000.0; + } + + // As soon as available, try to capture cumulative cost at this point too. + + NexmarkPerf.ProgressSnapshot snapshot = new NexmarkPerf.ProgressSnapshot(); + snapshot.secSinceStart = (now - startMsSinceEpoch) / 1000.0; + snapshot.runtimeSec = perf.runtimeSec; + snapshot.numEvents = numEvents; + snapshot.numResults = numResults; + snapshots.add(snapshot); + + captureSteadyState(perf, snapshots); + + return perf; + } + + /** + * Build and run a pipeline using specified options. + */ + interface PipelineBuilder<OptionT extends NexmarkOptions> { + void build(OptionT publishOnlyOptions); + } + + /** + * Invoke the builder with options suitable for running a publish-only child pipeline. + */ + private void invokeBuilderForPublishOnlyPipeline(PipelineBuilder<NexmarkOptions> builder) { + builder.build(options); +// throw new UnsupportedOperationException( +// "Cannot use --pubSubMode=COMBINED with DirectRunner"); + } + + /** + * If monitoring, wait until the publisher pipeline has run long enough to establish + * a backlog on the Pubsub topic. Otherwise, return immediately. + */ + private void waitForPublisherPreload() { + throw new UnsupportedOperationException(); + } + + /** + * Monitor the performance and progress of a running job. Return final performance if + * it was measured. + */ + @Nullable + private NexmarkPerf monitor(NexmarkQuery query) { + if (!options.getMonitorJobs()) { + return null; + } + + if (configuration.debug) { + NexmarkUtils.console("Waiting for main pipeline to 'finish'"); + } else { + NexmarkUtils.console("--debug=false, so job will not self-cancel"); + } + + PipelineResult job = mainResult; + PipelineResult publisherJob = publisherResult; + List<NexmarkPerf.ProgressSnapshot> snapshots = new ArrayList<>(); + long startMsSinceEpoch = System.currentTimeMillis(); + long endMsSinceEpoch = -1; + if (options.getRunningTimeMinutes() != null) { + endMsSinceEpoch = startMsSinceEpoch + + Duration.standardMinutes(options.getRunningTimeMinutes()).getMillis() + - Duration.standardSeconds(configuration.preloadSeconds).getMillis(); + } + long lastActivityMsSinceEpoch = -1; + NexmarkPerf perf = null; + boolean waitingForShutdown = false; + boolean publisherCancelled = false; + List<String> errors = new ArrayList<>(); + + while (true) { + long now = System.currentTimeMillis(); + if (endMsSinceEpoch >= 0 && now > endMsSinceEpoch && !waitingForShutdown) { + NexmarkUtils.console("Reached end of test, cancelling job"); + try { + job.cancel(); + } catch (IOException e) { + throw new RuntimeException("Unable to cancel main job: ", e); + } + if (publisherResult != null) { + try { + publisherJob.cancel(); + } catch (IOException e) { + throw new RuntimeException("Unable to cancel publisher job: ", e); + } + publisherCancelled = true; + } + waitingForShutdown = true; + } + + PipelineResult.State state = job.getState(); + NexmarkUtils.console("%s %s%s", state, queryName, + waitingForShutdown ? " (waiting for shutdown)" : ""); + + NexmarkPerf currPerf; + if (configuration.debug) { + currPerf = currentPerf(startMsSinceEpoch, now, job, snapshots, + query.eventMonitor, query.resultMonitor); + } else { + currPerf = null; + } + + if (perf == null || perf.anyActivity(currPerf)) { + lastActivityMsSinceEpoch = now; + } + + if (options.isStreaming() && !waitingForShutdown) { + Duration quietFor = new Duration(lastActivityMsSinceEpoch, now); + long fatalCount = getCounterMetric(job, query.getName(), "fatal", 0); + if (fatalCount > 0) { + NexmarkUtils.console("job has fatal errors, cancelling."); + errors.add(String.format("Pipeline reported %s fatal errors", fatalCount)); + waitingForShutdown = true; + } else if (configuration.debug && configuration.numEvents > 0 + && currPerf.numEvents == configuration.numEvents + && currPerf.numResults >= 0 && quietFor.isLongerThan(DONE_DELAY)) { + NexmarkUtils.console("streaming query appears to have finished, cancelling job."); + waitingForShutdown = true; + } else if (quietFor.isLongerThan(STUCK_TERMINATE_DELAY)) { + NexmarkUtils.console("streaming query appears to have gotten stuck, cancelling job."); + errors.add("Streaming job was cancelled since appeared stuck"); + waitingForShutdown = true; + } else if (quietFor.isLongerThan(STUCK_WARNING_DELAY)) { + NexmarkUtils.console("WARNING: streaming query appears to have been stuck for %d min.", + quietFor.getStandardMinutes()); + errors.add( + String.format("Streaming query was stuck for %d min", quietFor.getStandardMinutes())); + } + + if (waitingForShutdown) { + try { + job.cancel(); + } catch (IOException e) { + throw new RuntimeException("Unable to cancel main job: ", e); + } + } + } + + perf = currPerf; + + boolean running = true; + switch (state) { + case UNKNOWN: + case STOPPED: + case RUNNING: + // Keep going. + break; + case DONE: + // All done. + running = false; + break; + case CANCELLED: + running = false; + if (!waitingForShutdown) { + errors.add("Job was unexpectedly cancelled"); + } + break; + case FAILED: + case UPDATED: + // Abnormal termination. + running = false; + errors.add("Job was unexpectedly updated"); + break; + } + + if (!running) { + break; + } + + if (lastActivityMsSinceEpoch == now) { + NexmarkUtils.console("new perf %s", perf); + } else { + NexmarkUtils.console("no activity"); + } + + try { + Thread.sleep(PERF_DELAY.getMillis()); + } catch (InterruptedException e) { + Thread.interrupted(); + NexmarkUtils.console("Interrupted: pipeline is still running"); + } + } + + perf.errors = errors; + perf.snapshots = snapshots; + + if (publisherResult != null) { + NexmarkUtils.console("Shutting down publisher pipeline."); + try { + if (!publisherCancelled) { + publisherJob.cancel(); + } + publisherJob.waitUntilFinish(Duration.standardMinutes(5)); + } catch (IOException e) { + throw new RuntimeException("Unable to cancel publisher job: ", e); + } //TODO Ismael +// catch (InterruptedException e) { +// Thread.interrupted(); +// throw new RuntimeException("Interrupted: publish job still running.", e); +// } + } + + return perf; + } + + // ================================================================================ + // Basic sources and sinks + // ================================================================================ + + /** + * Return a topic name. + */ + private String shortTopic(long now) { + String baseTopic = options.getPubsubTopic(); + if (Strings.isNullOrEmpty(baseTopic)) { + throw new RuntimeException("Missing --pubsubTopic"); + } + switch (options.getResourceNameMode()) { + case VERBATIM: + return baseTopic; + case QUERY: + return String.format("%s_%s_source", baseTopic, queryName); + case QUERY_AND_SALT: + return String.format("%s_%s_%d_source", baseTopic, queryName, now); + } + throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode()); + } + + /** + * Return a subscription name. + */ + private String shortSubscription(long now) { + String baseSubscription = options.getPubsubSubscription(); + if (Strings.isNullOrEmpty(baseSubscription)) { + throw new RuntimeException("Missing --pubsubSubscription"); + } + switch (options.getResourceNameMode()) { + case VERBATIM: + return baseSubscription; + case QUERY: + return String.format("%s_%s_source", baseSubscription, queryName); + case QUERY_AND_SALT: + return String.format("%s_%s_%d_source", baseSubscription, queryName, now); + } + throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode()); + } + + /** + * Return a file name for plain text. + */ + private String textFilename(long now) { + String baseFilename = options.getOutputPath(); + if (Strings.isNullOrEmpty(baseFilename)) { + throw new RuntimeException("Missing --outputPath"); + } + switch (options.getResourceNameMode()) { + case VERBATIM: + return baseFilename; + case QUERY: + return String.format("%s/nexmark_%s.txt", baseFilename, queryName); + case QUERY_AND_SALT: + return String.format("%s/nexmark_%s_%d.txt", baseFilename, queryName, now); + } + throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode()); + } + + /** + * Return a BigQuery table spec. + */ + private String tableSpec(long now, String version) { + String baseTableName = options.getBigQueryTable(); + if (Strings.isNullOrEmpty(baseTableName)) { + throw new RuntimeException("Missing --bigQueryTable"); + } + switch (options.getResourceNameMode()) { + case VERBATIM: + return String.format("%s:nexmark.%s_%s", + options.getProject(), baseTableName, version); + case QUERY: + return String.format("%s:nexmark.%s_%s_%s", + options.getProject(), baseTableName, queryName, version); + case QUERY_AND_SALT: + return String.format("%s:nexmark.%s_%s_%s_%d", + options.getProject(), baseTableName, queryName, version, now); + } + throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode()); + } + + /** + * Return a directory for logs. + */ + private String logsDir(long now) { + String baseFilename = options.getOutputPath(); + if (Strings.isNullOrEmpty(baseFilename)) { + throw new RuntimeException("Missing --outputPath"); + } + switch (options.getResourceNameMode()) { + case VERBATIM: + return baseFilename; + case QUERY: + return String.format("%s/logs_%s", baseFilename, queryName); + case QUERY_AND_SALT: + return String.format("%s/logs_%s_%d", baseFilename, queryName, now); + } + throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode()); + } + + /** + * Return a source of synthetic events. + */ + private PCollection<Event> sourceEventsFromSynthetic(Pipeline p) { + if (isStreaming()) { + NexmarkUtils.console("Generating %d events in streaming mode", configuration.numEvents); + return p.apply(queryName + ".ReadUnbounded", NexmarkUtils.streamEventsSource(configuration)); + } else { + NexmarkUtils.console("Generating %d events in batch mode", configuration.numEvents); + return p.apply(queryName + ".ReadBounded", NexmarkUtils.batchEventsSource(configuration)); + } + } + + /** + * Return source of events from Pubsub. + */ + private PCollection<Event> sourceEventsFromPubsub(Pipeline p, long now) { + String shortSubscription = shortSubscription(now); + NexmarkUtils.console("Reading events from Pubsub %s", shortSubscription); + + PubsubIO.Read<PubsubMessage> io = + PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(shortSubscription) + .withIdAttribute(NexmarkUtils.PUBSUB_ID); + if (!configuration.usePubsubPublishTime) { + io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP); + } + + return p + .apply(queryName + ".ReadPubsubEvents", io) + .apply(queryName + ".PubsubMessageToEvent", ParDo.of(new DoFn<PubsubMessage, Event>() { + @ProcessElement + public void processElement(ProcessContext c) { + byte[] payload = c.element().getPayload(); + try { + Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload); + c.output(event); + } catch (CoderException e) { + // TODO Log decoding Event error + } + } + })); + } + + /** + * Return Avro source of events from {@code options.getInputFilePrefix}. + */ + private PCollection<Event> sourceEventsFromAvro(Pipeline p) { + String filename = options.getInputPath(); + if (Strings.isNullOrEmpty(filename)) { + throw new RuntimeException("Missing --inputPath"); + } + NexmarkUtils.console("Reading events from Avro files at %s", filename); + return p + .apply(queryName + ".ReadAvroEvents", AvroIO.read(Event.class) + .from(filename + "*.avro")) + .apply("OutputWithTimestamp", NexmarkQuery.EVENT_TIMESTAMP_FROM_DATA); + } + + /** + * Send {@code events} to Pubsub. + */ + private void sinkEventsToPubsub(PCollection<Event> events, long now) { + String shortTopic = shortTopic(now); + NexmarkUtils.console("Writing events to Pubsub %s", shortTopic); + + PubsubIO.Write<PubsubMessage> io = + PubsubIO.writePubsubMessages().to(shortTopic) + .withIdAttribute(NexmarkUtils.PUBSUB_ID); + if (!configuration.usePubsubPublishTime) { + io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP); + } + + events.apply(queryName + ".EventToPubsubMessage", + ParDo.of(new DoFn<Event, PubsubMessage>() { + @ProcessElement + public void processElement(ProcessContext c) { + try { + byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, c.element()); + c.output(new PubsubMessage(payload, new HashMap<String, String>())); + } catch (CoderException e1) { + // TODO Log encoding Event error + } + } + }) + ) + .apply(queryName + ".WritePubsubEvents", io); + } + + /** + * Send {@code formattedResults} to Pubsub. + */ + private void sinkResultsToPubsub(PCollection<String> formattedResults, long now) { + String shortTopic = shortTopic(now); + NexmarkUtils.console("Writing results to Pubsub %s", shortTopic); + PubsubIO.Write<String> io = + PubsubIO.writeStrings().to(shortTopic) + .withIdAttribute(NexmarkUtils.PUBSUB_ID); + if (!configuration.usePubsubPublishTime) { + io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP); + } + formattedResults.apply(queryName + ".WritePubsubResults", io); + } + + /** + * Sink all raw Events in {@code source} to {@code options.getOutputPath}. + * This will configure the job to write the following files: + * <ul> + * <li>{@code $outputPath/event*.avro} All Event entities. + * <li>{@code $outputPath/auction*.avro} Auction entities. + * <li>{@code $outputPath/bid*.avro} Bid entities. + * <li>{@code $outputPath/person*.avro} Person entities. + * </ul> + * + * @param source A PCollection of events. + */ + private void sinkEventsToAvro(PCollection<Event> source) { + String filename = options.getOutputPath(); + if (Strings.isNullOrEmpty(filename)) { + throw new RuntimeException("Missing --outputPath"); + } + NexmarkUtils.console("Writing events to Avro files at %s", filename); + source.apply(queryName + ".WriteAvroEvents", + AvroIO.write(Event.class).to(filename + "/event").withSuffix(".avro")); + source.apply(NexmarkQuery.JUST_BIDS) + .apply(queryName + ".WriteAvroBids", + AvroIO.write(Bid.class).to(filename + "/bid").withSuffix(".avro")); + source.apply(NexmarkQuery.JUST_NEW_AUCTIONS) + .apply(queryName + ".WriteAvroAuctions", + AvroIO.write(Auction.class).to(filename + "/auction").withSuffix(".avro")); + source.apply(NexmarkQuery.JUST_NEW_PERSONS) + .apply(queryName + ".WriteAvroPeople", + AvroIO.write(Person.class).to(filename + "/person").withSuffix(".avro")); + } + + /** + * Send {@code formattedResults} to text files. + */ + private void sinkResultsToText(PCollection<String> formattedResults, long now) { + String filename = textFilename(now); + NexmarkUtils.console("Writing results to text files at %s", filename); + formattedResults.apply(queryName + ".WriteTextResults", + TextIO.write().to(filename)); + } + + private static class StringToTableRow extends DoFn<String, TableRow> { + @ProcessElement + public void processElement(ProcessContext c) { + int n = ThreadLocalRandom.current().nextInt(10); + List<TableRow> records = new ArrayList<>(n); + for (int i = 0; i < n; i++) { + records.add(new TableRow().set("index", i).set("value", Integer.toString(i))); + } + c.output(new TableRow().set("result", c.element()).set("records", records)); + } + } + + /** + * Send {@code formattedResults} to BigQuery. + */ + private void sinkResultsToBigQuery( + PCollection<String> formattedResults, long now, + String version) { + String tableSpec = tableSpec(now, version); + TableSchema tableSchema = + new TableSchema().setFields(ImmutableList.of( + new TableFieldSchema().setName("result").setType("STRING"), + new TableFieldSchema().setName("records").setMode("REPEATED").setType("RECORD") + .setFields(ImmutableList.of( + new TableFieldSchema().setName("index").setType("INTEGER"), + new TableFieldSchema().setName("value").setType("STRING"))))); + NexmarkUtils.console("Writing results to BigQuery table %s", tableSpec); + BigQueryIO.Write io = + BigQueryIO.write().to(tableSpec) + .withSchema(tableSchema) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND); + formattedResults + .apply(queryName + ".StringToTableRow", ParDo.of(new StringToTableRow())) + .apply(queryName + ".WriteBigQueryResults", io); + } + + // ================================================================================ + // Construct overall pipeline + // ================================================================================ + + /** + * Return source of events for this run, or null if we are simply publishing events + * to Pubsub. + */ + private PCollection<Event> createSource(Pipeline p, final long now) { + PCollection<Event> source = null; + switch (configuration.sourceType) { + case DIRECT: + source = sourceEventsFromSynthetic(p); + break; + case AVRO: + source = sourceEventsFromAvro(p); + break; + case PUBSUB: + // Setup the sink for the publisher. + switch (configuration.pubSubMode) { + case SUBSCRIBE_ONLY: + // Nothing to publish. + break; + case PUBLISH_ONLY: + // Send synthesized events to Pubsub in this job. + sinkEventsToPubsub(sourceEventsFromSynthetic(p).apply(queryName + ".Snoop", + NexmarkUtils.snoop(queryName)), now); + break; + case COMBINED: + // Send synthesized events to Pubsub in separate publisher job. + // We won't start the main pipeline until the publisher has sent the pre-load events. + // We'll shutdown the publisher job when we notice the main job has finished. + invokeBuilderForPublishOnlyPipeline(new PipelineBuilder<NexmarkOptions>() { + @Override + public void build(NexmarkOptions publishOnlyOptions) { + Pipeline sp = Pipeline.create(options); + NexmarkUtils.setupPipeline(configuration.coderStrategy, sp); + publisherMonitor = new Monitor<>(queryName, "publisher"); + sinkEventsToPubsub( + sourceEventsFromSynthetic(sp) + .apply(queryName + ".Monitor", publisherMonitor.getTransform()), + now); + publisherResult = sp.run(); + } + }); + break; + } + + // Setup the source for the consumer. + switch (configuration.pubSubMode) { + case PUBLISH_ONLY: + // Nothing to consume. Leave source null. + break; + case SUBSCRIBE_ONLY: + case COMBINED: + // Read events from pubsub. + source = sourceEventsFromPubsub(p, now); + break; + } + break; + } + return source; + } + + private static final TupleTag<String> MAIN = new TupleTag<String>(){}; + private static final TupleTag<String> SIDE = new TupleTag<String>(){}; + + private static class PartitionDoFn extends DoFn<String, String> { + @ProcessElement + public void processElement(ProcessContext c) { + if (c.element().hashCode() % 2 == 0) { + c.output(c.element()); + } else { + c.output(SIDE, c.element()); + } + } + } + + /** + * Consume {@code results}. + */ + private void sink(PCollection<TimestampedValue<KnownSize>> results, long now) { + if (configuration.sinkType == NexmarkUtils.SinkType.COUNT_ONLY) { + // Avoid the cost of formatting the results. + results.apply(queryName + ".DevNull", NexmarkUtils.devNull(queryName)); + return; + } + + PCollection<String> formattedResults = + results.apply(queryName + ".Format", NexmarkUtils.format(queryName)); + if (options.getLogResults()) { + formattedResults = formattedResults.apply(queryName + ".Results.Log", + NexmarkUtils.<String>log(queryName + ".Results")); + } + + switch (configuration.sinkType) { + case DEVNULL: + // Discard all results + formattedResults.apply(queryName + ".DevNull", NexmarkUtils.devNull(queryName)); + break; + case PUBSUB: + sinkResultsToPubsub(formattedResults, now); + break; + case TEXT: + sinkResultsToText(formattedResults, now); + break; + case AVRO: + NexmarkUtils.console( + "WARNING: with --sinkType=AVRO, actual query results will be discarded."); + break; + case BIGQUERY: + // Multiple BigQuery backends to mimic what most customers do. + PCollectionTuple res = formattedResults.apply(queryName + ".Partition", + ParDo.of(new PartitionDoFn()).withOutputTags(MAIN, TupleTagList.of(SIDE))); + sinkResultsToBigQuery(res.get(MAIN), now, "main"); + sinkResultsToBigQuery(res.get(SIDE), now, "side"); + sinkResultsToBigQuery(formattedResults, now, "copy"); + break; + case COUNT_ONLY: + // Short-circuited above. + throw new RuntimeException(); + } + } + + // ================================================================================ + // Entry point + // ================================================================================ + + /** + * Calculate the distribution of the expected rate of results per minute (in event time, not + * wallclock time). + */ + private void modelResultRates(NexmarkQueryModel model) { + List<Long> counts = Lists.newArrayList(model.simulator().resultsPerWindow()); + Collections.sort(counts); + int n = counts.size(); + if (n < 5) { + NexmarkUtils.console("Query%d: only %d samples", model.configuration.query, n); + } else { + NexmarkUtils.console("Query%d: N:%d; min:%d; 1st%%:%d; mean:%d; 3rd%%:%d; max:%d", + model.configuration.query, n, counts.get(0), counts.get(n / 4), + counts.get(n / 2), + counts.get(n - 1 - n / 4), counts.get(n - 1)); + } + } + + /** + * Run {@code configuration} and return its performance if possible. + */ + @Nullable + public NexmarkPerf run(NexmarkConfiguration runConfiguration) { + if (options.getManageResources() && !options.getMonitorJobs()) { + throw new RuntimeException("If using --manageResources then must also use --monitorJobs."); + } + + // + // Setup per-run state. + // + checkState(configuration == null); + checkState(queryName == null); + configuration = runConfiguration; + + try { + NexmarkUtils.console("Running %s", configuration.toShortString()); + + if (configuration.numEvents < 0) { + NexmarkUtils.console("skipping since configuration is disabled"); + return null; + } + + List<NexmarkQuery> queries = Arrays.asList(new Query0(configuration), + new Query1(configuration), + new Query2(configuration), + new Query3(configuration), + new Query4(configuration), + new Query5(configuration), + new Query6(configuration), + new Query7(configuration), + new Query8(configuration), + new Query9(configuration), + new Query10(configuration), + new Query11(configuration), + new Query12(configuration)); + NexmarkQuery query = queries.get(configuration.query); + queryName = query.getName(); + + List<NexmarkQueryModel> models = Arrays.asList( + new Query0Model(configuration), + new Query1Model(configuration), + new Query2Model(configuration), + new Query3Model(configuration), + new Query4Model(configuration), + new Query5Model(configuration), + new Query6Model(configuration), + new Query7Model(configuration), + new Query8Model(configuration), + new Query9Model(configuration), + null, + null, + null); + NexmarkQueryModel model = models.get(configuration.query); + + if (options.getJustModelResultRate()) { + if (model == null) { + throw new RuntimeException(String.format("No model for %s", queryName)); + } + modelResultRates(model); + return null; + } + + long now = System.currentTimeMillis(); + Pipeline p = Pipeline.create(options); + NexmarkUtils.setupPipeline(configuration.coderStrategy, p); + + // Generate events. + PCollection<Event> source = createSource(p, now); + + if (options.getLogEvents()) { + source = source.apply(queryName + ".Events.Log", + NexmarkUtils.<Event>log(queryName + ".Events")); + } + + // Source will be null if source type is PUBSUB and mode is PUBLISH_ONLY. + // In that case there's nothing more to add to pipeline. + if (source != null) { + // Optionally sink events in Avro format. + // (Query results are ignored). + if (configuration.sinkType == NexmarkUtils.SinkType.AVRO) { + sinkEventsToAvro(source); + } + + // Special hacks for Query 10 (big logger). + if (configuration.query == 10) { + String path = null; + if (options.getOutputPath() != null && !options.getOutputPath().isEmpty()) { + path = logsDir(now); + } + ((Query10) query).setOutputPath(path); + ((Query10) query).setMaxNumWorkers(maxNumWorkers()); + } + + // Apply query. + PCollection<TimestampedValue<KnownSize>> results = source.apply(query); + + if (options.getAssertCorrectness()) { + if (model == null) { + throw new RuntimeException(String.format("No model for %s", queryName)); + } + // We know all our streams have a finite number of elements. + results.setIsBoundedInternal(PCollection.IsBounded.BOUNDED); + // If we have a finite number of events then assert our pipeline's + // results match those of a model using the same sequence of events. + PAssert.that(results).satisfies(model.assertionFor()); + } + + // Output results. + sink(results, now); + } + + if (publisherResult != null) { + waitForPublisherPreload(); + } + mainResult = p.run(); + mainResult.waitUntilFinish(Duration.standardSeconds(configuration.streamTimeout)); + return monitor(query); + } finally { + configuration = null; + queryName = null; + } + } +}
