Repository: metron Updated Branches: refs/heads/feature/METRON-1416-upgrade-solr 9b25084c8 -> 8cc8aab8b
METRON-1448: Update SolrWriter to conform to new collection strategy this closes apache/incubator-metron#929 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/8cc8aab8 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/8cc8aab8 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/8cc8aab8 Branch: refs/heads/feature/METRON-1416-upgrade-solr Commit: 8cc8aab8b7c5fb103c4375f47d47754492547bb1 Parents: 9b25084 Author: cstella <ceste...@gmail.com> Authored: Fri Feb 9 09:42:12 2018 -0500 Committer: cstella <ceste...@gmail.com> Committed: Fri Feb 9 09:42:12 2018 -0500 ---------------------------------------------------------------------- metron-platform/metron-solr/README.md | 44 ++++ .../metron/solr/writer/MetronSolrClient.java | 34 +++ .../apache/metron/solr/writer/SolrWriter.java | 225 ++++++++++++++----- .../src/main/scripts/start_solr_topology.sh | 2 +- .../SolrIndexingIntegrationTest.java | 4 +- .../schema/SchemaValidationIntegrationTest.java | 31 ++- .../metron/solr/writer/SolrWriterTest.java | 213 +++++++++++++++--- 7 files changed, 442 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/8cc8aab8/metron-platform/metron-solr/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/README.md b/metron-platform/metron-solr/README.md index 0ec6972..164160e 100644 --- a/metron-platform/metron-solr/README.md +++ b/metron-platform/metron-solr/README.md @@ -26,6 +26,50 @@ limitations under the License. Metron ships with Solr 6.6.2 support. Solr Cloud can be used as the real-time portion of the datastore resulting from [metron-indexing](../metron-indexing/README.md). +## Configuration + +### The Indexing Topology + +Solr is a viable option for the `random access topology` and, similar to the Elasticsearch Writer, can be configured +via the global config. The following settings are possible as part of the global config: +* `solr.zookeeper` + * The zookeeper quorum associated with the SolrCloud instance. This is a required field with no default. +* `solr.commitPerBatch` + * This is a boolean which defines whether the writer commits every batch. The default is `true`. + * _WARNING_: If you set this to `false`, then commits will happen based on the SolrClient's internal mechanism and + worker failure *may* result data being acknowledged in storm but not written in Solr. +* `solr.commit.soft` + * This is a boolean which defines whether the writer makes a soft commit or a durable commit. See [here](https://lucene.apache.org/solr/guide/6_6/near-real-time-searching.html#NearRealTimeSearching-AutoCommits) The default is `false`. + * _WARNING_: If you set this to `true`, then commits will happen based on the SolrClient's internal mechanism and + worker failure *may* result data being acknowledged in storm but not written in Solr. +* `solr.commit.waitSearcher` + * This is a boolean which defines whether the writer blocks the commit until the data is available to search. See [here](https://lucene.apache.org/solr/guide/6_6/near-real-time-searching.html#NearRealTimeSearching-AutoCommits) The default is `true`. + * _WARNING_: If you set this to `false`, then commits will happen based on the SolrClient's internal mechanism and + worker failure *may* result data being acknowledged in storm but not written in Solr. +* `solr.commit.waitFlush` + * This is a boolean which defines whether the writer blocks the commit until the data is flushed. See [here](https://lucene.apache.org/solr/guide/6_6/near-real-time-searching.html#NearRealTimeSearching-AutoCommits) The default is `true`. + * _WARNING_: If you set this to `false`, then commits will happen based on the SolrClient's internal mechanism and + worker failure *may* result data being acknowledged in storm but not written in Solr. +* `solr.collection` + * The default solr collection (if unspecified, the name is `metron`). By default, sensors will write to a collection associated with the index name in the + indexing config for that sensor. If that index name is the empty string, then the default collection will be used. +* `solr.http.config` + * This is a map which allows users to configure the Solr client's HTTP client. + * Possible fields here are: + * `socketTimeout` : Socket timeout measured in ms, closes a socket if read takes longer than x ms to complete + throws `java.net.SocketTimeoutException: Read timed out exception` + * `connTimeout` : Connection timeout measures in ms, closes a socket if connection cannot be established within x ms + with a `java.net.SocketTimeoutException: Connection timed out` + * `maxConectionsPerHost` : Maximum connections allowed per host + * `maxConnections` : Maximum total connections allowed + * `retry` : Retry http requests on error + * `allowCompression` : Allow compression (deflate,gzip) if server supports it + * `followRedirects` : Follow redirects + * `httpBasicAuthUser` : Basic auth username + * `httpBasicAuthPassword` : Basic auth password + * `solr.ssl.checkPeerName` : Check peer name + + ## Installing A script is provided in the installation for installing Solr Cloud in quick-start mode in the [full dev environment for CentOS](../../metron-deployment/development/centos6). http://git-wip-us.apache.org/repos/asf/metron/blob/8cc8aab8/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/MetronSolrClient.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/MetronSolrClient.java b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/MetronSolrClient.java index d3ef36f..5c27cce 100644 --- a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/MetronSolrClient.java +++ b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/MetronSolrClient.java @@ -17,18 +17,22 @@ */ package org.apache.metron.solr.writer; +import com.google.common.collect.Iterables; import org.apache.metron.solr.SolrConstants; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.HttpClientUtil; import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.NamedList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; +import java.util.Map; public class MetronSolrClient extends CloudSolrClient { @@ -40,6 +44,36 @@ public class MetronSolrClient extends CloudSolrClient { super(zkHost); } + public MetronSolrClient(String zkHost, Map<String, Object> solrHttpConfig) { + super(zkHost, HttpClientUtil.createClient(toSolrProps(solrHttpConfig))); + } + + public static SolrParams toSolrProps(Map<String, Object> config) { + if(config == null || config.isEmpty()) { + return null; + } + + ModifiableSolrParams ret = new ModifiableSolrParams(); + for(Map.Entry<String, Object> kv : config.entrySet()) { + Object v = kv.getValue(); + if(v instanceof Boolean) { + ret.set(kv.getKey(), (Boolean)v); + } + else if(v instanceof Integer) { + ret.set(kv.getKey(), (Integer)v); + } + else if(v instanceof Iterable) { + Iterable vals = (Iterable)v; + String[] strVals = new String[Iterables.size(vals)]; + int i = 0; + for(Object o : (Iterable)v) { + strVals[i++] = o.toString(); + } + } + } + return ret; + } + public void createCollection(String name, int numShards, int replicationFactor) throws IOException, SolrServerException { if (!listCollections().contains(name)) { request(getCreateCollectionsRequest(name, numShards, replicationFactor)); http://git-wip-us.apache.org/repos/asf/metron/blob/8cc8aab8/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java index 923a8dd..e2659e9 100644 --- a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java +++ b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java @@ -17,10 +17,14 @@ */ package org.apache.metron.solr.writer; +import com.google.common.base.Joiner; +import org.apache.commons.lang3.StringUtils; +import org.apache.metron.common.Constants; +import org.apache.metron.stellar.common.utils.ConversionUtils; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.common.SolrException; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; -import org.apache.metron.common.configuration.Configurations; -import org.apache.metron.common.configuration.EnrichmentConfigurations; import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.writer.BulkMessageWriter; import org.apache.metron.common.writer.BulkWriterResponse; @@ -33,101 +37,206 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; -import java.util.List; +import java.lang.invoke.MethodHandles; +import java.util.HashMap; import java.util.Map; +import java.util.Collection; +import java.util.Optional; +import java.util.List; +import java.util.ArrayList; +import java.util.UUID; +import java.util.function.Supplier; public class SolrWriter implements BulkMessageWriter<JSONObject>, Serializable { - public static final String DEFAULT_COLLECTION = "metron"; - private static final Logger LOG = LoggerFactory - .getLogger(SolrWriter.class); + public enum SolrProperties { + ZOOKEEPER_QUORUM("solr.zookeeper"), + COMMIT_PER_BATCH("solr.commitPerBatch", Optional.of(true)), + COMMIT_WAIT_SEARCHER("solr.commit.waitSearcher", Optional.of(true)), + COMMIT_WAIT_FLUSH("solr.commit.waitFlush", Optional.of(true)), + COMMIT_SOFT("solr.commit.soft", Optional.of(false)), + DEFAULT_COLLECTION("solr.collection", Optional.of("metron")), + HTTP_CONFIG("solr.http.config", Optional.of(new HashMap<>())) + ; + String name; + Optional<Object> defaultValue; + + SolrProperties(String name) { + this(name, Optional.empty()); + } + SolrProperties(String name, Optional<Object> defaultValue) { + this.name = name; + this.defaultValue = defaultValue; + } - private boolean shouldCommit = false; - private MetronSolrClient solr; + public <T> Optional<T> coerceOrDefault(Map<String, Object> globalConfig, Class<T> clazz) { + Object val = globalConfig.get(name); + if(val != null) { + T ret = null; + try { + ret = ConversionUtils.convert(val, clazz); + } + catch(ClassCastException cce) { + ret = null; + } + if(ret == null) { + //unable to convert value + LOG.warn("Unable to convert {} to {}, was {}", name, clazz.getName(), "" + val); + if(defaultValue.isPresent()) { + return Optional.ofNullable(ConversionUtils.convert(defaultValue.get(), clazz)); + } + else { + return Optional.empty(); + } + } + else { + return Optional.ofNullable(ret); + } + } + else { + if(defaultValue.isPresent()) { + return Optional.ofNullable(ConversionUtils.convert(defaultValue.get(), clazz)); + } + else { + return Optional.empty(); + } + } + } + + public Supplier<IllegalArgumentException> errorOut(Map<String, Object> globalConfig) { + String message = "Unable to retrieve " + name + " from global config, value associated is " + globalConfig.get(name); + return () -> new IllegalArgumentException(message); + } + + public <T> T coerceOrDefaultOrExcept(Map<String, Object> globalConfig, Class<T> clazz) { + return this.coerceOrDefault(globalConfig, clazz).orElseThrow(this.errorOut(globalConfig)); + } - public SolrWriter withShouldCommit(boolean shouldCommit) { - this.shouldCommit = shouldCommit; - return this; } + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private Boolean shouldCommit; + private Boolean softCommit; + private Boolean waitSearcher; + private Boolean waitFlush; + private String zookeeperUrl; + private String defaultCollection; + private Map<String, Object> solrHttpConfig; + + private MetronSolrClient solr; + public SolrWriter withMetronSolrClient(MetronSolrClient solr) { this.solr = solr; return this; } + public void initializeFromGlobalConfig(Map<String, Object> globalConfiguration) { + zookeeperUrl = SolrProperties.ZOOKEEPER_QUORUM.coerceOrDefaultOrExcept(globalConfiguration, String.class); + defaultCollection = SolrProperties.DEFAULT_COLLECTION.coerceOrDefaultOrExcept(globalConfiguration, String.class); + solrHttpConfig = SolrProperties.HTTP_CONFIG.coerceOrDefaultOrExcept(globalConfiguration, Map.class); + shouldCommit = SolrProperties.COMMIT_PER_BATCH.coerceOrDefaultOrExcept(globalConfiguration, Boolean.class); + softCommit = SolrProperties.COMMIT_SOFT.coerceOrDefaultOrExcept(globalConfiguration, Boolean.class); + waitSearcher = SolrProperties.COMMIT_WAIT_SEARCHER.coerceOrDefaultOrExcept(globalConfiguration, Boolean.class); + waitFlush = SolrProperties.COMMIT_WAIT_FLUSH.coerceOrDefaultOrExcept(globalConfiguration, Boolean.class); + } + @Override public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configurations) throws IOException, SolrServerException { Map<String, Object> globalConfiguration = configurations.getGlobalConfig(); - if(solr == null) solr = new MetronSolrClient((String) globalConfiguration.get("solr.zookeeper")); - String collection = getCollection(configurations); - solr.createCollection(collection, (Integer) globalConfiguration.get("solr.numShards"), (Integer) globalConfiguration.get("solr.replicationFactor")); - solr.setDefaultCollection(collection); + initializeFromGlobalConfig(globalConfiguration); + LOG.info("Initializing SOLR writer: {}", zookeeperUrl); + LOG.info("Forcing commit per batch: {}", shouldCommit); + LOG.info("Soft commit: {}", softCommit); + LOG.info("Commit Wait Searcher: {}", waitSearcher); + LOG.info("Commit Wait Flush: {}", waitFlush); + LOG.info("Default Collection: {}", "" + defaultCollection ); + if(solr == null) { + solr = new MetronSolrClient(zookeeperUrl, solrHttpConfig); + } + solr.setDefaultCollection(defaultCollection); + } - @Override - public BulkWriterResponse write(String sourceType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception { + public Collection<SolrInputDocument> toDocs(Iterable<JSONObject> messages) { + Collection<SolrInputDocument> ret = new ArrayList<>(); for(JSONObject message: messages) { SolrInputDocument document = new SolrInputDocument(); - document.addField("sensorType", sourceType); - for(Object key: message.keySet()) { + for (Object key : message.keySet()) { Object value = message.get(key); - if(value instanceof Iterable) { - for(Object v : (Iterable)value) { - document.addField(getFieldName(key, v), v); + if (value instanceof Iterable) { + for (Object v : (Iterable) value) { + document.addField("" + key, v); } - } - else { - document.addField(getFieldName(key, value), value); + } else { + document.addField("" + key, value); } } - if(!document.containsKey("id")) { - document.addField("id", getIdValue(message)); + if (!document.containsKey(Constants.GUID)) { + document.addField(Constants.GUID, UUID.randomUUID().toString()); } - UpdateResponse response = solr.add(document); - } - if (shouldCommit) { - solr.commit(getCollection(configurations)); + ret.add(document); } + return ret; + } - // Solr commits the entire batch or throws an exception for it. There's no way to get partial failures. - BulkWriterResponse response = new BulkWriterResponse(); - response.addAllSuccesses(tuples); - return response; + protected String getCollection(String sourceType, WriterConfiguration configurations) { + String collection = configurations.getIndex(sourceType); + if(StringUtils.isEmpty(collection)) { + return solr.getDefaultCollection(); + } + return collection; } @Override - public String getName() { - return "solr"; - } + public BulkWriterResponse write(String sourceType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception { + String collection = getCollection(sourceType, configurations); + BulkWriterResponse bulkResponse = new BulkWriterResponse(); + Collection<SolrInputDocument> docs = toDocs(messages); + try { + Optional<SolrException> exceptionOptional = fromUpdateResponse(solr.add(collection, docs)); + // Solr commits the entire batch or throws an exception for it. There's no way to get partial failures. + if(exceptionOptional.isPresent()) { + bulkResponse.addAllErrors(exceptionOptional.get(), tuples); + } + else { + if (shouldCommit) { + exceptionOptional = fromUpdateResponse(solr.commit(collection, waitFlush, waitSearcher, softCommit)); + if(exceptionOptional.isPresent()) { + bulkResponse.addAllErrors(exceptionOptional.get(), tuples); + } + } + if(!exceptionOptional.isPresent()) { + bulkResponse.addAllSuccesses(tuples); + } + } + } + catch(HttpSolrClient.RemoteSolrException sse) { + bulkResponse.addAllErrors(sse, tuples); + } - protected String getCollection(WriterConfiguration configurations) { - String collection = (String) configurations.getGlobalConfig().get("solr.collection"); - return collection != null ? collection : DEFAULT_COLLECTION; + return bulkResponse; } - protected Object getIdValue(JSONObject message) { - return message.toJSONString().hashCode(); + protected Optional<SolrException> fromUpdateResponse(UpdateResponse response) { + if(response != null && response.getStatus() > 0) { + String message = "Solr Update response: " + Joiner.on(",").join(response.getResponse()); + return Optional.of(new SolrException(SolrException.ErrorCode.BAD_REQUEST, message)); + } + return Optional.empty(); } - protected String getFieldName(Object key, Object value) { - String field; - if (value instanceof Integer) { - field = key + "_i"; - } else if (value instanceof Long) { - field = key + "_l"; - } else if (value instanceof Float) { - field = key + "_f"; - } else if (value instanceof Double) { - field = key + "_d"; - } else { - field = key + "_s"; - } - return field; + @Override + public String getName() { + return "solr"; } @Override public void close() throws Exception { - solr.close(); + if(solr != null) { + solr.close(); + } } } http://git-wip-us.apache.org/repos/asf/metron/blob/8cc8aab8/metron-platform/metron-solr/src/main/scripts/start_solr_topology.sh ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/main/scripts/start_solr_topology.sh b/metron-platform/metron-solr/src/main/scripts/start_solr_topology.sh index cae0c3c..614423e 100755 --- a/metron-platform/metron-solr/src/main/scripts/start_solr_topology.sh +++ b/metron-platform/metron-solr/src/main/scripts/start_solr_topology.sh @@ -19,4 +19,4 @@ METRON_VERSION=${project.version} METRON_HOME=/usr/metron/$METRON_VERSION TOPOLOGY_JAR=${project.artifactId}-$METRON_VERSION-uber.jar -storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/indexing/remote.yaml --filter $METRON_HOME/config/solr.properties +storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/indexing/random_access/remote.yaml --filter $METRON_HOME/config/solr.properties http://git-wip-us.apache.org/repos/asf/metron/blob/8cc8aab8/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java index 09e88a4..10239f1 100644 --- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java +++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java @@ -39,7 +39,7 @@ import org.apache.metron.solr.integration.components.SolrComponent; public class SolrIndexingIntegrationTest extends IndexingIntegrationTest { - private String collection = "metron"; + private String collection = "yaf"; private FieldNameConverter fieldNameConverter = fieldName -> fieldName; @Override public FieldNameConverter getFieldNameConverter() { @@ -49,7 +49,7 @@ public class SolrIndexingIntegrationTest extends IndexingIntegrationTest { @Override public InMemoryComponent getSearchComponent(final Properties topologyProperties) throws Exception { SolrComponent solrComponent = new SolrComponent.Builder() - .addCollection(collection, "../metron-solr/src/test/resources/solr/conf") + .addCollection(collection, "../metron-solr/src/main/config/schema/yaf") .withPostStartCallback(new Function<SolrComponent, Void>() { @Nullable @Override http://git-wip-us.apache.org/repos/asf/metron/blob/8cc8aab8/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/schema/SchemaValidationIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/schema/SchemaValidationIntegrationTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/schema/SchemaValidationIntegrationTest.java index 075fdda..e655428 100644 --- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/schema/SchemaValidationIntegrationTest.java +++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/schema/SchemaValidationIntegrationTest.java @@ -21,18 +21,22 @@ import com.google.common.collect.Iterables; import com.google.common.io.Files; import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.common.writer.BulkWriterResponse; import org.apache.metron.solr.integration.components.SolrComponent; import org.apache.metron.solr.writer.SolrWriter; import org.apache.metron.stellar.common.utils.ConversionUtils; +import org.apache.storm.tuple.Tuple; import org.json.simple.JSONObject; import org.junit.Assert; import org.junit.Test; - +import org.mockito.Mock; import java.io.File; import java.io.IOException; import java.nio.charset.Charset; import java.util.*; +import static org.mockito.Mockito.mock; + public class SchemaValidationIntegrationTest { public static Iterable<String> getData(String sensor) throws IOException { @@ -45,15 +49,12 @@ public class SchemaValidationIntegrationTest { public static Map<String, Object> getGlobalConfig(String sensorType, SolrComponent component) { Map<String, Object> globalConfig = new HashMap<>(); globalConfig.put("solr.zookeeper", component.getZookeeperUrl()); - globalConfig.put("solr.collection", sensorType + "_doc"); - globalConfig.put("solr.numShards", 1); - globalConfig.put("solr.replicationFactor", 1); return globalConfig; } public static SolrComponent createSolrComponent(String sensor) throws Exception { return new SolrComponent.Builder() - .addCollection(String.format("%s_doc", sensor), String.format("src/main/config/schema/%s", sensor)) + .addCollection(String.format("%s", sensor), String.format("src/main/config/schema/%s", sensor)) .build(); } @@ -94,9 +95,12 @@ public class SchemaValidationIntegrationTest { Map<String, Object> globalConfig = getGlobalConfig(sensorType, component); List<JSONObject> inputs = new ArrayList<>(); + List<Tuple> tuples = new ArrayList<>(); Map<String, Map<String, Object>> index = new HashMap<>(); for (String message : getData(sensorType)) { if (message.trim().length() > 0) { + Tuple t = mock(Tuple.class); + tuples.add(t); Map<String, Object> m = JSONUtils.INSTANCE.load(message.trim(), JSONUtils.MAP_SUPPLIER); String guid = getGuid(m); index.put(guid, m); @@ -105,18 +109,8 @@ public class SchemaValidationIntegrationTest { } Assert.assertTrue(inputs.size() > 0); - SolrWriter solrWriter = new SolrWriter() { - @Override - protected String getFieldName(Object key, Object value) { - return "" + key; - } - - @Override - protected Object getIdValue(JSONObject message) { - return message.get("guid"); - } + SolrWriter solrWriter = new SolrWriter(); - }; WriterConfiguration writerConfig = new WriterConfiguration() { @Override public int getBatchSize(String sensorName) { @@ -165,8 +159,9 @@ public class SchemaValidationIntegrationTest { solrWriter.init(null, null, writerConfig); - solrWriter.write(sensorType, writerConfig, new ArrayList<>(), inputs); - for (Map<String, Object> m : component.getAllIndexedDocs(sensorType + "_doc")) { + BulkWriterResponse response = solrWriter.write(sensorType, writerConfig, tuples, inputs); + Assert.assertTrue(response.getErrors().isEmpty()); + for (Map<String, Object> m : component.getAllIndexedDocs(sensorType)) { Map<String, Object> expected = index.get(getGuid(m)); for (Map.Entry<String, Object> field : expected.entrySet()) { if (field.getValue() instanceof Collection && ((Collection) field.getValue()).size() == 0) { http://git-wip-us.apache.org/repos/asf/metron/blob/8cc8aab8/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java index a56916f..685c5fd 100644 --- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java +++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java @@ -17,6 +17,10 @@ */ package org.apache.metron.solr.writer; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.EnrichmentConfigurations; import org.apache.metron.common.configuration.IndexingConfigurations; import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration; @@ -25,14 +29,18 @@ import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.common.SolrInputDocument; import org.hamcrest.Description; import org.json.simple.JSONObject; +import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentMatcher; import org.mockito.Mockito; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.argThat; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -60,33 +68,37 @@ public class SolrWriterTest { } } - static class SolrInputDocumentMatcher extends ArgumentMatcher<SolrInputDocument> { + static class SolrInputDocumentMatcher extends ArgumentMatcher<Collection<SolrInputDocument>> { - private int expectedId; - private String expectedSourceType; - private int expectedInt; - private double expectedDouble; - public SolrInputDocumentMatcher(int expectedId, String expectedSourceType, int expectedInt, double expectedDouble) { - this.expectedId = expectedId; - this.expectedSourceType = expectedSourceType; - this.expectedInt = expectedInt; - this.expectedDouble = expectedDouble; + List<Map<String, Object>> expectedDocs; + + public SolrInputDocumentMatcher(List<Map<String, Object>> expectedDocs) { + this.expectedDocs = expectedDocs; } @Override public boolean matches(Object o) { - SolrInputDocument solrInputDocument = (SolrInputDocument) o; - int actualId = (Integer) solrInputDocument.get("id").getValue(); - String actualName = (String) solrInputDocument.get("sensorType").getValue(); - int actualInt = (Integer) solrInputDocument.get("intField_i").getValue(); - double actualDouble = (Double) solrInputDocument.get("doubleField_d").getValue(); - return expectedId == actualId && expectedSourceType.equals(actualName) && expectedInt == actualInt && expectedDouble == actualDouble; + List<SolrInputDocument> docs = (List<SolrInputDocument>)o; + int size = docs.size(); + if(size != expectedDocs.size()) { + return false; + } + for(int i = 0; i < size;++i) { + SolrInputDocument doc = docs.get(i); + Map<String, Object> expectedDoc = expectedDocs.get(i); + for(Map.Entry<String, Object> expectedKv : expectedDoc.entrySet()) { + if(!expectedKv.getValue().equals(doc.get(expectedKv.getKey()).getValue())) { + return false; + } + } + } + return true; } @Override public void describeTo(Description description) { - description.appendText(String.format("fields: [id=%d, doubleField_d=%f, name=%s, intField_i=%d]", expectedId, expectedDouble, expectedSourceType, expectedInt)); + description.appendText(expectedDocs.toString()); } } @@ -95,9 +107,13 @@ public class SolrWriterTest { public void testWriter() throws Exception { IndexingConfigurations configurations = SampleUtil.getSampleIndexingConfigs(); JSONObject message1 = new JSONObject(); + message1.put(Constants.GUID, "guid-1"); + message1.put(Constants.SENSOR_TYPE, "test"); message1.put("intField", 100); message1.put("doubleField", 100.0); JSONObject message2 = new JSONObject(); + message2.put(Constants.GUID, "guid-2"); + message2.put(Constants.SENSOR_TYPE, "test"); message2.put("intField", 200); message2.put("doubleField", 200.0); List<JSONObject> messages = new ArrayList<>(); @@ -108,33 +124,166 @@ public class SolrWriterTest { MetronSolrClient solr = Mockito.mock(MetronSolrClient.class); SolrWriter writer = new SolrWriter().withMetronSolrClient(solr); writer.init(null, null,new IndexingWriterConfiguration("solr", configurations)); - verify(solr, times(1)).createCollection(collection, 1, 1); verify(solr, times(1)).setDefaultCollection(collection); collection = "metron2"; - int numShards = 4; - int replicationFactor = 2; Map<String, Object> globalConfig = configurations.getGlobalConfig(); globalConfig.put("solr.collection", collection); - globalConfig.put("solr.numShards", numShards); - globalConfig.put("solr.replicationFactor", replicationFactor); configurations.updateGlobalConfig(globalConfig); writer = new SolrWriter().withMetronSolrClient(solr); writer.init(null, null, new IndexingWriterConfiguration("solr", configurations)); - verify(solr, times(1)).createCollection(collection, numShards, replicationFactor); verify(solr, times(1)).setDefaultCollection(collection); writer.write("test", new IndexingWriterConfiguration("solr", configurations), new ArrayList<>(), messages); - verify(solr, times(1)).add(argThat(new SolrInputDocumentMatcher(message1.toJSONString().hashCode(), "test", 100, 100.0))); - verify(solr, times(1)).add(argThat(new SolrInputDocumentMatcher(message2.toJSONString().hashCode(), "test", 200, 200.0))); - verify(solr, times(0)).commit(collection); + verify(solr, times(1)).add(eq("yaf"), argThat(new SolrInputDocumentMatcher(ImmutableList.of(message1, message2)))); + verify(solr, times(1)).commit("yaf" + , (boolean)SolrWriter.SolrProperties.COMMIT_WAIT_FLUSH.defaultValue.get() + , (boolean)SolrWriter.SolrProperties.COMMIT_WAIT_SEARCHER.defaultValue.get() + , (boolean)SolrWriter.SolrProperties.COMMIT_SOFT.defaultValue.get() + ); - writer = new SolrWriter().withMetronSolrClient(solr).withShouldCommit(true); - writer.init(null, null, new IndexingWriterConfiguration("solr", configurations)); - writer.write("test", new IndexingWriterConfiguration("solr", configurations), new ArrayList<>(), messages); - verify(solr, times(2)).add(argThat(new SolrInputDocumentMatcher(message1.toJSONString().hashCode(), "test", 100, 100.0))); - verify(solr, times(2)).add(argThat(new SolrInputDocumentMatcher(message2.toJSONString().hashCode(), "test", 200, 200.0))); - verify(solr, times(1)).commit(collection); + } + + @Test + public void configTest_zookeeperQuorumSpecified() throws Exception { + String expected = "test"; + Assert.assertEquals(expected, + SolrWriter.SolrProperties.ZOOKEEPER_QUORUM.coerceOrDefaultOrExcept( + ImmutableMap.of( SolrWriter.SolrProperties.ZOOKEEPER_QUORUM.name, expected) + , String.class)); + } + + @Test(expected=IllegalArgumentException.class) + public void configTest_zookeeperQuorumUnpecified() throws Exception { + SolrWriter.SolrProperties.ZOOKEEPER_QUORUM.coerceOrDefaultOrExcept( + new HashMap<>() + , String.class); + } + + @Test + public void configTest_commitPerBatchSpecified() throws Exception { + Object expected = false; + Assert.assertEquals(expected, + SolrWriter.SolrProperties.COMMIT_PER_BATCH.coerceOrDefaultOrExcept( + ImmutableMap.of( SolrWriter.SolrProperties.COMMIT_PER_BATCH.name, false) + , Boolean.class)); + } + + @Test + public void configTest_commitPerBatchUnpecified() throws Exception { + Assert.assertEquals(SolrWriter.SolrProperties.COMMIT_PER_BATCH.defaultValue.get(), + SolrWriter.SolrProperties.COMMIT_PER_BATCH.coerceOrDefaultOrExcept( + new HashMap<>() + , Boolean.class)); + Assert.assertEquals(SolrWriter.SolrProperties.COMMIT_PER_BATCH.defaultValue.get(), + SolrWriter.SolrProperties.COMMIT_PER_BATCH.coerceOrDefaultOrExcept( + ImmutableMap.of( SolrWriter.SolrProperties.COMMIT_PER_BATCH.name, new DummyClass()) + , Boolean.class)); + } + + @Test + public void configTest_commitSoftSpecified() throws Exception { + Object expected = true; + Assert.assertEquals(expected, + SolrWriter.SolrProperties.COMMIT_SOFT.coerceOrDefaultOrExcept( + ImmutableMap.of( SolrWriter.SolrProperties.COMMIT_SOFT.name, expected) + , Boolean.class)); + } + + @Test + public void configTest_commitSoftUnpecified() throws Exception { + Assert.assertEquals(SolrWriter.SolrProperties.COMMIT_SOFT.defaultValue.get(), + SolrWriter.SolrProperties.COMMIT_SOFT.coerceOrDefaultOrExcept( + new HashMap<>() + , Boolean.class)); + Assert.assertEquals(SolrWriter.SolrProperties.COMMIT_SOFT.defaultValue.get(), + SolrWriter.SolrProperties.COMMIT_SOFT.coerceOrDefaultOrExcept( + ImmutableMap.of( SolrWriter.SolrProperties.COMMIT_SOFT.name, new DummyClass()) + , Boolean.class)); + } + + @Test + public void configTest_commitWaitFlushSpecified() throws Exception { + Object expected = false; + Assert.assertEquals(expected, + SolrWriter.SolrProperties.COMMIT_WAIT_FLUSH.coerceOrDefaultOrExcept( + ImmutableMap.of( SolrWriter.SolrProperties.COMMIT_WAIT_FLUSH.name, expected) + , Boolean.class)); } + + @Test + public void configTest_commitWaitFlushUnspecified() throws Exception { + Assert.assertEquals(SolrWriter.SolrProperties.COMMIT_WAIT_FLUSH.defaultValue.get(), + SolrWriter.SolrProperties.COMMIT_WAIT_FLUSH.coerceOrDefaultOrExcept( + new HashMap<>() + , Boolean.class)); + Assert.assertEquals(SolrWriter.SolrProperties.COMMIT_WAIT_FLUSH.defaultValue.get(), + SolrWriter.SolrProperties.COMMIT_WAIT_FLUSH.coerceOrDefaultOrExcept( + ImmutableMap.of( SolrWriter.SolrProperties.COMMIT_WAIT_FLUSH.name, new DummyClass()) + , Boolean.class)); + } + + @Test + public void configTest_commitWaitSearcherSpecified() throws Exception { + Object expected = false; + Assert.assertEquals(expected, + SolrWriter.SolrProperties.COMMIT_WAIT_SEARCHER.coerceOrDefaultOrExcept( + ImmutableMap.of( SolrWriter.SolrProperties.COMMIT_WAIT_SEARCHER.name, expected) + , Boolean.class)); + } + + @Test + public void configTest_commitWaitSearcherUnspecified() throws Exception { + Assert.assertEquals(SolrWriter.SolrProperties.COMMIT_WAIT_SEARCHER.defaultValue.get(), + SolrWriter.SolrProperties.COMMIT_WAIT_SEARCHER.coerceOrDefaultOrExcept( + new HashMap<>() + , Boolean.class)); + Assert.assertEquals(SolrWriter.SolrProperties.COMMIT_WAIT_SEARCHER.defaultValue.get(), + SolrWriter.SolrProperties.COMMIT_WAIT_SEARCHER.coerceOrDefaultOrExcept( + ImmutableMap.of( SolrWriter.SolrProperties.COMMIT_WAIT_SEARCHER.name, new DummyClass()) + , Boolean.class)); + } + + @Test + public void configTest_defaultCollectionSpecified() throws Exception { + Object expected = "mycollection"; + Assert.assertEquals(expected, + SolrWriter.SolrProperties.DEFAULT_COLLECTION.coerceOrDefaultOrExcept( + ImmutableMap.of( SolrWriter.SolrProperties.DEFAULT_COLLECTION.name, expected) + , String.class)); + } + + @Test + public void configTest_defaultCollectionUnspecified() throws Exception { + Assert.assertEquals(SolrWriter.SolrProperties.DEFAULT_COLLECTION.defaultValue.get(), + SolrWriter.SolrProperties.DEFAULT_COLLECTION.coerceOrDefaultOrExcept( + new HashMap<>() + , String.class)); + } + + @Test + public void configTest_httpConfigSpecified() throws Exception { + Object expected = new HashMap<String, Object>() {{ + put("name", "metron"); + }}; + Assert.assertEquals(expected, + SolrWriter.SolrProperties.HTTP_CONFIG.coerceOrDefaultOrExcept( + ImmutableMap.of( SolrWriter.SolrProperties.HTTP_CONFIG.name, expected) + , Map.class)); + } + + @Test + public void configTest_httpConfigUnspecified() throws Exception { + Assert.assertEquals(SolrWriter.SolrProperties.HTTP_CONFIG.defaultValue.get(), + SolrWriter.SolrProperties.HTTP_CONFIG.coerceOrDefaultOrExcept( + new HashMap<>() + , Map.class)); + Assert.assertEquals(SolrWriter.SolrProperties.HTTP_CONFIG.defaultValue.get(), + SolrWriter.SolrProperties.HTTP_CONFIG.coerceOrDefaultOrExcept( + ImmutableMap.of( SolrWriter.SolrProperties.HTTP_CONFIG.name, new DummyClass()) + , Map.class)); + } + + public static class DummyClass {} }