Github user cestella commented on a diff in the pull request:
https://github.com/apache/metron/pull/666#discussion_r131128726
--- Diff:
metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
---
@@ -102,15 +128,100 @@ public SearchResponse search(SearchRequest
searchRequest) throws InvalidSearchEx
searchResult.setId(searchHit.getId());
searchResult.setSource(searchHit.getSource());
searchResult.setScore(searchHit.getScore());
+ searchResult.setIndex(searchHit.getIndex());
return searchResult;
}).collect(Collectors.toList()));
return searchResponse;
}
@Override
- public void init(Map<String, Object> globalConfig, AccessConfig config) {
- this.client = ElasticsearchUtils.getClient(globalConfig,
config.getOptionalSettings());
- this.accessConfig = config;
+ public synchronized void init(AccessConfig config) {
+ if(this.client == null) {
+ this.client =
ElasticsearchUtils.getClient(config.getGlobalConfigSupplier().get(),
config.getOptionalSettings());
+ this.accessConfig = config;
+ }
+ }
+
+ @Override
+ public Document getLatest(final String guid, final String sensorType)
throws IOException {
+ Optional<Document> ret = searchByGuid(
+ guid
+ , sensorType
+ , hit -> {
+ Long ts = 0L;
+ String doc = hit.getSourceAsString();
+ String sourceType =
Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null);
+ try {
+ return Optional.of(new Document(doc, guid, sourceType,
ts));
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to retrieve
latest: " + e.getMessage(), e);
+ }
+ }
+ );
+ return ret.orElse(null);
+ }
+
+ /**
+ * Return the search hit based on the UUID and sensor type.
+ * A callback can be specified to transform the hit into a type T.
+ * If more than one hit happens, the first one will be returned.
+ * @throws IOException
+ */
+ <T> Optional<T> searchByGuid(String guid, String sensorType,
Function<SearchHit, Optional<T>> callback) throws IOException{
+ QueryBuilder query = QueryBuilders.matchQuery(Constants.GUID, guid);
+ SearchRequestBuilder request = client.prepareSearch()
+ .setTypes(sensorType + "_doc")
+ .setQuery(query)
+ .setSource("message")
+ ;
+ MultiSearchResponse response = client.prepareMultiSearch()
+ .add(request)
+ .get();
+ for(MultiSearchResponse.Item i : response) {
+ org.elasticsearch.action.search.SearchResponse resp =
i.getResponse();
+ SearchHits hits = resp.getHits();
+ for(SearchHit hit : hits) {
+ Optional<T> ret = callback.apply(hit);
+ if(ret.isPresent()) {
+ return ret;
+ }
+ }
+ }
+ return Optional.empty();
+
+ }
+
+ @Override
+ public void update(Document update, Optional<String> index) throws
IOException {
+ String indexPostfix =
ElasticsearchUtils.getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new
Date());
+ String sensorType = update.getSensorType();
+ String indexName = ElasticsearchUtils.getIndexName(sensorType,
indexPostfix, null);
+
+ String type = sensorType + "_doc";
+ byte[] source = JSONUtils.INSTANCE.toJSON(update.getDocument());
--- End diff --
I am honestly not sure about the ES API subtleties, but I'm mimicking
[ElasticsearchWriter](https://github.com/apache/metron/blob/ec959d20efa72e9868d87a02d9407bbaad34c4c8/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java#L88).
It seems to me that it's 6 of 1 and half a dozen of another considering it
shouldn't matter terribly who converts to bytes, it's getting converted to
bytes either way.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---