michaeljmarshall commented on code in PR #20498:
URL: https://github.com/apache/pulsar/pull/20498#discussion_r1218504600
##########
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java:
##########
@@ -277,7 +305,7 @@ public Pair<String, String>
extractIdAndDocument(Record<GenericObject> record) t
}
doc = sanitizeValue(doc);
return Pair.of(id, doc);
- } else {
+ } else {
Review Comment:
Nit: please remove this white space change
```suggestion
} else {
```
##########
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java:
##########
@@ -139,29 +161,35 @@ public void write(Record<GenericObject> record) throws
Exception {
} else {
elasticsearchClient.indexDocument(record, idAndDoc);
}
+ incrementCounter(METRICS_TOTAL_SUCCESS, 1);
}
} catch (JsonProcessingException jsonProcessingException) {
switch (elasticSearchConfig.getMalformedDocAction()) {
case IGNORE:
+ incrementCounter(METRICS_TOTAL_SKIP, 1);
Review Comment:
It seems like it would be valuable to differentiate this case from the other
`SKIP` case since this one is a failure case. What do you think?
##########
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java:
##########
@@ -67,15 +67,24 @@ public class ElasticSearchSink implements
Sink<GenericObject> {
private ElasticSearchClient elasticsearchClient;
private final ObjectMapper objectMapper = new ObjectMapper();
private ObjectMapper sortedObjectMapper;
+ private SinkContext sinkContext;
private List<String> primaryFields = null;
private final Pattern nonPrintableCharactersPattern =
Pattern.compile("[\\p{C}]");
private final Base64.Encoder base64Encoder =
Base64.getEncoder().withoutPadding();
+ //
Review Comment:
Nit: please add a comment or remove this line.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]