asafm commented on code in PR #20498:
URL: https://github.com/apache/pulsar/pull/20498#discussion_r1221021917


##########
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java:
##########
@@ -56,8 +58,18 @@ public class ElasticSearchClient implements AutoCloseable {
     final AtomicReference<Exception> irrecoverableError = new 
AtomicReference<>();
     private final IndexNameFormatter indexNameFormatter;
 
-    public ElasticSearchClient(ElasticSearchConfig elasticSearchConfig) {
+    // sink metrics
+    public static final String METRICS_TOTAL_INCOMING = 
"_elasticsearch_total_incoming_";

Review Comment:
   1. The metric name goes as a label into a Prometheus Summary collector. Why 
the `_` prefix?
   2. I would remove `_total_` everywhere. This is just a label in a Prometheus 
Summary, so this ends up exported as multiple quantiles and sum and count. So 
maybe let's just express what this counts. 
   



##########
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java:
##########
@@ -96,6 +110,12 @@ public void afterBulk(long executionId, 
List<BulkProcessor.BulkOperationRequest>
         this.client = retry(() -> RestClientFactory.createClient(config, 
bulkListener), -1, "client creation");
     }
 
+    public void incrementCounter(String counter, double value) {
+        if (sinkContext != null && counter != null) {

Review Comment:
   What's with the if? Why would both ever be null?



##########
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java:
##########
@@ -123,12 +124,18 @@ public void write(Record<GenericObject> record) throws 
Exception {
                                     elasticsearchClient.bulkDelete(record, 
idAndDoc.getLeft());
                                 } else {
                                     elasticsearchClient.deleteDocument(record, 
idAndDoc.getLeft());
+                                    
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_SUCCESS, 
1);

Review Comment:
   This needs to be cleaner. If you have two classes, use this, then let's 
extract it to `ElasticSearchSinkMetrics`, define the constants there, and 
record all those events.
   



##########
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java:
##########
@@ -109,6 +109,7 @@ void setElasticsearchClient(ElasticSearchClient 
elasticsearchClient) {
 
     @Override
     public void write(Record<GenericObject> record) throws Exception {
+        
this.elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_INCOMING,
 1);

Review Comment:
   This doesn't make much sense - Let's have this metric (actually constant) in 
`ElasticSearchSink` and just call sinkContext.record from there.



##########
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java:
##########
@@ -117,14 +137,17 @@ void checkForIrrecoverableError(Record<?> record, 
BulkProcessor.BulkOperationRes
                 isMalformed = true;
                 switch (config.getMalformedDocAction()) {
                     case IGNORE:
+                        incrementCounter(METRICS_TOTAL_MALFORMED_IGNORE, 1);
                         break;
                     case WARN:
+                        incrementCounter(METRICS_TOTAL_SKIP, 1);
                         log.warn("Ignoring malformed document index={} id={}",
                                 result.getIndex(),
                                 result.getDocumentId(),
                                 error);
                         break;
                     case FAIL:
+                        incrementCounter(METRICS_TOTAL_FAILURE, 1);

Review Comment:
   For me failed == have not been indexed, whether the reason was malformed or 
not. So we need to decide how to differentiate.
   Maybe: es_total_failures, es_skipped_failures
   
   Also, I wouldn't differentiate between IGNORE and WARN. Don't see a good 
reason for it.
        



##########
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java:
##########
@@ -78,6 +90,7 @@ public void afterBulk(long executionId, 
List<BulkProcessor.BulkOperationRequest>
                         record.fail();
                         checkForIrrecoverableError(record, result);
                     } else {
+                        incrementCounter(METRICS_TOTAL_SUCCESS, index);

Review Comment:
   Aren't you suppose to increment failures as well above?



##########
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java:
##########
@@ -96,6 +110,12 @@ public void afterBulk(long executionId, 
List<BulkProcessor.BulkOperationRequest>
         this.client = retry(() -> RestClientFactory.createClient(config, 
bulkListener), -1, "client creation");
     }
 
+    public void incrementCounter(String counter, double value) {

Review Comment:
   why public?



##########
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java:
##########
@@ -123,12 +124,18 @@ public void write(Record<GenericObject> record) throws 
Exception {
                                     elasticsearchClient.bulkDelete(record, 
idAndDoc.getLeft());
                                 } else {
                                     elasticsearchClient.deleteDocument(record, 
idAndDoc.getLeft());
+                                    
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_SUCCESS, 
1);
+                                    
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_DELETE, 
1);
                                 }
+                            } else {
+                                
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_SKIP, 1);
                             }
                             break;
                         case IGNORE:
+                            
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_NULLVALUE_IGNORE,
 1);
                             break;
                         case FAIL:
+                            
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_FAILURE, 
1);

Review Comment:
   In `elasticsearchClient.failed(` you call below, you also introduced 
increments, no? Won't it double count?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to