http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java deleted file mode 100644 index a609975..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java +++ /dev/null @@ -1,1158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import static com.google.common.base.Preconditions.checkState; - -import com.google.api.services.bigquery.model.TableFieldSchema; -import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.concurrent.ThreadLocalRandom; -import javax.annotation.Nullable; -import org.apache.beam.integration.nexmark.model.Auction; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.Event; -import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.integration.nexmark.model.Person; -import org.apache.beam.integration.nexmark.queries.NexmarkQuery; -import org.apache.beam.integration.nexmark.queries.NexmarkQueryModel; -import org.apache.beam.integration.nexmark.queries.Query0; -import org.apache.beam.integration.nexmark.queries.Query0Model; -import org.apache.beam.integration.nexmark.queries.Query1; -import org.apache.beam.integration.nexmark.queries.Query10; -import org.apache.beam.integration.nexmark.queries.Query11; -import org.apache.beam.integration.nexmark.queries.Query12; -import org.apache.beam.integration.nexmark.queries.Query1Model; -import org.apache.beam.integration.nexmark.queries.Query2; -import org.apache.beam.integration.nexmark.queries.Query2Model; -import org.apache.beam.integration.nexmark.queries.Query3; -import org.apache.beam.integration.nexmark.queries.Query3Model; -import org.apache.beam.integration.nexmark.queries.Query4; -import org.apache.beam.integration.nexmark.queries.Query4Model; -import org.apache.beam.integration.nexmark.queries.Query5; -import org.apache.beam.integration.nexmark.queries.Query5Model; -import org.apache.beam.integration.nexmark.queries.Query6; -import org.apache.beam.integration.nexmark.queries.Query6Model; -import org.apache.beam.integration.nexmark.queries.Query7; -import org.apache.beam.integration.nexmark.queries.Query7Model; -import org.apache.beam.integration.nexmark.queries.Query8; -import org.apache.beam.integration.nexmark.queries.Query8Model; -import org.apache.beam.integration.nexmark.queries.Query9; -import org.apache.beam.integration.nexmark.queries.Query9Model; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; -import org.apache.beam.sdk.metrics.DistributionResult; -import org.apache.beam.sdk.metrics.MetricNameFilter; -import org.apache.beam.sdk.metrics.MetricQueryResults; -import org.apache.beam.sdk.metrics.MetricResult; -import org.apache.beam.sdk.metrics.MetricsFilter; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TimestampedValue; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; -import org.joda.time.Duration; -import org.slf4j.LoggerFactory; - -/** - * Run a single Nexmark query using a given configuration. - */ -public class NexmarkLauncher<OptionT extends NexmarkOptions> { - private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(NexmarkLauncher.class); - /** - * Minimum number of samples needed for 'stead-state' rate calculation. - */ - private static final int MIN_SAMPLES = 9; - /** - * Minimum length of time over which to consider samples for 'steady-state' rate calculation. - */ - private static final Duration MIN_WINDOW = Duration.standardMinutes(2); - /** - * Delay between perf samples. - */ - private static final Duration PERF_DELAY = Duration.standardSeconds(15); - /** - * How long to let streaming pipeline run after all events have been generated and we've - * seen no activity. - */ - private static final Duration DONE_DELAY = Duration.standardMinutes(1); - /** - * How long to allow no activity without warning. - */ - private static final Duration STUCK_WARNING_DELAY = Duration.standardMinutes(10); - /** - * How long to let streaming pipeline run after we've - * seen no activity, even if all events have not been generated. - */ - private static final Duration STUCK_TERMINATE_DELAY = Duration.standardDays(3); - /** - * NexmarkOptions shared by all runs. - */ - private final OptionT options; - - /** - * Which configuration we are running. - */ - @Nullable - private NexmarkConfiguration configuration; - - /** - * If in --pubsubMode=COMBINED, the event monitor for the publisher pipeline. Otherwise null. - */ - @Nullable - private Monitor<Event> publisherMonitor; - - /** - * If in --pubsubMode=COMBINED, the pipeline result for the publisher pipeline. Otherwise null. - */ - @Nullable - private PipelineResult publisherResult; - - /** - * Result for the main pipeline. - */ - @Nullable - private PipelineResult mainResult; - - /** - * Query name we are running. - */ - @Nullable - private String queryName; - - public NexmarkLauncher(OptionT options) { - this.options = options; - } - - - /** - * Is this query running in streaming mode? - */ - private boolean isStreaming() { - return options.isStreaming(); - } - - /** - * Return maximum number of workers. - */ - private int maxNumWorkers() { - return 5; - } - - /** - * Return the current value for a long counter, or a default value if can't be retrieved. - * Note this uses only attempted metrics because some runners don't support committed metrics. - */ - private long getCounterMetric(PipelineResult result, String namespace, String name, - long defaultValue) { - MetricQueryResults metrics = result.metrics().queryMetrics( - MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build()); - Iterable<MetricResult<Long>> counters = metrics.counters(); - try { - MetricResult<Long> metricResult = counters.iterator().next(); - return metricResult.attempted(); - } catch (NoSuchElementException e) { - LOG.error("Failed to get metric {}, from namespace {}", name, namespace); - } - return defaultValue; - } - - /** - * Return the current value for a long counter, or a default value if can't be retrieved. - * Note this uses only attempted metrics because some runners don't support committed metrics. - */ - private long getDistributionMetric(PipelineResult result, String namespace, String name, - DistributionType distType, long defaultValue) { - MetricQueryResults metrics = result.metrics().queryMetrics( - MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build()); - Iterable<MetricResult<DistributionResult>> distributions = metrics.distributions(); - try { - MetricResult<DistributionResult> distributionResult = distributions.iterator().next(); - switch (distType) - { - case MIN: - return distributionResult.attempted().min(); - case MAX: - return distributionResult.attempted().max(); - default: - return defaultValue; - } - } catch (NoSuchElementException e) { - LOG.error( - "Failed to get distribution metric {} for namespace {}", - name, - namespace); - } - return defaultValue; - } - - private enum DistributionType {MIN, MAX} - - /** - * Return the current value for a time counter, or -1 if can't be retrieved. - */ - private long getTimestampMetric(long now, long value) { - // timestamp metrics are used to monitor time of execution of transforms. - // If result timestamp metric is too far from now, consider that metric is erroneous - - if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) { - return -1; - } - return value; - } - - /** - * Find a 'steady state' events/sec from {@code snapshots} and - * store it in {@code perf} if found. - */ - private void captureSteadyState(NexmarkPerf perf, List<NexmarkPerf.ProgressSnapshot> snapshots) { - if (!options.isStreaming()) { - return; - } - - // Find the first sample with actual event and result counts. - int dataStart = 0; - for (; dataStart < snapshots.size(); dataStart++) { - if (snapshots.get(dataStart).numEvents >= 0 && snapshots.get(dataStart).numResults >= 0) { - break; - } - } - - // Find the last sample which demonstrated progress. - int dataEnd = snapshots.size() - 1; - for (; dataEnd > dataStart; dataEnd--) { - if (snapshots.get(dataEnd).anyActivity(snapshots.get(dataEnd - 1))) { - break; - } - } - - int numSamples = dataEnd - dataStart + 1; - if (numSamples < MIN_SAMPLES) { - // Not enough samples. - NexmarkUtils.console("%d samples not enough to calculate steady-state event rate", - numSamples); - return; - } - - // We'll look at only the middle third samples. - int sampleStart = dataStart + numSamples / 3; - int sampleEnd = dataEnd - numSamples / 3; - - double sampleSec = - snapshots.get(sampleEnd).secSinceStart - snapshots.get(sampleStart).secSinceStart; - if (sampleSec < MIN_WINDOW.getStandardSeconds()) { - // Not sampled over enough time. - NexmarkUtils.console( - "sample of %.1f sec not long enough to calculate steady-state event rate", - sampleSec); - return; - } - - // Find rate with least squares error. - double sumxx = 0.0; - double sumxy = 0.0; - long prevNumEvents = -1; - for (int i = sampleStart; i <= sampleEnd; i++) { - if (prevNumEvents == snapshots.get(i).numEvents) { - // Skip samples with no change in number of events since they contribute no data. - continue; - } - // Use the effective runtime instead of wallclock time so we can - // insulate ourselves from delays and stutters in the query manager. - double x = snapshots.get(i).runtimeSec; - prevNumEvents = snapshots.get(i).numEvents; - double y = prevNumEvents; - sumxx += x * x; - sumxy += x * y; - } - double eventsPerSec = sumxy / sumxx; - NexmarkUtils.console("revising events/sec from %.1f to %.1f", perf.eventsPerSec, eventsPerSec); - perf.eventsPerSec = eventsPerSec; - } - - /** - * Return the current performance given {@code eventMonitor} and {@code resultMonitor}. - */ - private NexmarkPerf currentPerf( - long startMsSinceEpoch, long now, PipelineResult result, - List<NexmarkPerf.ProgressSnapshot> snapshots, Monitor<?> eventMonitor, - Monitor<?> resultMonitor) { - NexmarkPerf perf = new NexmarkPerf(); - - long numEvents = - getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + ".elements", -1); - long numEventBytes = - getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + ".bytes", -1); - long eventStart = - getTimestampMetric(now, - getDistributionMetric(result, eventMonitor.name, eventMonitor.prefix + ".startTime", - DistributionType.MIN, -1)); - long eventEnd = - getTimestampMetric(now, - getDistributionMetric(result, eventMonitor.name, eventMonitor.prefix + ".endTime", - DistributionType.MAX, -1)); - - long numResults = - getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + ".elements", -1); - long numResultBytes = - getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + ".bytes", -1); - long resultStart = - getTimestampMetric(now, - getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".startTime", - DistributionType.MIN, -1)); - long resultEnd = - getTimestampMetric(now, - getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".endTime", - DistributionType.MAX, -1)); - long timestampStart = - getTimestampMetric(now, - getDistributionMetric(result, - resultMonitor.name, resultMonitor.prefix + ".startTimestamp", - DistributionType.MIN, -1)); - long timestampEnd = - getTimestampMetric(now, - getDistributionMetric(result, - resultMonitor.name, resultMonitor.prefix + ".endTimestamp", - DistributionType.MAX, -1)); - - long effectiveEnd = -1; - if (eventEnd >= 0 && resultEnd >= 0) { - // It is possible for events to be generated after the last result was emitted. - // (Eg Query 2, which only yields results for a small prefix of the event stream.) - // So use the max of last event and last result times. - effectiveEnd = Math.max(eventEnd, resultEnd); - } else if (resultEnd >= 0) { - effectiveEnd = resultEnd; - } else if (eventEnd >= 0) { - // During startup we may have no result yet, but we would still like to track how - // long the pipeline has been running. - effectiveEnd = eventEnd; - } - - if (effectiveEnd >= 0 && eventStart >= 0 && effectiveEnd >= eventStart) { - perf.runtimeSec = (effectiveEnd - eventStart) / 1000.0; - } - - if (numEvents >= 0) { - perf.numEvents = numEvents; - } - - if (numEvents >= 0 && perf.runtimeSec > 0.0) { - // For streaming we may later replace this with a 'steady-state' value calculated - // from the progress snapshots. - perf.eventsPerSec = numEvents / perf.runtimeSec; - } - - if (numEventBytes >= 0 && perf.runtimeSec > 0.0) { - perf.eventBytesPerSec = numEventBytes / perf.runtimeSec; - } - - if (numResults >= 0) { - perf.numResults = numResults; - } - - if (numResults >= 0 && perf.runtimeSec > 0.0) { - perf.resultsPerSec = numResults / perf.runtimeSec; - } - - if (numResultBytes >= 0 && perf.runtimeSec > 0.0) { - perf.resultBytesPerSec = numResultBytes / perf.runtimeSec; - } - - if (eventStart >= 0) { - perf.startupDelaySec = (eventStart - startMsSinceEpoch) / 1000.0; - } - - if (resultStart >= 0 && eventStart >= 0 && resultStart >= eventStart) { - perf.processingDelaySec = (resultStart - eventStart) / 1000.0; - } - - if (timestampStart >= 0 && timestampEnd >= 0 && perf.runtimeSec > 0.0) { - double eventRuntimeSec = (timestampEnd - timestampStart) / 1000.0; - perf.timeDilation = eventRuntimeSec / perf.runtimeSec; - } - - if (resultEnd >= 0) { - // Fill in the shutdown delay assuming the job has now finished. - perf.shutdownDelaySec = (now - resultEnd) / 1000.0; - } - - // As soon as available, try to capture cumulative cost at this point too. - - NexmarkPerf.ProgressSnapshot snapshot = new NexmarkPerf.ProgressSnapshot(); - snapshot.secSinceStart = (now - startMsSinceEpoch) / 1000.0; - snapshot.runtimeSec = perf.runtimeSec; - snapshot.numEvents = numEvents; - snapshot.numResults = numResults; - snapshots.add(snapshot); - - captureSteadyState(perf, snapshots); - - return perf; - } - - /** - * Build and run a pipeline using specified options. - */ - interface PipelineBuilder<OptionT extends NexmarkOptions> { - void build(OptionT publishOnlyOptions); - } - - /** - * Invoke the builder with options suitable for running a publish-only child pipeline. - */ - private void invokeBuilderForPublishOnlyPipeline(PipelineBuilder<NexmarkOptions> builder) { - builder.build(options); - } - - /** - * Monitor the performance and progress of a running job. Return final performance if - * it was measured. - */ - @Nullable - private NexmarkPerf monitor(NexmarkQuery query) { - if (!options.getMonitorJobs()) { - return null; - } - - if (configuration.debug) { - NexmarkUtils.console("Waiting for main pipeline to 'finish'"); - } else { - NexmarkUtils.console("--debug=false, so job will not self-cancel"); - } - - PipelineResult job = mainResult; - PipelineResult publisherJob = publisherResult; - List<NexmarkPerf.ProgressSnapshot> snapshots = new ArrayList<>(); - long startMsSinceEpoch = System.currentTimeMillis(); - long endMsSinceEpoch = -1; - if (options.getRunningTimeMinutes() != null) { - endMsSinceEpoch = startMsSinceEpoch - + Duration.standardMinutes(options.getRunningTimeMinutes()).getMillis() - - Duration.standardSeconds(configuration.preloadSeconds).getMillis(); - } - long lastActivityMsSinceEpoch = -1; - NexmarkPerf perf = null; - boolean waitingForShutdown = false; - boolean publisherCancelled = false; - List<String> errors = new ArrayList<>(); - - while (true) { - long now = System.currentTimeMillis(); - if (endMsSinceEpoch >= 0 && now > endMsSinceEpoch && !waitingForShutdown) { - NexmarkUtils.console("Reached end of test, cancelling job"); - try { - job.cancel(); - } catch (IOException e) { - throw new RuntimeException("Unable to cancel main job: ", e); - } - if (publisherResult != null) { - try { - publisherJob.cancel(); - } catch (IOException e) { - throw new RuntimeException("Unable to cancel publisher job: ", e); - } - publisherCancelled = true; - } - waitingForShutdown = true; - } - - PipelineResult.State state = job.getState(); - NexmarkUtils.console("%s %s%s", state, queryName, - waitingForShutdown ? " (waiting for shutdown)" : ""); - - NexmarkPerf currPerf; - if (configuration.debug) { - currPerf = currentPerf(startMsSinceEpoch, now, job, snapshots, - query.eventMonitor, query.resultMonitor); - } else { - currPerf = null; - } - - if (perf == null || perf.anyActivity(currPerf)) { - lastActivityMsSinceEpoch = now; - } - - if (options.isStreaming() && !waitingForShutdown) { - Duration quietFor = new Duration(lastActivityMsSinceEpoch, now); - long fatalCount = getCounterMetric(job, query.getName(), "fatal", 0); - if (fatalCount > 0) { - NexmarkUtils.console("job has fatal errors, cancelling."); - errors.add(String.format("Pipeline reported %s fatal errors", fatalCount)); - waitingForShutdown = true; - } else if (configuration.debug && configuration.numEvents > 0 - && currPerf.numEvents == configuration.numEvents - && currPerf.numResults >= 0 && quietFor.isLongerThan(DONE_DELAY)) { - NexmarkUtils.console("streaming query appears to have finished, cancelling job."); - waitingForShutdown = true; - } else if (quietFor.isLongerThan(STUCK_TERMINATE_DELAY)) { - NexmarkUtils.console("streaming query appears to have gotten stuck, cancelling job."); - errors.add("Streaming job was cancelled since appeared stuck"); - waitingForShutdown = true; - } else if (quietFor.isLongerThan(STUCK_WARNING_DELAY)) { - NexmarkUtils.console("WARNING: streaming query appears to have been stuck for %d min.", - quietFor.getStandardMinutes()); - errors.add( - String.format("Streaming query was stuck for %d min", quietFor.getStandardMinutes())); - } - - if (waitingForShutdown) { - try { - job.cancel(); - } catch (IOException e) { - throw new RuntimeException("Unable to cancel main job: ", e); - } - } - } - - perf = currPerf; - - boolean running = true; - switch (state) { - case UNKNOWN: - case STOPPED: - case RUNNING: - // Keep going. - break; - case DONE: - // All done. - running = false; - break; - case CANCELLED: - running = false; - if (!waitingForShutdown) { - errors.add("Job was unexpectedly cancelled"); - } - break; - case FAILED: - case UPDATED: - // Abnormal termination. - running = false; - errors.add("Job was unexpectedly updated"); - break; - } - - if (!running) { - break; - } - - if (lastActivityMsSinceEpoch == now) { - NexmarkUtils.console("new perf %s", perf); - } else { - NexmarkUtils.console("no activity"); - } - - try { - Thread.sleep(PERF_DELAY.getMillis()); - } catch (InterruptedException e) { - Thread.interrupted(); - NexmarkUtils.console("Interrupted: pipeline is still running"); - } - } - - perf.errors = errors; - perf.snapshots = snapshots; - - if (publisherResult != null) { - NexmarkUtils.console("Shutting down publisher pipeline."); - try { - if (!publisherCancelled) { - publisherJob.cancel(); - } - publisherJob.waitUntilFinish(Duration.standardMinutes(5)); - } catch (IOException e) { - throw new RuntimeException("Unable to cancel publisher job: ", e); - } - } - - return perf; - } - - // ================================================================================ - // Basic sources and sinks - // ================================================================================ - - /** - * Return a topic name. - */ - private String shortTopic(long now) { - String baseTopic = options.getPubsubTopic(); - if (Strings.isNullOrEmpty(baseTopic)) { - throw new RuntimeException("Missing --pubsubTopic"); - } - switch (options.getResourceNameMode()) { - case VERBATIM: - return baseTopic; - case QUERY: - return String.format("%s_%s_source", baseTopic, queryName); - case QUERY_AND_SALT: - return String.format("%s_%s_%d_source", baseTopic, queryName, now); - } - throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode()); - } - - /** - * Return a subscription name. - */ - private String shortSubscription(long now) { - String baseSubscription = options.getPubsubSubscription(); - if (Strings.isNullOrEmpty(baseSubscription)) { - throw new RuntimeException("Missing --pubsubSubscription"); - } - switch (options.getResourceNameMode()) { - case VERBATIM: - return baseSubscription; - case QUERY: - return String.format("%s_%s_source", baseSubscription, queryName); - case QUERY_AND_SALT: - return String.format("%s_%s_%d_source", baseSubscription, queryName, now); - } - throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode()); - } - - /** - * Return a file name for plain text. - */ - private String textFilename(long now) { - String baseFilename = options.getOutputPath(); - if (Strings.isNullOrEmpty(baseFilename)) { - throw new RuntimeException("Missing --outputPath"); - } - switch (options.getResourceNameMode()) { - case VERBATIM: - return baseFilename; - case QUERY: - return String.format("%s/nexmark_%s.txt", baseFilename, queryName); - case QUERY_AND_SALT: - return String.format("%s/nexmark_%s_%d.txt", baseFilename, queryName, now); - } - throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode()); - } - - /** - * Return a BigQuery table spec. - */ - private String tableSpec(long now, String version) { - String baseTableName = options.getBigQueryTable(); - if (Strings.isNullOrEmpty(baseTableName)) { - throw new RuntimeException("Missing --bigQueryTable"); - } - switch (options.getResourceNameMode()) { - case VERBATIM: - return String.format("%s:nexmark.%s_%s", - options.getProject(), baseTableName, version); - case QUERY: - return String.format("%s:nexmark.%s_%s_%s", - options.getProject(), baseTableName, queryName, version); - case QUERY_AND_SALT: - return String.format("%s:nexmark.%s_%s_%s_%d", - options.getProject(), baseTableName, queryName, version, now); - } - throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode()); - } - - /** - * Return a directory for logs. - */ - private String logsDir(long now) { - String baseFilename = options.getOutputPath(); - if (Strings.isNullOrEmpty(baseFilename)) { - throw new RuntimeException("Missing --outputPath"); - } - switch (options.getResourceNameMode()) { - case VERBATIM: - return baseFilename; - case QUERY: - return String.format("%s/logs_%s", baseFilename, queryName); - case QUERY_AND_SALT: - return String.format("%s/logs_%s_%d", baseFilename, queryName, now); - } - throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode()); - } - - /** - * Return a source of synthetic events. - */ - private PCollection<Event> sourceEventsFromSynthetic(Pipeline p) { - if (isStreaming()) { - NexmarkUtils.console("Generating %d events in streaming mode", configuration.numEvents); - return p.apply(queryName + ".ReadUnbounded", NexmarkUtils.streamEventsSource(configuration)); - } else { - NexmarkUtils.console("Generating %d events in batch mode", configuration.numEvents); - return p.apply(queryName + ".ReadBounded", NexmarkUtils.batchEventsSource(configuration)); - } - } - - /** - * Return source of events from Pubsub. - */ - private PCollection<Event> sourceEventsFromPubsub(Pipeline p, long now) { - String shortSubscription = shortSubscription(now); - NexmarkUtils.console("Reading events from Pubsub %s", shortSubscription); - - PubsubIO.Read<PubsubMessage> io = - PubsubIO.readMessagesWithAttributes().fromSubscription(shortSubscription) - .withIdAttribute(NexmarkUtils.PUBSUB_ID); - if (!configuration.usePubsubPublishTime) { - io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP); - } - - return p - .apply(queryName + ".ReadPubsubEvents", io) - .apply(queryName + ".PubsubMessageToEvent", ParDo.of(new DoFn<PubsubMessage, Event>() { - @ProcessElement - public void processElement(ProcessContext c) { - byte[] payload = c.element().getPayload(); - try { - Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload); - c.output(event); - } catch (CoderException e) { - LOG.error("Error while decoding Event from pusbSub message: serialization error"); - } - } - })); - } - - /** - * Return Avro source of events from {@code options.getInputFilePrefix}. - */ - private PCollection<Event> sourceEventsFromAvro(Pipeline p) { - String filename = options.getInputPath(); - if (Strings.isNullOrEmpty(filename)) { - throw new RuntimeException("Missing --inputPath"); - } - NexmarkUtils.console("Reading events from Avro files at %s", filename); - return p - .apply(queryName + ".ReadAvroEvents", AvroIO.read(Event.class) - .from(filename + "*.avro")) - .apply("OutputWithTimestamp", NexmarkQuery.EVENT_TIMESTAMP_FROM_DATA); - } - - /** - * Send {@code events} to Pubsub. - */ - private void sinkEventsToPubsub(PCollection<Event> events, long now) { - String shortTopic = shortTopic(now); - NexmarkUtils.console("Writing events to Pubsub %s", shortTopic); - - PubsubIO.Write<PubsubMessage> io = - PubsubIO.writeMessages().to(shortTopic) - .withIdAttribute(NexmarkUtils.PUBSUB_ID); - if (!configuration.usePubsubPublishTime) { - io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP); - } - - events.apply(queryName + ".EventToPubsubMessage", - ParDo.of(new DoFn<Event, PubsubMessage>() { - @ProcessElement - public void processElement(ProcessContext c) { - try { - byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, c.element()); - c.output(new PubsubMessage(payload, new HashMap<String, String>())); - } catch (CoderException e1) { - LOG.error("Error while sending Event {} to pusbSub: serialization error", - c.element().toString()); - } - } - }) - ) - .apply(queryName + ".WritePubsubEvents", io); - } - - /** - * Send {@code formattedResults} to Pubsub. - */ - private void sinkResultsToPubsub(PCollection<String> formattedResults, long now) { - String shortTopic = shortTopic(now); - NexmarkUtils.console("Writing results to Pubsub %s", shortTopic); - PubsubIO.Write<String> io = - PubsubIO.writeStrings().to(shortTopic) - .withIdAttribute(NexmarkUtils.PUBSUB_ID); - if (!configuration.usePubsubPublishTime) { - io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP); - } - formattedResults.apply(queryName + ".WritePubsubResults", io); - } - - /** - * Sink all raw Events in {@code source} to {@code options.getOutputPath}. - * This will configure the job to write the following files: - * <ul> - * <li>{@code $outputPath/event*.avro} All Event entities. - * <li>{@code $outputPath/auction*.avro} Auction entities. - * <li>{@code $outputPath/bid*.avro} Bid entities. - * <li>{@code $outputPath/person*.avro} Person entities. - * </ul> - * - * @param source A PCollection of events. - */ - private void sinkEventsToAvro(PCollection<Event> source) { - String filename = options.getOutputPath(); - if (Strings.isNullOrEmpty(filename)) { - throw new RuntimeException("Missing --outputPath"); - } - NexmarkUtils.console("Writing events to Avro files at %s", filename); - source.apply(queryName + ".WriteAvroEvents", - AvroIO.write(Event.class).to(filename + "/event").withSuffix(".avro")); - source.apply(NexmarkQuery.JUST_BIDS) - .apply(queryName + ".WriteAvroBids", - AvroIO.write(Bid.class).to(filename + "/bid").withSuffix(".avro")); - source.apply(NexmarkQuery.JUST_NEW_AUCTIONS) - .apply(queryName + ".WriteAvroAuctions", - AvroIO.write(Auction.class).to(filename + "/auction").withSuffix(".avro")); - source.apply(NexmarkQuery.JUST_NEW_PERSONS) - .apply(queryName + ".WriteAvroPeople", - AvroIO.write(Person.class).to(filename + "/person").withSuffix(".avro")); - } - - /** - * Send {@code formattedResults} to text files. - */ - private void sinkResultsToText(PCollection<String> formattedResults, long now) { - String filename = textFilename(now); - NexmarkUtils.console("Writing results to text files at %s", filename); - formattedResults.apply(queryName + ".WriteTextResults", - TextIO.write().to(filename)); - } - - private static class StringToTableRow extends DoFn<String, TableRow> { - @ProcessElement - public void processElement(ProcessContext c) { - int n = ThreadLocalRandom.current().nextInt(10); - List<TableRow> records = new ArrayList<>(n); - for (int i = 0; i < n; i++) { - records.add(new TableRow().set("index", i).set("value", Integer.toString(i))); - } - c.output(new TableRow().set("result", c.element()).set("records", records)); - } - } - - /** - * Send {@code formattedResults} to BigQuery. - */ - private void sinkResultsToBigQuery( - PCollection<String> formattedResults, long now, - String version) { - String tableSpec = tableSpec(now, version); - TableSchema tableSchema = - new TableSchema().setFields(ImmutableList.of( - new TableFieldSchema().setName("result").setType("STRING"), - new TableFieldSchema().setName("records").setMode("REPEATED").setType("RECORD") - .setFields(ImmutableList.of( - new TableFieldSchema().setName("index").setType("INTEGER"), - new TableFieldSchema().setName("value").setType("STRING"))))); - NexmarkUtils.console("Writing results to BigQuery table %s", tableSpec); - BigQueryIO.Write io = - BigQueryIO.write().to(tableSpec) - .withSchema(tableSchema) - .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) - .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND); - formattedResults - .apply(queryName + ".StringToTableRow", ParDo.of(new StringToTableRow())) - .apply(queryName + ".WriteBigQueryResults", io); - } - - // ================================================================================ - // Construct overall pipeline - // ================================================================================ - - /** - * Return source of events for this run, or null if we are simply publishing events - * to Pubsub. - */ - private PCollection<Event> createSource(Pipeline p, final long now) { - PCollection<Event> source = null; - switch (configuration.sourceType) { - case DIRECT: - source = sourceEventsFromSynthetic(p); - break; - case AVRO: - source = sourceEventsFromAvro(p); - break; - case PUBSUB: - // Setup the sink for the publisher. - switch (configuration.pubSubMode) { - case SUBSCRIBE_ONLY: - // Nothing to publish. - break; - case PUBLISH_ONLY: - // Send synthesized events to Pubsub in this job. - sinkEventsToPubsub(sourceEventsFromSynthetic(p).apply(queryName + ".Snoop", - NexmarkUtils.snoop(queryName)), now); - break; - case COMBINED: - // Send synthesized events to Pubsub in separate publisher job. - // We won't start the main pipeline until the publisher has sent the pre-load events. - // We'll shutdown the publisher job when we notice the main job has finished. - invokeBuilderForPublishOnlyPipeline(new PipelineBuilder<NexmarkOptions>() { - @Override - public void build(NexmarkOptions publishOnlyOptions) { - Pipeline sp = Pipeline.create(options); - NexmarkUtils.setupPipeline(configuration.coderStrategy, sp); - publisherMonitor = new Monitor<>(queryName, "publisher"); - sinkEventsToPubsub( - sourceEventsFromSynthetic(sp) - .apply(queryName + ".Monitor", publisherMonitor.getTransform()), - now); - publisherResult = sp.run(); - } - }); - break; - } - - // Setup the source for the consumer. - switch (configuration.pubSubMode) { - case PUBLISH_ONLY: - // Nothing to consume. Leave source null. - break; - case SUBSCRIBE_ONLY: - case COMBINED: - // Read events from pubsub. - source = sourceEventsFromPubsub(p, now); - break; - } - break; - } - return source; - } - - private static final TupleTag<String> MAIN = new TupleTag<String>(){}; - private static final TupleTag<String> SIDE = new TupleTag<String>(){}; - - private static class PartitionDoFn extends DoFn<String, String> { - @ProcessElement - public void processElement(ProcessContext c) { - if (c.element().hashCode() % 2 == 0) { - c.output(c.element()); - } else { - c.output(SIDE, c.element()); - } - } - } - - /** - * Consume {@code results}. - */ - private void sink(PCollection<TimestampedValue<KnownSize>> results, long now) { - if (configuration.sinkType == NexmarkUtils.SinkType.COUNT_ONLY) { - // Avoid the cost of formatting the results. - results.apply(queryName + ".DevNull", NexmarkUtils.devNull(queryName)); - return; - } - - PCollection<String> formattedResults = - results.apply(queryName + ".Format", NexmarkUtils.format(queryName)); - if (options.getLogResults()) { - formattedResults = formattedResults.apply(queryName + ".Results.Log", - NexmarkUtils.<String>log(queryName + ".Results")); - } - - switch (configuration.sinkType) { - case DEVNULL: - // Discard all results - formattedResults.apply(queryName + ".DevNull", NexmarkUtils.devNull(queryName)); - break; - case PUBSUB: - sinkResultsToPubsub(formattedResults, now); - break; - case TEXT: - sinkResultsToText(formattedResults, now); - break; - case AVRO: - NexmarkUtils.console( - "WARNING: with --sinkType=AVRO, actual query results will be discarded."); - break; - case BIGQUERY: - // Multiple BigQuery backends to mimic what most customers do. - PCollectionTuple res = formattedResults.apply(queryName + ".Partition", - ParDo.of(new PartitionDoFn()).withOutputTags(MAIN, TupleTagList.of(SIDE))); - sinkResultsToBigQuery(res.get(MAIN), now, "main"); - sinkResultsToBigQuery(res.get(SIDE), now, "side"); - sinkResultsToBigQuery(formattedResults, now, "copy"); - break; - case COUNT_ONLY: - // Short-circuited above. - throw new RuntimeException(); - } - } - - // ================================================================================ - // Entry point - // ================================================================================ - - /** - * Calculate the distribution of the expected rate of results per minute (in event time, not - * wallclock time). - */ - private void modelResultRates(NexmarkQueryModel model) { - List<Long> counts = Lists.newArrayList(model.simulator().resultsPerWindow()); - Collections.sort(counts); - int n = counts.size(); - if (n < 5) { - NexmarkUtils.console("Query%d: only %d samples", model.configuration.query, n); - } else { - NexmarkUtils.console("Query%d: N:%d; min:%d; 1st%%:%d; mean:%d; 3rd%%:%d; max:%d", - model.configuration.query, n, counts.get(0), counts.get(n / 4), - counts.get(n / 2), - counts.get(n - 1 - n / 4), counts.get(n - 1)); - } - } - - /** - * Run {@code configuration} and return its performance if possible. - */ - @Nullable - public NexmarkPerf run(NexmarkConfiguration runConfiguration) { - if (options.getManageResources() && !options.getMonitorJobs()) { - throw new RuntimeException("If using --manageResources then must also use --monitorJobs."); - } - - // - // Setup per-run state. - // - checkState(configuration == null); - checkState(queryName == null); - configuration = runConfiguration; - - try { - NexmarkUtils.console("Running %s", configuration.toShortString()); - - if (configuration.numEvents < 0) { - NexmarkUtils.console("skipping since configuration is disabled"); - return null; - } - - List<NexmarkQuery> queries = Arrays.asList(new Query0(configuration), - new Query1(configuration), - new Query2(configuration), - new Query3(configuration), - new Query4(configuration), - new Query5(configuration), - new Query6(configuration), - new Query7(configuration), - new Query8(configuration), - new Query9(configuration), - new Query10(configuration), - new Query11(configuration), - new Query12(configuration)); - NexmarkQuery query = queries.get(configuration.query); - queryName = query.getName(); - - List<NexmarkQueryModel> models = Arrays.asList( - new Query0Model(configuration), - new Query1Model(configuration), - new Query2Model(configuration), - new Query3Model(configuration), - new Query4Model(configuration), - new Query5Model(configuration), - new Query6Model(configuration), - new Query7Model(configuration), - new Query8Model(configuration), - new Query9Model(configuration), - null, - null, - null); - NexmarkQueryModel model = models.get(configuration.query); - - if (options.getJustModelResultRate()) { - if (model == null) { - throw new RuntimeException(String.format("No model for %s", queryName)); - } - modelResultRates(model); - return null; - } - - long now = System.currentTimeMillis(); - Pipeline p = Pipeline.create(options); - NexmarkUtils.setupPipeline(configuration.coderStrategy, p); - - // Generate events. - PCollection<Event> source = createSource(p, now); - - if (options.getLogEvents()) { - source = source.apply(queryName + ".Events.Log", - NexmarkUtils.<Event>log(queryName + ".Events")); - } - - // Source will be null if source type is PUBSUB and mode is PUBLISH_ONLY. - // In that case there's nothing more to add to pipeline. - if (source != null) { - // Optionally sink events in Avro format. - // (Query results are ignored). - if (configuration.sinkType == NexmarkUtils.SinkType.AVRO) { - sinkEventsToAvro(source); - } - - // Query 10 logs all events to Google Cloud storage files. It could generate a lot of logs, - // so, set parallelism. Also set the output path where to write log files. - if (configuration.query == 10) { - String path = null; - if (options.getOutputPath() != null && !options.getOutputPath().isEmpty()) { - path = logsDir(now); - } - ((Query10) query).setOutputPath(path); - ((Query10) query).setMaxNumWorkers(maxNumWorkers()); - } - - // Apply query. - PCollection<TimestampedValue<KnownSize>> results = source.apply(query); - - if (options.getAssertCorrectness()) { - if (model == null) { - throw new RuntimeException(String.format("No model for %s", queryName)); - } - // We know all our streams have a finite number of elements. - results.setIsBoundedInternal(PCollection.IsBounded.BOUNDED); - // If we have a finite number of events then assert our pipeline's - // results match those of a model using the same sequence of events. - PAssert.that(results).satisfies(model.assertionFor()); - } - - // Output results. - sink(results, now); - } - - mainResult = p.run(); - mainResult.waitUntilFinish(Duration.standardSeconds(configuration.streamTimeout)); - return monitor(query); - } finally { - configuration = null; - queryName = null; - } - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java deleted file mode 100644 index fbd3e74..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java +++ /dev/null @@ -1,403 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import javax.annotation.Nullable; -import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; -import org.apache.beam.sdk.options.ApplicationNameOptions; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.StreamingOptions; - -/** - * Command line flags. - */ -public interface NexmarkOptions - extends ApplicationNameOptions, GcpOptions, PipelineOptions, StreamingOptions { - @Description("Which suite to run. Default is to use command line arguments for one job.") - @Default.Enum("DEFAULT") - NexmarkSuite getSuite(); - - void setSuite(NexmarkSuite suite); - - @Description("If true, monitor the jobs as they run.") - @Default.Boolean(false) - boolean getMonitorJobs(); - - void setMonitorJobs(boolean monitorJobs); - - @Description("Where the events come from.") - @Nullable - NexmarkUtils.SourceType getSourceType(); - - void setSourceType(NexmarkUtils.SourceType sourceType); - - @Description("Prefix for input files if using avro input") - @Nullable - String getInputPath(); - - void setInputPath(String inputPath); - - @Description("Where results go.") - @Nullable - NexmarkUtils.SinkType getSinkType(); - - void setSinkType(NexmarkUtils.SinkType sinkType); - - @Description("Which mode to run in when source is PUBSUB.") - @Nullable - NexmarkUtils.PubSubMode getPubSubMode(); - - void setPubSubMode(NexmarkUtils.PubSubMode pubSubMode); - - @Description("Which query to run.") - @Nullable - Integer getQuery(); - - void setQuery(Integer query); - - @Description("Prefix for output files if using text output for results or running Query 10.") - @Nullable - String getOutputPath(); - - void setOutputPath(String outputPath); - - @Description("Base name of pubsub topic to publish to in streaming mode.") - @Nullable - @Default.String("nexmark") - String getPubsubTopic(); - - void setPubsubTopic(String pubsubTopic); - - @Description("Base name of pubsub subscription to read from in streaming mode.") - @Nullable - @Default.String("nexmark") - String getPubsubSubscription(); - - void setPubsubSubscription(String pubsubSubscription); - - @Description("Base name of BigQuery table name if using BigQuery output.") - @Nullable - @Default.String("nexmark") - String getBigQueryTable(); - - void setBigQueryTable(String bigQueryTable); - - @Description("Approximate number of events to generate. " - + "Zero for effectively unlimited in streaming mode.") - @Nullable - Long getNumEvents(); - - void setNumEvents(Long numEvents); - - @Description("Time in seconds to preload the subscription with data, at the initial input rate " - + "of the pipeline.") - @Nullable - Integer getPreloadSeconds(); - - void setPreloadSeconds(Integer preloadSeconds); - - @Description( - "Time in seconds to wait in pipelineResult.waitUntilFinish(), useful in streaming mode") - @Nullable - Integer getStreamTimeout(); - - void setStreamTimeout(Integer streamTimeout); - - @Description("Number of unbounded sources to create events.") - @Nullable - Integer getNumEventGenerators(); - - void setNumEventGenerators(Integer numEventGenerators); - - @Description("Shape of event rate curve.") - @Nullable - NexmarkUtils.RateShape getRateShape(); - - void setRateShape(NexmarkUtils.RateShape rateShape); - - @Description("Initial overall event rate (in --rateUnit).") - @Nullable - Integer getFirstEventRate(); - - void setFirstEventRate(Integer firstEventRate); - - @Description("Next overall event rate (in --rateUnit).") - @Nullable - Integer getNextEventRate(); - - void setNextEventRate(Integer nextEventRate); - - @Description("Unit for rates.") - @Nullable - NexmarkUtils.RateUnit getRateUnit(); - - void setRateUnit(NexmarkUtils.RateUnit rateUnit); - - @Description("Overall period of rate shape, in seconds.") - @Nullable - Integer getRatePeriodSec(); - - void setRatePeriodSec(Integer ratePeriodSec); - - @Description("If true, relay events in real time in streaming mode.") - @Nullable - Boolean getIsRateLimited(); - - void setIsRateLimited(Boolean isRateLimited); - - @Description("If true, use wallclock time as event time. Otherwise, use a deterministic" - + " time in the past so that multiple runs will see exactly the same event streams" - + " and should thus have exactly the same results.") - @Nullable - Boolean getUseWallclockEventTime(); - - void setUseWallclockEventTime(Boolean useWallclockEventTime); - - @Description("Assert pipeline results match model results.") - @Nullable - boolean getAssertCorrectness(); - - void setAssertCorrectness(boolean assertCorrectness); - - @Description("Log all input events.") - @Nullable - boolean getLogEvents(); - - void setLogEvents(boolean logEvents); - - @Description("Log all query results.") - @Nullable - boolean getLogResults(); - - void setLogResults(boolean logResults); - - @Description("Average size in bytes for a person record.") - @Nullable - Integer getAvgPersonByteSize(); - - void setAvgPersonByteSize(Integer avgPersonByteSize); - - @Description("Average size in bytes for an auction record.") - @Nullable - Integer getAvgAuctionByteSize(); - - void setAvgAuctionByteSize(Integer avgAuctionByteSize); - - @Description("Average size in bytes for a bid record.") - @Nullable - Integer getAvgBidByteSize(); - - void setAvgBidByteSize(Integer avgBidByteSize); - - @Description("Ratio of bids for 'hot' auctions above the background.") - @Nullable - Integer getHotAuctionRatio(); - - void setHotAuctionRatio(Integer hotAuctionRatio); - - @Description("Ratio of auctions for 'hot' sellers above the background.") - @Nullable - Integer getHotSellersRatio(); - - void setHotSellersRatio(Integer hotSellersRatio); - - @Description("Ratio of auctions for 'hot' bidders above the background.") - @Nullable - Integer getHotBiddersRatio(); - - void setHotBiddersRatio(Integer hotBiddersRatio); - - @Description("Window size in seconds.") - @Nullable - Long getWindowSizeSec(); - - void setWindowSizeSec(Long windowSizeSec); - - @Description("Window period in seconds.") - @Nullable - Long getWindowPeriodSec(); - - void setWindowPeriodSec(Long windowPeriodSec); - - @Description("If in streaming mode, the holdback for watermark in seconds.") - @Nullable - Long getWatermarkHoldbackSec(); - - void setWatermarkHoldbackSec(Long watermarkHoldbackSec); - - @Description("Roughly how many auctions should be in flight for each generator.") - @Nullable - Integer getNumInFlightAuctions(); - - void setNumInFlightAuctions(Integer numInFlightAuctions); - - - @Description("Maximum number of people to consider as active for placing auctions or bids.") - @Nullable - Integer getNumActivePeople(); - - void setNumActivePeople(Integer numActivePeople); - - @Description("Filename of perf data to append to.") - @Nullable - String getPerfFilename(); - - void setPerfFilename(String perfFilename); - - @Description("Filename of baseline perf data to read from.") - @Nullable - String getBaselineFilename(); - - void setBaselineFilename(String baselineFilename); - - @Description("Filename of summary perf data to append to.") - @Nullable - String getSummaryFilename(); - - void setSummaryFilename(String summaryFilename); - - @Description("Filename for javascript capturing all perf data and any baselines.") - @Nullable - String getJavascriptFilename(); - - void setJavascriptFilename(String javascriptFilename); - - @Description("If true, don't run the actual query. Instead, calculate the distribution " - + "of number of query results per (event time) minute according to the query model.") - @Nullable - boolean getJustModelResultRate(); - - void setJustModelResultRate(boolean justModelResultRate); - - @Description("Coder strategy to use.") - @Nullable - NexmarkUtils.CoderStrategy getCoderStrategy(); - - void setCoderStrategy(NexmarkUtils.CoderStrategy coderStrategy); - - @Description("Delay, in milliseconds, for each event. We will peg one core for this " - + "number of milliseconds to simulate CPU-bound computation.") - @Nullable - Long getCpuDelayMs(); - - void setCpuDelayMs(Long cpuDelayMs); - - @Description("Extra data, in bytes, to save to persistent state for each event. " - + "This will force I/O all the way to durable storage to simulate an " - + "I/O-bound computation.") - @Nullable - Long getDiskBusyBytes(); - - void setDiskBusyBytes(Long diskBusyBytes); - - @Description("Skip factor for query 2. We select bids for every {@code auctionSkip}'th auction") - @Nullable - Integer getAuctionSkip(); - - void setAuctionSkip(Integer auctionSkip); - - @Description("Fanout for queries 4 (groups by category id) and 7 (finds a global maximum).") - @Nullable - Integer getFanout(); - - void setFanout(Integer fanout); - - @Description("Maximum waiting time to clean personState in query3 " - + "(ie maximum waiting of the auctions related to person in state in seconds in event time).") - @Nullable - Integer getMaxAuctionsWaitingTime(); - - void setMaxAuctionsWaitingTime(Integer fanout); - - @Description("Length of occasional delay to impose on events (in seconds).") - @Nullable - Long getOccasionalDelaySec(); - - void setOccasionalDelaySec(Long occasionalDelaySec); - - @Description("Probability that an event will be delayed by delayS.") - @Nullable - Double getProbDelayedEvent(); - - void setProbDelayedEvent(Double probDelayedEvent); - - @Description("Maximum size of each log file (in events). For Query10 only.") - @Nullable - Integer getMaxLogEvents(); - - void setMaxLogEvents(Integer maxLogEvents); - - @Description("How to derive names of resources.") - @Default.Enum("QUERY_AND_SALT") - NexmarkUtils.ResourceNameMode getResourceNameMode(); - - void setResourceNameMode(NexmarkUtils.ResourceNameMode mode); - - @Description("If true, manage the creation and cleanup of topics, subscriptions and gcs files.") - @Default.Boolean(true) - boolean getManageResources(); - - void setManageResources(boolean manageResources); - - @Description("If true, use pub/sub publish time instead of event time.") - @Nullable - Boolean getUsePubsubPublishTime(); - - void setUsePubsubPublishTime(Boolean usePubsubPublishTime); - - @Description("Number of events in out-of-order groups. 1 implies no out-of-order events. " - + "1000 implies every 1000 events per generator are emitted in pseudo-random order.") - @Nullable - Long getOutOfOrderGroupSize(); - - void setOutOfOrderGroupSize(Long outOfOrderGroupSize); - - @Description("If false, do not add the Monitor and Snoop transforms.") - @Nullable - Boolean getDebug(); - - void setDebug(Boolean value); - - @Description("If set, cancel running pipelines after this long") - @Nullable - Long getRunningTimeMinutes(); - - void setRunningTimeMinutes(Long value); - - @Description("If set and --monitorJobs is true, check that the system watermark is never more " - + "than this far behind real time") - @Nullable - Long getMaxSystemLagSeconds(); - - void setMaxSystemLagSeconds(Long value); - - @Description("If set and --monitorJobs is true, check that the data watermark is never more " - + "than this far behind real time") - @Nullable - Long getMaxDataLagSeconds(); - - void setMaxDataLagSeconds(Long value); - - @Description("Only start validating watermarks after this many seconds") - @Nullable - Long getWatermarkValidationDelaySeconds(); - - void setWatermarkValidationDelaySeconds(Long value); -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java deleted file mode 100644 index e7f59c8..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; - -import java.io.IOException; -import java.util.List; -import javax.annotation.Nullable; - -/** - * Summary of performance for a particular run of a configuration. - */ -public class NexmarkPerf { - /** - * A sample of the number of events and number of results (if known) generated at - * a particular time. - */ - public static class ProgressSnapshot { - /** Seconds since job was started (in wallclock time). */ - @JsonProperty - double secSinceStart; - - /** Job runtime in seconds (time from first event to last generated event or output result). */ - @JsonProperty - double runtimeSec; - - /** Cumulative number of events generated. -1 if not known. */ - @JsonProperty - long numEvents; - - /** Cumulative number of results emitted. -1 if not known. */ - @JsonProperty - long numResults; - - /** - * Return true if there looks to be activity between {@code this} and {@code that} - * snapshots. - */ - public boolean anyActivity(ProgressSnapshot that) { - if (runtimeSec != that.runtimeSec) { - // An event or result end timestamp looks to have changed. - return true; - } - if (numEvents != that.numEvents) { - // Some more events were generated. - return true; - } - if (numResults != that.numResults) { - // Some more results were emitted. - return true; - } - return false; - } - } - - /** - * Progess snapshots. Null if not yet calculated. - */ - @JsonProperty - @Nullable - public List<ProgressSnapshot> snapshots = null; - - /** - * Effective runtime, in seconds. Measured from timestamp of first generated event to latest of - * timestamp of last generated event and last emitted result. -1 if not known. - */ - @JsonProperty - public double runtimeSec = -1.0; - - /** - * Number of events generated. -1 if not known. - */ - @JsonProperty - public long numEvents = -1; - - /** - * Number of events generated per second of runtime. For batch this is number of events - * over the above runtime. For streaming this is the 'steady-state' event generation rate sampled - * over the lifetime of the job. -1 if not known. - */ - @JsonProperty - public double eventsPerSec = -1.0; - - /** - * Number of event bytes generated per second of runtime. -1 if not known. - */ - @JsonProperty - public double eventBytesPerSec = -1.0; - - /** - * Number of results emitted. -1 if not known. - */ - @JsonProperty - public long numResults = -1; - - /** - * Number of results generated per second of runtime. -1 if not known. - */ - @JsonProperty - public double resultsPerSec = -1.0; - - /** - * Number of result bytes generated per second of runtime. -1 if not known. - */ - @JsonProperty - public double resultBytesPerSec = -1.0; - - /** - * Delay between start of job and first event in second. -1 if not known. - */ - @JsonProperty - public double startupDelaySec = -1.0; - - /** - * Delay between first event and first result in seconds. -1 if not known. - */ - @JsonProperty - public double processingDelaySec = -1.0; - - /** - * Delay between last result and job completion in seconds. -1 if not known. - */ - @JsonProperty - public double shutdownDelaySec = -1.0; - - /** - * Time-dilation factor. Calculate as event time advancement rate relative to real time. - * Greater than one implies we processed events faster than they would have been generated - * in real time. Less than one implies we could not keep up with events in real time. - * -1 if not known. - */ - @JsonProperty - double timeDilation = -1.0; - - /** - * List of errors encountered during job execution. - */ - @JsonProperty - @Nullable - public List<String> errors = null; - - /** - * The job id this perf was drawn from. Null if not known. - */ - @JsonProperty - @Nullable - public String jobId = null; - - /** - * Return a JSON representation of performance. - */ - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - - /** - * Parse a {@link NexmarkPerf} object from JSON {@code string}. - */ - public static NexmarkPerf fromString(String string) { - try { - return NexmarkUtils.MAPPER.readValue(string, NexmarkPerf.class); - } catch (IOException e) { - throw new RuntimeException("Unable to parse nexmark perf: ", e); - } - } - - /** - * Return true if there looks to be activity between {@code this} and {@code that} - * perf values. - */ - public boolean anyActivity(NexmarkPerf that) { - if (runtimeSec != that.runtimeSec) { - // An event or result end timestamp looks to have changed. - return true; - } - if (numEvents != that.numEvents) { - // Some more events were generated. - return true; - } - if (numResults != that.numResults) { - // Some more results were emitted. - return true; - } - return false; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java deleted file mode 100644 index 0d98a5d..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import java.util.ArrayList; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Set; - -/** - * A set of {@link NexmarkConfiguration}s. - */ -public enum NexmarkSuite { - /** - * The default. - */ - DEFAULT(defaultConf()), - - /** - * Sweep through all queries using the default configuration. - * 100k/10k events (depending on query). - */ - SMOKE(smoke()), - - /** - * As for SMOKE, but with 10m/1m events. - */ - STRESS(stress()), - - /** - * As for SMOKE, but with 1b/100m events. - */ - FULL_THROTTLE(fullThrottle()); - - private static List<NexmarkConfiguration> defaultConf() { - List<NexmarkConfiguration> configurations = new ArrayList<>(); - NexmarkConfiguration configuration = new NexmarkConfiguration(); - configurations.add(configuration); - return configurations; - } - - private static List<NexmarkConfiguration> smoke() { - List<NexmarkConfiguration> configurations = new ArrayList<>(); - for (int query = 0; query <= 12; query++) { - NexmarkConfiguration configuration = NexmarkConfiguration.DEFAULT.copy(); - configuration.query = query; - configuration.numEvents = 100_000; - if (query == 4 || query == 6 || query == 9) { - // Scale back so overall runtimes are reasonably close across all queries. - configuration.numEvents /= 10; - } - configurations.add(configuration); - } - return configurations; - } - - private static List<NexmarkConfiguration> stress() { - List<NexmarkConfiguration> configurations = smoke(); - for (NexmarkConfiguration configuration : configurations) { - if (configuration.numEvents >= 0) { - configuration.numEvents *= 1000; - } - } - return configurations; - } - - private static List<NexmarkConfiguration> fullThrottle() { - List<NexmarkConfiguration> configurations = smoke(); - for (NexmarkConfiguration configuration : configurations) { - if (configuration.numEvents >= 0) { - configuration.numEvents *= 1000; - } - } - return configurations; - } - - private final List<NexmarkConfiguration> configurations; - - NexmarkSuite(List<NexmarkConfiguration> configurations) { - this.configurations = configurations; - } - - /** - * Return the configurations corresponding to this suite. We'll override each configuration - * with any set command line flags, except for --isStreaming which is only respected for - * the {@link #DEFAULT} suite. - */ - public Iterable<NexmarkConfiguration> getConfigurations(NexmarkOptions options) { - Set<NexmarkConfiguration> results = new LinkedHashSet<>(); - for (NexmarkConfiguration configuration : configurations) { - NexmarkConfiguration result = configuration.copy(); - result.overrideFromOptions(options); - results.add(result); - } - return results; - } -}
