Repository: incubator-streams Updated Branches: refs/heads/master 7810361d2 -> 4bd22317e
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistReader.java b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistReader.java new file mode 100644 index 0000000..171dde4 --- /dev/null +++ b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistReader.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.neo4j.bolt; + +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistReader; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.neo4j.Neo4jReaderConfiguration; +import org.apache.streams.util.PropertyUtil; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Queues; + +import org.joda.time.DateTime; +import org.neo4j.driver.internal.value.NodeValue; +import org.neo4j.driver.internal.value.RelationshipValue; +import org.neo4j.driver.internal.value.StringValue; +import org.neo4j.driver.v1.Record; +import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.Transaction; +import org.neo4j.driver.v1.Value; +import org.neo4j.driver.v1.types.Node; +import org.neo4j.driver.v1.types.Relationship; +import org.neo4j.driver.v1.util.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import javax.annotation.Nullable; + + +/** + * Neo4jBoltPersistReader reads documents from neo4j. + */ +public class Neo4jBoltPersistReader implements StreamsPersistReader { + + public static final String STREAMS_ID = "CassandraPersistReader"; + + public static final Logger LOGGER = LoggerFactory.getLogger(Neo4jBoltPersistReader.class); + + protected volatile Queue<StreamsDatum> persistQueue; + + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + private ExecutorService executor; + private CompletableFuture<Boolean> readerTaskFuture = new CompletableFuture<>(); + + private Neo4jReaderConfiguration config; + + protected Neo4jBoltClient client; + +// protected Cluster cluster; +// protected Session session; +// +// protected String keyspace; +// protected String table; + protected StatementResult statementResult; + + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** + * Neo4jBoltPersistReader constructor - resolves Neo4jReaderConfiguration from JVM 'neo4j'. + */ + public Neo4jBoltPersistReader() { + this.config = new ComponentConfigurator<>(Neo4jReaderConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("neo4j")); + } + + /** + * Neo4jBoltPersistReader constructor - uses supplied Neo4jReaderConfiguration. + * @param config config + */ + public Neo4jBoltPersistReader(Neo4jReaderConfiguration config) { + this.config = config; + } + + /** + * Neo4jBoltPersistReader constructor - uses supplied persistQueue. + * @param persistQueue persistQueue + */ + public Neo4jBoltPersistReader(Queue<StreamsDatum> persistQueue) { + this(); + this.persistQueue = persistQueue; + } + + public void setPersistQueue(Queue<StreamsDatum> persistQueue) { + this.persistQueue = persistQueue; + } + + public Queue<StreamsDatum> getPersistQueue() { + return persistQueue; + } + + public void stop() { + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void prepare(Object configurationObject) { + if( configurationObject instanceof Neo4jReaderConfiguration ) { + this.config = (Neo4jReaderConfiguration) configurationObject; + } + this.client = Neo4jBoltClient.getInstance(this.config); + + persistQueue = constructQueue(); + + executor = Executors.newSingleThreadExecutor(); + } + + @Override + public void cleanUp() { + stop(); + } + + protected Optional<StreamsDatum> buildDatum(Record record) { + ObjectNode objectNode; + + if( record != null ) { + ObjectNode valueJson = null; + Map<String, ObjectNode> valueJsons = record.asMap(neo4jObjectNodeFunction); + if( valueJsons.size() == 1) { + valueJson = valueJsons.get(valueJsons.keySet().iterator().next()); + } + objectNode = PropertyUtil.unflattenObjectNode(valueJson, '.'); + return Optional.of(new StreamsDatum(objectNode)); + } + + return Optional.empty(); + } + + @Override + public StreamsResultSet readAll() { + + Session session = null; + + String query = config.getQuery(); + Map<String, Object> params = mapper.convertValue(config.getParams(), Map.class); + + try { + session = client.client().session(); + Transaction transaction = session.beginTransaction(); + + this.statementResult = client.client().session().beginTransaction().run(query, params); + + while( statementResult.hasNext()) { + Record record = statementResult.next(); + Optional<StreamsDatum> datum = buildDatum(record); + if( datum.isPresent()) { + write(datum.get()); + } + } + + } catch(Exception ex) { + LOGGER.warn("Exception", ex); + } finally { + if( session != null ) { + session.close(); + } + } + return readCurrent(); + + } + + @Override + public void startStream() { + LOGGER.debug("startStream"); + Neo4jBoltPersistReaderTask readerTask = new Neo4jBoltPersistReaderTask(this); + + CompletableFuture.runAsync(readerTask, executor); + + try { + if (readerTaskFuture.get()) { + executor.shutdown(); + } + } catch (InterruptedException ex) { + LOGGER.trace("Interrupt", ex); + } catch (ExecutionException ex) { + LOGGER.trace("Execution exception", ex); + } + } + + @Override + public StreamsResultSet readCurrent() { + + StreamsResultSet current; + + try { + lock.writeLock().lock(); + current = new StreamsResultSet(persistQueue); + current.setCounter(new DatumStatusCounter()); + persistQueue = constructQueue(); + } finally { + lock.writeLock().unlock(); + } + + return current; + } + + protected void write(StreamsDatum entry) { + boolean success; + do { + try { + lock.readLock().lock(); + success = persistQueue.offer(entry); + Thread.yield(); + } finally { + lock.readLock().unlock(); + } + } + while (!success); + } + + @Override + public StreamsResultSet readNew(BigInteger sequence) { + return null; + } + + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + return null; + } + + @Override + public boolean isRunning() { + return !executor.isTerminated() || !executor.isShutdown(); + } + + private Queue<StreamsDatum> constructQueue() { + return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000)); + } + + private static String readAllStatement() { + return "MATCH (v:streams)"; + } + + public class Neo4jBoltPersistReaderTask implements Runnable { + + private Neo4jBoltPersistReader reader; + + public Neo4jBoltPersistReaderTask(Neo4jBoltPersistReader reader) { + this.reader = reader; + } + + @Override + public void run() { + try { + while (reader.statementResult.hasNext()) { + Record record = statementResult.next(); + Optional<StreamsDatum> datum = reader.buildDatum(record); + if( datum.isPresent() ) { + reader.write(datum.get()); + } + } + } finally { + readerTaskFuture.complete(true); + } + } + } + + Function<Value, ObjectNode> neo4jObjectNodeFunction = new Function<Value, ObjectNode>() { + + @Nullable + @Override + public ObjectNode apply(@Nullable Value value) { + ObjectNode resultNode = null; + if (value instanceof StringValue) { + StringValue stringValue = (StringValue) value; + String string = stringValue.asLiteralString(); + try { + resultNode = mapper.readValue(string, ObjectNode.class); + } catch (IOException ex) { + LOGGER.error("IOException", ex); + } + } else if ( value instanceof NodeValue) { + NodeValue nodeValue = (NodeValue) value; + Node node = nodeValue.asNode(); + Map<String, Object> nodeMap = node.asMap(); + resultNode = PropertyUtil.unflattenMap(nodeMap, '.'); + } else if (value instanceof RelationshipValue) { + RelationshipValue relationshipValue = (RelationshipValue) value; + Relationship relationship = relationshipValue.asRelationship(); + Map<String, Object> relationshipMap = relationship.asMap(); + resultNode = PropertyUtil.unflattenMap(relationshipMap, '.'); + } + return resultNode; + } + }; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistWriter.java b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistWriter.java new file mode 100644 index 0000000..3c752d6 --- /dev/null +++ b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistWriter.java @@ -0,0 +1,77 @@ +package org.apache.streams.neo4j.bolt; + +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistWriter; +import org.apache.streams.neo4j.Neo4jConfiguration; +import org.apache.streams.neo4j.Neo4jPersistUtil; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.javatuples.Pair; +import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.Transaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +/** + * Created by sblackmon on 12/16/16. + */ +public class Neo4jBoltPersistWriter implements StreamsPersistWriter { + + private Neo4jConfiguration config; + + Neo4jBoltClient client; + + private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jBoltPersistWriter.class); + + private static ObjectMapper mapper; + + public Neo4jBoltPersistWriter(Neo4jConfiguration config) { + this.config = config; + + } + + @Override + public String getId() { + return Neo4jBoltPersistWriter.class.getSimpleName(); + } + + @Override + public void prepare(Object configurationObject) { + client = Neo4jBoltClient.getInstance(config); + } + + @Override + public void cleanUp() { + // + } + + @Override + public void write(StreamsDatum entry) { + + List<Pair<String, Map<String, Object>>> statements; + Session session = null; + try { + statements = Neo4jPersistUtil.prepareStatements(entry); + session = client.client().session(); + Transaction transaction = session.beginTransaction(); + for( Pair<String, Map<String, Object>> statement : statements ) { + StatementResult statementResult = transaction.run( statement.getValue0(), statement.getValue1() ); + LOGGER.debug("StatementResult", statementResult.single()); + } + transaction.success(); + } catch( Exception ex ) { + LOGGER.error("Exception", ex); + } finally { + if( session != null ) { + session.close(); + } + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpClient.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpClient.java b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpClient.java new file mode 100644 index 0000000..da8c01e --- /dev/null +++ b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpClient.java @@ -0,0 +1,74 @@ +package org.apache.streams.neo4j.http; + +import org.apache.streams.neo4j.Neo4jConfiguration; + +import org.apache.http.client.HttpClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +import static org.hamcrest.MatcherAssert.assertThat; + +public class Neo4jHttpClient { + + private static final Logger LOGGER = LoggerFactory + .getLogger(Neo4jHttpClient.class); + + public Neo4jConfiguration config; + + private HttpClient client; + + private Neo4jHttpClient(Neo4jConfiguration neo4jConfiguration) { + this.config = neo4jConfiguration; + try { + this.start(); + } catch (Exception e) { + e.printStackTrace(); + this.client = null; + } + } + + private static Map<Neo4jConfiguration, Neo4jHttpClient> INSTANCE_MAP = new ConcurrentHashMap<Neo4jConfiguration, Neo4jHttpClient>(); + + public static Neo4jHttpClient getInstance(Neo4jConfiguration neo4jConfiguration) { + if ( INSTANCE_MAP != null && + INSTANCE_MAP.size() > 0 && + INSTANCE_MAP.containsKey(neo4jConfiguration)) { + return INSTANCE_MAP.get(neo4jConfiguration); + } else { + Neo4jHttpClient instance = new Neo4jHttpClient(neo4jConfiguration); + if( instance != null && instance.client != null ) { + INSTANCE_MAP.put(neo4jConfiguration, instance); + return instance; + } else { + return null; + } + } + } + + public void start() throws Exception { + + Objects.nonNull(config); + assertThat("config.getScheme().startsWith(\"http\")", config.getScheme().startsWith("http")); + + LOGGER.info("Neo4jConfiguration.start {}", config); + + Objects.nonNull(client); + + } + + public void stop() throws Exception { + this.client = null; + } + + public Neo4jConfiguration config() { + return config; + } + + public HttpClient client() { + return client; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpGraphHelper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpGraphHelper.java b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpGraphHelper.java new file mode 100644 index 0000000..4c126b8 --- /dev/null +++ b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpGraphHelper.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.neo4j.http; + +import org.apache.streams.graph.HttpGraphHelper; +import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.javatuples.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Objects; + +/** + * Supporting class for interacting with neo4j via rest API. + */ +public class Neo4jHttpGraphHelper implements HttpGraphHelper { + + private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + + private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jHttpGraphHelper.class); + + private static final String statementKey = "statement"; + private static final String queryKey = "query"; + private static final String paramsKey = "parameters"; + private static final String propsKey = "props"; + + /** + * readDataStatement neo4j rest json read data payload. + * + * @param queryPlusParameters (query, parameter map) + * @return ObjectNode + */ + public ObjectNode readData(Pair<String, Map<String, Object>> queryPlusParameters) { + + LOGGER.debug("readData: ", queryPlusParameters); + + Objects.requireNonNull(queryPlusParameters); + Objects.requireNonNull(queryPlusParameters.getValue0()); + + ObjectNode request = MAPPER.createObjectNode(); + + request.put(queryKey, queryPlusParameters.getValue0()); + + if( queryPlusParameters.getValue1() != null && queryPlusParameters.getValue1().size() > 0) { + ObjectNode params = MAPPER.convertValue(queryPlusParameters.getValue1(), ObjectNode.class); + request.put(paramsKey, params); + } + + LOGGER.debug("readData: ", request); + + return request; + } + + /** + * writeDataStatement neo4j rest json write data payload. + * + * @param queryPlusParameters (query, parameter map) + * @return ObjectNode + */ + public ObjectNode writeData(Pair<String, Map<String, Object>> queryPlusParameters) { + + LOGGER.debug("writeData: ", queryPlusParameters); + + Objects.requireNonNull(queryPlusParameters); + Objects.requireNonNull(queryPlusParameters.getValue0()); + + ObjectNode request = MAPPER.createObjectNode(); + + request.put(statementKey, queryPlusParameters.getValue0()); + + if( queryPlusParameters.getValue1() != null && queryPlusParameters.getValue1().size() > 0) { + ObjectNode params = MAPPER.convertValue(queryPlusParameters.getValue1(), ObjectNode.class); + request.put(paramsKey, params); + } else { + request.put(paramsKey, MAPPER.createObjectNode()); + } + + LOGGER.debug("writeData: ", request); + + return request; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistReader.java b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistReader.java new file mode 100644 index 0000000..86a9da2 --- /dev/null +++ b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistReader.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.neo4j.http; + +import org.apache.streams.components.http.HttpConfiguration; +import org.apache.streams.components.http.HttpProviderConfiguration; +import org.apache.streams.components.http.provider.SimpleHttpProvider; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsPersistReader; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.graph.HttpGraphHelper; +import org.apache.streams.graph.QueryGraphHelper; +import org.apache.streams.neo4j.CypherQueryGraphHelper; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.neo4j.CypherQueryResponse; +import org.apache.streams.neo4j.ItemData; +import org.apache.streams.neo4j.ItemMetadata; +import org.apache.streams.neo4j.Neo4jReaderConfiguration; +import org.apache.streams.util.PropertyUtil; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.javatuples.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Reads a stream of activityobjects from vertices in a graph database with + * an http rest endpoint (such as neo4j). + */ +public class Neo4jHttpPersistReader extends SimpleHttpProvider implements StreamsPersistReader { + + public static final String STREAMS_ID = Neo4jHttpPersistReader.class.getCanonicalName(); + + private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jHttpPersistReader.class); + + private Neo4jReaderConfiguration config; + + private QueryGraphHelper queryGraphHelper; + private HttpGraphHelper httpGraphHelper; + + private static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + /** + * GraphVertexReader constructor - resolve GraphReaderConfiguration from JVM 'graph'. + */ + public Neo4jHttpPersistReader() { + this(new ComponentConfigurator<>(Neo4jReaderConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("neo4j"))); + } + + /** + * GraphVertexReader constructor - use supplied GraphReaderConfiguration. + * @param configuration GraphReaderConfiguration + */ + public Neo4jHttpPersistReader(Neo4jReaderConfiguration configuration) { + super((HttpProviderConfiguration)StreamsJacksonMapper.getInstance().convertValue(configuration, HttpProviderConfiguration.class).withHostname(configuration.getHosts().get(0))); + super.configuration.setRequestMethod(HttpConfiguration.RequestMethod.POST); + super.configuration.setResourcePath("/db"); + super.configuration.setResource("data"); + super.configuration.setResourcePostfix("cypher"); + this.config = configuration; + } + + /** + * prepareHttpRequest + * @param uri uri + * @return result + */ + public HttpRequestBase prepareHttpRequest(URI uri) { + HttpRequestBase baseRequest = super.prepareHttpRequest(uri); + HttpPost post = (HttpPost) baseRequest; + String query = config.getQuery(); + Map<String, Object> params = mapper.convertValue(config.getParams(), Map.class); + Pair<String, Map<String, Object>> queryPlusParams = new Pair(query, params); + ObjectNode queryNode = httpGraphHelper.readData(queryPlusParams); + try { + String queryJsonString = mapper.writeValueAsString(queryNode); + HttpEntity entity = new StringEntity(queryJsonString, ContentType.create("application/json")); + post.setEntity(entity); + } catch (JsonProcessingException ex) { + LOGGER.error("JsonProcessingException", ex); + return null; + } + return post; + + } + /** + * Neo API query returns something like this: + * { "columns": [ "v" ], "data": [ [ { "data": { props }, etc... } ], [ { "data": { props }, etc... } ] ] } + * + * @param jsonNode jsonNode + * @return result + */ + public List<ObjectNode> parse(JsonNode jsonNode) { + List<ObjectNode> results = new ArrayList<>(); + + ObjectNode root = (ObjectNode) jsonNode; + + CypherQueryResponse cypherQueryResponse = mapper.convertValue(root, CypherQueryResponse.class); + + for ( List<List<ItemMetadata>> dataWrapper : cypherQueryResponse.getData()) { + + for (List<ItemMetadata> itemMetadatas : dataWrapper) { + + for (ItemMetadata itemMetadata : itemMetadatas) { + + ItemData itemData = itemMetadata.getData(); + + LOGGER.debug("itemData: " + itemData); + + results.add(PropertyUtil.unflattenMap(itemData.getAdditionalProperties(), '.')); + } + + } + + } + return results; + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void prepare(Object configurationObject) { + + super.prepare(configurationObject); + mapper = StreamsJacksonMapper.getInstance(); + + queryGraphHelper = new CypherQueryGraphHelper(); + httpGraphHelper = new Neo4jHttpGraphHelper(); + + Objects.requireNonNull(queryGraphHelper); + Objects.requireNonNull(httpGraphHelper); + } + + @Override + public StreamsResultSet readAll() { + return readCurrent(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistWriter.java b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistWriter.java new file mode 100644 index 0000000..e05a252 --- /dev/null +++ b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistWriter.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.neo4j.http; + +import org.apache.streams.components.http.HttpPersistWriterConfiguration; +import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.graph.HttpGraphHelper; +import org.apache.streams.graph.QueryGraphHelper; +import org.apache.streams.neo4j.CypherQueryGraphHelper; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.neo4j.Neo4jConfiguration; +import org.apache.streams.neo4j.Neo4jPersistUtil; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.util.EntityUtils; +import org.javatuples.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Adds activityobjects as vertices and activities as edges to a graph database with + * an http rest endpoint (such as neo4j). + */ +public class Neo4jHttpPersistWriter extends SimpleHTTPPostPersistWriter { + + public static final String STREAMS_ID = Neo4jHttpPersistWriter.class.getCanonicalName(); + + private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jHttpPersistWriter.class); + private static final long MAX_WRITE_LATENCY = 1000; + + private Neo4jConfiguration configuration; + + private QueryGraphHelper queryGraphHelper; + private HttpGraphHelper httpGraphHelper; + + private static ObjectMapper mapper; + + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** + * GraphHttpPersistWriter constructor - resolve GraphHttpConfiguration from JVM 'graph'. + */ + public Neo4jHttpPersistWriter() { + this(new ComponentConfigurator<>(Neo4jConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("neo4j"))); + } + + /** + * GraphHttpPersistWriter constructor - use supplied GraphHttpConfiguration. + * @param configuration GraphHttpConfiguration + */ + public Neo4jHttpPersistWriter(Neo4jConfiguration configuration) { + super((HttpPersistWriterConfiguration)StreamsJacksonMapper.getInstance().convertValue(configuration, HttpPersistWriterConfiguration.class).withHostname(configuration.getHosts().get(0))); + super.configuration.setResourcePath("/db/data/transaction/commit/"); + this.configuration = configuration; + } + + @Override + protected ObjectNode preparePayload(StreamsDatum entry) throws Exception { + + List<Pair<String, Map<String, Object>>> statements = Neo4jPersistUtil.prepareStatements(entry); + + ObjectNode requestNode = mapper.createObjectNode(); + ArrayNode statementsArray = mapper.createArrayNode(); + + for( Pair<String, Map<String, Object>> statement : statements ) { + statementsArray.add(httpGraphHelper.writeData(statement)); + } + + requestNode.put("statements", statementsArray); + return requestNode; + + } + + @Override + protected ObjectNode executePost(HttpPost httpPost) { + + Objects.requireNonNull(httpPost); + + ObjectNode result = null; + + CloseableHttpResponse response = null; + + String entityString = null; + try { + response = httpclient.execute(httpPost); + HttpEntity entity = response.getEntity(); + if (response.getStatusLine().getStatusCode() == 200 || response.getStatusLine().getStatusCode() == 201 && entity != null) { + entityString = EntityUtils.toString(entity); + result = mapper.readValue(entityString, ObjectNode.class); + } + LOGGER.debug("Writer response:\n{}\n{}\n{}", httpPost.toString(), response.getStatusLine().getStatusCode(), entityString); + if ( result == null + || ( + result.get("errors") != null + && result.get("errors").isArray() + && result.get("errors").iterator().hasNext() + ) + ) { + LOGGER.error("Write Error: " + result.get("errors")); + } else { + LOGGER.debug("Write Success"); + } + } catch (IOException ex) { + LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, ex.getMessage()); + } catch (Exception ex) { + LOGGER.error("Write Exception:\n{}\n{}\n{}", httpPost.toString(), response, ex.getMessage()); + } finally { + try { + if ( response != null) { + response.close(); + } + } catch (IOException ignored) { + LOGGER.trace("ignored IOException", ignored); + } + } + return result; + } + + @Override + public void prepare(Object configurationObject) { + + super.prepare(null); + mapper = StreamsJacksonMapper.getInstance(); + + queryGraphHelper = new CypherQueryGraphHelper(); + httpGraphHelper = new Neo4jHttpGraphHelper(); + + Objects.requireNonNull(queryGraphHelper); + Objects.requireNonNull(httpGraphHelper); + } + + @Override + public void cleanUp() { + + LOGGER.info("exiting"); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/CypherQueryResponse.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/CypherQueryResponse.json b/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/CypherQueryResponse.json new file mode 100644 index 0000000..4e80eb4 --- /dev/null +++ b/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/CypherQueryResponse.json @@ -0,0 +1,43 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "id": "#", + "javaType" : "org.apache.streams.neo4j.CypherQueryResponse", + "properties": { + "columns": { + "type": "array", + "id": "http://jsonschema.net/columns", + "required": false, + "items": { + "type": "string", + "id": "http://jsonschema.net/columns/0", + "required": false + } + }, + "data": { + "type": "array", + "required": false, + "items": { + "type": "array", + "required": false, + "items": { + "type": "array", + "required": false, + "items": { + "type": "object", + "javaType" : "org.apache.streams.neo4j.ItemMetadata", + "properties": { + "data": { + "type": "object", + "javaType" : "org.apache.streams.neo4j.ItemData" + } + } + } + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jConfiguration.json b/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jConfiguration.json new file mode 100644 index 0000000..abd2391 --- /dev/null +++ b/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jConfiguration.json @@ -0,0 +1,27 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "type": "object", + "javaType" : "org.apache.streams.neo4j.Neo4jConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "scheme": { + "type": "string" + }, + "hosts": { + "type": "array", + "items": { + "type": "string" + } + }, + "port": { + "type": "integer" + }, + "username": { + "type": "string" + }, + "password": { + "type": "string" + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jReaderConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jReaderConfiguration.json b/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jReaderConfiguration.json new file mode 100644 index 0000000..62c348f --- /dev/null +++ b/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jReaderConfiguration.json @@ -0,0 +1,17 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "type": "object", + "javaType" : "org.apache.streams.neo4j.Neo4jReaderConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "extends" : {"$ref":"Neo4jConfiguration.json"}, + "properties": { + "query": { + "type": "string", + "required": "true" + }, + "params": { + "type": "object" + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jBoltPersistIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jBoltPersistIT.java b/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jBoltPersistIT.java new file mode 100644 index 0000000..c45d975 --- /dev/null +++ b/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jBoltPersistIT.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.neo4j.test; + +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.neo4j.Neo4jConfiguration; +import org.apache.streams.neo4j.Neo4jReaderConfiguration; +import org.apache.streams.neo4j.bolt.Neo4jBoltClient; +import org.apache.streams.neo4j.bolt.Neo4jBoltPersistReader; +import org.apache.streams.neo4j.bolt.Neo4jBoltPersistWriter; +import org.apache.streams.pojo.json.Activity; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; + +import org.apache.commons.io.IOUtils; +import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.Transaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import static org.testng.Assert.assertTrue; + +/** + * Integration test for Neo4jBoltPersist. + * + * Test that graph db responses can be converted to streams data. + */ +public class Neo4jBoltPersistIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jBoltPersistIT.class); + + private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + + private Neo4jBoltClient testClient; + + private Neo4jConfiguration testConfiguration; + + @BeforeClass + public void prepareTest() throws IOException { + + Config reference = ConfigFactory.load(); + File conf = new File("target/test-classes/Neo4jBoltPersistIT.conf"); + assertTrue(conf.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf, ConfigParseOptions.defaults().setAllowMissing(false)); + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + testConfiguration = new ComponentConfigurator<>(Neo4jConfiguration.class).detectConfiguration(typesafe, "neo4j"); + testClient = Neo4jBoltClient.getInstance(testConfiguration); + + Session session = testClient.client().session(); + Transaction transaction = session.beginTransaction(); + transaction.run("MATCH ()-[r]-() DELETE r"); + transaction.run("MATCH (n) DETACH DELETE n"); + transaction.success(); + session.close(); + } + + @Test + public void testNeo4jBoltPersist() throws Exception { + + Neo4jBoltPersistWriter testPersistWriter = new Neo4jBoltPersistWriter(testConfiguration); + testPersistWriter.prepare(testConfiguration); + + InputStream testActivityFolderStream = Neo4jBoltPersistIT.class.getClassLoader() + .getResourceAsStream("activities"); + List<String> files = IOUtils.readLines(testActivityFolderStream, StandardCharsets.UTF_8); + + int count = 0; + for( String file : files) { + LOGGER.info("File: " + file ); + InputStream testActivityFileStream = Neo4jBoltPersistIT.class.getClassLoader() + .getResourceAsStream("activities/" + file); + Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class); + if( activity.getActor() != null && activity.getActor().getId() == null && activity.getActor().getObjectType() != null) { + activity.getActor().setId(activity.getActor().getObjectType()); + } + if( activity.getObject() != null && activity.getObject().getId() == null && activity.getObject().getObjectType() != null) { + activity.getObject().setId(activity.getObject().getObjectType()); + } + if( activity.getTarget() != null && activity.getTarget().getId() == null && activity.getTarget().getObjectType() != null) { + activity.getTarget().setId(activity.getTarget().getObjectType()); + } + if( activity.getId() == null && activity.getVerb() != null) { + activity.setId(activity.getVerb()); + } + StreamsDatum datum = new StreamsDatum(activity, activity.getVerb()); + testPersistWriter.write( datum ); + LOGGER.info("Wrote: " + activity.getVerb() ); + count++; + } + + testPersistWriter.cleanUp(); + + LOGGER.info("Total Written: {}", count ); + Assert.assertEquals(count, 89); + + Neo4jReaderConfiguration vertexReaderConfiguration= MAPPER.convertValue(testConfiguration, Neo4jReaderConfiguration.class); + vertexReaderConfiguration.setQuery("MATCH (v) return v"); + Neo4jBoltPersistReader vertexReader = new Neo4jBoltPersistReader(vertexReaderConfiguration); + vertexReader.prepare(null); + StreamsResultSet vertexResultSet = vertexReader.readAll(); + LOGGER.info("Total Read: {}", vertexResultSet.size() ); + Assert.assertEquals(vertexResultSet.size(), 24); + + Neo4jReaderConfiguration edgeReaderConfiguration= MAPPER.convertValue(testConfiguration, Neo4jReaderConfiguration.class); + edgeReaderConfiguration.setQuery("MATCH (s)-[r]->(d) return r"); + Neo4jBoltPersistReader edgeReader = new Neo4jBoltPersistReader(edgeReaderConfiguration); + edgeReader.prepare(null); + StreamsResultSet edgeResultSet = edgeReader.readAll(); + LOGGER.info("Total Read: {}", edgeResultSet.size() ); + Assert.assertEquals(edgeResultSet.size(), 100); + + } + + @AfterClass + public void cleanup() throws Exception { + Session session = testClient.client().session(); + Transaction transaction = session.beginTransaction(); + transaction.run("MATCH ()-[r]-() DELETE r"); + transaction.run("MATCH (n) DETACH DELETE n"); + transaction.success(); + session.close(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jHttpPersistIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jHttpPersistIT.java b/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jHttpPersistIT.java new file mode 100644 index 0000000..a5b0d30 --- /dev/null +++ b/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jHttpPersistIT.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.neo4j.test; + +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.neo4j.Neo4jConfiguration; +import org.apache.streams.neo4j.Neo4jReaderConfiguration; +import org.apache.streams.neo4j.http.Neo4jHttpClient; +import org.apache.streams.neo4j.http.Neo4jHttpPersistReader; +import org.apache.streams.neo4j.http.Neo4jHttpPersistWriter; +import org.apache.streams.pojo.json.Activity; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; + +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import static org.testng.Assert.assertTrue; + +/** + * Integration test for Neo4jHttpPersist. + * + * Test that graph db responses can be converted to streams data. + */ +public class Neo4jHttpPersistIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jHttpPersistIT.class); + + private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + + private Neo4jHttpClient testClient; + + private Neo4jConfiguration testConfiguration; + + @BeforeClass + public void prepareTest() throws IOException { + + Config reference = ConfigFactory.load(); + File conf = new File("target/test-classes/Neo4jHttpPersistIT.conf"); + assertTrue(conf.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf, ConfigParseOptions.defaults().setAllowMissing(false)); + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + testConfiguration = new ComponentConfigurator<>(Neo4jConfiguration.class).detectConfiguration(typesafe, "neo4j"); + + } + + @Test + public void testNeo4jHttpPersist() throws Exception { + + Neo4jHttpPersistWriter testPersistWriter = new Neo4jHttpPersistWriter(testConfiguration); + testPersistWriter.prepare(null); + + InputStream testActivityFolderStream = Neo4jHttpPersistIT.class.getClassLoader() + .getResourceAsStream("activities"); + List<String> files = IOUtils.readLines(testActivityFolderStream, StandardCharsets.UTF_8); + + // write data + + int count = 0; + for( String file : files) { + LOGGER.info("File: " + file ); + InputStream testActivityFileStream = Neo4jHttpPersistIT.class.getClassLoader() + .getResourceAsStream("activities/" + file); + Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class); + if( activity.getActor() != null && activity.getActor().getId() == null && activity.getActor().getObjectType() != null) { + activity.getActor().setId(activity.getActor().getObjectType()); + } + if( activity.getObject() != null && activity.getObject().getId() == null && activity.getObject().getObjectType() != null) { + activity.getObject().setId(activity.getObject().getObjectType()); + } + if( activity.getTarget() != null && activity.getTarget().getId() == null && activity.getTarget().getObjectType() != null) { + activity.getTarget().setId(activity.getTarget().getObjectType()); + } + if( activity.getId() == null && activity.getVerb() != null) { + activity.setId(activity.getVerb()); + } + StreamsDatum datum = new StreamsDatum(activity, activity.getVerb()); + testPersistWriter.write( datum ); + LOGGER.info("Wrote: " + activity.getVerb() ); + count++; + } + + testPersistWriter.cleanUp(); + + LOGGER.info("Total Written: {}", count ); + Assert.assertEquals(count, 89); + + Neo4jReaderConfiguration vertexReaderConfiguration= MAPPER.convertValue(testConfiguration, Neo4jReaderConfiguration.class); + vertexReaderConfiguration.setQuery("MATCH (v) return v"); + Neo4jHttpPersistReader vertexReader = new Neo4jHttpPersistReader(vertexReaderConfiguration); + vertexReader.prepare(null); + StreamsResultSet vertexResultSet = vertexReader.readAll(); + LOGGER.info("Total Read: {}", vertexResultSet.size() ); + Assert.assertEquals(vertexResultSet.size(), 24); + + Neo4jReaderConfiguration edgeReaderConfiguration= MAPPER.convertValue(testConfiguration, Neo4jReaderConfiguration.class); + edgeReaderConfiguration.setQuery("MATCH (s)-[r]->(d) return r"); + Neo4jHttpPersistReader edgeReader = new Neo4jHttpPersistReader(edgeReaderConfiguration); + edgeReader.prepare(null); + StreamsResultSet edgeResultSet = edgeReader.readAll(); + LOGGER.info("Total Read: {}", edgeResultSet.size() ); + Assert.assertEquals(edgeResultSet.size(), 100); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/TestCypherQueryGraphHelper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/TestCypherQueryGraphHelper.java b/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/TestCypherQueryGraphHelper.java new file mode 100644 index 0000000..12f3306 --- /dev/null +++ b/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/TestCypherQueryGraphHelper.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.neo4j.test; + +import org.apache.streams.neo4j.CypherQueryGraphHelper; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.pojo.json.ActivityObject; + +import org.javatuples.Pair; +import org.junit.Test; + +import java.util.Map; + +/** + * TestCypherQueryGraphHelper tests + * @see org.apache.streams.graph.neo4j.CypherQueryGraphHelper + */ +public class TestCypherQueryGraphHelper { + + CypherQueryGraphHelper helper = new CypherQueryGraphHelper(); + + @Test + public void getVertexRequestIdTest() throws Exception { + + Pair<String, Map<String, Object>> queryAndParams = helper.getVertexRequest("id"); + assert(queryAndParams != null); + assert(queryAndParams.getValue0() != null); + + } + + @Test + public void getVertexRequestLongTest() throws Exception { + + Pair<String, Map<String, Object>> queryAndParams = helper.getVertexRequest(new Long(1)); + + assert(queryAndParams != null); + assert(queryAndParams.getValue0() != null); + + } + + @Test + public void createVertexRequestTest() throws Exception { + + ActivityObject activityObject = new ActivityObject(); + activityObject.setId("id"); + activityObject.setObjectType("type"); + activityObject.setContent("content"); + + Pair<String, Map<String, Object>> queryAndParams = helper.createVertexRequest(activityObject); + assert(queryAndParams != null); + assert(queryAndParams.getValue0() != null); + assert(queryAndParams.getValue1() != null); + + } + + @Test + public void mergeVertexRequestTest() throws Exception { + + ActivityObject activityObject = new ActivityObject(); + activityObject.setId("id"); + activityObject.setObjectType("type"); + activityObject.setContent("content"); + + Pair<String, Map<String, Object>> queryAndParams = helper.mergeVertexRequest(activityObject); + assert(queryAndParams != null); + assert(queryAndParams.getValue0() != null); + assert(queryAndParams.getValue1() != null); + + } + + @Test + public void createActorObjectEdgeRequestTest() throws Exception { + + ActivityObject actor = new ActivityObject(); + actor.setId("actor"); + actor.setObjectType("type"); + actor.setContent("content"); + + ActivityObject object = new ActivityObject(); + object.setId("object"); + object.setObjectType("type"); + object.setContent("content"); + + Activity activity = new Activity(); + activity.setId("activity"); + activity.setVerb("verb"); + activity.setContent("content"); + + activity.setActor(actor); + activity.setObject(object); + + Pair<String, Map<String, Object>> queryAndParams = helper.createActorObjectEdge(activity); + + assert(queryAndParams != null); + assert(queryAndParams.getValue0() != null); + assert(queryAndParams.getValue1() != null); + + } + + @Test + public void createEdgeRequestTest() throws Exception { + + ActivityObject actor = new ActivityObject(); + actor.setId("actor"); + actor.setObjectType("type"); + actor.setContent("content"); + + ActivityObject object = new ActivityObject(); + object.setId("object"); + object.setObjectType("type"); + object.setContent("content"); + + ActivityObject target = new ActivityObject(); + object.setId("target"); + object.setObjectType("type"); + + Activity activity = new Activity(); + activity.setId("activity"); + activity.setVerb("verb"); + activity.setContent("content"); + + activity.setActor(actor); + activity.setObject(object); + activity.setObject(target); + + Pair<String, Map<String, Object>> queryAndParams = helper.createActorTargetEdge(activity); + + assert(queryAndParams != null); + assert(queryAndParams.getValue0() != null); + assert(queryAndParams.getValue1() != null); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jBoltPersistIT.conf ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jBoltPersistIT.conf b/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jBoltPersistIT.conf new file mode 100644 index 0000000..1d5ec35 --- /dev/null +++ b/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jBoltPersistIT.conf @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +neo4j { + scheme = "tcp" + hosts += ${neo4j.tcp.host} + port = ${neo4j.tcp.port} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jHttpPersistIT.conf ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jHttpPersistIT.conf b/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jHttpPersistIT.conf new file mode 100644 index 0000000..929b3ed --- /dev/null +++ b/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jHttpPersistIT.conf @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +neo4j { + scheme = "http" + hosts += ${neo4j.http.host} + port = ${neo4j.http.port} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java ---------------------------------------------------------------------- diff --git a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java deleted file mode 100644 index 981de44..0000000 --- a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.data.util; - -import org.apache.streams.jackson.StreamsJacksonMapper; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.fasterxml.jackson.databind.node.ValueNode; -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -/** - * Class transforms nested properties of activities, actors, objects, etc... - */ -public class PropertyUtil { - - private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - public static Map<String, Object> flattenToMap(ObjectNode object) { - Map<String, Object> flatObject = new HashMap<>(); - addKeys("", object, flatObject, '.'); - return flatObject; - } - - public static Map<String, Object> flattenToMap(ObjectNode object, char seperator) { - Map<String, Object> flatObject = new HashMap<>(); - addKeys("", object, flatObject, seperator); - return flatObject; - } - - public static ObjectNode flattenToObjectNode(ObjectNode object) { - Map<String, Object> flatObject = flattenToMap(object, '.'); - addKeys("", object, flatObject, '.'); - return mapper.convertValue(flatObject, ObjectNode.class); - } - - public static ObjectNode flattenToObjectNode(ObjectNode object, char seperator) { - Map<String, Object> flatObject = flattenToMap(object, seperator); - addKeys("", object, flatObject, seperator); - return mapper.convertValue(flatObject, ObjectNode.class); - } - - private static void addKeys(String currentPath, JsonNode jsonNode, Map<String, Object> map, char seperator) { - if (jsonNode.isObject()) { - ObjectNode objectNode = (ObjectNode) jsonNode; - Iterator<Map.Entry<String, JsonNode>> iter = objectNode.fields(); - String pathPrefix = currentPath.isEmpty() ? "" : currentPath + seperator; - - while (iter.hasNext()) { - Map.Entry<String, JsonNode> entry = iter.next(); - addKeys(pathPrefix + entry.getKey(), entry.getValue(), map, seperator); - } - } else if (jsonNode.isArray()) { - ArrayNode arrayNode = (ArrayNode) jsonNode; - map.put(currentPath, arrayNode); - } else if (jsonNode.isValueNode()) { - ValueNode valueNode = (ValueNode) jsonNode; - if ( valueNode.isTextual() ) { - map.put(currentPath, valueNode.asText()); - } else if ( valueNode.isNumber() ) { - map.put(currentPath, valueNode); - } - } - } - - public static ObjectNode unflattenMap(Map<String, Object> object, char seperator) { - return unflattenObjectNode(mapper.convertValue(object, ObjectNode.class), seperator); - } - - public static ObjectNode unflattenObjectNode(ObjectNode flatObject, char seperator) { - ObjectNode root = mapper.createObjectNode(); - Iterator<Map.Entry<String, JsonNode>> iter = flatObject.fields(); - while (iter.hasNext()) { - Map.Entry<String, JsonNode> item = iter.next(); - String fullKey = item.getKey(); - if ( !fullKey.contains(Character.valueOf(seperator).toString())) { - root.put(item.getKey(), item.getValue()); - } else { - ObjectNode currentNode = root; - List<String> keyParts = new ArrayList<>(); - Iterables.addAll(keyParts, Splitter.on(seperator).split(item.getKey())); - for (String part : Iterables.limit(Splitter.on(seperator).split(item.getKey()), keyParts.size() - 1)) { - if (currentNode.has(part) && currentNode.get(part).isObject()) { - currentNode = (ObjectNode) currentNode.get(part); - } else { - ObjectNode newNode = mapper.createObjectNode(); - currentNode.put(part, newNode); - currentNode = newNode; - } - } - currentNode.put(keyParts.get(keyParts.size() - 1), item.getValue()); - - } - } - return root; - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-util/src/main/java/org/apache/streams/util/PropertyUtil.java ---------------------------------------------------------------------- diff --git a/streams-util/src/main/java/org/apache/streams/util/PropertyUtil.java b/streams-util/src/main/java/org/apache/streams/util/PropertyUtil.java new file mode 100644 index 0000000..1671174 --- /dev/null +++ b/streams-util/src/main/java/org/apache/streams/util/PropertyUtil.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.util; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.ValueNode; +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Class transforms nested properties of activities, actors, objects, etc... + */ +public class PropertyUtil { + + private static final ObjectMapper mapper = new ObjectMapper(); + + public static Map<String, Object> flattenToMap(ObjectNode object) { + Map<String, Object> flatObject = new HashMap<>(); + addKeys(new String(), object, flatObject, '.'); + return flatObject; + } + + public static ObjectNode flattenToObjectNode(ObjectNode object) { + Map<String, Object> flatObject = flattenToMap(object, '.'); + addKeys(new String(), object, flatObject, '.'); + return mapper.convertValue(flatObject, ObjectNode.class); + } + + public static Map<String, Object> flattenToMap(ObjectNode object, char seperator) { + Map<String, Object> flatObject = new HashMap<>(); + addKeys(new String(), object, flatObject, seperator); + return flatObject; + } + + public static ObjectNode flattenToObjectNode(ObjectNode object, char seperator) { + Map<String, Object> flatObject = flattenToMap(object, seperator); + addKeys(new String(), object, flatObject, seperator); + return mapper.convertValue(flatObject, ObjectNode.class); + } + + private static void addKeys(String currentPath, JsonNode jsonNode, Map<String, Object> map, char seperator) { + if (jsonNode.isObject()) { + ObjectNode objectNode = (ObjectNode) jsonNode; + Iterator<Map.Entry<String, JsonNode>> iter = objectNode.fields(); + String pathPrefix = currentPath.isEmpty() ? "" : currentPath + seperator; + + while (iter.hasNext()) { + Map.Entry<String, JsonNode> entry = iter.next(); + addKeys(pathPrefix + entry.getKey(), entry.getValue(), map, seperator); + } + } else if (jsonNode.isArray()) { + ArrayNode arrayNode = (ArrayNode) jsonNode; + if( arrayNode.isTextual()) { + List<String> list = mapper.convertValue(arrayNode, List.class); + map.put(currentPath, list); + } + if( arrayNode.isNumber()) { + List<String> list = mapper.convertValue(arrayNode, List.class); + map.put(currentPath, list); + } + } else if (jsonNode.isValueNode()) { + ValueNode valueNode = (ValueNode) jsonNode; + if( valueNode.isTextual() ) + map.put(currentPath, valueNode.asText()); + else if ( valueNode.isNumber() ) + map.put(currentPath, valueNode); + } + } + + public static ObjectNode unflattenMap(Map<String, Object> object, char seperator) { + return unflattenObjectNode(mapper.convertValue(object, ObjectNode.class), seperator); + } + + public static ObjectNode unflattenObjectNode(ObjectNode flatObject, char seperator) { + ObjectNode root = mapper.createObjectNode(); + Iterator<Map.Entry<String, JsonNode>> iter = flatObject.fields(); + while (iter.hasNext()) { + Map.Entry<String, JsonNode> item = iter.next(); + String fullKey = item.getKey(); + if( !fullKey.contains(Character.valueOf(seperator).toString())) { + root.put(item.getKey(), item.getValue()); + } else { + ObjectNode currentNode = root; + List<String> keyParts = new ArrayList<>(Arrays.asList(StringUtils.split(item.getKey(), seperator))); + keyParts.remove(keyParts.size()-1); + Iterator<String> keyPartIterator = keyParts.iterator(); + while( keyPartIterator.hasNext()) { + String part = keyPartIterator.next(); + if( currentNode.has(part) && currentNode.get(part).isObject() ) { + currentNode = (ObjectNode) currentNode.get(part); + } else { + ObjectNode newNode = mapper.createObjectNode(); + currentNode.put(part, newNode); + currentNode = newNode; + } + }; + currentNode.put(keyParts.get(keyParts.size()-1), item.getValue()); + + } + } + return root; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-util/src/test/java/org/apache/streams/util/schema/test/PropertyUtilTest.java ---------------------------------------------------------------------- diff --git a/streams-util/src/test/java/org/apache/streams/util/schema/test/PropertyUtilTest.java b/streams-util/src/test/java/org/apache/streams/util/schema/test/PropertyUtilTest.java new file mode 100644 index 0000000..233a431 --- /dev/null +++ b/streams-util/src/test/java/org/apache/streams/util/schema/test/PropertyUtilTest.java @@ -0,0 +1,25 @@ +package org.apache.streams.util.schema.test; + +import org.apache.streams.util.PropertyUtil; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.testng.annotations.Test; + +/** + * Created by sblackmon on 1/8/17. + */ +public class PropertyUtilTest { + + private static final ObjectMapper mapper = new ObjectMapper(); + + String flatJson = "{\"a.a\": \"aa\", \"a.b\": \"ab\", \"b.a\": \"ba\", \"b.b\": \"bb\"}"; + + @Test + public void testUnflattenObjectNode() throws Exception { + ObjectNode flatNode = mapper.readValue(flatJson, ObjectNode.class); + ObjectNode unflattenedNode = PropertyUtil.unflattenObjectNode(flatNode, '.'); + assert(unflattenedNode.size() == 2); + } +} +