Repository: incubator-streams
Updated Branches:
  refs/heads/master 7810361d2 -> 4bd22317e


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistReader.java
 
b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistReader.java
new file mode 100644
index 0000000..171dde4
--- /dev/null
+++ 
b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistReader.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.neo4j.bolt;
+
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.neo4j.Neo4jReaderConfiguration;
+import org.apache.streams.util.PropertyUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Queues;
+
+import org.joda.time.DateTime;
+import org.neo4j.driver.internal.value.NodeValue;
+import org.neo4j.driver.internal.value.RelationshipValue;
+import org.neo4j.driver.internal.value.StringValue;
+import org.neo4j.driver.v1.Record;
+import org.neo4j.driver.v1.Session;
+import org.neo4j.driver.v1.StatementResult;
+import org.neo4j.driver.v1.Transaction;
+import org.neo4j.driver.v1.Value;
+import org.neo4j.driver.v1.types.Node;
+import org.neo4j.driver.v1.types.Relationship;
+import org.neo4j.driver.v1.util.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.annotation.Nullable;
+
+
+/**
+ * Neo4jBoltPersistReader reads documents from neo4j.
+ */
+public class Neo4jBoltPersistReader implements StreamsPersistReader {
+
+  public static final String STREAMS_ID = "CassandraPersistReader";
+
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(Neo4jBoltPersistReader.class);
+
+  protected volatile Queue<StreamsDatum> persistQueue;
+
+  private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+  private ExecutorService executor;
+  private CompletableFuture<Boolean> readerTaskFuture = new 
CompletableFuture<>();
+
+  private Neo4jReaderConfiguration config;
+
+  protected Neo4jBoltClient client;
+
+//  protected Cluster cluster;
+//  protected Session session;
+//
+//  protected String keyspace;
+//  protected String table;
+  protected StatementResult statementResult;
+
+  protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+  /**
+   * Neo4jBoltPersistReader constructor - resolves Neo4jReaderConfiguration 
from JVM 'neo4j'.
+   */
+  public Neo4jBoltPersistReader() {
+    this.config = new ComponentConfigurator<>(Neo4jReaderConfiguration.class)
+      .detectConfiguration(StreamsConfigurator.getConfig().getConfig("neo4j"));
+  }
+
+  /**
+   * Neo4jBoltPersistReader constructor - uses supplied 
Neo4jReaderConfiguration.
+   * @param config config
+   */
+  public Neo4jBoltPersistReader(Neo4jReaderConfiguration config) {
+    this.config = config;
+  }
+
+  /**
+   * Neo4jBoltPersistReader constructor - uses supplied persistQueue.
+   * @param persistQueue persistQueue
+   */
+  public Neo4jBoltPersistReader(Queue<StreamsDatum> persistQueue) {
+    this();
+    this.persistQueue = persistQueue;
+  }
+
+  public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+    this.persistQueue = persistQueue;
+  }
+
+  public Queue<StreamsDatum> getPersistQueue() {
+    return persistQueue;
+  }
+
+  public void stop() {
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    if( configurationObject instanceof Neo4jReaderConfiguration ) {
+      this.config = (Neo4jReaderConfiguration) configurationObject;
+    }
+    this.client = Neo4jBoltClient.getInstance(this.config);
+
+    persistQueue = constructQueue();
+
+    executor = Executors.newSingleThreadExecutor();
+  }
+
+  @Override
+  public void cleanUp() {
+    stop();
+  }
+
+  protected Optional<StreamsDatum> buildDatum(Record record) {
+    ObjectNode objectNode;
+
+    if( record != null ) {
+      ObjectNode valueJson = null;
+      Map<String, ObjectNode> valueJsons = 
record.asMap(neo4jObjectNodeFunction);
+      if( valueJsons.size() == 1) {
+        valueJson = valueJsons.get(valueJsons.keySet().iterator().next());
+      }
+      objectNode = PropertyUtil.unflattenObjectNode(valueJson, '.');
+      return Optional.of(new StreamsDatum(objectNode));
+    }
+
+    return Optional.empty();
+  }
+
+  @Override
+  public StreamsResultSet readAll() {
+
+    Session session = null;
+
+    String query = config.getQuery();
+    Map<String, Object> params = mapper.convertValue(config.getParams(), 
Map.class);
+
+    try {
+      session = client.client().session();
+      Transaction transaction = session.beginTransaction();
+
+      this.statementResult = 
client.client().session().beginTransaction().run(query, params);
+
+      while( statementResult.hasNext()) {
+        Record record = statementResult.next();
+        Optional<StreamsDatum> datum = buildDatum(record);
+        if( datum.isPresent()) {
+          write(datum.get());
+        }
+      }
+
+    } catch(Exception ex) {
+      LOGGER.warn("Exception", ex);
+    } finally {
+      if( session != null ) {
+        session.close();
+      }
+    }
+    return readCurrent();
+
+  }
+
+  @Override
+  public void startStream() {
+    LOGGER.debug("startStream");
+    Neo4jBoltPersistReaderTask readerTask = new 
Neo4jBoltPersistReaderTask(this);
+
+    CompletableFuture.runAsync(readerTask, executor);
+
+    try {
+      if (readerTaskFuture.get()) {
+        executor.shutdown();
+      }
+    } catch (InterruptedException ex) {
+      LOGGER.trace("Interrupt", ex);
+    } catch (ExecutionException ex) {
+      LOGGER.trace("Execution exception", ex);
+    }
+  }
+
+  @Override
+  public StreamsResultSet readCurrent() {
+
+    StreamsResultSet current;
+
+    try {
+      lock.writeLock().lock();
+      current = new StreamsResultSet(persistQueue);
+      current.setCounter(new DatumStatusCounter());
+      persistQueue = constructQueue();
+    } finally {
+      lock.writeLock().unlock();
+    }
+
+    return current;
+  }
+
+  protected void write(StreamsDatum entry) {
+    boolean success;
+    do {
+      try {
+        lock.readLock().lock();
+        success = persistQueue.offer(entry);
+        Thread.yield();
+      } finally {
+        lock.readLock().unlock();
+      }
+    }
+    while (!success);
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    return null;
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    return null;
+  }
+
+  @Override
+  public boolean isRunning() {
+    return !executor.isTerminated() || !executor.isShutdown();
+  }
+
+  private Queue<StreamsDatum> constructQueue() {
+    return Queues.synchronizedQueue(new 
LinkedBlockingQueue<StreamsDatum>(10000));
+  }
+
+  private static String readAllStatement() {
+    return "MATCH (v:streams)";
+  }
+
+  public class Neo4jBoltPersistReaderTask implements Runnable {
+
+    private Neo4jBoltPersistReader reader;
+
+    public Neo4jBoltPersistReaderTask(Neo4jBoltPersistReader reader) {
+      this.reader = reader;
+    }
+
+    @Override
+    public void run() {
+      try {
+        while (reader.statementResult.hasNext()) {
+          Record record = statementResult.next();
+          Optional<StreamsDatum> datum = reader.buildDatum(record);
+          if( datum.isPresent() ) {
+            reader.write(datum.get());
+          }
+        }
+      } finally {
+        readerTaskFuture.complete(true);
+      }
+    }
+  }
+
+  Function<Value, ObjectNode> neo4jObjectNodeFunction = new Function<Value, 
ObjectNode>() {
+
+    @Nullable
+    @Override
+    public ObjectNode apply(@Nullable Value value) {
+      ObjectNode resultNode = null;
+      if (value instanceof StringValue) {
+        StringValue stringValue = (StringValue) value;
+        String string = stringValue.asLiteralString();
+        try {
+          resultNode = mapper.readValue(string, ObjectNode.class);
+        } catch (IOException ex) {
+          LOGGER.error("IOException", ex);
+        }
+      } else if ( value instanceof NodeValue) {
+        NodeValue nodeValue = (NodeValue) value;
+        Node node = nodeValue.asNode();
+        Map<String, Object> nodeMap = node.asMap();
+        resultNode = PropertyUtil.unflattenMap(nodeMap, '.');
+      } else if (value instanceof RelationshipValue) {
+        RelationshipValue relationshipValue = (RelationshipValue) value;
+        Relationship relationship = relationshipValue.asRelationship();
+        Map<String, Object> relationshipMap = relationship.asMap();
+        resultNode = PropertyUtil.unflattenMap(relationshipMap, '.');
+      }
+      return resultNode;
+    }
+  };
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistWriter.java
 
b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistWriter.java
new file mode 100644
index 0000000..3c752d6
--- /dev/null
+++ 
b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistWriter.java
@@ -0,0 +1,77 @@
+package org.apache.streams.neo4j.bolt;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.neo4j.Neo4jConfiguration;
+import org.apache.streams.neo4j.Neo4jPersistUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.javatuples.Pair;
+import org.neo4j.driver.v1.Session;
+import org.neo4j.driver.v1.StatementResult;
+import org.neo4j.driver.v1.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by sblackmon on 12/16/16.
+ */
+public class Neo4jBoltPersistWriter implements StreamsPersistWriter {
+
+  private Neo4jConfiguration config;
+
+  Neo4jBoltClient client;
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(Neo4jBoltPersistWriter.class);
+
+  private static ObjectMapper mapper;
+
+  public Neo4jBoltPersistWriter(Neo4jConfiguration config) {
+    this.config = config;
+
+  }
+
+  @Override
+  public String getId() {
+    return Neo4jBoltPersistWriter.class.getSimpleName();
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    client = Neo4jBoltClient.getInstance(config);
+  }
+
+  @Override
+  public void cleanUp() {
+    //
+  }
+
+  @Override
+  public void write(StreamsDatum entry) {
+
+    List<Pair<String, Map<String, Object>>> statements;
+    Session session = null;
+    try {
+      statements = Neo4jPersistUtil.prepareStatements(entry);
+      session = client.client().session();
+      Transaction transaction = session.beginTransaction();
+      for( Pair<String, Map<String, Object>> statement : statements ) {
+        StatementResult statementResult = transaction.run( 
statement.getValue0(), statement.getValue1() );
+        LOGGER.debug("StatementResult", statementResult.single());
+      }
+      transaction.success();
+    } catch( Exception ex ) {
+      LOGGER.error("Exception", ex);
+    } finally {
+      if( session != null ) {
+        session.close();
+      }
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpClient.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpClient.java
 
b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpClient.java
new file mode 100644
index 0000000..da8c01e
--- /dev/null
+++ 
b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpClient.java
@@ -0,0 +1,74 @@
+package org.apache.streams.neo4j.http;
+
+import org.apache.streams.neo4j.Neo4jConfiguration;
+
+import org.apache.http.client.HttpClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class Neo4jHttpClient {
+
+    private static final Logger LOGGER = LoggerFactory
+            .getLogger(Neo4jHttpClient.class);
+
+    public Neo4jConfiguration config;
+
+    private HttpClient client;
+
+    private Neo4jHttpClient(Neo4jConfiguration neo4jConfiguration) {
+        this.config = neo4jConfiguration;
+        try {
+            this.start();
+        } catch (Exception e) {
+            e.printStackTrace();
+            this.client = null;
+        }
+    }
+
+    private static Map<Neo4jConfiguration, Neo4jHttpClient> INSTANCE_MAP = new 
ConcurrentHashMap<Neo4jConfiguration, Neo4jHttpClient>();
+
+    public static Neo4jHttpClient getInstance(Neo4jConfiguration 
neo4jConfiguration) {
+        if ( INSTANCE_MAP != null &&
+             INSTANCE_MAP.size() > 0 &&
+             INSTANCE_MAP.containsKey(neo4jConfiguration)) {
+            return INSTANCE_MAP.get(neo4jConfiguration);
+        } else {
+            Neo4jHttpClient instance = new Neo4jHttpClient(neo4jConfiguration);
+            if( instance != null && instance.client != null ) {
+                INSTANCE_MAP.put(neo4jConfiguration, instance);
+                return instance;
+            } else {
+                return null;
+            }
+        }
+    }
+
+    public void start() throws Exception {
+
+        Objects.nonNull(config);
+        assertThat("config.getScheme().startsWith(\"http\")", 
config.getScheme().startsWith("http"));
+
+        LOGGER.info("Neo4jConfiguration.start {}", config);
+
+        Objects.nonNull(client);
+
+    }
+
+    public void stop() throws Exception {
+        this.client = null;
+    }
+
+    public Neo4jConfiguration config() {
+        return config;
+    }
+
+    public HttpClient client() {
+        return client;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpGraphHelper.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpGraphHelper.java
 
b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpGraphHelper.java
new file mode 100644
index 0000000..4c126b8
--- /dev/null
+++ 
b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpGraphHelper.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.neo4j.http;
+
+import org.apache.streams.graph.HttpGraphHelper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.javatuples.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Supporting class for interacting with neo4j via rest API.
+ */
+public class Neo4jHttpGraphHelper implements HttpGraphHelper {
+
+  private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(Neo4jHttpGraphHelper.class);
+
+  private static final String statementKey = "statement";
+  private static final String queryKey = "query";
+  private static final String paramsKey = "parameters";
+  private static final String propsKey = "props";
+
+  /**
+   * readDataStatement neo4j rest json read data payload.
+   *
+   * @param queryPlusParameters (query, parameter map)
+   * @return ObjectNode
+   */
+  public ObjectNode readData(Pair<String, Map<String, Object>> 
queryPlusParameters) {
+
+    LOGGER.debug("readData: ", queryPlusParameters);
+
+    Objects.requireNonNull(queryPlusParameters);
+    Objects.requireNonNull(queryPlusParameters.getValue0());
+
+    ObjectNode request = MAPPER.createObjectNode();
+
+    request.put(queryKey, queryPlusParameters.getValue0());
+
+    if( queryPlusParameters.getValue1() != null && 
queryPlusParameters.getValue1().size() > 0) {
+      ObjectNode params = MAPPER.convertValue(queryPlusParameters.getValue1(), 
ObjectNode.class);
+      request.put(paramsKey, params);
+    }
+
+    LOGGER.debug("readData: ", request);
+
+    return request;
+  }
+
+  /**
+   * writeDataStatement neo4j rest json write data payload.
+   *
+   * @param queryPlusParameters (query, parameter map)
+   * @return ObjectNode
+   */
+  public ObjectNode writeData(Pair<String, Map<String, Object>> 
queryPlusParameters) {
+
+    LOGGER.debug("writeData: ", queryPlusParameters);
+
+    Objects.requireNonNull(queryPlusParameters);
+    Objects.requireNonNull(queryPlusParameters.getValue0());
+
+    ObjectNode request = MAPPER.createObjectNode();
+
+    request.put(statementKey, queryPlusParameters.getValue0());
+
+    if( queryPlusParameters.getValue1() != null && 
queryPlusParameters.getValue1().size() > 0) {
+      ObjectNode params = MAPPER.convertValue(queryPlusParameters.getValue1(), 
ObjectNode.class);
+      request.put(paramsKey, params);
+    } else {
+      request.put(paramsKey, MAPPER.createObjectNode());
+    }
+
+    LOGGER.debug("writeData: ", request);
+
+    return request;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistReader.java
 
b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistReader.java
new file mode 100644
index 0000000..86a9da2
--- /dev/null
+++ 
b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistReader.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.neo4j.http;
+
+import org.apache.streams.components.http.HttpConfiguration;
+import org.apache.streams.components.http.HttpProviderConfiguration;
+import org.apache.streams.components.http.provider.SimpleHttpProvider;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.graph.HttpGraphHelper;
+import org.apache.streams.graph.QueryGraphHelper;
+import org.apache.streams.neo4j.CypherQueryGraphHelper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.neo4j.CypherQueryResponse;
+import org.apache.streams.neo4j.ItemData;
+import org.apache.streams.neo4j.ItemMetadata;
+import org.apache.streams.neo4j.Neo4jReaderConfiguration;
+import org.apache.streams.util.PropertyUtil;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.javatuples.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Reads a stream of activityobjects from vertices in a graph database with
+ * an http rest endpoint (such as neo4j).
+ */
+public class Neo4jHttpPersistReader extends SimpleHttpProvider implements 
StreamsPersistReader {
+
+  public static final String STREAMS_ID = 
Neo4jHttpPersistReader.class.getCanonicalName();
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(Neo4jHttpPersistReader.class);
+
+  private Neo4jReaderConfiguration config;
+
+  private QueryGraphHelper queryGraphHelper;
+  private HttpGraphHelper httpGraphHelper;
+
+  private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+  /**
+   * GraphVertexReader constructor - resolve GraphReaderConfiguration from JVM 
'graph'.
+   */
+  public Neo4jHttpPersistReader() {
+    this(new 
ComponentConfigurator<>(Neo4jReaderConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("neo4j")));
+  }
+
+  /**
+   * GraphVertexReader constructor - use supplied GraphReaderConfiguration.
+   * @param configuration GraphReaderConfiguration
+   */
+  public Neo4jHttpPersistReader(Neo4jReaderConfiguration configuration) {
+    
super((HttpProviderConfiguration)StreamsJacksonMapper.getInstance().convertValue(configuration,
 
HttpProviderConfiguration.class).withHostname(configuration.getHosts().get(0)));
+    super.configuration.setRequestMethod(HttpConfiguration.RequestMethod.POST);
+    super.configuration.setResourcePath("/db");
+    super.configuration.setResource("data");
+    super.configuration.setResourcePostfix("cypher");
+    this.config = configuration;
+  }
+
+  /**
+   * prepareHttpRequest
+   * @param uri uri
+   * @return result
+   */
+  public HttpRequestBase prepareHttpRequest(URI uri) {
+    HttpRequestBase baseRequest = super.prepareHttpRequest(uri);
+    HttpPost post = (HttpPost) baseRequest;
+    String query = config.getQuery();
+    Map<String, Object> params = mapper.convertValue(config.getParams(), 
Map.class);
+    Pair<String, Map<String, Object>> queryPlusParams = new Pair(query, 
params);
+    ObjectNode queryNode = httpGraphHelper.readData(queryPlusParams);
+    try {
+      String queryJsonString = mapper.writeValueAsString(queryNode);
+      HttpEntity entity = new StringEntity(queryJsonString, 
ContentType.create("application/json"));
+      post.setEntity(entity);
+    } catch (JsonProcessingException ex) {
+      LOGGER.error("JsonProcessingException", ex);
+      return null;
+    }
+    return post;
+
+  }
+  /**
+   * Neo API query returns something like this:
+   * { "columns": [ "v" ], "data": [ [ { "data": { props }, etc... } ], [ { 
"data": { props }, etc... } ] ] }
+   *
+   * @param jsonNode jsonNode
+   * @return result
+   */
+  public List<ObjectNode> parse(JsonNode jsonNode) {
+    List<ObjectNode> results = new ArrayList<>();
+
+    ObjectNode root = (ObjectNode) jsonNode;
+
+    CypherQueryResponse cypherQueryResponse = mapper.convertValue(root, 
CypherQueryResponse.class);
+
+    for ( List<List<ItemMetadata>> dataWrapper : 
cypherQueryResponse.getData()) {
+
+      for (List<ItemMetadata> itemMetadatas : dataWrapper) {
+
+        for (ItemMetadata itemMetadata : itemMetadatas) {
+
+          ItemData itemData = itemMetadata.getData();
+
+          LOGGER.debug("itemData: " + itemData);
+
+          
results.add(PropertyUtil.unflattenMap(itemData.getAdditionalProperties(), '.'));
+        }
+
+      }
+
+    }
+    return results;
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+
+    super.prepare(configurationObject);
+    mapper = StreamsJacksonMapper.getInstance();
+
+    queryGraphHelper = new CypherQueryGraphHelper();
+    httpGraphHelper = new Neo4jHttpGraphHelper();
+
+    Objects.requireNonNull(queryGraphHelper);
+    Objects.requireNonNull(httpGraphHelper);
+  }
+
+  @Override
+  public StreamsResultSet readAll() {
+    return readCurrent();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistWriter.java
 
b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistWriter.java
new file mode 100644
index 0000000..e05a252
--- /dev/null
+++ 
b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistWriter.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.neo4j.http;
+
+import org.apache.streams.components.http.HttpPersistWriterConfiguration;
+import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.graph.HttpGraphHelper;
+import org.apache.streams.graph.QueryGraphHelper;
+import org.apache.streams.neo4j.CypherQueryGraphHelper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.neo4j.Neo4jConfiguration;
+import org.apache.streams.neo4j.Neo4jPersistUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.util.EntityUtils;
+import org.javatuples.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Adds activityobjects as vertices and activities as edges to a graph 
database with
+ * an http rest endpoint (such as neo4j).
+ */
+public class Neo4jHttpPersistWriter extends SimpleHTTPPostPersistWriter {
+
+  public static final String STREAMS_ID = 
Neo4jHttpPersistWriter.class.getCanonicalName();
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(Neo4jHttpPersistWriter.class);
+  private static final long MAX_WRITE_LATENCY = 1000;
+
+  private Neo4jConfiguration configuration;
+
+  private QueryGraphHelper queryGraphHelper;
+  private HttpGraphHelper httpGraphHelper;
+
+  private static ObjectMapper mapper;
+
+  protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+  /**
+   * GraphHttpPersistWriter constructor - resolve GraphHttpConfiguration from 
JVM 'graph'.
+   */
+  public Neo4jHttpPersistWriter() {
+    this(new 
ComponentConfigurator<>(Neo4jConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("neo4j")));
+  }
+
+  /**
+   * GraphHttpPersistWriter constructor - use supplied GraphHttpConfiguration.
+   * @param configuration GraphHttpConfiguration
+   */
+  public Neo4jHttpPersistWriter(Neo4jConfiguration configuration) {
+    
super((HttpPersistWriterConfiguration)StreamsJacksonMapper.getInstance().convertValue(configuration,
 
HttpPersistWriterConfiguration.class).withHostname(configuration.getHosts().get(0)));
+    super.configuration.setResourcePath("/db/data/transaction/commit/");
+    this.configuration = configuration;
+  }
+
+  @Override
+  protected ObjectNode preparePayload(StreamsDatum entry) throws Exception {
+
+    List<Pair<String, Map<String, Object>>> statements = 
Neo4jPersistUtil.prepareStatements(entry);
+
+    ObjectNode requestNode = mapper.createObjectNode();
+    ArrayNode statementsArray = mapper.createArrayNode();
+
+    for( Pair<String, Map<String, Object>> statement : statements ) {
+      statementsArray.add(httpGraphHelper.writeData(statement));
+    }
+
+    requestNode.put("statements", statementsArray);
+    return requestNode;
+
+  }
+
+  @Override
+  protected ObjectNode executePost(HttpPost httpPost) {
+
+    Objects.requireNonNull(httpPost);
+
+    ObjectNode result = null;
+
+    CloseableHttpResponse response = null;
+
+    String entityString = null;
+    try {
+      response = httpclient.execute(httpPost);
+      HttpEntity entity = response.getEntity();
+      if (response.getStatusLine().getStatusCode() == 200 || 
response.getStatusLine().getStatusCode() == 201 && entity != null) {
+        entityString = EntityUtils.toString(entity);
+        result = mapper.readValue(entityString, ObjectNode.class);
+      }
+      LOGGER.debug("Writer response:\n{}\n{}\n{}", httpPost.toString(), 
response.getStatusLine().getStatusCode(), entityString);
+      if ( result == null
+           || (
+              result.get("errors") != null
+                  && result.get("errors").isArray()
+                  && result.get("errors").iterator().hasNext()
+              )
+          ) {
+        LOGGER.error("Write Error: " + result.get("errors"));
+      } else {
+        LOGGER.debug("Write Success");
+      }
+    } catch (IOException ex) {
+      LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, 
ex.getMessage());
+    } catch (Exception ex) {
+      LOGGER.error("Write Exception:\n{}\n{}\n{}", httpPost.toString(), 
response, ex.getMessage());
+    } finally {
+      try {
+        if ( response != null) {
+          response.close();
+        }
+      } catch (IOException ignored) {
+        LOGGER.trace("ignored IOException", ignored);
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+
+    super.prepare(null);
+    mapper = StreamsJacksonMapper.getInstance();
+
+    queryGraphHelper = new CypherQueryGraphHelper();
+    httpGraphHelper = new Neo4jHttpGraphHelper();
+
+    Objects.requireNonNull(queryGraphHelper);
+    Objects.requireNonNull(httpGraphHelper);
+  }
+
+  @Override
+  public void cleanUp() {
+
+    LOGGER.info("exiting");
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/CypherQueryResponse.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/CypherQueryResponse.json
 
b/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/CypherQueryResponse.json
new file mode 100644
index 0000000..4e80eb4
--- /dev/null
+++ 
b/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/CypherQueryResponse.json
@@ -0,0 +1,43 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema";,
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0";
+    ],
+    "id": "#",
+    "javaType" : "org.apache.streams.neo4j.CypherQueryResponse",
+    "properties": {
+        "columns": {
+            "type": "array",
+            "id": "http://jsonschema.net/columns";,
+            "required": false,
+            "items": {
+                "type": "string",
+                "id": "http://jsonschema.net/columns/0";,
+                "required": false
+            }
+        },
+        "data": {
+            "type": "array",
+            "required": false,
+            "items": {
+                "type": "array",
+                "required": false,
+                "items": {
+                    "type": "array",
+                    "required": false,
+                    "items": {
+                        "type": "object",
+                        "javaType" : "org.apache.streams.neo4j.ItemMetadata",
+                        "properties": {
+                            "data": {
+                                "type": "object",
+                                "javaType" : 
"org.apache.streams.neo4j.ItemData"
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jConfiguration.json
 
b/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jConfiguration.json
new file mode 100644
index 0000000..abd2391
--- /dev/null
+++ 
b/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jConfiguration.json
@@ -0,0 +1,27 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema";,
+  "id": "#",
+  "type": "object",
+  "javaType" : "org.apache.streams.neo4j.Neo4jConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "scheme": {
+      "type": "string"
+    },
+    "hosts": {
+      "type": "array",
+      "items": {
+        "type": "string"
+      }
+    },
+    "port": {
+      "type": "integer"
+    },
+    "username": {
+      "type": "string"
+    },
+    "password": {
+      "type": "string"
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jReaderConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jReaderConfiguration.json
 
b/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jReaderConfiguration.json
new file mode 100644
index 0000000..62c348f
--- /dev/null
+++ 
b/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jReaderConfiguration.json
@@ -0,0 +1,17 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema";,
+  "id": "#",
+  "type": "object",
+  "javaType" : "org.apache.streams.neo4j.Neo4jReaderConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends" : {"$ref":"Neo4jConfiguration.json"},
+  "properties": {
+    "query": {
+      "type": "string",
+      "required": "true"
+    },
+    "params": {
+      "type": "object"
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jBoltPersistIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jBoltPersistIT.java
 
b/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jBoltPersistIT.java
new file mode 100644
index 0000000..c45d975
--- /dev/null
+++ 
b/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jBoltPersistIT.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.neo4j.test;
+
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.neo4j.Neo4jConfiguration;
+import org.apache.streams.neo4j.Neo4jReaderConfiguration;
+import org.apache.streams.neo4j.bolt.Neo4jBoltClient;
+import org.apache.streams.neo4j.bolt.Neo4jBoltPersistReader;
+import org.apache.streams.neo4j.bolt.Neo4jBoltPersistWriter;
+import org.apache.streams.pojo.json.Activity;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
+import org.apache.commons.io.IOUtils;
+import org.neo4j.driver.v1.Session;
+import org.neo4j.driver.v1.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import static org.testng.Assert.assertTrue;
+
+/**
+ * Integration test for Neo4jBoltPersist.
+ *
+ * Test that graph db responses can be converted to streams data.
+ */
+public class Neo4jBoltPersistIT {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(Neo4jBoltPersistIT.class);
+
+  private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
+
+  private Neo4jBoltClient testClient;
+
+  private Neo4jConfiguration testConfiguration;
+
+  @BeforeClass
+  public void prepareTest() throws IOException {
+
+    Config reference  = ConfigFactory.load();
+    File conf = new File("target/test-classes/Neo4jBoltPersistIT.conf");
+    assertTrue(conf.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf, 
ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new 
ComponentConfigurator<>(Neo4jConfiguration.class).detectConfiguration(typesafe, 
"neo4j");
+    testClient = Neo4jBoltClient.getInstance(testConfiguration);
+
+    Session session = testClient.client().session();
+    Transaction transaction = session.beginTransaction();
+    transaction.run("MATCH ()-[r]-() DELETE r");
+    transaction.run("MATCH (n) DETACH DELETE n");
+    transaction.success();
+    session.close();
+  }
+
+  @Test
+  public void testNeo4jBoltPersist() throws Exception {
+
+    Neo4jBoltPersistWriter testPersistWriter = new 
Neo4jBoltPersistWriter(testConfiguration);
+    testPersistWriter.prepare(testConfiguration);
+
+    InputStream testActivityFolderStream = 
Neo4jBoltPersistIT.class.getClassLoader()
+        .getResourceAsStream("activities");
+    List<String> files = IOUtils.readLines(testActivityFolderStream, 
StandardCharsets.UTF_8);
+
+    int count = 0;
+    for( String file : files) {
+      LOGGER.info("File: " + file );
+      InputStream testActivityFileStream = 
Neo4jBoltPersistIT.class.getClassLoader()
+          .getResourceAsStream("activities/" + file);
+      Activity activity = MAPPER.readValue(testActivityFileStream, 
Activity.class);
+      if( activity.getActor() != null && activity.getActor().getId() == null 
&& activity.getActor().getObjectType() != null) {
+        activity.getActor().setId(activity.getActor().getObjectType());
+      }
+      if( activity.getObject() != null && activity.getObject().getId() == null 
&& activity.getObject().getObjectType() != null) {
+        activity.getObject().setId(activity.getObject().getObjectType());
+      }
+      if( activity.getTarget() != null && activity.getTarget().getId() == null 
&& activity.getTarget().getObjectType() != null) {
+        activity.getTarget().setId(activity.getTarget().getObjectType());
+      }
+      if( activity.getId() == null && activity.getVerb() != null) {
+        activity.setId(activity.getVerb());
+      }
+      StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+      testPersistWriter.write( datum );
+      LOGGER.info("Wrote: " + activity.getVerb() );
+      count++;
+    }
+
+    testPersistWriter.cleanUp();
+
+    LOGGER.info("Total Written: {}", count );
+    Assert.assertEquals(count, 89);
+
+    Neo4jReaderConfiguration vertexReaderConfiguration= 
MAPPER.convertValue(testConfiguration, Neo4jReaderConfiguration.class);
+    vertexReaderConfiguration.setQuery("MATCH (v) return v");
+    Neo4jBoltPersistReader vertexReader = new 
Neo4jBoltPersistReader(vertexReaderConfiguration);
+    vertexReader.prepare(null);
+    StreamsResultSet vertexResultSet = vertexReader.readAll();
+    LOGGER.info("Total Read: {}", vertexResultSet.size() );
+    Assert.assertEquals(vertexResultSet.size(), 24);
+
+    Neo4jReaderConfiguration edgeReaderConfiguration= 
MAPPER.convertValue(testConfiguration, Neo4jReaderConfiguration.class);
+    edgeReaderConfiguration.setQuery("MATCH (s)-[r]->(d) return r");
+    Neo4jBoltPersistReader edgeReader = new 
Neo4jBoltPersistReader(edgeReaderConfiguration);
+    edgeReader.prepare(null);
+    StreamsResultSet edgeResultSet = edgeReader.readAll();
+    LOGGER.info("Total Read: {}", edgeResultSet.size() );
+    Assert.assertEquals(edgeResultSet.size(), 100);
+
+  }
+
+  @AfterClass
+  public void cleanup() throws Exception {
+    Session session = testClient.client().session();
+    Transaction transaction = session.beginTransaction();
+    transaction.run("MATCH ()-[r]-() DELETE r");
+    transaction.run("MATCH (n) DETACH DELETE n");
+    transaction.success();
+    session.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jHttpPersistIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jHttpPersistIT.java
 
b/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jHttpPersistIT.java
new file mode 100644
index 0000000..a5b0d30
--- /dev/null
+++ 
b/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jHttpPersistIT.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.neo4j.test;
+
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.neo4j.Neo4jConfiguration;
+import org.apache.streams.neo4j.Neo4jReaderConfiguration;
+import org.apache.streams.neo4j.http.Neo4jHttpClient;
+import org.apache.streams.neo4j.http.Neo4jHttpPersistReader;
+import org.apache.streams.neo4j.http.Neo4jHttpPersistWriter;
+import org.apache.streams.pojo.json.Activity;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import static org.testng.Assert.assertTrue;
+
+/**
+ * Integration test for Neo4jHttpPersist.
+ *
+ * Test that graph db responses can be converted to streams data.
+ */
+public class Neo4jHttpPersistIT {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(Neo4jHttpPersistIT.class);
+
+  private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
+
+  private Neo4jHttpClient testClient;
+
+  private Neo4jConfiguration testConfiguration;
+
+  @BeforeClass
+  public void prepareTest() throws IOException {
+
+    Config reference  = ConfigFactory.load();
+    File conf = new File("target/test-classes/Neo4jHttpPersistIT.conf");
+    assertTrue(conf.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf, 
ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new 
ComponentConfigurator<>(Neo4jConfiguration.class).detectConfiguration(typesafe, 
"neo4j");
+
+  }
+
+  @Test
+  public void testNeo4jHttpPersist() throws Exception {
+
+    Neo4jHttpPersistWriter testPersistWriter = new 
Neo4jHttpPersistWriter(testConfiguration);
+    testPersistWriter.prepare(null);
+
+    InputStream testActivityFolderStream = 
Neo4jHttpPersistIT.class.getClassLoader()
+        .getResourceAsStream("activities");
+    List<String> files = IOUtils.readLines(testActivityFolderStream, 
StandardCharsets.UTF_8);
+
+    // write data
+
+    int count = 0;
+    for( String file : files) {
+      LOGGER.info("File: " + file );
+      InputStream testActivityFileStream = 
Neo4jHttpPersistIT.class.getClassLoader()
+          .getResourceAsStream("activities/" + file);
+      Activity activity = MAPPER.readValue(testActivityFileStream, 
Activity.class);
+      if( activity.getActor() != null && activity.getActor().getId() == null 
&& activity.getActor().getObjectType() != null) {
+        activity.getActor().setId(activity.getActor().getObjectType());
+      }
+      if( activity.getObject() != null && activity.getObject().getId() == null 
&& activity.getObject().getObjectType() != null) {
+        activity.getObject().setId(activity.getObject().getObjectType());
+      }
+      if( activity.getTarget() != null && activity.getTarget().getId() == null 
&& activity.getTarget().getObjectType() != null) {
+        activity.getTarget().setId(activity.getTarget().getObjectType());
+      }
+      if( activity.getId() == null && activity.getVerb() != null) {
+        activity.setId(activity.getVerb());
+      }
+      StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+      testPersistWriter.write( datum );
+      LOGGER.info("Wrote: " + activity.getVerb() );
+      count++;
+    }
+
+    testPersistWriter.cleanUp();
+
+    LOGGER.info("Total Written: {}", count );
+    Assert.assertEquals(count, 89);
+
+    Neo4jReaderConfiguration vertexReaderConfiguration= 
MAPPER.convertValue(testConfiguration, Neo4jReaderConfiguration.class);
+    vertexReaderConfiguration.setQuery("MATCH (v) return v");
+    Neo4jHttpPersistReader vertexReader = new 
Neo4jHttpPersistReader(vertexReaderConfiguration);
+    vertexReader.prepare(null);
+    StreamsResultSet vertexResultSet = vertexReader.readAll();
+    LOGGER.info("Total Read: {}", vertexResultSet.size() );
+    Assert.assertEquals(vertexResultSet.size(), 24);
+
+    Neo4jReaderConfiguration edgeReaderConfiguration= 
MAPPER.convertValue(testConfiguration, Neo4jReaderConfiguration.class);
+    edgeReaderConfiguration.setQuery("MATCH (s)-[r]->(d) return r");
+    Neo4jHttpPersistReader edgeReader = new 
Neo4jHttpPersistReader(edgeReaderConfiguration);
+    edgeReader.prepare(null);
+    StreamsResultSet edgeResultSet = edgeReader.readAll();
+    LOGGER.info("Total Read: {}", edgeResultSet.size() );
+    Assert.assertEquals(edgeResultSet.size(), 100);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/TestCypherQueryGraphHelper.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/TestCypherQueryGraphHelper.java
 
b/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/TestCypherQueryGraphHelper.java
new file mode 100644
index 0000000..12f3306
--- /dev/null
+++ 
b/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/TestCypherQueryGraphHelper.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.neo4j.test;
+
+import org.apache.streams.neo4j.CypherQueryGraphHelper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+
+import org.javatuples.Pair;
+import org.junit.Test;
+
+import java.util.Map;
+
+/**
+ * TestCypherQueryGraphHelper tests
+ * @see org.apache.streams.graph.neo4j.CypherQueryGraphHelper
+ */
+public class TestCypherQueryGraphHelper {
+
+  CypherQueryGraphHelper helper = new CypherQueryGraphHelper();
+
+  @Test
+  public void getVertexRequestIdTest() throws Exception {
+
+    Pair<String, Map<String, Object>> queryAndParams = 
helper.getVertexRequest("id");
+    assert(queryAndParams != null);
+    assert(queryAndParams.getValue0() != null);
+
+  }
+
+  @Test
+  public void getVertexRequestLongTest() throws Exception {
+
+    Pair<String, Map<String, Object>> queryAndParams = 
helper.getVertexRequest(new Long(1));
+
+    assert(queryAndParams != null);
+    assert(queryAndParams.getValue0() != null);
+
+  }
+
+  @Test
+  public void createVertexRequestTest() throws Exception {
+
+    ActivityObject activityObject = new ActivityObject();
+    activityObject.setId("id");
+    activityObject.setObjectType("type");
+    activityObject.setContent("content");
+
+    Pair<String, Map<String, Object>> queryAndParams = 
helper.createVertexRequest(activityObject);
+    assert(queryAndParams != null);
+    assert(queryAndParams.getValue0() != null);
+    assert(queryAndParams.getValue1() != null);
+
+  }
+
+  @Test
+  public void mergeVertexRequestTest() throws Exception {
+
+    ActivityObject activityObject = new ActivityObject();
+    activityObject.setId("id");
+    activityObject.setObjectType("type");
+    activityObject.setContent("content");
+
+    Pair<String, Map<String, Object>> queryAndParams = 
helper.mergeVertexRequest(activityObject);
+    assert(queryAndParams != null);
+    assert(queryAndParams.getValue0() != null);
+    assert(queryAndParams.getValue1() != null);
+
+  }
+
+  @Test
+  public void createActorObjectEdgeRequestTest() throws Exception {
+
+    ActivityObject actor = new ActivityObject();
+    actor.setId("actor");
+    actor.setObjectType("type");
+    actor.setContent("content");
+
+    ActivityObject object = new ActivityObject();
+    object.setId("object");
+    object.setObjectType("type");
+    object.setContent("content");
+
+    Activity activity = new Activity();
+    activity.setId("activity");
+    activity.setVerb("verb");
+    activity.setContent("content");
+
+    activity.setActor(actor);
+    activity.setObject(object);
+
+    Pair<String, Map<String, Object>> queryAndParams = 
helper.createActorObjectEdge(activity);
+
+    assert(queryAndParams != null);
+    assert(queryAndParams.getValue0() != null);
+    assert(queryAndParams.getValue1() != null);
+
+  }
+
+  @Test
+  public void createEdgeRequestTest() throws Exception {
+
+    ActivityObject actor = new ActivityObject();
+    actor.setId("actor");
+    actor.setObjectType("type");
+    actor.setContent("content");
+
+    ActivityObject object = new ActivityObject();
+    object.setId("object");
+    object.setObjectType("type");
+    object.setContent("content");
+
+    ActivityObject target = new ActivityObject();
+    object.setId("target");
+    object.setObjectType("type");
+
+    Activity activity = new Activity();
+    activity.setId("activity");
+    activity.setVerb("verb");
+    activity.setContent("content");
+
+    activity.setActor(actor);
+    activity.setObject(object);
+    activity.setObject(target);
+
+    Pair<String, Map<String, Object>> queryAndParams = 
helper.createActorTargetEdge(activity);
+
+    assert(queryAndParams != null);
+    assert(queryAndParams.getValue0() != null);
+    assert(queryAndParams.getValue1() != null);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jBoltPersistIT.conf
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jBoltPersistIT.conf
 
b/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jBoltPersistIT.conf
new file mode 100644
index 0000000..1d5ec35
--- /dev/null
+++ 
b/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jBoltPersistIT.conf
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+neo4j {
+  scheme = "tcp"
+  hosts += ${neo4j.tcp.host}
+  port = ${neo4j.tcp.port}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jHttpPersistIT.conf
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jHttpPersistIT.conf
 
b/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jHttpPersistIT.conf
new file mode 100644
index 0000000..929b3ed
--- /dev/null
+++ 
b/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jHttpPersistIT.conf
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+neo4j {
+  scheme = "http"
+  hosts += ${neo4j.http.host}
+  port = ${neo4j.http.port}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java
----------------------------------------------------------------------
diff --git 
a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java
 
b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java
deleted file mode 100644
index 981de44..0000000
--- 
a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data.util;
-
-import org.apache.streams.jackson.StreamsJacksonMapper;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.fasterxml.jackson.databind.node.ValueNode;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- *  Class transforms nested properties of activities, actors, objects, etc...
- */
-public class PropertyUtil {
-
-  private static final ObjectMapper mapper = 
StreamsJacksonMapper.getInstance();
-
-  public static Map<String, Object> flattenToMap(ObjectNode object) {
-    Map<String, Object> flatObject = new HashMap<>();
-    addKeys("", object, flatObject, '.');
-    return flatObject;
-  }
-
-  public static Map<String, Object> flattenToMap(ObjectNode object, char 
seperator) {
-    Map<String, Object> flatObject = new HashMap<>();
-    addKeys("", object, flatObject, seperator);
-    return flatObject;
-  }
-
-  public static ObjectNode flattenToObjectNode(ObjectNode object) {
-    Map<String, Object> flatObject = flattenToMap(object, '.');
-    addKeys("", object, flatObject, '.');
-    return mapper.convertValue(flatObject, ObjectNode.class);
-  }
-
-  public static ObjectNode flattenToObjectNode(ObjectNode object, char 
seperator) {
-    Map<String, Object> flatObject = flattenToMap(object, seperator);
-    addKeys("", object, flatObject, seperator);
-    return mapper.convertValue(flatObject, ObjectNode.class);
-  }
-
-  private static void addKeys(String currentPath, JsonNode jsonNode, 
Map<String, Object> map, char seperator) {
-    if (jsonNode.isObject()) {
-      ObjectNode objectNode = (ObjectNode) jsonNode;
-      Iterator<Map.Entry<String, JsonNode>> iter = objectNode.fields();
-      String pathPrefix = currentPath.isEmpty() ? "" : currentPath + seperator;
-
-      while (iter.hasNext()) {
-        Map.Entry<String, JsonNode> entry = iter.next();
-        addKeys(pathPrefix + entry.getKey(), entry.getValue(), map, seperator);
-      }
-    } else if (jsonNode.isArray()) {
-      ArrayNode arrayNode = (ArrayNode) jsonNode;
-      map.put(currentPath, arrayNode);
-    } else if (jsonNode.isValueNode()) {
-      ValueNode valueNode = (ValueNode) jsonNode;
-      if ( valueNode.isTextual() ) {
-        map.put(currentPath, valueNode.asText());
-      } else if ( valueNode.isNumber() ) {
-        map.put(currentPath, valueNode);
-      }
-    }
-  }
-
-  public static ObjectNode unflattenMap(Map<String, Object> object, char 
seperator) {
-    return unflattenObjectNode(mapper.convertValue(object, ObjectNode.class), 
seperator);
-  }
-
-  public static ObjectNode unflattenObjectNode(ObjectNode flatObject, char 
seperator) {
-    ObjectNode root = mapper.createObjectNode();
-    Iterator<Map.Entry<String, JsonNode>> iter = flatObject.fields();
-    while (iter.hasNext()) {
-      Map.Entry<String, JsonNode> item = iter.next();
-      String fullKey = item.getKey();
-      if ( !fullKey.contains(Character.valueOf(seperator).toString())) {
-        root.put(item.getKey(), item.getValue());
-      } else {
-        ObjectNode currentNode = root;
-        List<String> keyParts = new ArrayList<>();
-        Iterables.addAll(keyParts, 
Splitter.on(seperator).split(item.getKey()));
-        for (String part : 
Iterables.limit(Splitter.on(seperator).split(item.getKey()), keyParts.size() - 
1)) {
-          if (currentNode.has(part) && currentNode.get(part).isObject()) {
-            currentNode = (ObjectNode) currentNode.get(part);
-          } else {
-            ObjectNode newNode = mapper.createObjectNode();
-            currentNode.put(part, newNode);
-            currentNode = newNode;
-          }
-        }
-        currentNode.put(keyParts.get(keyParts.size() - 1), item.getValue());
-
-      }
-    }
-    return root;
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-util/src/main/java/org/apache/streams/util/PropertyUtil.java
----------------------------------------------------------------------
diff --git 
a/streams-util/src/main/java/org/apache/streams/util/PropertyUtil.java 
b/streams-util/src/main/java/org/apache/streams/util/PropertyUtil.java
new file mode 100644
index 0000000..1671174
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/PropertyUtil.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.util;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.ValueNode;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *  Class transforms nested properties of activities, actors, objects, etc...
+ */
+public class PropertyUtil {
+
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  public static Map<String, Object> flattenToMap(ObjectNode object) {
+    Map<String, Object> flatObject = new HashMap<>();
+    addKeys(new String(), object, flatObject, '.');
+    return flatObject;
+  }
+
+  public static ObjectNode flattenToObjectNode(ObjectNode object) {
+    Map<String, Object> flatObject = flattenToMap(object, '.');
+    addKeys(new String(), object, flatObject, '.');
+    return mapper.convertValue(flatObject, ObjectNode.class);
+  }
+
+  public static Map<String, Object> flattenToMap(ObjectNode object, char 
seperator) {
+    Map<String, Object> flatObject = new HashMap<>();
+    addKeys(new String(), object, flatObject, seperator);
+    return flatObject;
+  }
+
+  public static ObjectNode flattenToObjectNode(ObjectNode object, char 
seperator) {
+    Map<String, Object> flatObject = flattenToMap(object, seperator);
+    addKeys(new String(), object, flatObject, seperator);
+    return mapper.convertValue(flatObject, ObjectNode.class);
+  }
+
+  private static void addKeys(String currentPath, JsonNode jsonNode, 
Map<String, Object> map, char seperator) {
+    if (jsonNode.isObject()) {
+      ObjectNode objectNode = (ObjectNode) jsonNode;
+      Iterator<Map.Entry<String, JsonNode>> iter = objectNode.fields();
+      String pathPrefix = currentPath.isEmpty() ? "" : currentPath + seperator;
+
+      while (iter.hasNext()) {
+        Map.Entry<String, JsonNode> entry = iter.next();
+        addKeys(pathPrefix + entry.getKey(), entry.getValue(), map, seperator);
+      }
+    } else if (jsonNode.isArray()) {
+      ArrayNode arrayNode = (ArrayNode) jsonNode;
+      if( arrayNode.isTextual()) {
+        List<String> list = mapper.convertValue(arrayNode, List.class);
+        map.put(currentPath, list);
+      }
+      if( arrayNode.isNumber()) {
+        List<String> list = mapper.convertValue(arrayNode, List.class);
+        map.put(currentPath, list);
+      }
+    } else if (jsonNode.isValueNode()) {
+      ValueNode valueNode = (ValueNode) jsonNode;
+      if( valueNode.isTextual() )
+        map.put(currentPath, valueNode.asText());
+      else if ( valueNode.isNumber() )
+        map.put(currentPath, valueNode);
+    }
+  }
+
+  public static ObjectNode unflattenMap(Map<String, Object> object, char 
seperator) {
+    return unflattenObjectNode(mapper.convertValue(object, ObjectNode.class), 
seperator);
+  }
+
+  public static ObjectNode unflattenObjectNode(ObjectNode flatObject, char 
seperator) {
+    ObjectNode root = mapper.createObjectNode();
+    Iterator<Map.Entry<String, JsonNode>> iter = flatObject.fields();
+    while (iter.hasNext()) {
+      Map.Entry<String, JsonNode> item = iter.next();
+      String fullKey = item.getKey();
+      if( !fullKey.contains(Character.valueOf(seperator).toString())) {
+        root.put(item.getKey(), item.getValue());
+      } else {
+        ObjectNode currentNode = root;
+        List<String> keyParts = new 
ArrayList<>(Arrays.asList(StringUtils.split(item.getKey(), seperator)));
+        keyParts.remove(keyParts.size()-1);
+        Iterator<String> keyPartIterator = keyParts.iterator();
+        while( keyPartIterator.hasNext()) {
+          String part = keyPartIterator.next();
+          if( currentNode.has(part) && currentNode.get(part).isObject() ) {
+            currentNode = (ObjectNode) currentNode.get(part);
+          } else {
+            ObjectNode newNode = mapper.createObjectNode();
+            currentNode.put(part, newNode);
+            currentNode = newNode;
+          }
+        };
+        currentNode.put(keyParts.get(keyParts.size()-1), item.getValue());
+
+      }
+    }
+    return root;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-util/src/test/java/org/apache/streams/util/schema/test/PropertyUtilTest.java
----------------------------------------------------------------------
diff --git 
a/streams-util/src/test/java/org/apache/streams/util/schema/test/PropertyUtilTest.java
 
b/streams-util/src/test/java/org/apache/streams/util/schema/test/PropertyUtilTest.java
new file mode 100644
index 0000000..233a431
--- /dev/null
+++ 
b/streams-util/src/test/java/org/apache/streams/util/schema/test/PropertyUtilTest.java
@@ -0,0 +1,25 @@
+package org.apache.streams.util.schema.test;
+
+import org.apache.streams.util.PropertyUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.testng.annotations.Test;
+
+/**
+ * Created by sblackmon on 1/8/17.
+ */
+public class PropertyUtilTest {
+
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  String flatJson = "{\"a.a\": \"aa\", \"a.b\": \"ab\", \"b.a\": \"ba\", 
\"b.b\": \"bb\"}";
+
+  @Test
+  public void testUnflattenObjectNode() throws Exception {
+    ObjectNode flatNode = mapper.readValue(flatJson, ObjectNode.class);
+    ObjectNode unflattenedNode = PropertyUtil.unflattenObjectNode(flatNode, 
'.');
+    assert(unflattenedNode.size() == 2);
+  }
+}
+

Reply via email to