This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0432f13 [BEAM-11092] Add protos for new process wide
HarnessMonitoringInfos, and BigQuery IO metrics
new 7eb0e95 Merge pull request #13163 from ajamato/bq_api_metrics_protos
0432f13 is described below
commit 0432f138f2bfb8d4d9543c4569581bdd3f8782db
Author: Alex Amato <[email protected]>
AuthorDate: Wed Oct 21 15:19:09 2020 -0700
[BEAM-11092] Add protos for new process wide HarnessMonitoringInfos, and
BigQuery IO metrics
---
.../fn-execution/src/main/proto/beam_fn_api.proto | 56 ++++++++++++++++++----
model/pipeline/src/main/proto/metrics.proto | 32 +++++++++++++
2 files changed, 78 insertions(+), 10 deletions(-)
diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto
b/model/fn-execution/src/main/proto/beam_fn_api.proto
index 906b366..eaf5dc3 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -108,6 +108,7 @@ message InstructionRequest {
ProcessBundleSplitRequest process_bundle_split = 1003;
FinalizeBundleRequest finalize_bundle = 1004;
MonitoringInfosMetadataRequest monitoring_infos = 1005;
+ HarnessMonitoringInfosRequest harness_monitoring_infos = 1006;
// DEPRECATED
RegisterRequest register = 1000;
@@ -135,12 +136,44 @@ message InstructionResponse {
ProcessBundleSplitResponse process_bundle_split = 1003;
FinalizeBundleResponse finalize_bundle = 1004;
MonitoringInfosMetadataResponse monitoring_infos = 1005;
+ HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
// DEPRECATED
RegisterResponse register = 1000;
}
}
+// A request to provide full MonitoringInfo associated with the entire SDK
+// harness process, not specific to a bundle.
+//
+// An SDK can report metrics using an identifier that only contains the
+// associated payload. A runner who wants to receive the full metrics
+// information can request all the monitoring metadata via a
+// MonitoringInfosMetadataRequest providing a list of ids as necessary.
+//
+// The SDK is allowed to reuse the identifiers
+// for the lifetime of the associated control connection as long
+// as the MonitoringInfo could be reconstructed fully by overwriting its
+// payload field with the bytes specified here.
+message HarnessMonitoringInfosRequest {
+}
+
+message HarnessMonitoringInfosResponse {
+ // An identifier to MonitoringInfo.payload mapping containing
+ // Metrics associated with the SDK harness, not a specific bundle.
+ //
+ // An SDK can report metrics using an identifier that only contains the
+ // associated payload. A runner who wants to receive the full metrics
+ // information can request all the monitoring metadata via a
+ // MonitoringInfosMetadataRequest providing a list of ids as necessary.
+ //
+ // The SDK is allowed to reuse the identifiers
+ // for the lifetime of the associated control connection as long
+ // as the MonitoringInfo could be reconstructed fully by overwriting its
+ // payload field with the bytes specified here.
+ map<string, bytes> monitoring_data = 1;
+}
+
// A list of objects which can be referred to by the runner in
// future requests.
// Stable
@@ -306,18 +339,19 @@ message ProcessBundleProgressRequest {
string instruction_id = 1;
}
-// A request to provide full MonitoringInfo for a given id.
+// A request to provide full MonitoringInfo for a set of provided ids.
//
// An SDK can report metrics using an identifier that only contains the
// associated payload. A runner who wants to receive the full metrics
// information can request all the monitoring metadata via a
// MonitoringInfosMetadataRequest providing a list of ids as necessary.
//
-// The MonitoringInfo ids are scoped to the associated control connection. For
-// example, an SDK may reuse the ids across multiple bundles.
+// The SDK is allowed to reuse the identifiers for the lifetime of the
+// associated control connection as long as the MonitoringInfo could be
+// reconstructed fully by overwriting its payload field with the bytes
specified
+// here.
message MonitoringInfosMetadataRequest {
- // A list of ids for which MonitoringInfo are requested. All but the payload
- // field will be populated.
+ // A list of ids for which the full MonitoringInfo is requested for.
repeated string monitoring_info_id = 1;
}
@@ -333,7 +367,8 @@ message ProcessBundleProgressResponse {
// information can request all the monitoring metadata via a
// MonitoringInfosMetadataRequest providing a list of ids as necessary.
//
- // The SDK is allowed to reuse the identifiers across multiple bundles as
long
+ // The SDK is allowed to reuse the identifiers
+ // for the lifetime of the associated control connection as long
// as the MonitoringInfo could be reconstructed fully by overwriting its
// payload field with the bytes specified here.
map<string, bytes> monitoring_data = 5;
@@ -349,11 +384,12 @@ message ProcessBundleProgressResponse {
// information can request all the monitoring metadata via a
// MonitoringInfosMetadataRequest providing a list of ids as necessary.
//
-// The MonitoringInfo ids are scoped to the associated control connection. For
-// example an SDK may reuse the ids across multiple bundles.
+// The SDK is allowed to reuse the identifiers
+// for the lifetime of the associated control connection as long
+// as the MonitoringInfo could be reconstructed fully by overwriting its
+// payload field with the bytes specified here.
message MonitoringInfosMetadataResponse {
- // A mapping from a requested identifier to a MonitoringInfo. All fields
- // except for the payload of the MonitoringInfo will be specified.
+ // A mapping from an identifier to the full metrics information.
map<string, org.apache.beam.model.pipeline.v1.MonitoringInfo>
monitoring_info = 1;
}
diff --git a/model/pipeline/src/main/proto/metrics.proto
b/model/pipeline/src/main/proto/metrics.proto
index af4b9e6..86114a8 100644
--- a/model/pipeline/src/main/proto/metrics.proto
+++ b/model/pipeline/src/main/proto/metrics.proto
@@ -306,6 +306,29 @@ message MonitoringInfoSpecs {
value: "The read index of the data channel."
}]
}];
+
+ API_REQUEST_COUNT = 19 [(monitoring_info_spec) = {
+ urn: "beam:metric:io:api_request_count:v1",
+ type: "beam:metrics:sum_int64:v1",
+ required_labels: [
+ "SERVICE",
+ "METHOD",
+ "RESOURCE",
+ "PTRANSFORM",
+ "STATUS"
+ ],
+ annotations: [
+ {
+ key: "description",
+ value: "Request counts with status made to an IOs service APIs to
batch read or write elements."
+ },
+ {
+ key: "process_metric", // Should be reported as a process metric
+ // instead of a bundle metric
+ value: "true"
+ }
+ ]
+ }];
}
}
@@ -354,6 +377,15 @@ message MonitoringInfo {
ENVIRONMENT = 4 [(label_props) = { name: "ENVIRONMENT" }];
NAMESPACE = 5 [(label_props) = { name: "NAMESPACE" }];
NAME = 6 [(label_props) = { name: "NAME" }];
+ SERVICE = 7 [(label_props) = { name: "SERVICE" }];
+ METHOD = 8 [(label_props) = { name: "METHOD" }];
+ RESOURCE = 9 [(label_props) = { name: "RESOURCE" }];
+ STATUS = 10 [(label_props) = { name: "STATUS" }];
+ BIGQUERY_PROJECT_ID = 11 [(label_props) = { name: "BIGQUERY_PROJECT_ID" }];
+ BIGQUERY_DATASET = 12 [(label_props) = { name: "BIGQUERY_DATASET" }];
+ BIGQUERY_TABLE = 13 [(label_props) = { name: "BIGQUERY_TABLE" }];
+ BIGQUERY_VIEW = 14 [(label_props) = { name: "BIGQUERY_VIEW" }];
+ BIGQUERY_QUERY_NAME = 15 [(label_props) = { name: "BIGQUERY_QUERY_NAME" }];
}
// A set of key and value labels which define the scope of the metric. For