m-trieu commented on code in PR #33013:
URL: https://github.com/apache/beam/pull/33013#discussion_r1841048796


##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java:
##########
@@ -163,4 +178,90 @@ public static double decodeDoubleCounter(ByteString 
payload) {
       throw new RuntimeException(e);
     }
   }
+
+  /** Encodes to {@link 
MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
+  public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
+    try {
+      int numberOfBuckets = inputHistogram.getBucketType().getNumBuckets();
+
+      DataflowHistogramValue outputHistogram2 = new DataflowHistogramValue();
+
+      if (inputHistogram.getBucketType() instanceof 
HistogramData.LinearBuckets) {
+        HistogramData.LinearBuckets buckets =
+            (HistogramData.LinearBuckets) inputHistogram.getBucketType();
+        Linear linear = new Linear();
+        linear.setNumberOfBuckets(numberOfBuckets);
+        linear.setWidth(buckets.getWidth());
+        linear.setStart(buckets.getStart());
+        outputHistogram2.setBucketOptions(new 
BucketOptions().setLinear(linear));
+      } else if (inputHistogram.getBucketType() instanceof 
HistogramData.ExponentialBuckets) {
+        HistogramData.ExponentialBuckets buckets =
+            (HistogramData.ExponentialBuckets) inputHistogram.getBucketType();
+        Base2Exponent base2Exp = new Base2Exponent();
+        base2Exp.setNumberOfBuckets(numberOfBuckets);
+        base2Exp.setScale(buckets.getScale());
+        outputHistogram2.setBucketOptions(new 
BucketOptions().setExponential(base2Exp));
+      } else {
+        throw new RuntimeException("Unable to parse histogram, bucket is not 
recognized");
+      }
+
+      outputHistogram2.setCount(inputHistogram.getTotalCount());
+
+      List<Long> bucketCounts = new ArrayList<>();
+
+      Arrays.stream(inputHistogram.getBucketCount())
+          .forEach(
+              val -> {
+                bucketCounts.add(val);
+              });
+
+      outputHistogram2.setBucketCounts(bucketCounts);
+
+      ObjectMapper objectMapper = new ObjectMapper();
+      String jsonString = objectMapper.writeValueAsString(outputHistogram2);
+
+      return ByteString.copyFromUtf8(jsonString);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /** Decodes to {@link 
MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
+  public static HistogramData decodeInt64Histogram(ByteString payload) {
+    try {
+      ObjectMapper objectMapper = new ObjectMapper();
+      
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+      JsonNode jsonNode = objectMapper.readTree(payload.toStringUtf8()); // 
parse afterwards
+      DataflowHistogramValue newHist = new DataflowHistogramValue();
+      newHist.setCount(jsonNode.get("count").asLong());
+
+      List<Long> bucketCounts = new ArrayList<>();
+      Iterator<JsonNode> itr = jsonNode.get("bucketCounts").iterator();
+      while (itr.hasNext()) {
+        Long item = itr.next().asLong();
+        bucketCounts.add(item);
+      }
+      newHist.setBucketCounts(bucketCounts);
+
+      if (jsonNode.get("bucketOptions").has("linear")) {
+        Linear linear = new Linear();
+        JsonNode linearNode = jsonNode.get("bucketOptions").get("linear");
+        linear.setNumberOfBuckets(linearNode.get("numberOfBuckets").asInt());
+        linear.setWidth(linearNode.get("width").asDouble());
+        linear.setStart(linearNode.get("start").asDouble());
+        newHist.setBucketOptions(new BucketOptions().setLinear(linear));
+      } else if (jsonNode.get("bucketOptions").has("exponential")) {
+        Base2Exponent base2Exp = new Base2Exponent();
+        JsonNode expNode = jsonNode.get("bucketOptions").get("exponential");
+        base2Exp.setNumberOfBuckets(expNode.get("numberOfBuckets").asInt());
+        base2Exp.setScale(expNode.get("scale").asInt());
+        newHist.setBucketOptions(new BucketOptions().setExponential(base2Exp));
+      } else {
+        throw new RuntimeException("Unable to parse histogram, bucket is not 
recognized");

Review Comment:
   ditto above
   
   Can we throw a more specific exception here?
   
   It can extend RuntimeException but will help with debugging



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java:
##########
@@ -74,6 +74,42 @@ public HistogramData(BucketType bucketType) {
     this.sumOfSquaredDeviations = 0;
   }
 
+  /**
+   * Create a histogram from DataflowHistogramValue proto.
+   *
+   * @param histogramProto DataflowHistogramValue proto used to populate stats 
for the histogram.
+   */
+  public HistogramData(
+      com.google.api.services.dataflow.model.DataflowHistogramValue 
histogramProto) {

Review Comment:
   is this qualifier needed?



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java:
##########
@@ -163,4 +178,90 @@ public static double decodeDoubleCounter(ByteString 
payload) {
       throw new RuntimeException(e);
     }
   }
+
+  /** Encodes to {@link 
MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
+  public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
+    try {
+      int numberOfBuckets = inputHistogram.getBucketType().getNumBuckets();
+
+      DataflowHistogramValue outputHistogram2 = new DataflowHistogramValue();
+
+      if (inputHistogram.getBucketType() instanceof 
HistogramData.LinearBuckets) {
+        HistogramData.LinearBuckets buckets =
+            (HistogramData.LinearBuckets) inputHistogram.getBucketType();
+        Linear linear = new Linear();
+        linear.setNumberOfBuckets(numberOfBuckets);
+        linear.setWidth(buckets.getWidth());
+        linear.setStart(buckets.getStart());
+        outputHistogram2.setBucketOptions(new 
BucketOptions().setLinear(linear));
+      } else if (inputHistogram.getBucketType() instanceof 
HistogramData.ExponentialBuckets) {
+        HistogramData.ExponentialBuckets buckets =
+            (HistogramData.ExponentialBuckets) inputHistogram.getBucketType();
+        Base2Exponent base2Exp = new Base2Exponent();
+        base2Exp.setNumberOfBuckets(numberOfBuckets);
+        base2Exp.setScale(buckets.getScale());
+        outputHistogram2.setBucketOptions(new 
BucketOptions().setExponential(base2Exp));
+      } else {
+        throw new RuntimeException("Unable to parse histogram, bucket is not 
recognized");
+      }
+
+      outputHistogram2.setCount(inputHistogram.getTotalCount());
+
+      List<Long> bucketCounts = new ArrayList<>();
+
+      Arrays.stream(inputHistogram.getBucketCount())
+          .forEach(
+              val -> {
+                bucketCounts.add(val);
+              });
+
+      outputHistogram2.setBucketCounts(bucketCounts);
+
+      ObjectMapper objectMapper = new ObjectMapper();
+      String jsonString = objectMapper.writeValueAsString(outputHistogram2);
+
+      return ByteString.copyFromUtf8(jsonString);
+    } catch (Exception e) {
+      throw new RuntimeException(e);

Review Comment:
   ditto above 
   
   Can we throw a more specific exception here?
   
   It can extend RuntimeException but will help with debugging



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java:
##########
@@ -573,6 +613,40 @@ public double getRangeTo() {
     // Note: equals() and hashCode() are implemented by the AutoValue.
   }
 
+  // Used for testing unsupported Bucket formats
+  @AutoValue
+  public abstract static class UnsupportedBuckets implements BucketType {

Review Comment:
   nit: format comment like javadoc
   
   `/** Used for testing unsupported Bucket formats. */`



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java:
##########
@@ -573,6 +613,40 @@ public double getRangeTo() {
     // Note: equals() and hashCode() are implemented by the AutoValue.
   }
 
+  // Used for testing unsupported Bucket formats
+  @AutoValue

Review Comment:
   if just for testing add annotation `@VisibleForTesting` and since its public 
add `@Internal` beam annoation



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java:
##########
@@ -163,4 +178,90 @@ public static double decodeDoubleCounter(ByteString 
payload) {
       throw new RuntimeException(e);
     }
   }
+
+  /** Encodes to {@link 
MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
+  public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
+    try {
+      int numberOfBuckets = inputHistogram.getBucketType().getNumBuckets();
+
+      DataflowHistogramValue outputHistogram2 = new DataflowHistogramValue();
+
+      if (inputHistogram.getBucketType() instanceof 
HistogramData.LinearBuckets) {
+        HistogramData.LinearBuckets buckets =
+            (HistogramData.LinearBuckets) inputHistogram.getBucketType();
+        Linear linear = new Linear();
+        linear.setNumberOfBuckets(numberOfBuckets);
+        linear.setWidth(buckets.getWidth());
+        linear.setStart(buckets.getStart());
+        outputHistogram2.setBucketOptions(new 
BucketOptions().setLinear(linear));
+      } else if (inputHistogram.getBucketType() instanceof 
HistogramData.ExponentialBuckets) {
+        HistogramData.ExponentialBuckets buckets =
+            (HistogramData.ExponentialBuckets) inputHistogram.getBucketType();
+        Base2Exponent base2Exp = new Base2Exponent();
+        base2Exp.setNumberOfBuckets(numberOfBuckets);
+        base2Exp.setScale(buckets.getScale());
+        outputHistogram2.setBucketOptions(new 
BucketOptions().setExponential(base2Exp));
+      } else {
+        throw new RuntimeException("Unable to parse histogram, bucket is not 
recognized");
+      }
+
+      outputHistogram2.setCount(inputHistogram.getTotalCount());
+
+      List<Long> bucketCounts = new ArrayList<>();
+
+      Arrays.stream(inputHistogram.getBucketCount())
+          .forEach(
+              val -> {
+                bucketCounts.add(val);
+              });
+
+      outputHistogram2.setBucketCounts(bucketCounts);
+
+      ObjectMapper objectMapper = new ObjectMapper();
+      String jsonString = objectMapper.writeValueAsString(outputHistogram2);
+
+      return ByteString.copyFromUtf8(jsonString);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /** Decodes to {@link 
MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
+  public static HistogramData decodeInt64Histogram(ByteString payload) {
+    try {
+      ObjectMapper objectMapper = new ObjectMapper();
+      
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+      JsonNode jsonNode = objectMapper.readTree(payload.toStringUtf8()); // 
parse afterwards
+      DataflowHistogramValue newHist = new DataflowHistogramValue();
+      newHist.setCount(jsonNode.get("count").asLong());
+
+      List<Long> bucketCounts = new ArrayList<>();
+      Iterator<JsonNode> itr = jsonNode.get("bucketCounts").iterator();
+      while (itr.hasNext()) {
+        Long item = itr.next().asLong();
+        bucketCounts.add(item);
+      }
+      newHist.setBucketCounts(bucketCounts);
+
+      if (jsonNode.get("bucketOptions").has("linear")) {
+        Linear linear = new Linear();
+        JsonNode linearNode = jsonNode.get("bucketOptions").get("linear");
+        linear.setNumberOfBuckets(linearNode.get("numberOfBuckets").asInt());
+        linear.setWidth(linearNode.get("width").asDouble());
+        linear.setStart(linearNode.get("start").asDouble());
+        newHist.setBucketOptions(new BucketOptions().setLinear(linear));
+      } else if (jsonNode.get("bucketOptions").has("exponential")) {
+        Base2Exponent base2Exp = new Base2Exponent();
+        JsonNode expNode = jsonNode.get("bucketOptions").get("exponential");
+        base2Exp.setNumberOfBuckets(expNode.get("numberOfBuckets").asInt());
+        base2Exp.setScale(expNode.get("scale").asInt());
+        newHist.setBucketOptions(new BucketOptions().setExponential(base2Exp));
+      } else {
+        throw new RuntimeException("Unable to parse histogram, bucket is not 
recognized");

Review Comment:
   Also what does the caller do if an exception is thrown?
   
   Do we want to LOG the error and return Optional<HistogramData> if we don't 
care about the exception?



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java:
##########
@@ -163,4 +178,90 @@ public static double decodeDoubleCounter(ByteString 
payload) {
       throw new RuntimeException(e);
     }
   }
+
+  /** Encodes to {@link 
MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
+  public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
+    try {
+      int numberOfBuckets = inputHistogram.getBucketType().getNumBuckets();
+
+      DataflowHistogramValue outputHistogram2 = new DataflowHistogramValue();
+
+      if (inputHistogram.getBucketType() instanceof 
HistogramData.LinearBuckets) {
+        HistogramData.LinearBuckets buckets =
+            (HistogramData.LinearBuckets) inputHistogram.getBucketType();
+        Linear linear = new Linear();
+        linear.setNumberOfBuckets(numberOfBuckets);
+        linear.setWidth(buckets.getWidth());
+        linear.setStart(buckets.getStart());
+        outputHistogram2.setBucketOptions(new 
BucketOptions().setLinear(linear));
+      } else if (inputHistogram.getBucketType() instanceof 
HistogramData.ExponentialBuckets) {
+        HistogramData.ExponentialBuckets buckets =
+            (HistogramData.ExponentialBuckets) inputHistogram.getBucketType();
+        Base2Exponent base2Exp = new Base2Exponent();
+        base2Exp.setNumberOfBuckets(numberOfBuckets);
+        base2Exp.setScale(buckets.getScale());
+        outputHistogram2.setBucketOptions(new 
BucketOptions().setExponential(base2Exp));
+      } else {
+        throw new RuntimeException("Unable to parse histogram, bucket is not 
recognized");

Review Comment:
   Also what does the caller do if an exception is thrown?
   
   Do we want to LOG the error and return Optional<ByteString> if we don't care 
about the exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to