mosche commented on code in PR #22260:
URL: https://github.com/apache/beam/pull/22260#discussion_r957109098


##########
sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java:
##########
@@ -83,73 +138,48 @@ private static void publishWithCheck(
     }
   }
 
-  private static void publishNexmark(
-      final Collection<Map<String, Object>> results,
-      final InfluxDBSettings settings,
-      final Map<String, String> tags)
-      throws Exception {
-
-    final HttpClientBuilder builder = provideHttpBuilder(settings);
-    final HttpPost postRequest = providePOSTRequest(settings);
-    final StringBuilder metricBuilder = new StringBuilder();
-
+  @VisibleForTesting
+  static String nexmarkDataPoints(
+      final Collection<Map<String, Object>> results, final Map<String, String> 
tags) {
+    final StringBuilder builder = new StringBuilder();
+    final Set<String> fields = ImmutableSet.of("runtimeMs", "numResults");
     results.forEach(
         map -> {
-          
metricBuilder.append(map.get("measurement")).append(",").append(getKV(map, 
"runner"));
-          if (tags != null && !tags.isEmpty()) {
-            tags.entrySet().stream()
-                .forEach(
-                    entry -> {
-                      metricBuilder
-                          .append(",")
-                          .append(entry.getKey())
-                          .append("=")
-                          .append(entry.getValue());
-                    });
-          }
-          metricBuilder
-              .append(" ")
-              .append(getKV(map, "runtimeMs"))
-              .append(",")
-              .append(getKV(map, "numResults"))
-              .append(" ")
-              .append(map.get("timestamp"))
+          String measurement = 
checkArgumentNotNull(map.get("measurement")).toString();
+          addMeasurement(builder, measurement, tags, filterKeys(map, fields), 
map.get("timestamp"))
               .append('\n');
         });
+    return builder.toString();
+  }
 
-    postRequest.setEntity(
-        new GzipCompressingEntity(new 
ByteArrayEntity(metricBuilder.toString().getBytes(UTF_8))));
-
-    executeWithVerification(postRequest, builder);
+  @SuppressWarnings("nullness")
+  private static <K, V> Map<K, V> filterKeys(final Map<K, V> map, final Set<K> 
keys) {
+    return Maps.filterKeys(map, keys::contains);
   }
 
-  private static String getKV(final Map<String, Object> map, final String key) 
{
-    return key + "=" + map.get(key);
+  // fix types once nexmarkMeasurements is removed
+  private static StringBuilder addMeasurement(
+      StringBuilder builder,
+      String measurement,
+      Map<String, ?> tags,
+      Map<String, ?> fields,
+      @Nullable Object timestamp) {
+    checkState(!fields.isEmpty(), "fields cannot be empty");
+    builder.append(measurement);
+    tags.forEach((k, v) -> 
builder.append(',').append(k).append('=').append(v));
+    builder.append(' ');
+    fields.forEach((k, v) -> 
builder.append(k).append('=').append(fieldValue(v)).append(','));
+    builder.setLength(builder.length() - 1); // skip last comma
+    if (timestamp != null) {
+      builder.append(' ').append(timestamp);

Review Comment:
   Thanks for having a close look, @lukecwik!
   Turns out current precision is in seconds. The only place were the actual 
timestamp of the test is used is Nexmark:
   ```
   if (options.getExportSummaryToInfluxDB()) {
           final long timestamp = start.getMillis() / 1000; // seconds
           savePerfsToInfluxDB(options, actual, timestamp);
         }
   ```
    The publisher configures `precision` as follows to override the default 
precision of InfluxDB. Again, 
   ``` 
   return new HttpPost(settings.host + "/write?db=" + settings.database + "&" + 
retentionPolicy + "&precision=s");
   ```



##########
sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java:
##########
@@ -46,35 +56,80 @@
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.util.EntityUtils;
+import org.checkerframework.dataflow.qual.Pure;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
 public final class InfluxDBPublisher {
   private static final Logger LOG = 
LoggerFactory.getLogger(InfluxDBPublisher.class);
 
   private InfluxDBPublisher() {}
 
+  /** InfluxDB data point. */
+  @AutoValue
+  public abstract static class DataPoint {
+    DataPoint() {}
+
+    public abstract @Pure String measurement();
+
+    public abstract @Pure Map<String, String> tags();
+
+    public abstract @Pure Map<String, Number> fields();
+
+    @Nullable
+    public abstract @Pure Long timestamp();

Review Comment:
   I renamed it to timestampSecs and added a comment to explain. Also, see below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to