[ 
https://issues.apache.org/jira/browse/BEAM-4138?focusedWorklogId=97804&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97804
 ]

ASF GitHub Bot logged work on BEAM-4138:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/May/18 07:45
            Start Date: 03/May/18 07:45
    Worklog Time Spent: 10m 
      Work Description: jbonofre closed pull request #5203: [BEAM-4138] Support 
runners that do not support committed metrics in MetricsHttpSink
URL: https://github.com/apache/beam/pull/5203
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSink.java
 
b/runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSink.java
index 0c8718e3981..e9cefe5d7ec 100644
--- 
a/runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSink.java
+++ 
b/runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSink.java
@@ -18,9 +18,12 @@
 
 package org.apache.beam.runners.extensions.metrics;
 
+import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.MapperFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
+import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
 import com.fasterxml.jackson.datatype.joda.JodaModule;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.DataOutputStream;
@@ -71,6 +74,26 @@ String serializeMetrics(MetricQueryResults 
metricQueryResults) throws Exception
     objectMapper.registerModule(new JodaModule());
     objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
     objectMapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
-    return objectMapper.writeValueAsString(metricQueryResults);
+    // need to register a filter as soon as @JsonFilter annotation is 
specified.
+    // So specify an pass through filter
+    SimpleBeanPropertyFilter filter = SimpleBeanPropertyFilter.serializeAll();
+    SimpleFilterProvider filterProvider = new SimpleFilterProvider();
+    filterProvider.addFilter("committedMetrics", filter);
+    objectMapper.setFilterProvider(filterProvider);
+    String result;
+    try {
+      result = objectMapper.writeValueAsString(metricQueryResults);
+    } catch (JsonMappingException exception) {
+      if ((exception.getCause() instanceof UnsupportedOperationException)
+          && exception.getCause().getMessage().contains("committed metrics")) {
+        filterProvider.removeFilter("committedMetrics");
+        filter = SimpleBeanPropertyFilter.serializeAllExcept("committed");
+        filterProvider.addFilter("committedMetrics", filter);
+        result = objectMapper.writeValueAsString(metricQueryResults);
+      } else {
+        throw exception;
+      }
+    }
+    return result;
   }
 }
diff --git 
a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java
 
b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java
index 6a99cd5a3de..af564abfd6b 100644
--- 
a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java
+++ 
b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java
@@ -35,191 +35,8 @@
 public class MetricsHttpSinkTest {
 
   @Test
-  public void testSerializer() throws Exception {
-    MetricQueryResults metricQueryResults =
-        new MetricQueryResults() {
-
-          @Override
-          public List<MetricResult<Long>> getCounters() {
-            return Collections.singletonList(
-                (MetricResult<Long>)
-                    new MetricResult<Long>() {
-
-                      @Override
-                      public MetricName getName() {
-                        return new MetricName() {
-
-                          @Override
-                          public String getNamespace() {
-                            return "ns1";
-                          }
-
-                          @Override
-                          public String getName() {
-                            return "n1";
-                          }
-                        };
-                      }
-
-                      @Override
-                      public String getStep() {
-                        return "s1";
-                      }
-
-                      @Override
-                      public Long getCommitted() {
-                        return 10L;
-                      }
-
-                      @Override
-                      public Long getAttempted() {
-                        return 20L;
-                      }
-                    });
-          }
-
-          @Override
-          public List<MetricResult<DistributionResult>> getDistributions() {
-            return Collections.singletonList(
-                (MetricResult<DistributionResult>)
-                    new MetricResult<DistributionResult>() {
-
-                      @Override
-                      public MetricName getName() {
-                        return new MetricName() {
-
-                          @Override
-                          public String getNamespace() {
-                            return "ns1";
-                          }
-
-                          @Override
-                          public String getName() {
-                            return "n2";
-                          }
-                        };
-                      }
-
-                      @Override
-                      public String getStep() {
-                        return "s2";
-                      }
-
-                      @Override
-                      public DistributionResult getCommitted() {
-                        return new DistributionResult() {
-
-                          @Override
-                          public long getSum() {
-                            return 10L;
-                          }
-
-                          @Override
-                          public long getCount() {
-                            return 2L;
-                          }
-
-                          @Override
-                          public long getMin() {
-                            return 5L;
-                          }
-
-                          @Override
-                          public long getMax() {
-                            return 8L;
-                          }
-                        };
-                      }
-
-                      @Override
-                      public DistributionResult getAttempted() {
-                        return new DistributionResult() {
-
-                          @Override
-                          public long getSum() {
-                            return 25L;
-                          }
-
-                          @Override
-                          public long getCount() {
-                            return 4L;
-                          }
-
-                          @Override
-                          public long getMin() {
-                            return 3L;
-                          }
-
-                          @Override
-                          public long getMax() {
-                            return 9L;
-                          }
-                        };
-                      }
-                    });
-          }
-
-          @Override
-          public List<MetricResult<GaugeResult>> getGauges() {
-            return Collections.singletonList(
-                (MetricResult<GaugeResult>)
-                    new MetricResult<GaugeResult>() {
-
-                      @Override
-                      public MetricName getName() {
-                        return new MetricName() {
-
-                          @Override
-                          public String getNamespace() {
-                            return "ns1";
-                          }
-
-                          @Override
-                          public String getName() {
-                            return "n3";
-                          }
-                        };
-                      }
-
-                      @Override
-                      public String getStep() {
-                        return "s3";
-                      }
-
-                      @Override
-                      public GaugeResult getCommitted() {
-                        return new GaugeResult() {
-
-                          @Override
-                          public long getValue() {
-                            return 100L;
-                          }
-
-                          @Override
-                          public Instant getTimestamp() {
-                            return new Instant(0L);
-                          }
-                        };
-                      }
-
-                      @Override
-                      public GaugeResult getAttempted() {
-                        return new GaugeResult() {
-
-                          @Override
-                          public long getValue() {
-                            return 120L;
-                          }
-
-                          @Override
-                          public Instant getTimestamp() {
-                            return new Instant(0L);
-                          }
-                        };
-                      }
-                    });
-          }
-        };
+  public void testSerializerWithCommittedSupported() throws Exception {
+    MetricQueryResults metricQueryResults = new CustomMetricQueryResults(true);
     MetricsHttpSink metricsHttpSink = new 
MetricsHttpSink(PipelineOptionsFactory.create());
     String serializeMetrics = 
metricsHttpSink.serializeMetrics(metricQueryResults);
     assertEquals(
@@ -234,4 +51,231 @@ public Instant getTimestamp() {
             + "\"ns1\"},\"step\":\"s3\"}]}",
         serializeMetrics);
   }
+
+  @Test
+  public void testSerializerWithCommittedUnSupported() throws Exception {
+    MetricQueryResults metricQueryResults = new 
CustomMetricQueryResults(false);
+    MetricsHttpSink metricsHttpSink = new 
MetricsHttpSink(PipelineOptionsFactory.create());
+    String serializeMetrics = 
metricsHttpSink.serializeMetrics(metricQueryResults);
+    assertEquals(
+        "Errror in serialization",
+        "{\"counters\":[{\"attempted\":20,\"name\":{\"name\":\"n1\","
+            + 
"\"namespace\":\"ns1\"},\"step\":\"s1\"}],\"distributions\":[{\"attempted\":"
+            + 
"{\"count\":4,\"max\":9,\"mean\":6.25,\"min\":3,\"sum\":25},\"name\":{\"name\":\"n2\""
+            + 
",\"namespace\":\"ns1\"},\"step\":\"s2\"}],\"gauges\":[{\"attempted\":{\"timestamp\":"
+            + 
"\"1970-01-01T00:00:00.000Z\",\"value\":120},\"name\":{\"name\":\"n3\",\"namespace\":"
+            + "\"ns1\"},\"step\":\"s3\"}]}",
+        serializeMetrics);
+  }
+
+  private static class CustomMetricQueryResults implements MetricQueryResults {
+
+    private boolean isCommittedSupported;
+
+    private CustomMetricQueryResults(boolean isCommittedSupported) {
+      this.isCommittedSupported = isCommittedSupported;
+    }
+
+    @Override
+    public List<MetricResult<Long>> getCounters() {
+      return Collections.singletonList(
+          (MetricResult<Long>)
+              new MetricResult<Long>() {
+
+                @Override
+                public MetricName getName() {
+                  return new MetricName() {
+
+                    @Override
+                    public String getNamespace() {
+                      return "ns1";
+                    }
+
+                    @Override
+                    public String getName() {
+                      return "n1";
+                    }
+                  };
+                }
+
+                @Override
+                public String getStep() {
+                  return "s1";
+                }
+
+                @Override
+                public Long getCommitted() {
+                  if (!isCommittedSupported) {
+                    // This is what getCommitted code is like for 
AccumulatedMetricResult on runners
+                    // that do not support committed metrics
+                    throw new UnsupportedOperationException(
+                        "This runner does not currently support committed"
+                            + " metrics results. Please use 'attempted' 
instead.");
+                  }
+                  return 10L;
+                }
+
+                @Override
+                public Long getAttempted() {
+                  return 20L;
+                }
+              });
+    }
+
+    @Override
+    public List<MetricResult<DistributionResult>> getDistributions() {
+      return Collections.singletonList(
+          (MetricResult<DistributionResult>)
+              new MetricResult<DistributionResult>() {
+
+                @Override
+                public MetricName getName() {
+                  return new MetricName() {
+
+                    @Override
+                    public String getNamespace() {
+                      return "ns1";
+                    }
+
+                    @Override
+                    public String getName() {
+                      return "n2";
+                    }
+                  };
+                }
+
+                @Override
+                public String getStep() {
+                  return "s2";
+                }
+
+                @Override
+                public DistributionResult getCommitted() {
+                  if (!isCommittedSupported) {
+                    // This is what getCommitted code is like for 
AccumulatedMetricResult on runners
+                    // that do not support committed metrics
+                    throw new UnsupportedOperationException(
+                        "This runner does not currently support committed"
+                            + " metrics results. Please use 'attempted' 
instead.");
+                  }
+                  return new DistributionResult() {
+
+                    @Override
+                    public long getSum() {
+                      return 10L;
+                    }
+
+                    @Override
+                    public long getCount() {
+                      return 2L;
+                    }
+
+                    @Override
+                    public long getMin() {
+                      return 5L;
+                    }
+
+                    @Override
+                    public long getMax() {
+                      return 8L;
+                    }
+                  };
+                }
+
+                @Override
+                public DistributionResult getAttempted() {
+                  return new DistributionResult() {
+
+                    @Override
+                    public long getSum() {
+                      return 25L;
+                    }
+
+                    @Override
+                    public long getCount() {
+                      return 4L;
+                    }
+
+                    @Override
+                    public long getMin() {
+                      return 3L;
+                    }
+
+                    @Override
+                    public long getMax() {
+                      return 9L;
+                    }
+                  };
+                }
+              });
+    }
+
+    @Override
+    public List<MetricResult<GaugeResult>> getGauges() {
+      return Collections.singletonList(
+          (MetricResult<GaugeResult>)
+              new MetricResult<GaugeResult>() {
+
+                @Override
+                public MetricName getName() {
+                  return new MetricName() {
+
+                    @Override
+                    public String getNamespace() {
+                      return "ns1";
+                    }
+
+                    @Override
+                    public String getName() {
+                      return "n3";
+                    }
+                  };
+                }
+
+                @Override
+                public String getStep() {
+                  return "s3";
+                }
+
+                @Override
+                public GaugeResult getCommitted() {
+                  if (!isCommittedSupported) {
+                    // This is what getCommitted code is like for 
AccumulatedMetricResult on runners
+                    // that do not support committed metrics
+                    throw new UnsupportedOperationException(
+                        "This runner does not currently support committed"
+                            + " metrics results. Please use 'attempted' 
instead.");
+                  }
+                  return new GaugeResult() {
+
+                    @Override
+                    public long getValue() {
+                      return 100L;
+                    }
+
+                    @Override
+                    public Instant getTimestamp() {
+                      return new Instant(0L);
+                    }
+                  };
+                }
+
+                @Override
+                public GaugeResult getAttempted() {
+                  return new GaugeResult() {
+
+                    @Override
+                    public long getValue() {
+                      return 120L;
+                    }
+
+                    @Override
+                    public Instant getTimestamp() {
+                      return new Instant(0L);
+                    }
+                  };
+                }
+              });
+    }
+  }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java
index 3619a6a3078..6412154f10b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.metrics;
 
+import com.fasterxml.jackson.annotation.JsonFilter;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 
@@ -24,6 +25,7 @@
  * The results of a single current metric.
  */
 @Experimental(Kind.METRICS)
+@JsonFilter("committedMetrics")
 public interface MetricResult<T> {
   /** Return the name of the metric. */
   MetricName getName();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 97804)
    Time Spent: 20m  (was: 10m)

> support runners that do not support committed metrics in MetricsHttpSink
> ------------------------------------------------------------------------
>
>                 Key: BEAM-4138
>                 URL: https://issues.apache.org/jira/browse/BEAM-4138
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-ideas
>            Reporter: Etienne Chauchot
>            Assignee: Etienne Chauchot
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> {{MetricsHttpSink}} is currently more a POC implementation of the new 
> {{MetricsSink}} interface. But if users want to use it on runners that do not 
> support committed metrics, then it needs to avoid calling related 
> {{getCommitted()}} methods to avoid the {{UnsupportedException}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to