asafm commented on code in PR #20498:
URL: https://github.com/apache/pulsar/pull/20498#discussion_r1221021917
##########
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java:
##########
@@ -56,8 +58,18 @@ public class ElasticSearchClient implements AutoCloseable {
final AtomicReference<Exception> irrecoverableError = new
AtomicReference<>();
private final IndexNameFormatter indexNameFormatter;
- public ElasticSearchClient(ElasticSearchConfig elasticSearchConfig) {
+ // sink metrics
+ public static final String METRICS_TOTAL_INCOMING =
"_elasticsearch_total_incoming_";
Review Comment:
1. The metric name goes as a label into a Prometheus Summary collector. Why
the `_` prefix?
2. I would remove `_total_` everywhere. This is just a label in a Prometheus
Summary, so this ends up exported as multiple quantiles and sum and count. So
maybe let's just express what this counts.
##########
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java:
##########
@@ -96,6 +110,12 @@ public void afterBulk(long executionId,
List<BulkProcessor.BulkOperationRequest>
this.client = retry(() -> RestClientFactory.createClient(config,
bulkListener), -1, "client creation");
}
+ public void incrementCounter(String counter, double value) {
+ if (sinkContext != null && counter != null) {
Review Comment:
What's with the if? Why would both ever be null?
##########
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java:
##########
@@ -123,12 +124,18 @@ public void write(Record<GenericObject> record) throws
Exception {
elasticsearchClient.bulkDelete(record,
idAndDoc.getLeft());
} else {
elasticsearchClient.deleteDocument(record,
idAndDoc.getLeft());
+
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_SUCCESS,
1);
Review Comment:
This needs to be cleaner. If you have two classes, use this, then let's
extract it to `ElasticSearchSinkMetrics`, define the constants there, and
record all those events.
##########
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java:
##########
@@ -109,6 +109,7 @@ void setElasticsearchClient(ElasticSearchClient
elasticsearchClient) {
@Override
public void write(Record<GenericObject> record) throws Exception {
+
this.elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_INCOMING,
1);
Review Comment:
This doesn't make much sense - Let's have this metric (actually constant) in
`ElasticSearchSink` and just call sinkContext.record from there.
##########
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java:
##########
@@ -117,14 +137,17 @@ void checkForIrrecoverableError(Record<?> record,
BulkProcessor.BulkOperationRes
isMalformed = true;
switch (config.getMalformedDocAction()) {
case IGNORE:
+ incrementCounter(METRICS_TOTAL_MALFORMED_IGNORE, 1);
break;
case WARN:
+ incrementCounter(METRICS_TOTAL_SKIP, 1);
log.warn("Ignoring malformed document index={} id={}",
result.getIndex(),
result.getDocumentId(),
error);
break;
case FAIL:
+ incrementCounter(METRICS_TOTAL_FAILURE, 1);
Review Comment:
For me failed == have not been indexed, whether the reason was malformed or
not. So we need to decide how to differentiate.
Maybe: es_total_failures, es_skipped_failures
Also, I wouldn't differentiate between IGNORE and WARN. Don't see a good
reason for it.
##########
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java:
##########
@@ -78,6 +90,7 @@ public void afterBulk(long executionId,
List<BulkProcessor.BulkOperationRequest>
record.fail();
checkForIrrecoverableError(record, result);
} else {
+ incrementCounter(METRICS_TOTAL_SUCCESS, index);
Review Comment:
Aren't you suppose to increment failures as well above?
##########
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java:
##########
@@ -96,6 +110,12 @@ public void afterBulk(long executionId,
List<BulkProcessor.BulkOperationRequest>
this.client = retry(() -> RestClientFactory.createClient(config,
bulkListener), -1, "client creation");
}
+ public void incrementCounter(String counter, double value) {
Review Comment:
why public?
##########
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java:
##########
@@ -123,12 +124,18 @@ public void write(Record<GenericObject> record) throws
Exception {
elasticsearchClient.bulkDelete(record,
idAndDoc.getLeft());
} else {
elasticsearchClient.deleteDocument(record,
idAndDoc.getLeft());
+
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_SUCCESS,
1);
+
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_DELETE,
1);
}
+ } else {
+
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_SKIP, 1);
}
break;
case IGNORE:
+
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_NULLVALUE_IGNORE,
1);
break;
case FAIL:
+
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_FAILURE,
1);
Review Comment:
In `elasticsearchClient.failed(` you call below, you also introduced
increments, no? Won't it double count?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]