Github user cestella commented on a diff in the pull request: https://github.com/apache/metron/pull/666#discussion_r131170639 --- Diff: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java --- @@ -102,15 +128,99 @@ public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchEx searchResult.setId(searchHit.getId()); searchResult.setSource(searchHit.getSource()); searchResult.setScore(searchHit.getScore()); + searchResult.setIndex(searchHit.getIndex()); return searchResult; }).collect(Collectors.toList())); return searchResponse; } @Override - public void init(Map<String, Object> globalConfig, AccessConfig config) { - this.client = ElasticsearchUtils.getClient(globalConfig, config.getOptionalSettings()); - this.accessConfig = config; + public synchronized void init(AccessConfig config) { + if(this.client == null) { + this.client = ElasticsearchUtils.getClient(config.getGlobalConfigSupplier().get(), config.getOptionalSettings()); + this.accessConfig = config; + } + } + + @Override + public Document getLatest(final String guid, final String sensorType) throws IOException { + Optional<Document> ret = searchByGuid( + guid + , sensorType + , hit -> { + Long ts = 0L; + String doc = hit.getSourceAsString(); + String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null); + try { + return Optional.of(new Document(doc, guid, sourceType, ts)); + } catch (IOException e) { + throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e); + } + } + ); + return ret.orElse(null); + } + + /** + * Return the search hit based on the UUID and sensor type. + * A callback can be specified to transform the hit into a type T. + * If more than one hit happens, the first one will be returned. + * @throws IOException + */ + <T> Optional<T> searchByGuid(String guid, String sensorType, Function<SearchHit, Optional<T>> callback) throws IOException{ + QueryBuilder query = QueryBuilders.matchQuery(Constants.GUID, guid); + SearchRequestBuilder request = client.prepareSearch() + .setTypes(sensorType + "_doc") + .setQuery(query) + .setSource("message") + ; + MultiSearchResponse response = client.prepareMultiSearch() + .add(request) + .get(); + for(MultiSearchResponse.Item i : response) { + org.elasticsearch.action.search.SearchResponse resp = i.getResponse(); + SearchHits hits = resp.getHits(); + for(SearchHit hit : hits) { + Optional<T> ret = callback.apply(hit); + if(ret.isPresent()) { + return ret; + } + } + } + return Optional.empty(); + + } + + @Override + public void update(Document update, Optional<String> index) throws IOException { + String indexPostfix = ElasticsearchUtils.getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date()); + String sensorType = update.getSensorType(); + String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, null); + + String type = sensorType + "_doc"; --- End diff -- I'm not sure, where would you expect that to be documented? I'm relying on this component interacting with the writer components in a seamless manner, so there are some assumptions being made. Perhaps I should make a comment here and denote that connection?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---