Repository: incubator-streams-examples Updated Branches: refs/heads/master 27f63c1d1 -> b43035722
switch to bolt protocol Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/b4303572 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/b4303572 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/b4303572 Branch: refs/heads/master Commit: b430357222faacfa27f5db37dee198ed5115ad3d Parents: 27f63c1 Author: Steve Blackmon <sblack...@apache.org> Authored: Mon Jan 2 19:44:09 2017 -0600 Committer: Steve Blackmon <sblack...@apache.org> Committed: Mon Jan 2 19:44:09 2017 -0600 ---------------------------------------------------------------------- local/twitter-follow-neo4j/pom.xml | 4 +- .../streams/example/TwitterFollowNeo4j.java | 18 +++--- .../TwitterFollowNeo4jConfiguration.json | 2 +- .../example/test/TwitterFollowNeo4jIT.java | 62 ++++++++++++++++++-- .../test/resources/TwitterFollowNeo4jIT.conf | 9 ++- pom.xml | 4 +- 6 files changed, 77 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/b4303572/local/twitter-follow-neo4j/pom.xml ---------------------------------------------------------------------- diff --git a/local/twitter-follow-neo4j/pom.xml b/local/twitter-follow-neo4j/pom.xml index 3fd9dd1..def7ae7 100644 --- a/local/twitter-follow-neo4j/pom.xml +++ b/local/twitter-follow-neo4j/pom.xml @@ -78,7 +78,7 @@ </dependency> <dependency> <groupId>org.apache.streams</groupId> - <artifactId>streams-persist-graph</artifactId> + <artifactId>streams-persist-neo4j</artifactId> </dependency> <dependency> <groupId>org.apache.streams</groupId> @@ -175,7 +175,7 @@ <dependencies> <dependency> <groupId>org.apache.streams</groupId> - <artifactId>streams-persist-graph</artifactId> + <artifactId>streams-persist-neo4j</artifactId> <version>${project.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/b4303572/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java ---------------------------------------------------------------------- diff --git a/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java b/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java index e00e4a0..5098061 100644 --- a/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java +++ b/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java @@ -26,11 +26,11 @@ import org.apache.streams.converter.TypeConverterProcessor; import org.apache.streams.core.StreamBuilder; import org.apache.streams.data.ActivityConverter; import org.apache.streams.data.DocumentClassifier; -import org.apache.streams.graph.GraphHttpConfiguration; -import org.apache.streams.graph.GraphHttpPersistWriter; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.local.LocalRuntimeConfiguration; import org.apache.streams.local.builders.LocalStreamBuilder; +import org.apache.streams.neo4j.Neo4jConfiguration; +import org.apache.streams.neo4j.bolt.Neo4jBoltPersistWriter; import org.apache.streams.twitter.TwitterFollowingConfiguration; import org.apache.streams.twitter.converter.TwitterDocumentClassifier; import org.apache.streams.twitter.converter.TwitterFollowActivityConverter; @@ -39,6 +39,7 @@ import org.apache.streams.twitter.provider.TwitterFollowingProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -66,14 +67,17 @@ public class TwitterFollowNeo4j implements Runnable { TwitterFollowingProvider followingProvider = new TwitterFollowingProvider(twitterFollowingConfiguration); TypeConverterProcessor converter = new TypeConverterProcessor(String.class); + List<DocumentClassifier> classifiers = Stream.of((DocumentClassifier) new TwitterDocumentClassifier()).collect(Collectors.toList()); + List<ActivityConverter> converters = Stream.of((ActivityConverter) new TwitterFollowActivityConverter()).collect(Collectors.toList()); ActivityConverterProcessorConfiguration activityConverterProcessorConfiguration = new ActivityConverterProcessorConfiguration() - .withClassifiers(Stream.of((DocumentClassifier) new TwitterDocumentClassifier()).collect(Collectors.toList())) - .withConverters(Stream.of((ActivityConverter) new TwitterFollowActivityConverter()).collect(Collectors.toList())); + .withClassifiers(classifiers) + .withConverters(converters); ActivityConverterProcessor activity = new ActivityConverterProcessor(activityConverterProcessorConfiguration); - GraphHttpConfiguration graphWriterConfiguration = config.getGraph(); - GraphHttpPersistWriter graphPersistWriter = new GraphHttpPersistWriter(graphWriterConfiguration); + Neo4jConfiguration neo4jConfiguration = config.getNeo4j(); + Neo4jBoltPersistWriter graphPersistWriter = new Neo4jBoltPersistWriter(neo4jConfiguration); + graphPersistWriter.prepare(neo4jConfiguration); LocalRuntimeConfiguration localRuntimeConfiguration = StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class); @@ -82,7 +86,7 @@ public class TwitterFollowNeo4j implements Runnable { builder.newPerpetualStream(TwitterFollowingProvider.class.getCanonicalName(), followingProvider); builder.addStreamsProcessor(TypeConverterProcessor.class.getCanonicalName(), converter, 1, TwitterFollowingProvider.class.getCanonicalName()); builder.addStreamsProcessor(ActivityConverterProcessor.class.getCanonicalName(), activity, 1, TypeConverterProcessor.class.getCanonicalName()); - builder.addStreamsPersistWriter(GraphHttpPersistWriter.class.getCanonicalName(), graphPersistWriter, 1, ActivityConverterProcessor.class.getCanonicalName()); + builder.addStreamsPersistWriter(Neo4jBoltPersistWriter.class.getCanonicalName(), graphPersistWriter, 1, ActivityConverterProcessor.class.getCanonicalName()); builder.start(); } http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/b4303572/local/twitter-follow-neo4j/src/main/jsonschema/TwitterFollowNeo4jConfiguration.json ---------------------------------------------------------------------- diff --git a/local/twitter-follow-neo4j/src/main/jsonschema/TwitterFollowNeo4jConfiguration.json b/local/twitter-follow-neo4j/src/main/jsonschema/TwitterFollowNeo4jConfiguration.json index ffbd39d..e2b3386 100644 --- a/local/twitter-follow-neo4j/src/main/jsonschema/TwitterFollowNeo4jConfiguration.json +++ b/local/twitter-follow-neo4j/src/main/jsonschema/TwitterFollowNeo4jConfiguration.json @@ -8,6 +8,6 @@ "javaInterfaces": ["java.io.Serializable"], "properties": { "twitter": { "javaType": "org.apache.streams.twitter.TwitterFollowingConfiguration", "type": "object", "required": true }, - "graph": { "javaType": "org.apache.streams.graph.GraphHttpConfiguration", "type": "object", "required": true } + "neo4j": { "javaType": "org.apache.streams.neo4j.Neo4jConfiguration", "type": "object", "required": true } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/b4303572/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java ---------------------------------------------------------------------- diff --git a/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java b/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java index ac9362e..75e56bb 100644 --- a/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java +++ b/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java @@ -19,18 +19,31 @@ package org.apache.streams.example.test; import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.core.StreamsResultSet; import org.apache.streams.example.TwitterFollowNeo4j; import org.apache.streams.example.TwitterFollowNeo4jConfiguration; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.neo4j.Neo4jReaderConfiguration; +import org.apache.streams.neo4j.bolt.Neo4jBoltClient; +import org.apache.streams.neo4j.bolt.Neo4jBoltPersistReader; +import com.fasterxml.jackson.databind.ObjectMapper; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigParseOptions; + +import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.Transaction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import java.io.File; +import java.io.IOException; + +import static org.testng.Assert.assertTrue; /** * TwitterFollowNeo4jIT is an integration test for TwitterFollowNeo4j. @@ -39,20 +52,31 @@ public class TwitterFollowNeo4jIT { private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowNeo4jIT.class); + private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + protected TwitterFollowNeo4jConfiguration testConfiguration; private int count = 0; + private Neo4jBoltClient testClient; + @BeforeClass - public void prepareTest() throws Exception { + public void prepareTest() throws IOException { Config reference = ConfigFactory.load(); - File conf_file = new File("target/test-classes/TwitterFollowNeo4jIT.conf"); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + File conf = new File("target/test-classes/TwitterFollowNeo4jIT.conf"); + assertTrue(conf.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf, ConfigParseOptions.defaults().setAllowMissing(false)); Config typesafe = testResourceConfig.withFallback(reference).resolve(); testConfiguration = new ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(typesafe); - + testClient = Neo4jBoltClient.getInstance(testConfiguration.getNeo4j()); + + Session session = testClient.client().session(); + Transaction transaction = session.beginTransaction(); + transaction.run("MATCH ()-[r]-() DELETE r"); + transaction.run("MATCH (n) DETACH DELETE n"); + transaction.success(); + session.close(); } @Test @@ -62,6 +86,34 @@ public class TwitterFollowNeo4jIT { stream.run(); + Neo4jReaderConfiguration vertexReaderConfiguration= MAPPER.convertValue(testConfiguration.getNeo4j(), Neo4jReaderConfiguration.class); + vertexReaderConfiguration.setQuery("MATCH (v) return v"); + Neo4jBoltPersistReader vertexReader = new Neo4jBoltPersistReader(vertexReaderConfiguration); + vertexReader.prepare(null); + StreamsResultSet vertexResultSet = vertexReader.readAll(); + LOGGER.info("Total Read: {}", vertexResultSet.size() ); + assertTrue(vertexResultSet.size() > 100); + + Neo4jReaderConfiguration edgeReaderConfiguration= MAPPER.convertValue(testConfiguration.getNeo4j(), Neo4jReaderConfiguration.class); + edgeReaderConfiguration.setQuery("MATCH (s)-[r]->(d) return r"); + Neo4jBoltPersistReader edgeReader = new Neo4jBoltPersistReader(edgeReaderConfiguration); + edgeReader.prepare(null); + StreamsResultSet edgeResultSet = edgeReader.readAll(); + LOGGER.info("Total Read: {}", edgeResultSet.size() ); + assertTrue(edgeResultSet.size() == vertexResultSet.size()-1); + } + + @AfterClass + public void cleanup() throws Exception { + Session session = testClient.client().session(); + Transaction transaction = session.beginTransaction(); + transaction.run("MATCH ()-[r]-() DELETE r"); + transaction.run("MATCH (n) DETACH DELETE n"); + transaction.success(); + session.close(); + } + + } http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/b4303572/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf ---------------------------------------------------------------------- diff --git a/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf b/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf index 346b111..b5e8fed 100644 --- a/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf +++ b/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf @@ -20,10 +20,9 @@ twitter { ] twitter.max_items = 1000 } -graph { - hostname = ${neo4j.http.host} - port = ${neo4j.http.port} - type = "neo4j" - graph = "data" +neo4j { + scheme = "tcp" + hosts += ${neo4j.tcp.host} + port = ${neo4j.tcp.port} } taskTimeoutMs = 60000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/b4303572/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index e721622..ac7d7f3 100644 --- a/pom.xml +++ b/pom.xml @@ -263,7 +263,7 @@ </dependency> <dependency> <groupId>org.apache.streams</groupId> - <artifactId>streams-persist-graph</artifactId> + <artifactId>streams-persist-neo4j</artifactId> <version>0.5-incubating-SNAPSHOT</version> </dependency> <dependency> @@ -574,7 +574,7 @@ </watch> </image> <image> - <name>neo4j</name> + <name>neo4j:3.0.6</name> <alias>graph</alias> <run> <env>