Naireen commented on code in PR #33013:
URL: https://github.com/apache/beam/pull/33013#discussion_r1842788011
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java:
##########
@@ -163,4 +178,90 @@ public static double decodeDoubleCounter(ByteString
payload) {
throw new RuntimeException(e);
}
}
+
+ /** Encodes to {@link
MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
+ public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
+ try {
+ int numberOfBuckets = inputHistogram.getBucketType().getNumBuckets();
+
+ DataflowHistogramValue outputHistogram2 = new DataflowHistogramValue();
+
+ if (inputHistogram.getBucketType() instanceof
HistogramData.LinearBuckets) {
+ HistogramData.LinearBuckets buckets =
+ (HistogramData.LinearBuckets) inputHistogram.getBucketType();
+ Linear linear = new Linear();
+ linear.setNumberOfBuckets(numberOfBuckets);
+ linear.setWidth(buckets.getWidth());
+ linear.setStart(buckets.getStart());
+ outputHistogram2.setBucketOptions(new
BucketOptions().setLinear(linear));
+ } else if (inputHistogram.getBucketType() instanceof
HistogramData.ExponentialBuckets) {
+ HistogramData.ExponentialBuckets buckets =
+ (HistogramData.ExponentialBuckets) inputHistogram.getBucketType();
+ Base2Exponent base2Exp = new Base2Exponent();
+ base2Exp.setNumberOfBuckets(numberOfBuckets);
+ base2Exp.setScale(buckets.getScale());
+ outputHistogram2.setBucketOptions(new
BucketOptions().setExponential(base2Exp));
+ } else {
+ throw new RuntimeException("Unable to parse histogram, bucket is not
recognized");
+ }
+
+ outputHistogram2.setCount(inputHistogram.getTotalCount());
+
+ List<Long> bucketCounts = new ArrayList<>();
+
+ Arrays.stream(inputHistogram.getBucketCount())
+ .forEach(
+ val -> {
+ bucketCounts.add(val);
+ });
+
+ outputHistogram2.setBucketCounts(bucketCounts);
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ String jsonString = objectMapper.writeValueAsString(outputHistogram2);
+
+ return ByteString.copyFromUtf8(jsonString);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
Review Comment:
To clarify, when encoding and decoding, if there are specific errors like
the format isn't as expected (or if the bucket isn't supported), I raise the
HistogramParsingException error, and then have the generic
catch (IOException e) {
throw new RuntimeException(e);
at the end that is a part of all encoding and decodings.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]