Naireen commented on code in PR #33013:
URL: https://github.com/apache/beam/pull/33013#discussion_r1842606109
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java:
##########
@@ -163,4 +178,90 @@ public static double decodeDoubleCounter(ByteString
payload) {
throw new RuntimeException(e);
}
}
+
+ /** Encodes to {@link
MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
+ public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
+ try {
+ int numberOfBuckets = inputHistogram.getBucketType().getNumBuckets();
+
+ DataflowHistogramValue outputHistogram2 = new DataflowHistogramValue();
+
+ if (inputHistogram.getBucketType() instanceof
HistogramData.LinearBuckets) {
+ HistogramData.LinearBuckets buckets =
+ (HistogramData.LinearBuckets) inputHistogram.getBucketType();
+ Linear linear = new Linear();
+ linear.setNumberOfBuckets(numberOfBuckets);
+ linear.setWidth(buckets.getWidth());
+ linear.setStart(buckets.getStart());
+ outputHistogram2.setBucketOptions(new
BucketOptions().setLinear(linear));
+ } else if (inputHistogram.getBucketType() instanceof
HistogramData.ExponentialBuckets) {
+ HistogramData.ExponentialBuckets buckets =
+ (HistogramData.ExponentialBuckets) inputHistogram.getBucketType();
+ Base2Exponent base2Exp = new Base2Exponent();
+ base2Exp.setNumberOfBuckets(numberOfBuckets);
+ base2Exp.setScale(buckets.getScale());
+ outputHistogram2.setBucketOptions(new
BucketOptions().setExponential(base2Exp));
+ } else {
+ throw new RuntimeException("Unable to parse histogram, bucket is not
recognized");
+ }
+
+ outputHistogram2.setCount(inputHistogram.getTotalCount());
+
+ List<Long> bucketCounts = new ArrayList<>();
+
+ Arrays.stream(inputHistogram.getBucketCount())
+ .forEach(
+ val -> {
+ bucketCounts.add(val);
+ });
+
+ outputHistogram2.setBucketCounts(bucketCounts);
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ String jsonString = objectMapper.writeValueAsString(outputHistogram2);
+
+ return ByteString.copyFromUtf8(jsonString);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** Decodes to {@link
MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
+ public static HistogramData decodeInt64Histogram(ByteString payload) {
+ try {
+ ObjectMapper objectMapper = new ObjectMapper();
+
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
+ JsonNode jsonNode = objectMapper.readTree(payload.toStringUtf8()); //
parse afterwards
+ DataflowHistogramValue newHist = new DataflowHistogramValue();
+ newHist.setCount(jsonNode.get("count").asLong());
+
+ List<Long> bucketCounts = new ArrayList<>();
+ Iterator<JsonNode> itr = jsonNode.get("bucketCounts").iterator();
+ while (itr.hasNext()) {
+ Long item = itr.next().asLong();
+ bucketCounts.add(item);
+ }
+ newHist.setBucketCounts(bucketCounts);
+
+ if (jsonNode.get("bucketOptions").has("linear")) {
+ Linear linear = new Linear();
+ JsonNode linearNode = jsonNode.get("bucketOptions").get("linear");
+ linear.setNumberOfBuckets(linearNode.get("numberOfBuckets").asInt());
+ linear.setWidth(linearNode.get("width").asDouble());
+ linear.setStart(linearNode.get("start").asDouble());
+ newHist.setBucketOptions(new BucketOptions().setLinear(linear));
+ } else if (jsonNode.get("bucketOptions").has("exponential")) {
+ Base2Exponent base2Exp = new Base2Exponent();
+ JsonNode expNode = jsonNode.get("bucketOptions").get("exponential");
+ base2Exp.setNumberOfBuckets(expNode.get("numberOfBuckets").asInt());
+ base2Exp.setScale(expNode.get("scale").asInt());
+ newHist.setBucketOptions(new BucketOptions().setExponential(base2Exp));
+ } else {
+ throw new RuntimeException("Unable to parse histogram, bucket is not
recognized");
Review Comment:
I followed the existing style, where it throws an error. Essentially nothing
happens if an error is thrown (the call is here, where we dont actaully return
anything:
https://github.com/apache/beam/blob/f38edb7c98a663afb00489561cd717e34ce334e2/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java#L518)
This is separate than the data processing path, so if there is an error in the
metrics processing, its not retried.
We can update this to return an optional for all the decoders instead if we
want, but that should likely be a separate change and should ve done for all of
them at once
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]