Configs and untested writer

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4b596083
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4b596083
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4b596083

Branch: refs/heads/blueprints
Commit: 4b59608350c292053dae6c063c1c414c38f99861
Parents: 7b21084
Author: sblackmon <[email protected]>
Authored: Fri Oct 24 14:15:21 2014 -0500
Committer: sblackmon <[email protected]>
Committed: Fri Oct 24 14:15:21 2014 -0500

----------------------------------------------------------------------
 .../blueprints/BlueprintsConfigurator.java      |  62 ++++++
 .../blueprints/BlueprintsPersistWriter.java     | 202 +++++++++++++++++++
 .../blueprints/BlueprintsConfiguration.json     |  26 +++
 3 files changed, 290 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4b596083/streams-contrib/streams-persist-blueprints/src/main/java/org/apache/streams/blueprints/BlueprintsConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-blueprints/src/main/java/org/apache/streams/blueprints/BlueprintsConfigurator.java
 
b/streams-contrib/streams-persist-blueprints/src/main/java/org/apache/streams/blueprints/BlueprintsConfigurator.java
new file mode 100644
index 0000000..a9d2dd8
--- /dev/null
+++ 
b/streams-contrib/streams-persist-blueprints/src/main/java/org/apache/streams/blueprints/BlueprintsConfigurator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.blueprints;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class BlueprintsConfigurator {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(BlueprintsConfigurator.class);
+
+    private final static ObjectMapper mapper = new ObjectMapper();
+
+    public static BlueprintsConfiguration detectConfiguration(Config 
blueprints) {
+
+        BlueprintsConfiguration blueprintsConfiguration = new 
BlueprintsConfiguration();
+
+        try {
+            blueprintsConfiguration = 
mapper.readValue(blueprints.root().render(ConfigRenderOptions.concise()), 
BlueprintsConfiguration.class);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Could not parse BlueprintsConfiguration");
+        }
+        return blueprintsConfiguration;
+    }
+
+    public static BlueprintsWriterConfiguration 
detectWriterConfiguration(Config blueprints) {
+
+        BlueprintsWriterConfiguration blueprintsConfiguration = new 
BlueprintsWriterConfiguration();
+
+        try {
+            blueprintsConfiguration = 
mapper.readValue(blueprints.root().render(ConfigRenderOptions.concise()), 
BlueprintsWriterConfiguration.class);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Could not parse BlueprintsConfiguration");
+        }
+        return blueprintsConfiguration;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4b596083/streams-contrib/streams-persist-blueprints/src/main/java/org/apache/streams/blueprints/BlueprintsPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-blueprints/src/main/java/org/apache/streams/blueprints/BlueprintsPersistWriter.java
 
b/streams-contrib/streams-persist-blueprints/src/main/java/org/apache/streams/blueprints/BlueprintsPersistWriter.java
new file mode 100644
index 0000000..4a68afc
--- /dev/null
+++ 
b/streams-contrib/streams-persist-blueprints/src/main/java/org/apache/streams/blueprints/BlueprintsPersistWriter.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.blueprints;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.tinkerpop.blueprints.Edge;
+import com.tinkerpop.blueprints.Element;
+import com.tinkerpop.blueprints.Graph;
+import com.tinkerpop.blueprints.Vertex;
+import com.tinkerpop.blueprints.impls.rexster.RexsterGraph;
+import com.tinkerpop.rexster.client.RexsterClient;
+import com.tinkerpop.rexster.client.RexsterClientFactory;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class BlueprintsPersistWriter implements StreamsPersistWriter {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(BlueprintsPersistWriter.class);
+    private final static long MAX_WRITE_LATENCY = 1000;
+
+    private BlueprintsWriterConfiguration configuration;
+
+    protected RexsterClient client;
+    protected RexsterGraph graph;
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+    private volatile AtomicLong lastWrite = new 
AtomicLong(System.currentTimeMillis());
+    private ScheduledExecutorService backgroundFlushTask = 
Executors.newSingleThreadScheduledExecutor();
+
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public BlueprintsPersistWriter() {
+        
this(BlueprintsConfigurator.detectWriterConfiguration(StreamsConfigurator.config.getConfig("blueprints")));
+    }
+
+    public BlueprintsPersistWriter(BlueprintsWriterConfiguration 
configuration) {
+        this.configuration = configuration;
+    }
+
+    @Override
+    public void write(StreamsDatum streamsDatum) {
+
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+        connectToGraph();
+
+        Preconditions.checkNotNull(client);
+
+        Preconditions.checkNotNull(graph);
+        Preconditions.checkNotNull(graph.getGraphURI());
+
+    }
+
+    @Override
+    public void cleanUp() {
+
+        graph.shutdown();
+
+        try {
+            client.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        } finally {
+            client = null;
+        }
+
+    }
+
+    protected List<Element> persistElements(StreamsDatum streamsDatum) {
+        List<Element> elements = Lists.newArrayList();
+        Activity activity = null;
+        if (streamsDatum.getDocument() instanceof Activity) {
+            activity = (Activity) streamsDatum.getDocument();
+        } else if (streamsDatum.getDocument() instanceof ObjectNode) {
+            activity = mapper.convertValue(streamsDatum.getDocument(), 
Activity.class);
+        } else if (streamsDatum.getDocument() instanceof String) {
+            try {
+                activity = mapper.readValue((String) 
streamsDatum.getDocument(), Activity.class);
+            } catch (Throwable e) {
+                LOGGER.warn(e.getMessage());
+                return elements;
+            }
+        } else {
+            return elements;
+        }
+
+        // what gets persisted is configurable
+        // we may add vertices and/or edges
+
+        // always add vertices first
+        // what types of verbs are relevant for adding vertices?
+        if( 
configuration.getVertices().getVerbs().contains(activity.getVerb().toString())) 
{
+
+            // what objectTypes are relevant for adding vertices?
+            if( 
configuration.getVertices().getObjectTypes().contains(activity.getActor().getObjectType()))
 {
+                elements.add(persistVertex(activity.getActor()));
+            }
+
+            if( 
configuration.getVertices().getObjectTypes().contains(activity.getObject().getObjectType()))
 {
+                elements.add(persistVertex(activity.getObject()));
+            }
+
+        }
+
+        // always add edges last
+        // what types of verbs are relevant for adding edges?
+        if( 
configuration.getEdges().getVerbs().contains(activity.getVerb().toString())) {
+
+            // what objectTypes are relevant for adding edges?
+            if( 
configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType())
+                &&
+                
configuration.getEdges().getObjectTypes().contains(activity.getObject().getObjectType()))
 {
+                elements.add(persistEdge(activity));
+            }
+
+        }
+        return elements;
+    }
+
+    private synchronized void connectToGraph() {
+
+        if( 
configuration.getType().equals(BlueprintsConfiguration.Type.REXSTER)) {
+            try {
+                client = RexsterClientFactory.open(
+                        configuration.getHost(),
+                        configuration.getGraph());
+                StringBuilder uri = new StringBuilder()
+                        .append("http://";)
+                        .append(configuration.getHost())
+                        .append(":")
+                        .append(configuration.getPort())
+                        .append("/graphs/")
+                        .append(configuration.getGraph());
+                graph = new RexsterGraph(uri.toString());
+            } catch (Exception e) {
+                LOGGER.error("ERROR: ", e.getMessage());
+            }
+            return;
+
+        }
+    }
+
+    protected Vertex persistVertex(ActivityObject object) {
+        Iterator<Vertex> existing = graph.query().limit(1).has("id", 
object.getId()).vertices().iterator();
+        if( !existing.hasNext()) {
+            Vertex vertex = graph.addVertex(object);
+            return vertex;
+        } else {
+            return existing.next();
+        }
+    }
+
+    protected Edge persistEdge(Activity activity) {
+        Iterator<Edge> existing = graph.query().limit(1).has("id", 
activity.getId()).edges().iterator();
+        if( !existing.hasNext()) {
+            Vertex s = persistVertex(activity.getActor());
+            Vertex d = persistVertex(activity.getObject());
+            Edge edge = graph.addEdge(activity, s, d, activity.getVerb());
+            return edge;
+        } else {
+            return existing.next();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4b596083/streams-contrib/streams-persist-blueprints/src/main/jsonschema/org/apache/streams/blueprints/BlueprintsConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-blueprints/src/main/jsonschema/org/apache/streams/blueprints/BlueprintsConfiguration.json
 
b/streams-contrib/streams-persist-blueprints/src/main/jsonschema/org/apache/streams/blueprints/BlueprintsConfiguration.json
new file mode 100644
index 0000000..fa60a78
--- /dev/null
+++ 
b/streams-contrib/streams-persist-blueprints/src/main/jsonschema/org/apache/streams/blueprints/BlueprintsConfiguration.json
@@ -0,0 +1,26 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema";,
+    "id": "#",
+    "javaType" : "org.apache.streams.blueprints.BlueprintsConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "type": {
+            "type": "string",
+            "description": "Blueprint DB type",
+            "enum" : ["tinkergraph", "rexster"]
+        },
+        "host": {
+            "type": "string",
+            "description": "Blueprint DB host"
+        },
+        "port": {
+            "type": "integer",
+            "description": "Blueprint DB Port"
+        },
+        "graph": {
+            "type": "string",
+            "description": "Blueprint DB Graph ID"
+        }
+    }
+}
\ No newline at end of file

Reply via email to