http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java index d34f53f..e356aff 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java @@ -18,27 +18,24 @@ package org.apache.streams.elasticsearch.test; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; -import org.apache.commons.io.Charsets; -import org.apache.commons.io.IOUtils; import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.elasticsearch.ElasticsearchClientManager; import org.apache.streams.elasticsearch.ElasticsearchPersistUpdater; -import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; + +import org.apache.commons.io.Charsets; +import org.apache.commons.io.IOUtils; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.search.SearchRequestBuilder; @@ -53,151 +50,150 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.io.FileInputStream; import java.io.InputStream; import java.util.List; -import java.util.Properties; import static junit.framework.TestCase.assertTrue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; /** - * Created by sblackmon on 10/20/14. + * Integration Test for + * @see org.apache.streams.elasticsearch.ElasticsearchPersistUpdater */ public class ElasticsearchPersistUpdaterIT { - private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistUpdaterIT.class); - - private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - - protected ElasticsearchWriterConfiguration testConfiguration; - protected Client testClient; - - @Before - public void prepareTest() throws Exception { - - Config reference = ConfigFactory.load(); - File conf_file = new File("target/test-classes/ElasticsearchPersistUpdaterIT.conf"); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - Config typesafe = testResourceConfig.withFallback(reference).resolve(); - testConfiguration = new ComponentConfigurator<>(ElasticsearchWriterConfiguration.class).detectConfiguration(typesafe, "elasticsearch"); - testClient = new ElasticsearchClientManager(testConfiguration).getClient(); - - ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); - ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); - assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); - - IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getIndex()); - IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); - assertTrue(indicesExistsResponse.isExists()); - + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistUpdaterIT.class); + + private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + + protected ElasticsearchWriterConfiguration testConfiguration; + protected Client testClient; + + @Before + public void prepareTest() throws Exception { + + Config reference = ConfigFactory.load(); + File conf_file = new File("target/test-classes/ElasticsearchPersistUpdaterIT.conf"); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + testConfiguration = new ComponentConfigurator<>(ElasticsearchWriterConfiguration.class).detectConfiguration(typesafe, "elasticsearch"); + testClient = new ElasticsearchClientManager(testConfiguration).getClient(); + + ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); + ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); + assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); + + IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getIndex()); + IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); + assertTrue(indicesExistsResponse.isExists()); + + } + + @Test + public void testPersistUpdater() throws Exception { + + IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getIndex()); + IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); + assertTrue(indicesExistsResponse.isExists()); + + SearchRequestBuilder countRequest = testClient + .prepareSearch(testConfiguration.getIndex()) + .setTypes(testConfiguration.getType()); + SearchResponse countResponse = countRequest.execute().actionGet(); + + long count = countResponse.getHits().getTotalHits(); + + ElasticsearchPersistUpdater testPersistUpdater = new ElasticsearchPersistUpdater(testConfiguration); + testPersistUpdater.prepare(null); + + InputStream testActivityFolderStream = ElasticsearchPersistUpdaterIT.class.getClassLoader() + .getResourceAsStream("activities"); + List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); + + for( String file : files) { + LOGGER.info("File: " + file ); + InputStream testActivityFileStream = ElasticsearchPersistUpdaterIT.class.getClassLoader() + .getResourceAsStream("activities/" + file); + Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class); + Activity update = new Activity(); + update.setAdditionalProperty("updated", Boolean.TRUE); + update.setAdditionalProperty("str", "str"); + update.setAdditionalProperty("long", 10l); + update.setActor( + new ActivityObject() + .withAdditionalProperty("updated", Boolean.TRUE) + .withAdditionalProperty("double", 10d) + .withAdditionalProperty("map", + MAPPER.createObjectNode().set("field", MAPPER.createArrayNode().add("item")))); + + StreamsDatum datum = new StreamsDatum(update, activity.getVerb()); + testPersistUpdater.write( datum ); + LOGGER.info("Updated: " + activity.getVerb() ); } - @Test - public void testPersistUpdater() throws Exception { - - IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getIndex()); - IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); - assertTrue(indicesExistsResponse.isExists()); - - SearchRequestBuilder countRequest = testClient - .prepareSearch(testConfiguration.getIndex()) - .setTypes(testConfiguration.getType()); - SearchResponse countResponse = countRequest.execute().actionGet(); + testPersistUpdater.cleanUp(); - long count = countResponse.getHits().getTotalHits(); + SearchRequestBuilder updatedCountRequest = testClient + .prepareSearch(testConfiguration.getIndex()) + .setTypes(testConfiguration.getType()) + .setQuery(QueryBuilders.existsQuery("updated")); + SearchResponse updatedCount = updatedCountRequest.execute().actionGet(); - ElasticsearchPersistUpdater testPersistUpdater = new ElasticsearchPersistUpdater(testConfiguration); - testPersistUpdater.prepare(null); + LOGGER.info("updated: {}", updatedCount.getHits().getTotalHits()); - InputStream testActivityFolderStream = ElasticsearchPersistUpdaterIT.class.getClassLoader() - .getResourceAsStream("activities"); - List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); + assertEquals(count, updatedCount.getHits().getTotalHits()); - for( String file : files) { - LOGGER.info("File: " + file ); - InputStream testActivityFileStream = ElasticsearchPersistUpdaterIT.class.getClassLoader() - .getResourceAsStream("activities/" + file); - Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class); - Activity update = new Activity(); - update.setAdditionalProperty("updated", Boolean.TRUE); - update.setAdditionalProperty("str", "str"); - update.setAdditionalProperty("long", 10l); - update.setActor( - new ActivityObject() - .withAdditionalProperty("updated", Boolean.TRUE) - .withAdditionalProperty("double", 10d) - .withAdditionalProperty("map", - MAPPER.createObjectNode().set("field", MAPPER.createArrayNode().add("item")))); + SearchRequestBuilder actorUpdatedCountRequest = testClient + .prepareSearch(testConfiguration.getIndex()) + .setTypes(testConfiguration.getType()) + .setQuery(QueryBuilders.termQuery("actor.updated", true)); + SearchResponse actorUpdatedCount = actorUpdatedCountRequest.execute().actionGet(); - StreamsDatum datum = new StreamsDatum(update, activity.getVerb()); - testPersistUpdater.write( datum ); - LOGGER.info("Updated: " + activity.getVerb() ); - } + LOGGER.info("actor.updated: {}", actorUpdatedCount.getHits().getTotalHits()); - testPersistUpdater.cleanUp(); + assertEquals(count, actorUpdatedCount.getHits().getTotalHits()); - SearchRequestBuilder updatedCountRequest = testClient - .prepareSearch(testConfiguration.getIndex()) - .setTypes(testConfiguration.getType()) - .setQuery(QueryBuilders.existsQuery("updated")); - SearchResponse updatedCount = updatedCountRequest.execute().actionGet(); + SearchRequestBuilder strUpdatedCountRequest = testClient + .prepareSearch(testConfiguration.getIndex()) + .setTypes(testConfiguration.getType()) + .setQuery(QueryBuilders.termQuery("str", "str")); + SearchResponse strUpdatedCount = strUpdatedCountRequest.execute().actionGet(); - LOGGER.info("updated: {}", updatedCount.getHits().getTotalHits()); + LOGGER.info("strupdated: {}", strUpdatedCount.getHits().getTotalHits()); - assertEquals(count, updatedCount.getHits().getTotalHits()); + assertEquals(count, strUpdatedCount.getHits().getTotalHits()); - SearchRequestBuilder actorUpdatedCountRequest = testClient - .prepareSearch(testConfiguration.getIndex()) - .setTypes(testConfiguration.getType()) - .setQuery(QueryBuilders.termQuery("actor.updated", true)); - SearchResponse actorUpdatedCount = actorUpdatedCountRequest.execute().actionGet(); + SearchRequestBuilder longUpdatedCountRequest = testClient + .prepareSearch(testConfiguration.getIndex()) + .setTypes(testConfiguration.getType()) + .setQuery(QueryBuilders.rangeQuery("long").from(9).to(11)); + SearchResponse longUpdatedCount = longUpdatedCountRequest.execute().actionGet(); - LOGGER.info("actor.updated: {}", actorUpdatedCount.getHits().getTotalHits()); + LOGGER.info("longupdated: {}", longUpdatedCount.getHits().getTotalHits()); - assertEquals(count, actorUpdatedCount.getHits().getTotalHits()); + assertEquals(count, longUpdatedCount.getHits().getTotalHits()); - SearchRequestBuilder strUpdatedCountRequest = testClient - .prepareSearch(testConfiguration.getIndex()) - .setTypes(testConfiguration.getType()) - .setQuery(QueryBuilders.termQuery("str", "str")); - SearchResponse strUpdatedCount = strUpdatedCountRequest.execute().actionGet(); + SearchRequestBuilder doubleUpdatedCountRequest = testClient + .prepareSearch(testConfiguration.getIndex()) + .setTypes(testConfiguration.getType()) + .setQuery(QueryBuilders.rangeQuery("long").from(9).to(11)); + SearchResponse doubleUpdatedCount = doubleUpdatedCountRequest.execute().actionGet(); - LOGGER.info("strupdated: {}", strUpdatedCount.getHits().getTotalHits()); + LOGGER.info("doubleupdated: {}", doubleUpdatedCount.getHits().getTotalHits()); - assertEquals(count, strUpdatedCount.getHits().getTotalHits()); + assertEquals(count, doubleUpdatedCount.getHits().getTotalHits()); - SearchRequestBuilder longUpdatedCountRequest = testClient - .prepareSearch(testConfiguration.getIndex()) - .setTypes(testConfiguration.getType()) - .setQuery(QueryBuilders.rangeQuery("long").from(9).to(11)); - SearchResponse longUpdatedCount = longUpdatedCountRequest.execute().actionGet(); + SearchRequestBuilder mapUpdatedCountRequest = testClient + .prepareSearch(testConfiguration.getIndex()) + .setTypes(testConfiguration.getType()) + .setQuery(QueryBuilders.termQuery("actor.map.field", "item")); + SearchResponse mapUpdatedCount = mapUpdatedCountRequest.execute().actionGet(); - LOGGER.info("longupdated: {}", longUpdatedCount.getHits().getTotalHits()); + LOGGER.info("mapfieldupdated: {}", mapUpdatedCount.getHits().getTotalHits()); - assertEquals(count, longUpdatedCount.getHits().getTotalHits()); + assertEquals(count, mapUpdatedCount.getHits().getTotalHits()); - SearchRequestBuilder doubleUpdatedCountRequest = testClient - .prepareSearch(testConfiguration.getIndex()) - .setTypes(testConfiguration.getType()) - .setQuery(QueryBuilders.rangeQuery("long").from(9).to(11)); - SearchResponse doubleUpdatedCount = doubleUpdatedCountRequest.execute().actionGet(); - - LOGGER.info("doubleupdated: {}", doubleUpdatedCount.getHits().getTotalHits()); - - assertEquals(count, doubleUpdatedCount.getHits().getTotalHits()); - - SearchRequestBuilder mapUpdatedCountRequest = testClient - .prepareSearch(testConfiguration.getIndex()) - .setTypes(testConfiguration.getType()) - .setQuery(QueryBuilders.termQuery("actor.map.field", "item")); - SearchResponse mapUpdatedCount = mapUpdatedCountRequest.execute().actionGet(); - - LOGGER.info("mapfieldupdated: {}", mapUpdatedCount.getHits().getTotalHits()); - - assertEquals(count, mapUpdatedCount.getHits().getTotalHits()); - - } + } }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java index f291dcd..f290971 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java @@ -18,21 +18,21 @@ package org.apache.streams.elasticsearch.test; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; -import org.apache.commons.io.Charsets; -import org.apache.commons.io.IOUtils; import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.elasticsearch.ElasticsearchClientManager; import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; + +import org.apache.commons.io.Charsets; +import org.apache.commons.io.IOUtils; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; @@ -58,71 +58,72 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; /** - * Created by sblackmon on 10/20/14. + * Integration Test for + * @see org.apache.streams.elasticsearch.ElasticsearchPersistWriter */ public class ElasticsearchPersistWriterIT { - private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriterIT.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriterIT.class); - private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - protected ElasticsearchWriterConfiguration testConfiguration; - protected Client testClient; + protected ElasticsearchWriterConfiguration testConfiguration; + protected Client testClient; - @Before - public void prepareTest() throws Exception { + @Before + public void prepareTest() throws Exception { - Config reference = ConfigFactory.load(); - File conf_file = new File("target/test-classes/ElasticsearchPersistWriterIT.conf"); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - Config typesafe = testResourceConfig.withFallback(reference).resolve(); - testConfiguration = new ComponentConfigurator<>(ElasticsearchWriterConfiguration.class).detectConfiguration(typesafe, "elasticsearch"); - testClient = new ElasticsearchClientManager(testConfiguration).getClient(); + Config reference = ConfigFactory.load(); + File conf_file = new File("target/test-classes/ElasticsearchPersistWriterIT.conf"); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + testConfiguration = new ComponentConfigurator<>(ElasticsearchWriterConfiguration.class).detectConfiguration(typesafe, "elasticsearch"); + testClient = new ElasticsearchClientManager(testConfiguration).getClient(); - ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); - ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); - assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); + ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); + ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); + assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); - IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getIndex()); - IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); - if(indicesExistsResponse.isExists()) { - DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getIndex()); - DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet(); - assertTrue(deleteIndexResponse.isAcknowledged()); - }; + IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getIndex()); + IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); + if(indicesExistsResponse.isExists()) { + DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getIndex()); + DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet(); + assertTrue(deleteIndexResponse.isAcknowledged()); + }; - } + } - @Test - public void testPersistWriter() throws Exception { + @Test + public void testPersistWriter() throws Exception { - ElasticsearchPersistWriter testPersistWriter = new ElasticsearchPersistWriter(testConfiguration); - testPersistWriter.prepare(null); + ElasticsearchPersistWriter testPersistWriter = new ElasticsearchPersistWriter(testConfiguration); + testPersistWriter.prepare(null); - InputStream testActivityFolderStream = ElasticsearchPersistWriterIT.class.getClassLoader() - .getResourceAsStream("activities"); - List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); + InputStream testActivityFolderStream = ElasticsearchPersistWriterIT.class.getClassLoader() + .getResourceAsStream("activities"); + List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); - for( String file : files) { - LOGGER.info("File: " + file ); - InputStream testActivityFileStream = ElasticsearchPersistWriterIT.class.getClassLoader() - .getResourceAsStream("activities/" + file); - Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class); - StreamsDatum datum = new StreamsDatum(activity, activity.getVerb()); - testPersistWriter.write( datum ); - LOGGER.info("Wrote: " + activity.getVerb() ); - } + for( String file : files) { + LOGGER.info("File: " + file ); + InputStream testActivityFileStream = ElasticsearchPersistWriterIT.class.getClassLoader() + .getResourceAsStream("activities/" + file); + Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class); + StreamsDatum datum = new StreamsDatum(activity, activity.getVerb()); + testPersistWriter.write( datum ); + LOGGER.info("Wrote: " + activity.getVerb() ); + } - testPersistWriter.cleanUp(); + testPersistWriter.cleanUp(); - SearchRequestBuilder countRequest = testClient - .prepareSearch(testConfiguration.getIndex()) - .setTypes(testConfiguration.getType()); - SearchResponse countResponse = countRequest.execute().actionGet(); + SearchRequestBuilder countRequest = testClient + .prepareSearch(testConfiguration.getIndex()) + .setTypes(testConfiguration.getType()); + SearchResponse countResponse = countRequest.execute().actionGet(); - assertEquals(89, countResponse.getHits().getTotalHits()); + assertEquals(89, countResponse.getHits().getTotalHits()); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java index ab45cf3..76f10b1 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java @@ -18,25 +18,20 @@ package org.apache.streams.elasticsearch.test; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Sets; -import org.apache.commons.io.Charsets; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.SerializationUtils; import org.apache.streams.core.StreamsDatum; -import org.apache.streams.elasticsearch.processor.DocumentToMetadataProcessor; import org.apache.streams.elasticsearch.processor.MetadataFromDocumentProcessor; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; -import org.junit.Assert; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; + +import org.apache.commons.io.Charsets; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.SerializationUtils; import org.junit.Before; import org.junit.Test; -import org.reflections.Reflections; -import org.reflections.scanners.SubTypesScanner; -import org.reflections.util.ClasspathHelper; -import org.reflections.util.ConfigurationBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,88 +40,89 @@ import java.util.List; import java.util.Set; /** - * Created by sblackmon on 10/20/14. + * Unit Test for + * @see org.apache.streams.elasticsearch.processor.MetadataFromDocumentProcessor */ public class TestMetadataFromDocumentProcessor { - private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - private final static Logger LOGGER = LoggerFactory.getLogger(TestMetadataFromDocumentProcessor.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TestMetadataFromDocumentProcessor.class); - @Before - public void prepareTest() { + @Before + public void prepareTest() { - } + } - @Test - public void testSerializability() { - MetadataFromDocumentProcessor processor = new MetadataFromDocumentProcessor(); + @Test + public void testSerializability() { + MetadataFromDocumentProcessor processor = new MetadataFromDocumentProcessor(); - MetadataFromDocumentProcessor clone = (MetadataFromDocumentProcessor) SerializationUtils.clone(processor); - } + MetadataFromDocumentProcessor clone = (MetadataFromDocumentProcessor) SerializationUtils.clone(processor); + } - @Test - public void testMetadataFromDocumentProcessor() throws Exception { + @Test + public void testMetadataFromDocumentProcessor() throws Exception { - MetadataFromDocumentProcessor processor = new MetadataFromDocumentProcessor(); + MetadataFromDocumentProcessor processor = new MetadataFromDocumentProcessor(); - processor.prepare(null); + processor.prepare(null); - InputStream testActivityFolderStream = TestMetadataFromDocumentProcessor.class.getClassLoader() - .getResourceAsStream("activities"); - List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); + InputStream testActivityFolderStream = TestMetadataFromDocumentProcessor.class.getClassLoader() + .getResourceAsStream("activities"); + List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); - Set<ActivityObject> objects = Sets.newHashSet(); + Set<ActivityObject> objects = Sets.newHashSet(); - for( String file : files) { - LOGGER.info("File: " + file ); - InputStream testActivityFileStream = TestMetadataFromDocumentProcessor.class.getClassLoader() - .getResourceAsStream("activities/" + file); - Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class); - activity.setId(activity.getVerb()); - activity.getAdditionalProperties().remove("$license"); + for( String file : files) { + LOGGER.info("File: " + file ); + InputStream testActivityFileStream = TestMetadataFromDocumentProcessor.class.getClassLoader() + .getResourceAsStream("activities/" + file); + Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class); + activity.setId(activity.getVerb()); + activity.getAdditionalProperties().remove("$license"); - if( activity.getActor().getObjectType() != null) - objects.add(activity.getActor()); - if( activity.getObject().getObjectType() != null) - objects.add(activity.getObject()); + if( activity.getActor().getObjectType() != null) + objects.add(activity.getActor()); + if( activity.getObject().getObjectType() != null) + objects.add(activity.getObject()); - StreamsDatum datum = new StreamsDatum(activity); + StreamsDatum datum = new StreamsDatum(activity); - List<StreamsDatum> resultList = processor.process(datum); - assert(resultList != null); - assert(resultList.size() == 1); + List<StreamsDatum> resultList = processor.process(datum); + assert(resultList != null); + assert(resultList.size() == 1); - StreamsDatum result = resultList.get(0); - assert(result != null); - assert(result.getDocument() != null); - assert(result.getId() != null); - assert(result.getMetadata() != null); - assert(result.getMetadata().get("id") != null); - assert(result.getMetadata().get("type") != null); + StreamsDatum result = resultList.get(0); + assert(result != null); + assert(result.getDocument() != null); + assert(result.getId() != null); + assert(result.getMetadata() != null); + assert(result.getMetadata().get("id") != null); + assert(result.getMetadata().get("type") != null); - LOGGER.info("valid: " + activity.getVerb() ); - } + LOGGER.info("valid: " + activity.getVerb() ); + } - for( ActivityObject activityObject : objects) { - LOGGER.info("Object: " + MAPPER.writeValueAsString(activityObject)); + for( ActivityObject activityObject : objects) { + LOGGER.info("Object: " + MAPPER.writeValueAsString(activityObject)); - activityObject.setId(activityObject.getObjectType()); - StreamsDatum datum = new StreamsDatum(activityObject); + activityObject.setId(activityObject.getObjectType()); + StreamsDatum datum = new StreamsDatum(activityObject); - List<StreamsDatum> resultList = processor.process(datum); - assert(resultList != null); - assert(resultList.size() == 1); + List<StreamsDatum> resultList = processor.process(datum); + assert(resultList != null); + assert(resultList.size() == 1); - StreamsDatum result = resultList.get(0); - assert(result != null); - assert(result.getDocument() != null); - assert(result.getId() != null); - assert(result.getMetadata() != null); - assert(result.getMetadata().get("id") != null); - assert(result.getMetadata().get("type") != null); + StreamsDatum result = resultList.get(0); + assert(result != null); + assert(result.getDocument() != null); + assert(result.getId() != null); + assert(result.getMetadata() != null); + assert(result.getMetadata().get("id") != null); + assert(result.getMetadata().get("type") != null); - LOGGER.info("valid: " + activityObject.getObjectType() ); - } + LOGGER.info("valid: " + activityObject.getObjectType() ); } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java index 504ea5e..b921ba5 100644 --- a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java +++ b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java @@ -18,15 +18,17 @@ package org.apache.streams.filebuffer; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.google.common.collect.Queues; -import com.squareup.tape.QueueFile; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistReader; import org.apache.streams.core.StreamsResultSet; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Queues; +import com.squareup.tape.QueueFile; + import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,135 +50,135 @@ import java.util.concurrent.Executors; */ public class FileBufferPersistReader implements StreamsPersistReader, Serializable { - public static final String STREAMS_ID = "FileBufferPersistReader"; + public static final String STREAMS_ID = "FileBufferPersistReader"; - private static final Logger LOGGER = LoggerFactory.getLogger(FileBufferPersistReader.class); + private static final Logger LOGGER = LoggerFactory.getLogger(FileBufferPersistReader.class); - protected volatile Queue<StreamsDatum> persistQueue; + protected volatile Queue<StreamsDatum> persistQueue; - private ObjectMapper mapper; + private ObjectMapper mapper; - private FileBufferConfiguration config; + private FileBufferConfiguration config; - private QueueFile queueFile; + private QueueFile queueFile; - private boolean isStarted = false; - private boolean isStopped = false; + private boolean isStarted = false; + private boolean isStopped = false; - private ExecutorService executor = Executors.newSingleThreadExecutor(); + private ExecutorService executor = Executors.newSingleThreadExecutor(); - public FileBufferPersistReader() { - this(new ComponentConfigurator<>(FileBufferConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("filebuffer"))); - } + public FileBufferPersistReader() { + this(new ComponentConfigurator<>(FileBufferConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("filebuffer"))); + } - public FileBufferPersistReader(FileBufferConfiguration config) { - this.config = config; - } + public FileBufferPersistReader(FileBufferConfiguration config) { + this.config = config; + } - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - public StreamsResultSet readAll() { - return readCurrent(); - } + @Override + public StreamsResultSet readAll() { + return readCurrent(); + } - @Override - public void startStream() { - isStarted = true; - } + @Override + public void startStream() { + isStarted = true; + } - @Override - public StreamsResultSet readCurrent() { - - while (!queueFile.isEmpty()) { - try { - byte[] bytes = queueFile.peek(); - ByteArrayInputStream bais = new ByteArrayInputStream(bytes); - BufferedReader buf = new BufferedReader(new InputStreamReader(bais)); - String s = buf.readLine(); - LOGGER.debug(s); - write(new StreamsDatum(s)); - queueFile.remove(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - StreamsResultSet current; - current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue)); - persistQueue.clear(); - - return current; - } + @Override + public StreamsResultSet readCurrent() { - private void write( StreamsDatum entry ) { - persistQueue.offer(entry); + while (!queueFile.isEmpty()) { + try { + byte[] bytes = queueFile.peek(); + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + BufferedReader buf = new BufferedReader(new InputStreamReader(bais)); + String line = buf.readLine(); + LOGGER.debug(line); + write(new StreamsDatum(line)); + queueFile.remove(); + } catch (IOException ex) { + ex.printStackTrace(); + } } - @Override - public StreamsResultSet readNew(BigInteger bigInteger) { - return null; - } + StreamsResultSet current; + current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue)); + persistQueue.clear(); - @Override - public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) { - return null; - } + return current; + } - @Override - public boolean isRunning() { - return isStarted && !isStopped; - } + private void write( StreamsDatum entry ) { + persistQueue.offer(entry); + } - @Override - public void prepare(Object configurationObject) { + @Override + public StreamsResultSet readNew(BigInteger bigInteger) { + return null; + } - try { - Thread.sleep(1000); - } catch (InterruptedException ie) { - //Handle exception - } + @Override + public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) { + return null; + } - mapper = new ObjectMapper(); + @Override + public boolean isRunning() { + return isStarted && !isStopped; + } - File file = new File( config.getPath()); + @Override + public void prepare(Object configurationObject) { - if( !file.exists() ) { - try { - file.createNewFile(); - } catch (IOException e) { - LOGGER.error(e.getMessage()); - } - } + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + //Handle exception + } - Preconditions.checkArgument(file.exists()); - Preconditions.checkArgument(file.canRead()); + mapper = new ObjectMapper(); - try { - queueFile = new QueueFile(file); - } catch (IOException e) { - LOGGER.error(e.getMessage()); - } + File file = new File( config.getPath()); - Preconditions.checkNotNull(queueFile); + if ( !file.exists() ) { + try { + file.createNewFile(); + } catch (IOException ex) { + LOGGER.error(ex.getMessage()); + } + } - this.persistQueue = new ConcurrentLinkedQueue<>(); + Preconditions.checkArgument(file.exists()); + Preconditions.checkArgument(file.canRead()); + try { + queueFile = new QueueFile(file); + } catch (IOException ex) { + LOGGER.error(ex.getMessage()); } - @Override - public void cleanUp() { - try { - queueFile.close(); - } catch (IOException e) { - e.printStackTrace(); - } finally { - queueFile = null; - isStopped = true; - } + Preconditions.checkNotNull(queueFile); + + this.persistQueue = new ConcurrentLinkedQueue<>(); + + } + + @Override + public void cleanUp() { + try { + queueFile.close(); + } catch (IOException ex) { + ex.printStackTrace(); + } finally { + queueFile = null; + isStopped = true; } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java index 4dea85c..76dfafc 100644 --- a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java +++ b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java @@ -18,15 +18,17 @@ package org.apache.streams.filebuffer; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.squareup.tape.QueueFile; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; import org.apache.streams.util.GuidUtils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.squareup.tape.QueueFile; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,79 +43,79 @@ import java.util.concurrent.ConcurrentLinkedQueue; */ public class FileBufferPersistWriter implements StreamsPersistWriter, Serializable { - public final static String STREAMS_ID = "FileBufferPersistWriter"; + public static final String STREAMS_ID = "FileBufferPersistWriter"; - private static final Logger LOGGER = LoggerFactory.getLogger(FileBufferPersistWriter.class); + private static final Logger LOGGER = LoggerFactory.getLogger(FileBufferPersistWriter.class); - protected volatile Queue<StreamsDatum> persistQueue; + protected volatile Queue<StreamsDatum> persistQueue; - private ObjectMapper mapper; + private ObjectMapper mapper; - private FileBufferConfiguration config; + private FileBufferConfiguration config; - private QueueFile queueFile; + private QueueFile queueFile; - public FileBufferPersistWriter() { - this(new ComponentConfigurator<>(FileBufferConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("filebuffer"))); - } + public FileBufferPersistWriter() { + this(new ComponentConfigurator<>(FileBufferConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("filebuffer"))); + } - public FileBufferPersistWriter(FileBufferConfiguration config) { - this.config = config; - } + public FileBufferPersistWriter(FileBufferConfiguration config) { + this.config = config; + } - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - public void write(StreamsDatum entry) { + @Override + public void write(StreamsDatum entry) { - String key = entry.getId() != null ? entry.getId() : GuidUtils.generateGuid("filewriter"); + String key = entry.getId() != null ? entry.getId() : GuidUtils.generateGuid("filewriter"); - Preconditions.checkArgument(!Strings.isNullOrEmpty(key)); - Preconditions.checkArgument(entry.getDocument() instanceof String); - Preconditions.checkArgument(!Strings.isNullOrEmpty((String) entry.getDocument())); + Preconditions.checkArgument(!Strings.isNullOrEmpty(key)); + Preconditions.checkArgument(entry.getDocument() instanceof String); + Preconditions.checkArgument(!Strings.isNullOrEmpty((String) entry.getDocument())); - byte[] item = ((String)entry.getDocument()).getBytes(); - try { - queueFile.add(item); - } catch (IOException e) { - e.printStackTrace(); - } + byte[] item = ((String)entry.getDocument()).getBytes(); + try { + queueFile.add(item); + } catch (IOException ex) { + ex.printStackTrace(); } + } - @Override - public void prepare(Object configurationObject) { + @Override + public void prepare(Object configurationObject) { - mapper = new ObjectMapper(); + mapper = new ObjectMapper(); - File file = new File( config.getPath()); + File file = new File( config.getPath()); - try { - queueFile = new QueueFile(file); - } catch (IOException e) { - e.printStackTrace(); - } + try { + queueFile = new QueueFile(file); + } catch (IOException ex) { + ex.printStackTrace(); + } - Preconditions.checkArgument(file.exists()); - Preconditions.checkArgument(file.canWrite()); + Preconditions.checkArgument(file.exists()); + Preconditions.checkArgument(file.canWrite()); - Preconditions.checkNotNull(queueFile); + Preconditions.checkNotNull(queueFile); - this.persistQueue = new ConcurrentLinkedQueue<>(); + this.persistQueue = new ConcurrentLinkedQueue<>(); - } + } - @Override - public void cleanUp() { - try { - queueFile.close(); - } catch (IOException e) { - e.printStackTrace(); - } finally { - queueFile = null; - } + @Override + public void cleanUp() { + try { + queueFile.close(); + } catch (IOException ex) { + ex.printStackTrace(); + } finally { + queueFile = null; } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java index 3c97fd7..847328a 100644 --- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java +++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java @@ -18,16 +18,6 @@ package org.apache.streams.graph; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import org.apache.http.HttpEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.util.EntityUtils; import org.apache.streams.components.http.HttpPersistWriterConfiguration; import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter; import org.apache.streams.config.ComponentConfigurator; @@ -39,6 +29,18 @@ import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; import org.apache.streams.pojo.json.Provider; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; + +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,190 +51,203 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Adds activityobjects as vertices and activities as edges to a graph database with - * an http rest endpoint (such as neo4j) + * an http rest endpoint (such as neo4j). */ public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter { - public static final String STREAMS_ID = GraphHttpPersistWriter.class.getCanonicalName(); + public static final String STREAMS_ID = GraphHttpPersistWriter.class.getCanonicalName(); - private final static Logger LOGGER = LoggerFactory.getLogger(GraphHttpPersistWriter.class); - private final static long MAX_WRITE_LATENCY = 1000; + private static final Logger LOGGER = LoggerFactory.getLogger(GraphHttpPersistWriter.class); + private static final long MAX_WRITE_LATENCY = 1000; - protected GraphHttpConfiguration configuration; + protected GraphHttpConfiguration configuration; - protected QueryGraphHelper queryGraphHelper; - protected HttpGraphHelper httpGraphHelper; + protected QueryGraphHelper queryGraphHelper; + protected HttpGraphHelper httpGraphHelper; - private static ObjectMapper mapper; + private static ObjectMapper mapper; - protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); - public GraphHttpPersistWriter() { - this(new ComponentConfigurator<GraphHttpConfiguration>(GraphHttpConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph"))); - } + /** + * GraphHttpPersistWriter constructor - resolve GraphHttpConfiguration from JVM 'graph'. + */ + public GraphHttpPersistWriter() { + this(new ComponentConfigurator<GraphHttpConfiguration>(GraphHttpConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph"))); + } - public GraphHttpPersistWriter(GraphHttpConfiguration configuration) { - super(StreamsJacksonMapper.getInstance().convertValue(configuration, HttpPersistWriterConfiguration.class)); - if( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) { - super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit/"); + /** + * GraphHttpPersistWriter constructor - use supplied GraphHttpConfiguration. + * @param configuration GraphHttpConfiguration + */ + public GraphHttpPersistWriter(GraphHttpConfiguration configuration) { + super(StreamsJacksonMapper.getInstance().convertValue(configuration, HttpPersistWriterConfiguration.class)); + if ( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) { + super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit/"); + } else if ( configuration.getType().equals(GraphHttpConfiguration.Type.REXSTER)) { + super.configuration.setResourcePath("/graphs/" + configuration.getGraph()); + } + this.configuration = configuration; + } + + @Override + protected ObjectNode preparePayload(StreamsDatum entry) throws Exception { + + Activity activity = null; + ActivityObject activityObject = null; + Object document = entry.getDocument(); + + if (document instanceof Activity) { + activity = (Activity) document; + activityObject = activity.getObject(); + } else if (document instanceof ActivityObject) { + activityObject = (ActivityObject) document; + } else { + ObjectNode objectNode; + if (document instanceof ObjectNode) { + objectNode = (ObjectNode) document; + } else if ( document instanceof String) { + try { + objectNode = mapper.readValue((String) document, ObjectNode.class); + } catch (IOException ex) { + LOGGER.error("Can't handle input: ", entry); + throw ex; } - else if( configuration.getType().equals(GraphHttpConfiguration.Type.REXSTER)) { - super.configuration.setResourcePath("/graphs/" + configuration.getGraph()); + } else { + LOGGER.error("Can't handle input: ", entry); + throw new Exception("Can't create payload from datum."); + } + + if ( objectNode.get("verb") != null ) { + try { + activity = mapper.convertValue(objectNode, Activity.class); + activityObject = activity.getObject(); + } catch (Exception ex) { + activityObject = mapper.convertValue(objectNode, ActivityObject.class); } - this.configuration = configuration; + } else { + activityObject = mapper.convertValue(objectNode, ActivityObject.class); + } } - @Override - protected ObjectNode preparePayload(StreamsDatum entry) throws Exception { - - Activity activity = null; - ActivityObject activityObject = null; - Object document = entry.getDocument(); - - if (document instanceof Activity) { - activity = (Activity) document; - activityObject = activity.getObject(); - } else if (document instanceof ActivityObject) { - activityObject = (ActivityObject) document; - } else { - ObjectNode objectNode; - if (document instanceof ObjectNode) { - objectNode = (ObjectNode) document; - } else if( document instanceof String) { - try { - objectNode = mapper.readValue((String) document, ObjectNode.class); - } catch (IOException e) { - LOGGER.error("Can't handle input: ", entry); - throw e; - } - } else { - LOGGER.error("Can't handle input: ", entry); - throw new Exception("Can't create payload from datum."); - } - - if( objectNode.get("verb") != null ) { - try { - activity = mapper.convertValue(objectNode, Activity.class); - activityObject = activity.getObject(); - } catch (Exception e) { - activityObject = mapper.convertValue(objectNode, ActivityObject.class); - } - } else { - activityObject = mapper.convertValue(objectNode, ActivityObject.class); - } - } + Preconditions.checkArgument(activity != null || activityObject != null); + + ObjectNode request = mapper.createObjectNode(); + ArrayNode statements = mapper.createArrayNode(); + + // always add vertices first - Preconditions.checkArgument(activity != null || activityObject != null); + List<String> labels = Lists.newArrayList("streams"); - ObjectNode request = mapper.createObjectNode(); - ArrayNode statements = mapper.createArrayNode(); + if ( activityObject != null ) { + if ( activityObject.getObjectType() != null ) { + labels.add(activityObject.getObjectType()); + } + statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(activityObject))); + } - // always add vertices first + if ( activity != null ) { - List<String> labels = Lists.newArrayList("streams"); + ActivityObject actor = activity.getActor(); + Provider provider = activity.getProvider(); - if( activityObject != null ) { - if (activityObject.getObjectType() != null) - labels.add(activityObject.getObjectType()); - statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(activityObject))); + if ( provider != null + && !Strings.isNullOrEmpty(provider.getId()) ) { + labels.add(provider.getId()); + } + if (actor != null + && !Strings.isNullOrEmpty(actor.getId())) { + if (actor.getObjectType() != null) { + labels.add(actor.getObjectType()); } + statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(actor))); + } - if( activity != null ) { - - ActivityObject actor = activity.getActor(); - Provider provider = activity.getProvider(); - - if( provider != null && - !Strings.isNullOrEmpty(provider.getId()) ) { - labels.add(provider.getId()); - } - if (actor != null && - !Strings.isNullOrEmpty(actor.getId())) { - if (actor.getObjectType() != null) - labels.add(actor.getObjectType()); - statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(actor))); - } - - if (activityObject != null && - !Strings.isNullOrEmpty(activityObject.getId())) { - if (activityObject.getObjectType() != null) - labels.add(activityObject.getObjectType()); - statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(activityObject))); - } - - // then add edge - - if (!Strings.isNullOrEmpty(activity.getVerb())) { - statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.createEdgeRequest(activity))); - } + if (activityObject != null + && !Strings.isNullOrEmpty(activityObject.getId())) { + if (activityObject.getObjectType() != null) { + labels.add(activityObject.getObjectType()); } + statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(activityObject))); + } - request.put("statements", statements); - return request; + // then add edge + if (!Strings.isNullOrEmpty(activity.getVerb())) { + statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.createEdgeRequest(activity))); + } } - @Override - protected ObjectNode executePost(HttpPost httpPost) { - - Preconditions.checkNotNull(httpPost); - - ObjectNode result = null; - - CloseableHttpResponse response = null; - - String entityString = null; - try { - response = httpclient.execute(httpPost); - HttpEntity entity = response.getEntity(); - if (response.getStatusLine().getStatusCode() == 200 || response.getStatusLine().getStatusCode() == 201 && entity != null) { - entityString = EntityUtils.toString(entity); - result = mapper.readValue(entityString, ObjectNode.class); - } - LOGGER.debug("Writer response:\n{}\n{}\n{}", httpPost.toString(), response.getStatusLine().getStatusCode(), entityString); - if( result == null || - ( - result.get("errors") != null && - result.get("errors").isArray() && - result.get("errors").iterator().hasNext() - ) - ) { - LOGGER.error("Write Error: " + result.get("errors")); - } else { - LOGGER.debug("Write Success"); - } - } catch (IOException e) { - LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, e.getMessage()); - } catch (Exception e) { - LOGGER.error("Write Exception:\n{}\n{}\n{}", httpPost.toString(), response, e.getMessage()); - } finally { - try { - if( response != null) response.close(); - } catch (IOException e) {} + request.put("statements", statements); + return request; + + } + + @Override + protected ObjectNode executePost(HttpPost httpPost) { + + Preconditions.checkNotNull(httpPost); + + ObjectNode result = null; + + CloseableHttpResponse response = null; + + String entityString = null; + try { + response = httpclient.execute(httpPost); + HttpEntity entity = response.getEntity(); + if (response.getStatusLine().getStatusCode() == 200 || response.getStatusLine().getStatusCode() == 201 && entity != null) { + entityString = EntityUtils.toString(entity); + result = mapper.readValue(entityString, ObjectNode.class); + } + LOGGER.debug("Writer response:\n{}\n{}\n{}", httpPost.toString(), response.getStatusLine().getStatusCode(), entityString); + if ( result == null + || ( + result.get("errors") != null + && result.get("errors").isArray() + && result.get("errors").iterator().hasNext() + ) + ) { + LOGGER.error("Write Error: " + result.get("errors")); + } else { + LOGGER.debug("Write Success"); + } + } catch (IOException ex) { + LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, ex.getMessage()); + } catch (Exception ex) { + LOGGER.error("Write Exception:\n{}\n{}\n{}", httpPost.toString(), response, ex.getMessage()); + } finally { + try { + if ( response != null) { + response.close(); } - return result; + } catch (IOException ignored) { + LOGGER.trace("ignored IOException", ignored); + } } + return result; + } - @Override - public void prepare(Object configurationObject) { - - super.prepare(configuration); - mapper = StreamsJacksonMapper.getInstance(); + @Override + public void prepare(Object configurationObject) { - if( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) { - queryGraphHelper = new CypherQueryGraphHelper(); - httpGraphHelper = new Neo4jHttpGraphHelper(); - } + super.prepare(configuration); + mapper = StreamsJacksonMapper.getInstance(); - Preconditions.checkNotNull(queryGraphHelper); - Preconditions.checkNotNull(httpGraphHelper); + if ( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) { + queryGraphHelper = new CypherQueryGraphHelper(); + httpGraphHelper = new Neo4jHttpGraphHelper(); } - @Override - public void cleanUp() { + Preconditions.checkNotNull(queryGraphHelper); + Preconditions.checkNotNull(httpGraphHelper); + } - LOGGER.info("exiting"); + @Override + public void cleanUp() { - } + LOGGER.info("exiting"); + + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java index 731159f..7c6e341 100644 --- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java +++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java @@ -18,10 +18,6 @@ package org.apache.streams.graph; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; import org.apache.streams.components.http.HttpProviderConfiguration; import org.apache.streams.components.http.provider.SimpleHttpProvider; import org.apache.streams.config.ComponentConfigurator; @@ -33,6 +29,12 @@ import org.apache.streams.graph.neo4j.CypherQueryResponse; import org.apache.streams.graph.neo4j.ItemData; import org.apache.streams.graph.neo4j.ItemMetadata; import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,76 +42,86 @@ import java.util.List; /** * Reads a stream of activityobjects from vertices in a graph database with - * an http rest endpoint (such as neo4j) + * an http rest endpoint (such as neo4j). */ public class GraphVertexReader extends SimpleHttpProvider implements StreamsPersistReader { - public static final String STREAMS_ID = GraphVertexReader.class.getCanonicalName(); + public static final String STREAMS_ID = GraphVertexReader.class.getCanonicalName(); - private final static Logger LOGGER = LoggerFactory.getLogger(GraphVertexReader.class); + private static final Logger LOGGER = LoggerFactory.getLogger(GraphVertexReader.class); - protected GraphReaderConfiguration configuration; + protected GraphReaderConfiguration configuration; - private static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + private static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - public GraphVertexReader() { - this(new ComponentConfigurator<GraphReaderConfiguration>(GraphReaderConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph"))); - } + /** + * GraphVertexReader constructor - resolve GraphReaderConfiguration from JVM 'graph'. + */ + public GraphVertexReader() { + this(new ComponentConfigurator<GraphReaderConfiguration>(GraphReaderConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph"))); + } - public GraphVertexReader(GraphReaderConfiguration configuration) { - super(mapper.convertValue(configuration, HttpProviderConfiguration.class)); - if( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) - super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit"); - else if( configuration.getType().equals(GraphHttpConfiguration.Type.REXSTER)) - super.configuration.setResourcePath("/graphs/" + configuration.getGraph()); - this.configuration = configuration; + /** + * GraphVertexReader constructor - use supplied GraphReaderConfiguration. + * @param configuration GraphReaderConfiguration + */ + public GraphVertexReader(GraphReaderConfiguration configuration) { + super(mapper.convertValue(configuration, HttpProviderConfiguration.class)); + if ( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) { + super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit"); + } else if ( configuration.getType().equals(GraphHttpConfiguration.Type.REXSTER)) { + super.configuration.setResourcePath("/graphs/" + configuration.getGraph()); } + this.configuration = configuration; + } - /* - * Neo API query returns something like this: - * { "columns": [ "v" ], "data": [ [ { "data": { props }, etc... } ], [ { "data": { props }, etc... } ] ] } - * - */ - public List<ObjectNode> parse(JsonNode jsonNode) { - List<ObjectNode> results = Lists.newArrayList(); + /** + * Neo API query returns something like this: + * { "columns": [ "v" ], "data": [ [ { "data": { props }, etc... } ], [ { "data": { props }, etc... } ] ] } + * + * @param jsonNode jsonNode + * @return result + */ + public List<ObjectNode> parse(JsonNode jsonNode) { + List<ObjectNode> results = Lists.newArrayList(); - ObjectNode root = (ObjectNode) jsonNode; + ObjectNode root = (ObjectNode) jsonNode; - CypherQueryResponse cypherQueryResponse = mapper.convertValue(root, CypherQueryResponse.class); + CypherQueryResponse cypherQueryResponse = mapper.convertValue(root, CypherQueryResponse.class); - for( List<List<ItemMetadata>> dataWrapper : cypherQueryResponse.getData()) { + for ( List<List<ItemMetadata>> dataWrapper : cypherQueryResponse.getData()) { - for (List<ItemMetadata> itemMetadatas : dataWrapper) { + for (List<ItemMetadata> itemMetadatas : dataWrapper) { - for (ItemMetadata itemMetadata : itemMetadatas) { + for (ItemMetadata itemMetadata : itemMetadatas) { - ItemData itemData = itemMetadata.getData(); + ItemData itemData = itemMetadata.getData(); - LOGGER.debug("itemData: " + itemData); + LOGGER.debug("itemData: " + itemData); - results.add(PropertyUtil.unflattenMap(itemData.getAdditionalProperties(), '.')); - } + results.add(PropertyUtil.unflattenMap(itemData.getAdditionalProperties(), '.')); + } - } + } - } - return results; } + return results; + } - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - public void prepare(Object configurationObject) { + @Override + public void prepare(Object configurationObject) { - super.prepare(configurationObject); + super.prepare(configurationObject); - } + } - @Override - public StreamsResultSet readAll() { - return readCurrent(); - } + @Override + public StreamsResultSet readAll() { + return readCurrent(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java index 0833ba0..17b8840 100644 --- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java +++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java @@ -19,8 +19,7 @@ package org.apache.streams.graph; import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.pojo.json.ActivityObject; + import org.javatuples.Pair; import java.util.Map; @@ -31,6 +30,6 @@ import java.util.Map; */ public interface HttpGraphHelper { - public ObjectNode createHttpRequest(Pair<String, Map<String, Object>> queryPlusParameters); + public ObjectNode createHttpRequest(Pair<String, Map<String, Object>> queryPlusParameters); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java index eeacdae..1699aee 100644 --- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java +++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java @@ -18,28 +18,27 @@ package org.apache.streams.graph; -import com.fasterxml.jackson.databind.node.ObjectNode; - import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; + import org.javatuples.Pair; import java.util.Map; /** * Interface for methods allowing persistance to a graph database which uses a combination - * DSL + * DSL. */ public interface QueryGraphHelper { - public Pair<String, Map<String, Object>> getVertexRequest(String streamsId); + public Pair<String, Map<String, Object>> getVertexRequest(String streamsId); - public Pair<String, Map<String, Object>> getVertexRequest(Long vertexId); + public Pair<String, Map<String, Object>> getVertexRequest(Long vertexId); - public Pair<String, Map<String, Object>> createVertexRequest(ActivityObject activityObject); + public Pair<String, Map<String, Object>> createVertexRequest(ActivityObject activityObject); - public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject activityObject); + public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject activityObject); - public Pair<String, Map<String, Object>> createEdgeRequest(Activity activity); + public Pair<String, Map<String, Object>> createEdgeRequest(Activity activity); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/BinaryGraphHelper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/BinaryGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/BinaryGraphHelper.java deleted file mode 100644 index 3dc8ffc..0000000 --- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/BinaryGraphHelper.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.graph.neo4j; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.streams.data.util.PropertyUtil; -import org.apache.streams.graph.QueryGraphHelper; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.pojo.json.ActivityObject; -import org.javatuples.Pair; -import org.javatuples.Quartet; -import org.stringtemplate.v4.ST; - -import java.util.List; -import java.util.Map; - -/** - * Supporting class for interacting with neo4j via rest API - */ -public class BinaryGraphHelper { - - private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - public Pair<String, Map<String, Object>> createVertexRequest(ActivityObject activityObject) { - - Preconditions.checkNotNull(activityObject.getObjectType()); - - ObjectNode object = mapper.convertValue(activityObject, ObjectNode.class); - Map<String, Object> props = PropertyUtil.flattenToMap(object, '.'); - - Pair<String, Map<String, Object>> queryPlusParameters = new Pair(props.get("id"), props); - - return queryPlusParameters; - } - - public Quartet<String, String, String, Map<String, Object>> createEdgeRequest(Activity activity) { - - ObjectNode object = mapper.convertValue(activity, ObjectNode.class); - Map<String, Object> props = PropertyUtil.flattenToMap(object, '.'); - - Quartet createEdgeRequest = new Quartet( - activity.getActor().getId(), - activity.getObject().getId(), - activity.getId(), - props); - - return createEdgeRequest; - } - - public static String getPropertyValueSetter(Map<String, Object> map, String symbol) { - StringBuilder builder = new StringBuilder(); - for( Map.Entry<String, Object> entry : map.entrySet()) { - if( entry.getValue() instanceof String ) { - String propVal = (String)(entry.getValue()); - builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + propVal + "'"); - } - } - return builder.toString(); - } - - public static String getPropertyParamSetter(Map<String, Object> map, String symbol) { - StringBuilder builder = new StringBuilder(); - for( Map.Entry<String, Object> entry : map.entrySet()) { - if( entry.getValue() instanceof String ) { - String propVal = (String)(entry.getValue()); - builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + propVal + "'"); - } - } - return builder.toString(); - } - - public static String getPropertyCreater(Map<String, Object> map) { - StringBuilder builder = new StringBuilder(); - builder.append("{"); - List<String> parts = Lists.newArrayList(); - for( Map.Entry<String, Object> entry : map.entrySet()) { - if( entry.getValue() instanceof String ) { - String propVal = (String) (entry.getValue()); - parts.add("`"+entry.getKey() + "`:'" + propVal + "'"); - } - } - builder.append(Joiner.on(",").join(parts)); - builder.append("}"); - return builder.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphHelper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphHelper.java deleted file mode 100644 index 8028350..0000000 --- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphHelper.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.graph.neo4j; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import org.apache.streams.data.util.PropertyUtil; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.pojo.json.ActivityObject; -import org.stringtemplate.v4.ST; - -import java.util.List; -import java.util.Map; - -/** - * Supporting class for interacting with neo4j via rest API - */ -public class CypherGraphHelper implements org.apache.streams.graph.GraphHelper { - - private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - public final static String statementKey = "statement"; - public final static String paramsKey = "parameters"; - public final static String propsKey = "props"; - - public final static String getVertexLongIdStatementTemplate = "MATCH (v) WHERE ID(v) = <id> RETURN v"; - public final static String getVertexStringIdStatementTemplate = "MATCH (v {id: '<id>'} ) RETURN v"; - - public final static String createVertexStatementTemplate = "MATCH (x {id: '<id>'}) "+ - "CREATE UNIQUE (v:<type> { props }) "+ - "ON CREATE SET v <labels> "+ - "RETURN v"; - - public final static String mergeVertexStatementTemplate = "MERGE (v:<type> {id: '<id>'}) "+ - "ON CREATE SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "+ - "ON MATCH SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "+ - "RETURN v"; - - public final static String createEdgeStatementTemplate = "MATCH (s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) "+ - "CREATE UNIQUE (s)-[r:<r_type> <r_props>]->(d) "+ - "RETURN r"; - - public ObjectNode getVertexRequest(String streamsId) { - - ObjectNode request = mapper.createObjectNode(); - - ST getVertex = new ST(getVertexStringIdStatementTemplate); - getVertex.add("id", streamsId); - request.put(statementKey, getVertex.render()); - - return request; - } - - @Override - public ObjectNode getVertexRequest(Long vertexId) { - - ObjectNode request = mapper.createObjectNode(); - - ST getVertex = new ST(getVertexLongIdStatementTemplate); - getVertex.add("id", vertexId); - request.put(statementKey, getVertex.render()); - - return request; - } - - public ObjectNode createVertexRequest(ActivityObject activityObject) { - - Preconditions.checkNotNull(activityObject.getObjectType()); - - ObjectNode request = mapper.createObjectNode(); - - List<String> labels = Lists.newArrayList(); - if( activityObject.getAdditionalProperties().containsKey("labels") ) { - List<String> extraLabels = (List<String>)activityObject.getAdditionalProperties().get("labels"); - for( String extraLabel : extraLabels ) - labels.add(":"+extraLabel); - } - - ST createVertex = new ST(createVertexStatementTemplate); - createVertex.add("id", activityObject.getId()); - createVertex.add("type", activityObject.getObjectType()); - createVertex.add("labels", Joiner.on(' ').join(labels)); - request.put(statementKey, createVertex.render()); - - ObjectNode params = mapper.createObjectNode(); - ObjectNode object = mapper.convertValue(activityObject, ObjectNode.class); - ObjectNode props = PropertyUtil.flattenToObjectNode(object, '.'); - params.put(propsKey, props); - request.put(paramsKey, params); - - return request; - } - - public ObjectNode mergeVertexRequest(ActivityObject activityObject) { - - Preconditions.checkNotNull(activityObject.getObjectType()); - - ObjectNode request = mapper.createObjectNode(); - - List<String> labels = Lists.newArrayList(); - if( activityObject.getAdditionalProperties().containsKey("labels") ) { - List<String> extraLabels = (List<String>)activityObject.getAdditionalProperties().get("labels"); - for( String extraLabel : extraLabels ) - labels.add(":"+extraLabel); - } - - ST mergeVertex = new ST(mergeVertexStatementTemplate); - mergeVertex.add("id", activityObject.getId()); - mergeVertex.add("type", activityObject.getObjectType()); - mergeVertex.add("labels", Joiner.on(' ').join(labels)); - - ObjectNode params = mapper.createObjectNode(); - ObjectNode object = mapper.convertValue(activityObject, ObjectNode.class); - ObjectNode props = PropertyUtil.flattenToObjectNode(object, '.'); - params.put(propsKey, props); - request.put(paramsKey, params); - - String statement = mergeVertex.render(); - - request.put(statementKey, statement); - - return request; - } - - public ObjectNode createEdgeRequest(Activity activity, ActivityObject source, ActivityObject destination) { - - ObjectNode request = mapper.createObjectNode(); - - // set the activityObject's and extensions null, because their properties don't need to appear on the relationship - activity.setActor(null); - activity.setObject(null); - activity.setTarget(null); - activity.getAdditionalProperties().put("extensions", null); - - ObjectNode object = mapper.convertValue(activity, ObjectNode.class); - Map<String, Object> props = PropertyUtil.flattenToMap(object, '.'); - - ST mergeEdge = new ST(createEdgeStatementTemplate); - mergeEdge.add("s_id", source.getId()); - mergeEdge.add("s_type", source.getObjectType()); - mergeEdge.add("d_id", destination.getId()); - mergeEdge.add("d_type", destination.getObjectType()); - mergeEdge.add("r_id", activity.getId()); - mergeEdge.add("r_type", activity.getVerb()); - mergeEdge.add("r_props", getPropertyCreater(props)); - - String statement = mergeEdge.render(); - request.put(statementKey, statement); - - return request; - } - - public static String getPropertyValueSetter(Map<String, Object> map, String symbol) { - StringBuilder builder = new StringBuilder(); - for( Map.Entry<String, Object> entry : map.entrySet()) { - if( entry.getValue() instanceof String ) { - String propVal = (String)(entry.getValue()); - builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + propVal + "'"); - } - } - return builder.toString(); - } - - public static String getPropertyParamSetter(Map<String, Object> map, String symbol) { - StringBuilder builder = new StringBuilder(); - for( Map.Entry<String, Object> entry : map.entrySet()) { - if( entry.getValue() instanceof String ) { - String propVal = (String)(entry.getValue()); - builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + propVal + "'"); - } - } - return builder.toString(); - } - - public static String getPropertyCreater(Map<String, Object> map) { - StringBuilder builder = new StringBuilder(); - builder.append("{"); - List<String> parts = Lists.newArrayList(); - for( Map.Entry<String, Object> entry : map.entrySet()) { - if( entry.getValue() instanceof String ) { - String propVal = (String) (entry.getValue()); - parts.add("`"+entry.getKey() + "`:'" + propVal + "'"); - } - } - builder.append(Joiner.on(",").join(parts)); - builder.append("}"); - return builder.toString(); - } - -}
