Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/3128#discussion_r97315979
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
---
@@ -64,122 +80,148 @@ private MetricDumpSerialization() {
* @param gauges gauges to serialize
* @param histograms histograms to serialize
* @return byte array containing the serialized metrics
- * @throws IOException
*/
- public byte[] serialize(
+ public MetricSerializationResult serialize(
Map<Counter, Tuple2<QueryScopeInfo, String>> counters,
Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges,
Map<Histogram, Tuple2<QueryScopeInfo, String>>
histograms,
- Map<Meter, Tuple2<QueryScopeInfo, String>> meters)
throws IOException {
-
- baos.reset();
- dos.writeInt(counters.size());
- dos.writeInt(gauges.size());
- dos.writeInt(histograms.size());
- dos.writeInt(meters.size());
+ Map<Meter, Tuple2<QueryScopeInfo, String>> meters) {
+
+ buffer.clear();
+ int numCounters = 0;
for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>>
entry : counters.entrySet()) {
- serializeMetricInfo(dos, entry.getValue().f0);
- serializeString(dos, entry.getValue().f1);
- serializeCounter(dos, entry.getKey());
+ try {
+ serializeCounter(buffer,
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+ numCounters++;
+ } catch (Exception e) {
+ LOG.warn("Failed to serialize
counter.", e);
+ }
}
+ int numGauges = 0;
for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo,
String>> entry : gauges.entrySet()) {
- serializeMetricInfo(dos, entry.getValue().f0);
- serializeString(dos, entry.getValue().f1);
- serializeGauge(dos, entry.getKey());
+ try {
+ serializeGauge(buffer,
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+ numGauges++;
+ } catch (Exception e) {
+ LOG.warn("Failed to serialize gauge.",
e);
+ }
}
+ int numHistograms = 0;
for (Map.Entry<Histogram, Tuple2<QueryScopeInfo,
String>> entry : histograms.entrySet()) {
- serializeMetricInfo(dos, entry.getValue().f0);
- serializeString(dos, entry.getValue().f1);
- serializeHistogram(dos, entry.getKey());
+ try {
+ serializeHistogram(buffer,
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+ numHistograms++;
+ } catch (Exception e) {
+ LOG.warn("Failed to serialize
histogram.", e);
+ }
}
+ int numMeters = 0;
for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>>
entry : meters.entrySet()) {
- serializeMetricInfo(dos, entry.getValue().f0);
- serializeString(dos, entry.getValue().f1);
- serializeMeter(dos, entry.getKey());
+ try {
+ serializeMeter(buffer,
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+ numMeters++;
+ } catch (Exception e) {
+ LOG.warn("Failed to serialize meter.",
e);
+ }
}
- return baos.toByteArray();
+ return new
MetricSerializationResult(buffer.getCopyOfBuffer(), numCounters, numGauges,
numMeters, numHistograms);
}
public void close() {
- try {
- dos.close();
- } catch (Exception e) {
- LOG.debug("Failed to close OutputStream.", e);
- }
- try {
- baos.close();
- } catch (Exception e) {
- LOG.debug("Failed to close OutputStream.", e);
- }
+ buffer = null;
}
}
- private static void serializeMetricInfo(DataOutputStream dos,
QueryScopeInfo info) throws IOException {
- serializeString(dos, info.scope);
- dos.writeByte(info.getCategory());
+ private static void serializeMetricInfo(DataOutput out, QueryScopeInfo
info) throws IOException {
+ out.writeUTF(info.scope);
+ out.writeByte(info.getCategory());
switch (info.getCategory()) {
case INFO_CATEGORY_JM:
break;
case INFO_CATEGORY_TM:
String tmID =
((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID;
- serializeString(dos, tmID);
+ out.writeUTF(tmID);
break;
case INFO_CATEGORY_JOB:
QueryScopeInfo.JobQueryScopeInfo jobInfo =
(QueryScopeInfo.JobQueryScopeInfo) info;
- serializeString(dos, jobInfo.jobID);
+ out.writeUTF(jobInfo.jobID);
break;
case INFO_CATEGORY_TASK:
QueryScopeInfo.TaskQueryScopeInfo taskInfo =
(QueryScopeInfo.TaskQueryScopeInfo) info;
- serializeString(dos, taskInfo.jobID);
- serializeString(dos, taskInfo.vertexID);
- dos.writeInt(taskInfo.subtaskIndex);
+ out.writeUTF(taskInfo.jobID);
+ out.writeUTF(taskInfo.vertexID);
+ out.writeInt(taskInfo.subtaskIndex);
break;
case INFO_CATEGORY_OPERATOR:
QueryScopeInfo.OperatorQueryScopeInfo
operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info;
- serializeString(dos, operatorInfo.jobID);
- serializeString(dos, operatorInfo.vertexID);
- dos.writeInt(operatorInfo.subtaskIndex);
- serializeString(dos, operatorInfo.operatorName);
+ out.writeUTF(operatorInfo.jobID);
+ out.writeUTF(operatorInfo.vertexID);
+ out.writeInt(operatorInfo.subtaskIndex);
+ out.writeUTF(operatorInfo.operatorName);
break;
+ default:
+ throw new IOException("Unknown scope category:
" + info.getCategory());
}
}
- private static void serializeString(DataOutputStream dos, String
string) throws IOException {
- byte[] bytes = string.getBytes();
- dos.writeInt(bytes.length);
- dos.write(bytes);
+ private static void serializeCounter(DataOutput out, QueryScopeInfo
info, String name, Counter counter) throws IOException {
+ long count = counter.getCount();
+ serializeMetricInfo(out, info);
+ out.writeUTF(name);
+ out.writeLong(count);
}
- private static void serializeCounter(DataOutputStream dos, Counter
counter) throws IOException {
- dos.writeLong(counter.getCount());
- }
-
- private static void serializeGauge(DataOutputStream dos, Gauge<?>
gauge) throws IOException {
- serializeString(dos, gauge.getValue().toString());
+ private static void serializeGauge(DataOutput out, QueryScopeInfo info,
String name, Gauge<?> gauge) throws IOException {
+ Object value = gauge.getValue();
+ if (value == null) {
+ throw new NullPointerException("Value returned by gauge
" + name + " was null.");
+ }
+ String stringValue = gauge.getValue().toString();
+ if (stringValue == null) {
+ throw new NullPointerException("toString() of the value
returned by gauge " + name + " returned null.");
+ }
+ serializeMetricInfo(out, info);
+ out.writeUTF(name);
+ out.writeUTF(stringValue);
}
- private static void serializeHistogram(DataOutputStream dos, Histogram
histogram) throws IOException {
+ private static void serializeHistogram(DataOutput out, QueryScopeInfo
info, String name, Histogram histogram) throws IOException {
HistogramStatistics stat = histogram.getStatistics();
-
- dos.writeLong(stat.getMin());
- dos.writeLong(stat.getMax());
- dos.writeDouble(stat.getMean());
- dos.writeDouble(stat.getQuantile(0.5));
- dos.writeDouble(stat.getStdDev());
- dos.writeDouble(stat.getQuantile(0.75));
- dos.writeDouble(stat.getQuantile(0.90));
- dos.writeDouble(stat.getQuantile(0.95));
- dos.writeDouble(stat.getQuantile(0.98));
- dos.writeDouble(stat.getQuantile(0.99));
- dos.writeDouble(stat.getQuantile(0.999));
+ long min = stat.getMin();
+ long max = stat.getMax();
+ double mean = stat.getMean();
+ double mediam = stat.getQuantile(0.5);
--- End diff --
Typo `mediam` -> `median`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---