Github user cestella commented on a diff in the pull request:

    https://github.com/apache/metron/pull/666#discussion_r131170639
  
    --- Diff: 
metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
 ---
    @@ -102,15 +128,99 @@ public SearchResponse search(SearchRequest 
searchRequest) throws InvalidSearchEx
           searchResult.setId(searchHit.getId());
           searchResult.setSource(searchHit.getSource());
           searchResult.setScore(searchHit.getScore());
    +      searchResult.setIndex(searchHit.getIndex());
           return searchResult;
         }).collect(Collectors.toList()));
         return searchResponse;
       }
     
       @Override
    -  public void init(Map<String, Object> globalConfig, AccessConfig config) {
    -    this.client = ElasticsearchUtils.getClient(globalConfig, 
config.getOptionalSettings());
    -    this.accessConfig = config;
    +  public synchronized void init(AccessConfig config) {
    +    if(this.client == null) {
    +      this.client = 
ElasticsearchUtils.getClient(config.getGlobalConfigSupplier().get(), 
config.getOptionalSettings());
    +      this.accessConfig = config;
    +    }
    +  }
    +
    +  @Override
    +  public Document getLatest(final String guid, final String sensorType) 
throws IOException {
    +    Optional<Document> ret = searchByGuid(
    +            guid
    +            , sensorType
    +            , hit -> {
    +              Long ts = 0L;
    +              String doc = hit.getSourceAsString();
    +              String sourceType = 
Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null);
    +              try {
    +                return Optional.of(new Document(doc, guid, sourceType, 
ts));
    +              } catch (IOException e) {
    +                throw new IllegalStateException("Unable to retrieve 
latest: " + e.getMessage(), e);
    +              }
    +            }
    +            );
    +    return ret.orElse(null);
    +  }
    +
    +  /**
    +   * Return the search hit based on the UUID and sensor type.
    +   * A callback can be specified to transform the hit into a type T.
    +   * If more than one hit happens, the first one will be returned.
    +   * @throws IOException
    +   */
    +  <T> Optional<T> searchByGuid(String guid, String sensorType, 
Function<SearchHit, Optional<T>> callback) throws IOException{
    +    QueryBuilder query =  QueryBuilders.matchQuery(Constants.GUID, guid);
    +    SearchRequestBuilder request = client.prepareSearch()
    +                                         .setTypes(sensorType + "_doc")
    +                                         .setQuery(query)
    +                                         .setSource("message")
    +                                         ;
    +    MultiSearchResponse response = client.prepareMultiSearch()
    +                                         .add(request)
    +                                         .get();
    +    for(MultiSearchResponse.Item i : response) {
    +      org.elasticsearch.action.search.SearchResponse resp = 
i.getResponse();
    +      SearchHits hits = resp.getHits();
    +      for(SearchHit hit : hits) {
    +        Optional<T> ret = callback.apply(hit);
    +        if(ret.isPresent()) {
    +          return ret;
    +        }
    +      }
    +    }
    +    return Optional.empty();
    +
    +  }
    +
    +  @Override
    +  public void update(Document update, Optional<String> index) throws 
IOException {
    +    String indexPostfix = 
ElasticsearchUtils.getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new
 Date());
    +    String sensorType = update.getSensorType();
    +    String indexName = ElasticsearchUtils.getIndexName(sensorType, 
indexPostfix, null);
    +
    +    String type = sensorType + "_doc";
    --- End diff --
    
    I'm not sure, where would you expect that to be documented?  I'm relying on 
this component interacting with the writer components in a seamless manner, so 
there are some assumptions being made.  Perhaps I should make a comment here 
and denote that connection?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to