[
https://issues.apache.org/jira/browse/BEAM-4553?focusedWorklogId=154982&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-154982
]
ASF GitHub Bot logged work on BEAM-4553:
----------------------------------------
Author: ASF GitHub Bot
Created on: 16/Oct/18 16:26
Start Date: 16/Oct/18 16:26
Worklog Time Spent: 10m
Work Description: aromanenko-dev closed pull request #6569: [BEAM-4553]
Implement graphite sink for MetricsPusher and refactor MetricsHttpSink test
URL: https://github.com/apache/beam/pull/6569
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsGraphiteSink.java
b/runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsGraphiteSink.java
new file mode 100644
index 00000000000..18810cbe3ca
--- /dev/null
+++
b/runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsGraphiteSink.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.extensions.metrics;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.BufferedWriter;
+import java.io.OutputStreamWriter;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.nio.charset.Charset;
+import java.util.Locale;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeResult;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricsSink;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Sink to push metrics to Graphite. Graphite requires a timestamp. So metrics
are reported with the
+ * timestamp (seconds from epoch) when the push to the sink was done (except
with gauges that
+ * already have a timestamp value). The graphite metric name will be in the
form of
+ *
beam.metricType.metricNamespace.metricName.[committed|attempted].metricValueType
For example:
+ * {@code beam.counter.throughput.nbRecords.attempted.value} Or {@code
+ * beam.distribution.throughput.nbRecordsPerSec.attempted.mean}
+ */
+public class MetricsGraphiteSink implements MetricsSink {
+ private static final Charset UTF_8 = Charset.forName("UTF-8");
+ private static final Pattern WHITESPACE = Pattern.compile("[\\s]+");
+ private static final String SPACE_REPLACEMENT = "_";
+ private final String address;
+ private final int port;
+ private final Charset charset;
+
+ public MetricsGraphiteSink(PipelineOptions pipelineOptions) {
+ this.address = pipelineOptions.getMetricsGraphiteHost();
+ this.port = pipelineOptions.getMetricsGraphitePort();
+ this.charset = UTF_8;
+ }
+
+ @Experimental(Experimental.Kind.METRICS)
+ @Override
+ public void writeMetrics(MetricQueryResults metricQueryResults) throws
Exception {
+ final long metricTimestamp = System.currentTimeMillis() / 1000L;
+ Socket socket = new Socket(InetAddress.getByName(address), port);
+ BufferedWriter writer =
+ new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(),
charset));
+ StringBuilder messagePayload = new StringBuilder();
+ Iterable<MetricResult<Long>> counters = metricQueryResults.getCounters();
+ Iterable<MetricResult<GaugeResult>> gauges =
metricQueryResults.getGauges();
+ Iterable<MetricResult<DistributionResult>> distributions =
+ metricQueryResults.getDistributions();
+
+ for (MetricResult<Long> counter : counters) {
+ messagePayload.append(new CounterMetricMessage(counter, "value",
metricTimestamp).toString());
+ }
+
+ for (MetricResult<GaugeResult> gauge : gauges) {
+ messagePayload.append(new GaugeMetricMessage(gauge, "value").toString());
+ }
+
+ for (MetricResult<DistributionResult> distribution : distributions) {
+ messagePayload.append(
+ new DistributionMetricMessage(distribution, "min",
metricTimestamp).toString());
+ messagePayload.append(
+ new DistributionMetricMessage(distribution, "max",
metricTimestamp).toString());
+ messagePayload.append(
+ new DistributionMetricMessage(distribution, "count",
metricTimestamp).toString());
+ messagePayload.append(
+ new DistributionMetricMessage(distribution, "sum",
metricTimestamp).toString());
+ messagePayload.append(
+ new DistributionMetricMessage(distribution, "mean",
metricTimestamp).toString());
+ }
+ writer.write(messagePayload.toString());
+ writer.flush();
+ writer.close();
+ socket.close();
+ }
+
+ private abstract static class MetricMessage {
+ @Override
+ public String toString() {
+ StringBuilder messagePayload = new StringBuilder();
+ // if committed metrics are not supported, exception is thrown and we
don't append the message
+ try {
+ messagePayload.append(createCommittedMessage());
+ } catch (UnsupportedOperationException e) {
+ if (!e.getMessage().contains("committed metrics")) {
+ throw e;
+ }
+ }
+ messagePayload.append(createAttemptedMessage());
+ return messagePayload.toString();
+ }
+
+ protected abstract String createCommittedMessage();
+
+ protected abstract String createAttemptedMessage();
+ }
+
+ private static class CounterMetricMessage extends MetricMessage {
+ private String valueType;
+ private MetricResult<Long> counter;
+ private long metricTimestamp;
+
+ private CounterMetricMessage(
+ MetricResult<Long> counter, String valueType, long metricTimestamp) {
+ this.valueType = valueType;
+ this.counter = counter;
+ this.metricTimestamp = metricTimestamp;
+ }
+
+ @SuppressFBWarnings(
+ value = "VA_FORMAT_STRING_USES_NEWLINE",
+ justification = "\\n is part of graphite protocol"
+ )
+ @Override
+ protected String createCommittedMessage() {
+ String metricMessage =
+ String.format(
+ Locale.US,
+ "%s %s %s\n",
+ createNormalizedMetricName(
+ counter, "counter", valueType,
CommittedOrAttemped.COMMITTED),
+ counter.getCommitted(),
+ metricTimestamp);
+ return metricMessage;
+ }
+
+ @SuppressFBWarnings(
+ value = "VA_FORMAT_STRING_USES_NEWLINE",
+ justification = "\\n is part of graphite protocol"
+ )
+ @Override
+ protected String createAttemptedMessage() {
+ String metricMessage =
+ String.format(
+ Locale.US,
+ "%s %s %s\n",
+ createNormalizedMetricName(
+ counter, "counter", valueType,
CommittedOrAttemped.ATTEMPTED),
+ counter.getAttempted(),
+ metricTimestamp);
+ return metricMessage;
+ }
+ }
+
+ private static class GaugeMetricMessage extends MetricMessage {
+ private String valueType;
+ private MetricResult<GaugeResult> gauge;
+
+ private GaugeMetricMessage(MetricResult<GaugeResult> gauge, String
valueType) {
+ this.valueType = valueType;
+ this.gauge = gauge;
+ }
+
+ @SuppressFBWarnings(
+ value = "VA_FORMAT_STRING_USES_NEWLINE",
+ justification = "\\n is part of graphite protocol"
+ )
+ @Override
+ protected String createCommittedMessage() {
+ String metricMessage =
+ String.format(
+ Locale.US,
+ "%s %s %s\n",
+ createNormalizedMetricName(gauge, "gauge", valueType,
CommittedOrAttemped.COMMITTED),
+ gauge.getCommitted().getValue(),
+ gauge.getCommitted().getTimestamp().getMillis() / 1000L);
+ return metricMessage;
+ }
+
+ @SuppressFBWarnings(
+ value = "VA_FORMAT_STRING_USES_NEWLINE",
+ justification = "\\n is part of graphite protocol"
+ )
+ @Override
+ protected String createAttemptedMessage() {
+ String metricMessage =
+ String.format(
+ Locale.US,
+ "%s %s %s\n",
+ createNormalizedMetricName(gauge, "gauge", valueType,
CommittedOrAttemped.ATTEMPTED),
+ gauge.getAttempted().getValue(),
+ gauge.getAttempted().getTimestamp().getMillis() / 1000L);
+ return metricMessage;
+ }
+ }
+
+ private static class DistributionMetricMessage extends MetricMessage {
+
+ private String valueType;
+ private MetricResult<DistributionResult> distribution;
+ private long metricTimestamp;
+
+ public DistributionMetricMessage(
+ MetricResult<DistributionResult> distribution, String valueType, long
metricTimestamp) {
+ this.valueType = valueType;
+ this.distribution = distribution;
+ this.metricTimestamp = metricTimestamp;
+ }
+
+ @SuppressFBWarnings(
+ value = "VA_FORMAT_STRING_USES_NEWLINE",
+ justification = "\\n is part of graphite protocol"
+ )
+ @Override
+ protected String createCommittedMessage() {
+ Number value = null;
+ switch (valueType) {
+ case "min":
+ value = distribution.getCommitted().getMin();
+ break;
+ case "max":
+ value = distribution.getCommitted().getMax();
+ break;
+ case "count":
+ value = distribution.getCommitted().getCount();
+ break;
+ case "sum":
+ value = distribution.getCommitted().getSum();
+ break;
+ case "mean":
+ value = distribution.getCommitted().getMean();
+ break;
+ default:
+ break;
+ }
+ String metricMessage =
+ String.format(
+ Locale.US,
+ "%s %s %s\n",
+ createNormalizedMetricName(
+ distribution, "distribution", valueType,
CommittedOrAttemped.COMMITTED),
+ value,
+ metricTimestamp);
+ return metricMessage;
+ }
+
+ @SuppressFBWarnings(
+ value = "VA_FORMAT_STRING_USES_NEWLINE",
+ justification = "\\n is part of graphite protocol"
+ )
+ @Override
+ protected String createAttemptedMessage() {
+ Number value = null;
+ switch (valueType) {
+ case "min":
+ value = distribution.getAttempted().getMin();
+ break;
+ case "max":
+ value = distribution.getAttempted().getMax();
+ break;
+ case "count":
+ value = distribution.getAttempted().getCount();
+ break;
+ case "sum":
+ value = distribution.getAttempted().getSum();
+ break;
+ case "mean":
+ value = distribution.getAttempted().getMean();
+ break;
+ default:
+ break;
+ }
+ String metricMessage =
+ String.format(
+ Locale.US,
+ "%s %s %s\n",
+ createNormalizedMetricName(
+ distribution, "distribution", valueType,
CommittedOrAttemped.ATTEMPTED),
+ value,
+ metricTimestamp);
+ return metricMessage;
+ }
+ }
+
+ private static <T> String createNormalizedMetricName(
+ MetricResult<T> metric,
+ String metricType,
+ String valueType,
+ CommittedOrAttemped committedOrAttemped) {
+ String metricName =
+ String.format(
+ "beam.%s.%s.%s.%s.%s",
+ metricType,
+ metric.getName().getNamespace(),
+ metric.getName().getName(),
+ committedOrAttemped,
+ valueType);
+
+ return WHITESPACE.matcher(metricName).replaceAll(SPACE_REPLACEMENT);
+ }
+
+ private enum CommittedOrAttemped {
+ COMMITTED("committed"),
+ ATTEMPTED("attempted");
+
+ private final String committedOrAttempted;
+
+ CommittedOrAttemped(String committedOrAttempted) {
+ this.committedOrAttempted = committedOrAttempted;
+ }
+
+ @Override
+ public String toString() {
+ return committedOrAttempted;
+ }
+ }
+}
diff --git
a/runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSink.java
b/runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSink.java
index e9cefe5d7ec..4208007ccb1 100644
---
a/runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSink.java
+++
b/runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSink.java
@@ -25,7 +25,6 @@
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
import com.fasterxml.jackson.datatype.joda.JodaModule;
-import com.google.common.annotations.VisibleForTesting;
import java.io.DataOutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
@@ -69,8 +68,7 @@ public void writeMetrics(MetricQueryResults
metricQueryResults) throws Exception
}
}
- @VisibleForTesting
- String serializeMetrics(MetricQueryResults metricQueryResults) throws
Exception {
+ private String serializeMetrics(MetricQueryResults metricQueryResults)
throws Exception {
objectMapper.registerModule(new JodaModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
diff --git
a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java
b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java
new file mode 100644
index 00000000000..fd461700bac
--- /dev/null
+++
b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.extensions.metrics;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeResult;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricsSink;
+import org.joda.time.Instant;
+
+/** Test class to be used as a input to {@link MetricsSink} implementations
tests. */
+class CustomMetricQueryResults implements MetricQueryResults {
+
+ private final boolean isCommittedSupported;
+
+ CustomMetricQueryResults(boolean isCommittedSupported) {
+ this.isCommittedSupported = isCommittedSupported;
+ }
+
+ @Override
+ public List<MetricResult<Long>> getCounters() {
+ return Collections.singletonList(
+ new MetricResult<Long>() {
+
+ @Override
+ public MetricName getName() {
+ return MetricName.named("ns1", "n1");
+ }
+
+ @Override
+ public String getStep() {
+ return "s1";
+ }
+
+ @Override
+ public Long getCommitted() {
+ if (!isCommittedSupported) {
+ // This is what getCommitted code is like for
AccumulatedMetricResult on runners
+ // that do not support committed metrics
+ throw new UnsupportedOperationException(
+ "This runner does not currently support committed"
+ + " metrics results. Please use 'attempted' instead.");
+ }
+ return 10L;
+ }
+
+ @Override
+ public Long getAttempted() {
+ return 20L;
+ }
+ });
+ }
+
+ @Override
+ public List<MetricResult<DistributionResult>> getDistributions() {
+ return Collections.singletonList(
+ new MetricResult<DistributionResult>() {
+
+ @Override
+ public MetricName getName() {
+ return MetricName.named("ns1", "n2");
+ }
+
+ @Override
+ public String getStep() {
+ return "s2";
+ }
+
+ @Override
+ public DistributionResult getCommitted() {
+ if (!isCommittedSupported) {
+ // This is what getCommitted code is like for
AccumulatedMetricResult on runners
+ // that do not support committed metrics
+ throw new UnsupportedOperationException(
+ "This runner does not currently support committed"
+ + " metrics results. Please use 'attempted' instead.");
+ }
+ return DistributionResult.create(10L, 2L, 5L, 8L);
+ }
+
+ @Override
+ public DistributionResult getAttempted() {
+ return DistributionResult.create(25L, 4L, 3L, 9L);
+ }
+ });
+ }
+
+ @Override
+ public List<MetricResult<GaugeResult>> getGauges() {
+ return Collections.singletonList(
+ new MetricResult<GaugeResult>() {
+
+ @Override
+ public MetricName getName() {
+ return MetricName.named("ns1", "n3");
+ }
+
+ @Override
+ public String getStep() {
+ return "s3";
+ }
+
+ @Override
+ public GaugeResult getCommitted() {
+ if (!isCommittedSupported) {
+ // This is what getCommitted code is like for
AccumulatedMetricResult on runners
+ // that do not support committed metrics
+ throw new UnsupportedOperationException(
+ "This runner does not currently support committed"
+ + " metrics results. Please use 'attempted' instead.");
+ }
+ return GaugeResult.create(100L, new Instant(345862800L));
+ }
+
+ @Override
+ public GaugeResult getAttempted() {
+ return GaugeResult.create(120L, new Instant(345862800L));
+ }
+ });
+ }
+}
diff --git
a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsGraphiteSinkTest.java
b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsGraphiteSinkTest.java
new file mode 100644
index 00000000000..8c1b560b65f
--- /dev/null
+++
b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsGraphiteSinkTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.extensions.metrics;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.concurrent.CountDownLatch;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/** Test class for MetricsGraphiteSink. */
+public class MetricsGraphiteSinkTest {
+ private static NetworkMockServer graphiteServer;
+ private static int port;
+
+ @BeforeClass
+ public static void beforeClass() throws IOException, InterruptedException {
+ // get free local port
+ ServerSocket serverSocket = new ServerSocket(0);
+ port = serverSocket.getLocalPort();
+ serverSocket.close();
+ graphiteServer = new NetworkMockServer(port);
+ graphiteServer.clear();
+ graphiteServer.start();
+ }
+
+ @Before
+ public void before() {
+ graphiteServer.clear();
+ }
+
+ @AfterClass
+ public static void afterClass() throws IOException {
+ graphiteServer.stop();
+ }
+
+ @Test
+ public void testWriteMetricsWithCommittedSupported() throws Exception {
+ MetricQueryResults metricQueryResults = new CustomMetricQueryResults(true);
+ PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
+ pipelineOptions.setMetricsGraphitePort(port);
+ pipelineOptions.setMetricsGraphiteHost("127.0.0.1");
+ MetricsGraphiteSink metricsGraphiteSink = new
MetricsGraphiteSink(pipelineOptions);
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ graphiteServer.setCountDownLatch(countDownLatch);
+ metricsGraphiteSink.writeMetrics(metricQueryResults);
+ countDownLatch.await();
+ String join = String.join("\n", graphiteServer.getMessages());
+ String regexpr =
+ "beam.counter.ns1.n1.committed.value 10 [0-9]+\\n"
+ + "beam.counter.ns1.n1.attempted.value 20 [0-9]+\\n"
+ + "beam.gauge.ns1.n3.committed.value 100 [0-9]+\\n"
+ + "beam.gauge.ns1.n3.attempted.value 120 [0-9]+\\n"
+ + "beam.distribution.ns1.n2.committed.min 5 [0-9]+\\n"
+ + "beam.distribution.ns1.n2.attempted.min 3 [0-9]+\\n"
+ + "beam.distribution.ns1.n2.committed.max 8 [0-9]+\\n"
+ + "beam.distribution.ns1.n2.attempted.max 9 [0-9]+\\n"
+ + "beam.distribution.ns1.n2.committed.count 2 [0-9]+\\n"
+ + "beam.distribution.ns1.n2.attempted.count 4 [0-9]+\\n"
+ + "beam.distribution.ns1.n2.committed.sum 10 [0-9]+\\n"
+ + "beam.distribution.ns1.n2.attempted.sum 25 [0-9]+\\n"
+ + "beam.distribution.ns1.n2.committed.mean 5.0 [0-9]+\\n"
+ + "beam.distribution.ns1.n2.attempted.mean 6.25 [0-9]+";
+ assertTrue(join.matches(regexpr));
+ }
+
+ @Test
+ public void testWriteMetricsWithCommittedUnSupported() throws Exception {
+ MetricQueryResults metricQueryResults = new
CustomMetricQueryResults(false);
+ PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
+ pipelineOptions.setMetricsGraphitePort(port);
+ pipelineOptions.setMetricsGraphiteHost("127.0.0.1");
+ MetricsGraphiteSink metricsGraphiteSink = new
MetricsGraphiteSink(pipelineOptions);
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ graphiteServer.setCountDownLatch(countDownLatch);
+ metricsGraphiteSink.writeMetrics(metricQueryResults);
+ countDownLatch.await();
+ String join = String.join("\n", graphiteServer.getMessages());
+ String regexpr =
+ "beam.counter.ns1.n1.attempted.value 20 [0-9]+\\n"
+ + "beam.gauge.ns1.n3.attempted.value 120 [0-9]+\\n"
+ + "beam.distribution.ns1.n2.attempted.min 3 [0-9]+\\n"
+ + "beam.distribution.ns1.n2.attempted.max 9 [0-9]+\\n"
+ + "beam.distribution.ns1.n2.attempted.count 4 [0-9]+\\n"
+ + "beam.distribution.ns1.n2.attempted.sum 25 [0-9]+\\n"
+ + "beam.distribution.ns1.n2.attempted.mean 6.25 [0-9]+";
+ assertTrue(join.matches(regexpr));
+ }
+}
diff --git
a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java
b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java
index feccacbaa98..09478fe86cd 100644
---
a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java
+++
b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java
@@ -20,162 +20,108 @@
import static org.junit.Assert.assertEquals;
-import java.util.Collections;
+import com.sun.net.httpserver.HttpServer;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.List;
-import org.apache.beam.sdk.metrics.DistributionResult;
-import org.apache.beam.sdk.metrics.GaugeResult;
-import org.apache.beam.sdk.metrics.MetricName;
+import java.util.concurrent.CountDownLatch;
import org.apache.beam.sdk.metrics.MetricQueryResults;
-import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.joda.time.Instant;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
/** Test class for MetricsHttpSink. */
public class MetricsHttpSinkTest {
+ private static int port;
+ private static List<String> messages = new ArrayList<>();
+ private static HttpServer httpServer;
+ private static CountDownLatch countDownLatch;
+
+ @BeforeClass
+ public static void beforeClass() throws IOException {
+ // get free local port
+ ServerSocket serverSocket = new ServerSocket(0);
+ port = serverSocket.getLocalPort();
+ serverSocket.close();
+ httpServer = HttpServer.create(new InetSocketAddress(port), 0);
+ httpServer
+ .createContext("/")
+ .setHandler(
+ httpExchange -> {
+ try (final BufferedReader in =
+ new BufferedReader(
+ new InputStreamReader(
+ httpExchange.getRequestBody(),
StandardCharsets.UTF_8))) {
+ String line;
+ while ((line = in.readLine()) != null) {
+ messages.add(line);
+ }
+ }
+ httpExchange.sendResponseHeaders(HttpURLConnection.HTTP_OK, 0L);
+ httpExchange.close();
+ countDownLatch.countDown();
+ });
+ httpServer.start();
+ }
+
+ @Before
+ public void before() {
+ messages.clear();
+ }
@Test
- public void testSerializerWithCommittedSupported() throws Exception {
+ public void testWriteMetricsWithCommittedSupported() throws Exception {
MetricQueryResults metricQueryResults = new CustomMetricQueryResults(true);
- MetricsHttpSink metricsHttpSink = new
MetricsHttpSink(PipelineOptionsFactory.create());
- String serializeMetrics =
metricsHttpSink.serializeMetrics(metricQueryResults);
- assertEquals(
- "Error in serialization",
+ PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
+ pipelineOptions.setMetricsHttpSinkUrl(String.format("http://localhost:%s",
port));
+ MetricsHttpSink metricsHttpSink = new MetricsHttpSink(pipelineOptions);
+ countDownLatch = new CountDownLatch(1);
+ metricsHttpSink.writeMetrics(metricQueryResults);
+ countDownLatch.await();
+ String expected =
"{\"counters\":[{\"attempted\":20,\"committed\":10,\"name\":{\"name\":\"n1\","
+
"\"namespace\":\"ns1\"},\"step\":\"s1\"}],\"distributions\":[{\"attempted\":"
+
"{\"count\":4,\"max\":9,\"mean\":6.25,\"min\":3,\"sum\":25},\"committed\":"
+
"{\"count\":2,\"max\":8,\"mean\":5.0,\"min\":5,\"sum\":10},\"name\":{\"name\":\"n2\","
+
"\"namespace\":\"ns1\"},\"step\":\"s2\"}],\"gauges\":[{\"attempted\":{\"timestamp\":"
- +
"\"1970-01-01T00:00:00.000Z\",\"value\":120},\"committed\":{\"timestamp\":"
- +
"\"1970-01-01T00:00:00.000Z\",\"value\":100},\"name\":{\"name\":\"n3\",\"namespace\":"
- + "\"ns1\"},\"step\":\"s3\"}]}",
- serializeMetrics);
+ +
"\"1970-01-05T00:04:22.800Z\",\"value\":120},\"committed\":{\"timestamp\":"
+ +
"\"1970-01-05T00:04:22.800Z\",\"value\":100},\"name\":{\"name\":\"n3\",\"namespace\":"
+ + "\"ns1\"},\"step\":\"s3\"}]}";
+ assertEquals("Wrong number of messages sent to HTTP server", 1,
messages.size());
+ assertEquals("Wrong messages sent to HTTP server", expected,
messages.get(0));
}
@Test
- public void testSerializerWithCommittedUnSupported() throws Exception {
+ public void testWriteMetricsWithCommittedUnSupported() throws Exception {
MetricQueryResults metricQueryResults = new
CustomMetricQueryResults(false);
- MetricsHttpSink metricsHttpSink = new
MetricsHttpSink(PipelineOptionsFactory.create());
- String serializeMetrics =
metricsHttpSink.serializeMetrics(metricQueryResults);
- assertEquals(
- "Error in serialization",
+ PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
+ pipelineOptions.setMetricsHttpSinkUrl(String.format("http://localhost:%s",
port));
+ MetricsHttpSink metricsHttpSink = new MetricsHttpSink(pipelineOptions);
+ countDownLatch = new CountDownLatch(1);
+ metricsHttpSink.writeMetrics(metricQueryResults);
+ countDownLatch.await();
+ String expected =
"{\"counters\":[{\"attempted\":20,\"name\":{\"name\":\"n1\","
+
"\"namespace\":\"ns1\"},\"step\":\"s1\"}],\"distributions\":[{\"attempted\":"
+
"{\"count\":4,\"max\":9,\"mean\":6.25,\"min\":3,\"sum\":25},\"name\":{\"name\":\"n2\""
+
",\"namespace\":\"ns1\"},\"step\":\"s2\"}],\"gauges\":[{\"attempted\":{\"timestamp\":"
- +
"\"1970-01-01T00:00:00.000Z\",\"value\":120},\"name\":{\"name\":\"n3\",\"namespace\":"
- + "\"ns1\"},\"step\":\"s3\"}]}",
- serializeMetrics);
+ +
"\"1970-01-05T00:04:22.800Z\",\"value\":120},\"name\":{\"name\":\"n3\",\"namespace\":"
+ + "\"ns1\"},\"step\":\"s3\"}]}";
+ assertEquals("Wrong number of messages sent to HTTP server", 1,
messages.size());
+ assertEquals("Wrong messages sent to HTTP server", expected,
messages.get(0));
}
- private static class CustomMetricQueryResults implements MetricQueryResults {
-
- private final boolean isCommittedSupported;
-
- private CustomMetricQueryResults(boolean isCommittedSupported) {
- this.isCommittedSupported = isCommittedSupported;
- }
-
- @Override
- public List<MetricResult<Long>> getCounters() {
- return Collections.singletonList(
- new MetricResult<Long>() {
-
- @Override
- public MetricName getName() {
- return MetricName.named("ns1", "n1");
- }
-
- @Override
- public String getStep() {
- return "s1";
- }
-
- @Override
- public Long getCommitted() {
- if (!isCommittedSupported) {
- // This is what getCommitted code is like for
AccumulatedMetricResult on runners
- // that do not support committed metrics
- throw new UnsupportedOperationException(
- "This runner does not currently support committed"
- + " metrics results. Please use 'attempted' instead.");
- }
- return 10L;
- }
-
- @Override
- public Long getAttempted() {
- return 20L;
- }
- });
- }
-
- @Override
- public List<MetricResult<DistributionResult>> getDistributions() {
- return Collections.singletonList(
- new MetricResult<DistributionResult>() {
-
- @Override
- public MetricName getName() {
- return MetricName.named("ns1", "n2");
- }
-
- @Override
- public String getStep() {
- return "s2";
- }
-
- @Override
- public DistributionResult getCommitted() {
- if (!isCommittedSupported) {
- // This is what getCommitted code is like for
AccumulatedMetricResult on runners
- // that do not support committed metrics
- throw new UnsupportedOperationException(
- "This runner does not currently support committed"
- + " metrics results. Please use 'attempted' instead.");
- }
- return DistributionResult.create(10L, 2L, 5L, 8L);
- }
-
- @Override
- public DistributionResult getAttempted() {
- return DistributionResult.create(25L, 4L, 3L, 9L);
- }
- });
- }
-
- @Override
- public List<MetricResult<GaugeResult>> getGauges() {
- return Collections.singletonList(
- new MetricResult<GaugeResult>() {
-
- @Override
- public MetricName getName() {
- return MetricName.named("ns1", "n3");
- }
-
- @Override
- public String getStep() {
- return "s3";
- }
-
- @Override
- public GaugeResult getCommitted() {
- if (!isCommittedSupported) {
- // This is what getCommitted code is like for
AccumulatedMetricResult on runners
- // that do not support committed metrics
- throw new UnsupportedOperationException(
- "This runner does not currently support committed"
- + " metrics results. Please use 'attempted' instead.");
- }
- return GaugeResult.create(100L, new Instant(0L));
- }
-
- @Override
- public GaugeResult getAttempted() {
- return GaugeResult.create(120L, new Instant(0L));
- }
- });
- }
+ @AfterClass
+ public static void after() {
+ httpServer.stop(0);
}
}
diff --git
a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/NetworkMockServer.java
b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/NetworkMockServer.java
new file mode 100644
index 00000000000..ed7f366788d
--- /dev/null
+++
b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/NetworkMockServer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.extensions.metrics;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.charset.Charset;
+import java.util.Collection;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/** Mock of a network server. */
+class NetworkMockServer {
+ private final int port;
+ private ServerSocket serverSocket;
+ private ServerThread thread;
+ private CountDownLatch countDownLatch;
+
+ private Collection<String> messages = new CopyOnWriteArrayList<String>();
+
+ public NetworkMockServer(final int port) {
+ this.port = port;
+ }
+
+ public void setCountDownLatch(CountDownLatch countDownLatch) {
+ this.countDownLatch = countDownLatch;
+ }
+
+ public NetworkMockServer start() throws IOException {
+ serverSocket = new ServerSocket(port);
+ thread = new ServerThread(serverSocket, messages);
+ thread.start();
+ return this;
+ }
+
+ public void stop() throws IOException {
+ thread.shutdown();
+ serverSocket.close();
+ }
+
+ public Collection<String> getMessages() {
+ return messages;
+ }
+
+ public void clear() {
+ messages.clear();
+ }
+
+ private class ServerThread extends Thread {
+ private final Collection<String> messages;
+
+ private final AtomicBoolean done = new AtomicBoolean(false);
+ private final ServerSocket server;
+
+ public ServerThread(final ServerSocket server, final Collection<String>
messages) {
+ this.messages = messages;
+ this.server = server;
+ setName("network-mock-server");
+ }
+
+ @Override
+ public void run() {
+ while (!done.get()) {
+ try {
+ final Socket s = server.accept();
+ synchronized (this) {
+ final InputStream is = s.getInputStream();
+ final BufferedReader reader =
+ new BufferedReader(new InputStreamReader(is,
Charset.forName("UTF-8")));
+ String line;
+
+ try {
+ while ((line = reader.readLine()) != null) {
+ messages.add(line);
+ }
+ } finally {
+ countDownLatch.countDown();
+ s.close();
+ }
+ }
+ } catch (final IOException e) {
+ if (!done.get()) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ public void shutdown() {
+ done.set(true);
+ }
+ }
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index 0b763327e26..33157eb32f2 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -445,4 +445,15 @@ public String create(PipelineOptions options) {
String getMetricsHttpSinkUrl();
void setMetricsHttpSinkUrl(String metricsSink);
+
+ @Description("The graphite metrics host")
+ String getMetricsGraphiteHost();
+
+ void setMetricsGraphiteHost(String host);
+
+ @Description("The graphite metrics port")
+ @Default.Integer(2003)
+ Integer getMetricsGraphitePort();
+
+ void setMetricsGraphitePort(Integer port);
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 154982)
Time Spent: 5.5h (was: 5h 20m)
> Implement a Graphite sink for the metrics pusher
> ------------------------------------------------
>
> Key: BEAM-4553
> URL: https://issues.apache.org/jira/browse/BEAM-4553
> Project: Beam
> Issue Type: Sub-task
> Components: runner-extensions-metrics
> Reporter: Etienne Chauchot
> Assignee: Etienne Chauchot
> Priority: Major
> Time Spent: 5.5h
> Remaining Estimate: 0h
>
> Today only a REST Http sink that sends raw json metrics using POST request to
> a http server is available. It is more a POC sink. It would be good to code
> the first real metrics sink. Some of the most popular is Graphite.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)