Naireen commented on code in PR #33013:
URL: https://github.com/apache/beam/pull/33013#discussion_r1842606109


##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java:
##########
@@ -163,4 +178,90 @@ public static double decodeDoubleCounter(ByteString 
payload) {
       throw new RuntimeException(e);
     }
   }
+
+  /** Encodes to {@link 
MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
+  public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
+    try {
+      int numberOfBuckets = inputHistogram.getBucketType().getNumBuckets();
+
+      DataflowHistogramValue outputHistogram2 = new DataflowHistogramValue();
+
+      if (inputHistogram.getBucketType() instanceof 
HistogramData.LinearBuckets) {
+        HistogramData.LinearBuckets buckets =
+            (HistogramData.LinearBuckets) inputHistogram.getBucketType();
+        Linear linear = new Linear();
+        linear.setNumberOfBuckets(numberOfBuckets);
+        linear.setWidth(buckets.getWidth());
+        linear.setStart(buckets.getStart());
+        outputHistogram2.setBucketOptions(new 
BucketOptions().setLinear(linear));
+      } else if (inputHistogram.getBucketType() instanceof 
HistogramData.ExponentialBuckets) {
+        HistogramData.ExponentialBuckets buckets =
+            (HistogramData.ExponentialBuckets) inputHistogram.getBucketType();
+        Base2Exponent base2Exp = new Base2Exponent();
+        base2Exp.setNumberOfBuckets(numberOfBuckets);
+        base2Exp.setScale(buckets.getScale());
+        outputHistogram2.setBucketOptions(new 
BucketOptions().setExponential(base2Exp));
+      } else {
+        throw new RuntimeException("Unable to parse histogram, bucket is not 
recognized");
+      }
+
+      outputHistogram2.setCount(inputHistogram.getTotalCount());
+
+      List<Long> bucketCounts = new ArrayList<>();
+
+      Arrays.stream(inputHistogram.getBucketCount())
+          .forEach(
+              val -> {
+                bucketCounts.add(val);
+              });
+
+      outputHistogram2.setBucketCounts(bucketCounts);
+
+      ObjectMapper objectMapper = new ObjectMapper();
+      String jsonString = objectMapper.writeValueAsString(outputHistogram2);
+
+      return ByteString.copyFromUtf8(jsonString);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /** Decodes to {@link 
MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
+  public static HistogramData decodeInt64Histogram(ByteString payload) {
+    try {
+      ObjectMapper objectMapper = new ObjectMapper();
+      
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+      JsonNode jsonNode = objectMapper.readTree(payload.toStringUtf8()); // 
parse afterwards
+      DataflowHistogramValue newHist = new DataflowHistogramValue();
+      newHist.setCount(jsonNode.get("count").asLong());
+
+      List<Long> bucketCounts = new ArrayList<>();
+      Iterator<JsonNode> itr = jsonNode.get("bucketCounts").iterator();
+      while (itr.hasNext()) {
+        Long item = itr.next().asLong();
+        bucketCounts.add(item);
+      }
+      newHist.setBucketCounts(bucketCounts);
+
+      if (jsonNode.get("bucketOptions").has("linear")) {
+        Linear linear = new Linear();
+        JsonNode linearNode = jsonNode.get("bucketOptions").get("linear");
+        linear.setNumberOfBuckets(linearNode.get("numberOfBuckets").asInt());
+        linear.setWidth(linearNode.get("width").asDouble());
+        linear.setStart(linearNode.get("start").asDouble());
+        newHist.setBucketOptions(new BucketOptions().setLinear(linear));
+      } else if (jsonNode.get("bucketOptions").has("exponential")) {
+        Base2Exponent base2Exp = new Base2Exponent();
+        JsonNode expNode = jsonNode.get("bucketOptions").get("exponential");
+        base2Exp.setNumberOfBuckets(expNode.get("numberOfBuckets").asInt());
+        base2Exp.setScale(expNode.get("scale").asInt());
+        newHist.setBucketOptions(new BucketOptions().setExponential(base2Exp));
+      } else {
+        throw new RuntimeException("Unable to parse histogram, bucket is not 
recognized");

Review Comment:
   I followed the existing style, where it throws an error. Essentially nothing 
happens if an error is thrown (the call is here, where we dont actaully return 
anything: 
https://github.com/apache/beam/blob/f38edb7c98a663afb00489561cd717e34ce334e2/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java#L518)
 This is separate than the data processing path, so if there is an error in the 
metrics processing, its not retried.  
   We can update this to return an optional for all the decoders instead if we 
want, but that should likely be a separate change and should ve done for all of 
them at once



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to