Repository: incubator-streams-examples
Updated Branches:
  refs/heads/master 27f63c1d1 -> b43035722


switch to bolt protocol


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/b4303572
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/b4303572
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/b4303572

Branch: refs/heads/master
Commit: b430357222faacfa27f5db37dee198ed5115ad3d
Parents: 27f63c1
Author: Steve Blackmon <sblack...@apache.org>
Authored: Mon Jan 2 19:44:09 2017 -0600
Committer: Steve Blackmon <sblack...@apache.org>
Committed: Mon Jan 2 19:44:09 2017 -0600

----------------------------------------------------------------------
 local/twitter-follow-neo4j/pom.xml              |  4 +-
 .../streams/example/TwitterFollowNeo4j.java     | 18 +++---
 .../TwitterFollowNeo4jConfiguration.json        |  2 +-
 .../example/test/TwitterFollowNeo4jIT.java      | 62 ++++++++++++++++++--
 .../test/resources/TwitterFollowNeo4jIT.conf    |  9 ++-
 pom.xml                                         |  4 +-
 6 files changed, 77 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/b4303572/local/twitter-follow-neo4j/pom.xml
----------------------------------------------------------------------
diff --git a/local/twitter-follow-neo4j/pom.xml 
b/local/twitter-follow-neo4j/pom.xml
index 3fd9dd1..def7ae7 100644
--- a/local/twitter-follow-neo4j/pom.xml
+++ b/local/twitter-follow-neo4j/pom.xml
@@ -78,7 +78,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
-            <artifactId>streams-persist-graph</artifactId>
+            <artifactId>streams-persist-neo4j</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
@@ -175,7 +175,7 @@
                 <dependencies>
                     <dependency>
                         <groupId>org.apache.streams</groupId>
-                        <artifactId>streams-persist-graph</artifactId>
+                        <artifactId>streams-persist-neo4j</artifactId>
                         <version>${project.version}</version>
                     </dependency>
                     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/b4303572/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java
----------------------------------------------------------------------
diff --git 
a/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java
 
b/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java
index e00e4a0..5098061 100644
--- 
a/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java
+++ 
b/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java
@@ -26,11 +26,11 @@ import org.apache.streams.converter.TypeConverterProcessor;
 import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.data.ActivityConverter;
 import org.apache.streams.data.DocumentClassifier;
-import org.apache.streams.graph.GraphHttpConfiguration;
-import org.apache.streams.graph.GraphHttpPersistWriter;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.local.LocalRuntimeConfiguration;
 import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.apache.streams.neo4j.Neo4jConfiguration;
+import org.apache.streams.neo4j.bolt.Neo4jBoltPersistWriter;
 import org.apache.streams.twitter.TwitterFollowingConfiguration;
 import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
 import org.apache.streams.twitter.converter.TwitterFollowActivityConverter;
@@ -39,6 +39,7 @@ import 
org.apache.streams.twitter.provider.TwitterFollowingProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -66,14 +67,17 @@ public class TwitterFollowNeo4j implements Runnable {
     TwitterFollowingProvider followingProvider = new 
TwitterFollowingProvider(twitterFollowingConfiguration);
     TypeConverterProcessor converter = new 
TypeConverterProcessor(String.class);
 
+    List<DocumentClassifier> classifiers = Stream.of((DocumentClassifier) new 
TwitterDocumentClassifier()).collect(Collectors.toList());
+    List<ActivityConverter> converters = Stream.of((ActivityConverter) new 
TwitterFollowActivityConverter()).collect(Collectors.toList());
     ActivityConverterProcessorConfiguration 
activityConverterProcessorConfiguration =
         new ActivityConverterProcessorConfiguration()
-            .withClassifiers(Stream.of((DocumentClassifier) new 
TwitterDocumentClassifier()).collect(Collectors.toList()))
-            .withConverters(Stream.of((ActivityConverter) new 
TwitterFollowActivityConverter()).collect(Collectors.toList()));
+            .withClassifiers(classifiers)
+            .withConverters(converters);
     ActivityConverterProcessor activity = new 
ActivityConverterProcessor(activityConverterProcessorConfiguration);
 
-    GraphHttpConfiguration graphWriterConfiguration = config.getGraph();
-    GraphHttpPersistWriter graphPersistWriter = new 
GraphHttpPersistWriter(graphWriterConfiguration);
+    Neo4jConfiguration neo4jConfiguration = config.getNeo4j();
+    Neo4jBoltPersistWriter graphPersistWriter = new 
Neo4jBoltPersistWriter(neo4jConfiguration);
+    graphPersistWriter.prepare(neo4jConfiguration);
 
     LocalRuntimeConfiguration localRuntimeConfiguration =
         
StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(),
 LocalRuntimeConfiguration.class);
@@ -82,7 +86,7 @@ public class TwitterFollowNeo4j implements Runnable {
     
builder.newPerpetualStream(TwitterFollowingProvider.class.getCanonicalName(), 
followingProvider);
     
builder.addStreamsProcessor(TypeConverterProcessor.class.getCanonicalName(), 
converter, 1, TwitterFollowingProvider.class.getCanonicalName());
     
builder.addStreamsProcessor(ActivityConverterProcessor.class.getCanonicalName(),
 activity, 1, TypeConverterProcessor.class.getCanonicalName());
-    
builder.addStreamsPersistWriter(GraphHttpPersistWriter.class.getCanonicalName(),
 graphPersistWriter, 1, ActivityConverterProcessor.class.getCanonicalName());
+    
builder.addStreamsPersistWriter(Neo4jBoltPersistWriter.class.getCanonicalName(),
 graphPersistWriter, 1, ActivityConverterProcessor.class.getCanonicalName());
 
     builder.start();
   }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/b4303572/local/twitter-follow-neo4j/src/main/jsonschema/TwitterFollowNeo4jConfiguration.json
----------------------------------------------------------------------
diff --git 
a/local/twitter-follow-neo4j/src/main/jsonschema/TwitterFollowNeo4jConfiguration.json
 
b/local/twitter-follow-neo4j/src/main/jsonschema/TwitterFollowNeo4jConfiguration.json
index ffbd39d..e2b3386 100644
--- 
a/local/twitter-follow-neo4j/src/main/jsonschema/TwitterFollowNeo4jConfiguration.json
+++ 
b/local/twitter-follow-neo4j/src/main/jsonschema/TwitterFollowNeo4jConfiguration.json
@@ -8,6 +8,6 @@
   "javaInterfaces": ["java.io.Serializable"],
   "properties": {
     "twitter": { "javaType": 
"org.apache.streams.twitter.TwitterFollowingConfiguration", "type": "object", 
"required": true },
-    "graph": { "javaType": "org.apache.streams.graph.GraphHttpConfiguration", 
"type": "object", "required": true }
+    "neo4j": { "javaType": "org.apache.streams.neo4j.Neo4jConfiguration", 
"type": "object", "required": true }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/b4303572/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java
----------------------------------------------------------------------
diff --git 
a/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java
 
b/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java
index ac9362e..75e56bb 100644
--- 
a/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java
+++ 
b/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java
@@ -19,18 +19,31 @@
 package org.apache.streams.example.test;
 
 import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.example.TwitterFollowNeo4j;
 import org.apache.streams.example.TwitterFollowNeo4jConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.neo4j.Neo4jReaderConfiguration;
+import org.apache.streams.neo4j.bolt.Neo4jBoltClient;
+import org.apache.streams.neo4j.bolt.Neo4jBoltPersistReader;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
+
+import org.neo4j.driver.v1.Session;
+import org.neo4j.driver.v1.Transaction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import java.io.File;
+import java.io.IOException;
+
+import static org.testng.Assert.assertTrue;
 
 /**
  * TwitterFollowNeo4jIT is an integration test for TwitterFollowNeo4j.
@@ -39,20 +52,31 @@ public class TwitterFollowNeo4jIT {
 
   private final static Logger LOGGER = 
LoggerFactory.getLogger(TwitterFollowNeo4jIT.class);
 
+  private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
+
   protected TwitterFollowNeo4jConfiguration testConfiguration;
 
   private int count = 0;
 
+  private Neo4jBoltClient testClient;
+
   @BeforeClass
-  public void prepareTest() throws Exception {
+  public void prepareTest() throws IOException {
 
     Config reference  = ConfigFactory.load();
-    File conf_file = new File("target/test-classes/TwitterFollowNeo4jIT.conf");
-    assert(conf_file.exists());
-    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+    File conf = new File("target/test-classes/TwitterFollowNeo4jIT.conf");
+    assertTrue(conf.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf, 
ConfigParseOptions.defaults().setAllowMissing(false));
     Config typesafe  = testResourceConfig.withFallback(reference).resolve();
     testConfiguration = new 
ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(typesafe);
-
+    testClient = Neo4jBoltClient.getInstance(testConfiguration.getNeo4j());
+
+    Session session = testClient.client().session();
+    Transaction transaction = session.beginTransaction();
+    transaction.run("MATCH ()-[r]-() DELETE r");
+    transaction.run("MATCH (n) DETACH DELETE n");
+    transaction.success();
+    session.close();
   }
 
   @Test
@@ -62,6 +86,34 @@ public class TwitterFollowNeo4jIT {
 
     stream.run();
 
+    Neo4jReaderConfiguration vertexReaderConfiguration= 
MAPPER.convertValue(testConfiguration.getNeo4j(), 
Neo4jReaderConfiguration.class);
+    vertexReaderConfiguration.setQuery("MATCH (v) return v");
+    Neo4jBoltPersistReader vertexReader = new 
Neo4jBoltPersistReader(vertexReaderConfiguration);
+    vertexReader.prepare(null);
+    StreamsResultSet vertexResultSet = vertexReader.readAll();
+    LOGGER.info("Total Read: {}", vertexResultSet.size() );
+    assertTrue(vertexResultSet.size() > 100);
+
+    Neo4jReaderConfiguration edgeReaderConfiguration= 
MAPPER.convertValue(testConfiguration.getNeo4j(), 
Neo4jReaderConfiguration.class);
+    edgeReaderConfiguration.setQuery("MATCH (s)-[r]->(d) return r");
+    Neo4jBoltPersistReader edgeReader = new 
Neo4jBoltPersistReader(edgeReaderConfiguration);
+    edgeReader.prepare(null);
+    StreamsResultSet edgeResultSet = edgeReader.readAll();
+    LOGGER.info("Total Read: {}", edgeResultSet.size() );
+    assertTrue(edgeResultSet.size() == vertexResultSet.size()-1);
+
   }
 
+
+  @AfterClass
+  public void cleanup() throws Exception {
+    Session session = testClient.client().session();
+    Transaction transaction = session.beginTransaction();
+    transaction.run("MATCH ()-[r]-() DELETE r");
+    transaction.run("MATCH (n) DETACH DELETE n");
+    transaction.success();
+    session.close();
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/b4303572/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf
----------------------------------------------------------------------
diff --git 
a/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf 
b/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf
index 346b111..b5e8fed 100644
--- a/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf
+++ b/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf
@@ -20,10 +20,9 @@ twitter {
   ]
   twitter.max_items = 1000
 }
-graph {
-  hostname = ${neo4j.http.host}
-  port = ${neo4j.http.port}
-  type = "neo4j"
-  graph = "data"
+neo4j {
+  scheme = "tcp"
+  hosts += ${neo4j.tcp.host}
+  port = ${neo4j.tcp.port}
 }
 taskTimeoutMs = 60000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/b4303572/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e721622..ac7d7f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -263,7 +263,7 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.streams</groupId>
-                <artifactId>streams-persist-graph</artifactId>
+                <artifactId>streams-persist-neo4j</artifactId>
                 <version>0.5-incubating-SNAPSHOT</version>
             </dependency>
             <dependency>
@@ -574,7 +574,7 @@
                                     </watch>
                                 </image>
                                 <image>
-                                    <name>neo4j</name>
+                                    <name>neo4j:3.0.6</name>
                                     <alias>graph</alias>
                                     <run>
                                         <env>

Reply via email to