Clean some code that is specific to Dataflow
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/77eabbaa Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/77eabbaa Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/77eabbaa Branch: refs/heads/master Commit: 77eabbaaddad88784c8ce2e775b4b8e8fea3f868 Parents: 902050b Author: Etienne Chauchot <[email protected]> Authored: Fri May 5 15:19:07 2017 +0200 Committer: Ismaël MejÃa <[email protected]> Committed: Wed Aug 23 19:07:28 2017 +0200 ---------------------------------------------------------------------- .../beam/integration/nexmark/NexmarkRunner.java | 106 ------------------- 1 file changed, 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/77eabbaa/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java index 6df76f0..935544e 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java @@ -157,9 +157,6 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { this.options = options; } - // ================================================================================ - // Overridden by each runner. - // ================================================================================ /** * Is this query running in streaming mode? @@ -414,7 +411,6 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { perf.shutdownDelaySec = (now - resultEnd) / 1000.0; } - perf.jobId = getJobId(result); // As soon as available, try to capture cumulative cost at this point too. NexmarkPerf.ProgressSnapshot snapshot = new NexmarkPerf.ProgressSnapshot(); @@ -429,105 +425,6 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { return perf; } - private String getJobId(PipelineResult job) { - return ""; - } - - // TODO specific to dataflow, see if we can find an equivalent -/* - protected MetricType getMetricType(MetricUpdate metric) { - String metricName = metric.getKey().metricName().name(); - if (metricName.endsWith("windmill-system-watermark")) { - return MetricType.SYSTEM_WATERMARK; - } else if (metricName.endsWith("windmill-data-watermark")) { - return MetricType.DATA_WATERMARK; - } else { - return MetricType.OTHER; - } - } -*/ - - /** - * Check that watermarks are not too far behind. - * - * <p>Returns a list of errors detected. - */ - // TODO specific to dataflow, see if we can find an equivalent - /* - private List<String> checkWatermarks(DataflowPipelineJob job, long startMsSinceEpoch) { - long now = System.currentTimeMillis(); - List<String> errors = new ArrayList<>(); - try { - JobMetrics metricResponse = job.getDataflowClient() - .projects() - .jobs() - .getMetrics(job.getProjectId(), job.getJobId()) - .execute(); - List<MetricUpdate> metrics = metricResponse.getMetrics(); - - - - if (metrics != null) { - boolean foundWatermarks = false; - for (MetricUpdate metric : metrics) { - MetricType type = getMetricType(metric); - if (type == MetricType.OTHER) { - continue; - } - foundWatermarks = true; - @SuppressWarnings("unchecked") - BigDecimal scalar = (BigDecimal) metric.getScalar(); - if (scalar.signum() < 0) { - continue; - } - Instant value = - new Instant(scalar.divideToIntegralValue(new BigDecimal(1000)).longValueExact()); - Instant updateTime = Instant.parse(metric.getUpdateTime()); - - if (options.getWatermarkValidationDelaySeconds() == null - || now > startMsSinceEpoch - + Duration.standardSeconds(options.getWatermarkValidationDelaySeconds()) - .getMillis()) { - Duration threshold = null; - if (type == MetricType.SYSTEM_WATERMARK && options.getMaxSystemLagSeconds() != null) { - threshold = Duration.standardSeconds(options.getMaxSystemLagSeconds()); - } else if (type == MetricType.DATA_WATERMARK - && options.getMaxDataLagSeconds() != null) { - threshold = Duration.standardSeconds(options.getMaxDataLagSeconds()); - } - - if (threshold != null && value.isBefore(updateTime.minus(threshold))) { - String msg = String.format("High lag for %s: %s vs %s (allowed lag of %s)", - metric.getKey().metricName().name(), value, updateTime, threshold); - errors.add(msg); - NexmarkUtils.console(msg); - } - } - } - if (!foundWatermarks) { - NexmarkUtils.console("No known watermarks in update: " + metrics); - if (now > startMsSinceEpoch + Duration.standardMinutes(5).getMillis()) { - errors.add("No known watermarks found. Metrics were " + metrics); - } - } - } - } catch (IOException e) { - NexmarkUtils.console("Warning: failed to get JobMetrics: " + e); - } - - return errors; - } -*/ - - // TODO specific to dataflow, see if we can find an equivalent -/* - enum MetricType { - SYSTEM_WATERMARK, - DATA_WATERMARK, - OTHER - } -*/ - /** * Build and run a pipeline using specified options. */ @@ -643,9 +540,6 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { String.format("Streaming query was stuck for %d min", quietFor.getStandardMinutes())); } - // TODO specific to dataflow, see if we can find an equivalent -// errors.addAll(checkWatermarks(job, startMsSinceEpoch)); - if (waitingForShutdown) { try { job.cancel();
