Repository: incubator-streams Updated Branches: refs/heads/STREAMS-170 ddf359c50 -> cf4fd7724
integration test harness very basic test for persist writer using harness tests for new processor capability Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/cf4fd772 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/cf4fd772 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/cf4fd772 Branch: refs/heads/STREAMS-170 Commit: cf4fd7724164db06f178c0ef2aade45797be897f Parents: ddf359c Author: sblackmon <[email protected]> Authored: Mon Oct 20 16:15:35 2014 -0500 Committer: sblackmon <[email protected]> Committed: Mon Oct 20 16:15:35 2014 -0500 ---------------------------------------------------------------------- .../streams-persist-elasticsearch/pom.xml | 47 ++++++++++++ .../processor/DocumentToMetadataProcessor.java | 29 +++---- .../ElasticsearchConfiguration.json | 3 +- .../test/TestDatumFromMetadataProcessor.java | 81 ++++++++++++++++++++ .../test/TestDocumentToMetadataProcessor.java | 63 +++++++++++++++ .../test/TestElasticsearchPersistWriter.java | 70 +++++++++++++++++ 6 files changed, 274 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/pom.xml b/streams-contrib/streams-persist-elasticsearch/pom.xml index 95f3357..b75c38d 100644 --- a/streams-contrib/streams-persist-elasticsearch/pom.xml +++ b/streams-contrib/streams-persist-elasticsearch/pom.xml @@ -13,10 +13,34 @@ <properties> <elasticsearch.version>1.1.0</elasticsearch.version> + <lucene.version>4.7.2</lucene.version> </properties> <dependencies> + <!-- Test includes --> <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-test-framework</artifactId> + <version>${lucene.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-codecs</artifactId> + <version>${lucene.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-config</artifactId> <version>${project.version}</version> @@ -55,6 +79,29 @@ <artifactId>elasticsearch</artifactId> <version>${elasticsearch.version}</version> </dependency> + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + <type>test-jar</type> + <version>${elasticsearch.version}</version> + </dependency> + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-test-framework</artifactId> + <version>${lucene.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-codecs</artifactId> + <version>${lucene.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <version>1.3</version> + </dependency> </dependencies> </dependencyManagement> <build> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java index c4c654f..ed449fd 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java @@ -18,6 +18,7 @@ package org.apache.streams.elasticsearch.processor; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -32,6 +33,8 @@ import org.apache.streams.elasticsearch.ElasticsearchClientManager; import org.apache.streams.elasticsearch.ElasticsearchConfigurator; import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration; import org.apache.streams.jackson.StreamsJacksonMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; @@ -49,32 +52,24 @@ public class DocumentToMetadataProcessor implements StreamsProcessor, Serializab public final static String STREAMS_ID = "DatumFromMetadataProcessor"; - private ElasticsearchClientManager elasticsearchClientManager; - private ElasticsearchReaderConfiguration config; - private ObjectMapper mapper; - public DocumentToMetadataProcessor() { - Config config = StreamsConfigurator.config.getConfig("elasticsearch"); - this.config = ElasticsearchConfigurator.detectReaderConfiguration(config); - } + private static final Logger LOGGER = LoggerFactory.getLogger(DocumentToMetadataProcessor.class); - public DocumentToMetadataProcessor(Config config) { - this.config = ElasticsearchConfigurator.detectReaderConfiguration(config); - } - - public DocumentToMetadataProcessor(ElasticsearchReaderConfiguration config) { - this.config = config; + public DocumentToMetadataProcessor() { } @Override public List<StreamsDatum> process(StreamsDatum entry) { List<StreamsDatum> result = Lists.newArrayList(); + Object object = entry.getDocument(); ObjectNode metadataObjectNode; try { - metadataObjectNode = mapper.readValue((String) entry.getDocument(), ObjectNode.class); - } catch (IOException e) { + String docAsJson = (object instanceof String) ? object.toString() : mapper.writeValueAsString(object); + metadataObjectNode = mapper.readValue(docAsJson, ObjectNode.class); + } catch (Throwable e) { + LOGGER.warn("Exception: %s", e.getMessage()); return result; } @@ -92,15 +87,13 @@ public class DocumentToMetadataProcessor implements StreamsProcessor, Serializab @Override public void prepare(Object configurationObject) { - this.elasticsearchClientManager = new ElasticsearchClientManager(config); mapper = StreamsJacksonMapper.getInstance(); mapper.registerModule(new JsonOrgModule()); - } @Override public void cleanUp() { - this.elasticsearchClientManager.getClient().close(); + mapper = null; } public static Map<String, Object> asMap(JsonNode node) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json index 6524dcc..e2ed37c 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json +++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json @@ -14,7 +14,8 @@ }, "port": { "type": "integer", - "description": "Elasticsearch Transport API port" + "description": "Elasticsearch Transport API port", + "default": 9300 }, "clusterName": { "type": "string", http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDatumFromMetadataProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDatumFromMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDatumFromMetadataProcessor.java new file mode 100644 index 0000000..66434bc --- /dev/null +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDatumFromMetadataProcessor.java @@ -0,0 +1,81 @@ +package org.apache.streams.elasticsearch.test; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.lang.SerializationUtils; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.elasticsearch.ElasticsearchConfiguration; +import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; +import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration; +import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration; +import org.apache.streams.elasticsearch.processor.DatumFromMetadataProcessor; +import org.apache.streams.elasticsearch.processor.DocumentToMetadataProcessor; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.Activity; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Created by sblackmon on 10/20/14. + */ [email protected](scope= ElasticsearchIntegrationTest.Scope.TEST, numNodes=1) +public class TestDatumFromMetadataProcessor extends ElasticsearchIntegrationTest { + + private final String TEST_INDEX = "TestDatumFromMetadataProcessor".toLowerCase(); + + private ElasticsearchReaderConfiguration testConfiguration; + + @Test + public void testSerializability() { + DatumFromMetadataProcessor processor = new DatumFromMetadataProcessor(testConfiguration); + + DatumFromMetadataProcessor clone = (DatumFromMetadataProcessor) SerializationUtils.clone(processor); + } + + @Before + public void prepareTest() { + + testConfiguration = new ElasticsearchReaderConfiguration(); + testConfiguration.setHosts(Lists.newArrayList("localhost")); + testConfiguration.setClusterName(cluster().getClusterName()); + + String testJsonString = "{\"dummy\":\"true\"}"; + + client().index(client().prepareIndex(TEST_INDEX, "activity", "id").setSource(testJsonString).request()).actionGet(5, TimeUnit.SECONDS); + + } + + @Test + public void testDatumFromMetadataProcessor() { + + Map<String, Object> metadata = Maps.newHashMap(); + + metadata.put("index", TEST_INDEX); + metadata.put("type", "activity"); + metadata.put("id", "id"); + + DatumFromMetadataProcessor processor = new DatumFromMetadataProcessor(testConfiguration); + + StreamsDatum testInput = new StreamsDatum(null); + + testInput.setMetadata(metadata); + + Assert.assertNull(testInput.document); + + processor.prepare(null); + + StreamsDatum testOutput = processor.process(testInput).get(0); + + processor.cleanUp(); + + Assert.assertNotNull(testOutput.document); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java new file mode 100644 index 0000000..b2bfa84 --- /dev/null +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java @@ -0,0 +1,63 @@ +package org.apache.streams.elasticsearch.test; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; +import org.apache.commons.lang.SerializationUtils; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration; +import org.apache.streams.elasticsearch.processor.DocumentToMetadataProcessor; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +/** + * Created by sblackmon on 10/20/14. + */ +public class TestDocumentToMetadataProcessor { + + private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + + @Before + public void prepareTest() { + + } + + @Test + public void testSerializability() { + DocumentToMetadataProcessor processor = new DocumentToMetadataProcessor(); + + DocumentToMetadataProcessor clone = (DocumentToMetadataProcessor) SerializationUtils.clone(processor); + } + + @Test + public void testDocumentToMetadataProcessor() { + + ObjectNode document = MAPPER.createObjectNode() + .put("a", "a") + .put("b", "b") + .put("c", 6); + + DocumentToMetadataProcessor processor = new DocumentToMetadataProcessor(); + + StreamsDatum testInput = new StreamsDatum(document); + + Assert.assertNotNull(testInput.document); + Assert.assertNotNull(testInput.metadata); + Assert.assertEquals(testInput.metadata.size(), 0); + + processor.prepare(null); + + StreamsDatum testOutput = processor.process(testInput).get(0); + + processor.cleanUp(); + + Assert.assertNotNull(testOutput.metadata); + Assert.assertEquals(testInput.metadata.size(), 3); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriter.java new file mode 100644 index 0000000..8452592 --- /dev/null +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriter.java @@ -0,0 +1,70 @@ +package org.apache.streams.elasticsearch.test; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.commons.lang.SerializationUtils; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.elasticsearch.ElasticsearchConfiguration; +import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; +import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration; +import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.Activity; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Before; +import org.junit.Test; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +/** + * Created by sblackmon on 10/20/14. + */ [email protected](scope= ElasticsearchIntegrationTest.Scope.TEST, numNodes=1) +public class TestElasticsearchPersistWriter extends ElasticsearchIntegrationTest { + + private final String TEST_INDEX = "TestElasticsearchPersistWriter".toLowerCase(); + + private ElasticsearchWriterConfiguration testConfiguration; + + public void prepareTest() { + + testConfiguration = new ElasticsearchWriterConfiguration(); + testConfiguration.setHosts(Lists.newArrayList("localhost")); + testConfiguration.setClusterName(cluster().getClusterName()); + + } + + @Test + public void testPersistWriterString() { + + ElasticsearchWriterConfiguration testConfiguration = new ElasticsearchWriterConfiguration(); + testConfiguration.setHosts(Lists.newArrayList("localhost")); + testConfiguration.setClusterName(cluster().getClusterName()); + testConfiguration.setBatchSize(1l); + testConfiguration.setIndex(TEST_INDEX); + testConfiguration.setType("string"); + ElasticsearchPersistWriter testPersistWriter = new ElasticsearchPersistWriter(testConfiguration); + testPersistWriter.prepare(null); + + String testJsonString = "{\"dummy\":\"true\"}"; + + assert(!indexExists(TEST_INDEX)); + + testPersistWriter.write(new StreamsDatum(testJsonString, "test")); + + testPersistWriter.cleanUp(); + + flushAndRefresh(); + + assert(indexExists(TEST_INDEX)); + + long count = client().count(client().prepareCount().request()).actionGet().getCount(); + + assert(count > 0); + + } +}
