http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java index 86ab72f..e322990 100644 --- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java +++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java @@ -18,17 +18,19 @@ package org.apache.streams.graph.neo4j; +import org.apache.streams.data.util.PropertyUtil; +import org.apache.streams.graph.QueryGraphHelper; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.pojo.json.ActivityObject; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.streams.data.util.PropertyUtil; -import org.apache.streams.graph.QueryGraphHelper; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.pojo.json.ActivityObject; + import org.javatuples.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,178 +44,196 @@ import java.util.Map; */ public class CypherQueryGraphHelper implements QueryGraphHelper { - private final static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - private final static Logger LOGGER = LoggerFactory.getLogger(Neo4jHttpGraphHelper.class); + private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jHttpGraphHelper.class); - public final static String getVertexLongIdStatementTemplate = "MATCH (v) WHERE ID(v) = <id> RETURN v"; - public final static String getVertexStringIdStatementTemplate = "MATCH (v {id: '<id>'} ) RETURN v"; + public static final String getVertexLongIdStatementTemplate = "MATCH (v) WHERE ID(v) = <id> RETURN v"; + public static final String getVertexStringIdStatementTemplate = "MATCH (v {id: '<id>'} ) RETURN v"; - public final static String createVertexStatementTemplate = "MATCH (x {id: '<id>'}) "+ - "CREATE UNIQUE (v:<type> { props }) "+ - "ON CREATE SET v <labels> "+ - "RETURN v"; + public static final String createVertexStatementTemplate = + "MATCH (x {id: '<id>'}) " + + "CREATE UNIQUE (v:<type> { props }) " + + "ON CREATE SET v <labels> " + + "RETURN v"; - public final static String mergeVertexStatementTemplate = "MERGE (v:<type> {id: '<id>'}) "+ - "ON CREATE SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "+ - "ON MATCH SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "+ - "RETURN v"; - public final static String createEdgeStatementTemplate = "MATCH (s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) "+ - "CREATE UNIQUE (s)-[r:<r_type> <r_props>]->(d) "+ - "RETURN r"; - public Pair<String, Map<String, Object>> getVertexRequest(String streamsId) { + public static final String mergeVertexStatementTemplate = + "MERGE (v:<type> {id: '<id>'}) " + + "ON CREATE SET v <labels>, v = { props }, v.`@timestamp` = timestamp() " + + "ON MATCH SET v <labels>, v = { props }, v.`@timestamp` = timestamp() " + + "RETURN v"; - ST getVertex = new ST(getVertexStringIdStatementTemplate); - getVertex.add("id", streamsId); + public static final String createEdgeStatementTemplate = + "MATCH (s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) " + + "CREATE UNIQUE (s)-[r:<r_type> <r_props>]->(d) " + + "RETURN r"; - Pair<String, Map<String, Object>> queryPlusParameters = new Pair(getVertex.render(), null); + /** + * getVertexRequest. + * @param streamsId streamsId + * @return pair (streamsId, parameterMap) + */ + public Pair<String, Map<String, Object>> getVertexRequest(String streamsId) { - LOGGER.debug("getVertexRequest", queryPlusParameters.toString()); + ST getVertex = new ST(getVertexStringIdStatementTemplate); + getVertex.add("id", streamsId); - return queryPlusParameters; - } - - @Override - public Pair<String, Map<String, Object>> getVertexRequest(Long vertexId) { + Pair<String, Map<String, Object>> queryPlusParameters = new Pair(getVertex.render(), null); - ST getVertex = new ST(getVertexLongIdStatementTemplate); - getVertex.add("id", vertexId); + LOGGER.debug("getVertexRequest", queryPlusParameters.toString()); - Pair<String, Map<String, Object>> queryPlusParameters = new Pair(getVertex.render(), null); + return queryPlusParameters; + } - LOGGER.debug("getVertexRequest", queryPlusParameters.toString()); + /** + * getVertexRequest. + * @param vertexId numericId + * @return pair (streamsId, parameterMap) + */ + @Override + public Pair<String, Map<String, Object>> getVertexRequest(Long vertexId) { - return queryPlusParameters; + ST getVertex = new ST(getVertexLongIdStatementTemplate); + getVertex.add("id", vertexId); - } + Pair<String, Map<String, Object>> queryPlusParameters = new Pair(getVertex.render(), null); - public Pair<String, Map<String, Object>> createVertexRequest(ActivityObject activityObject) { + LOGGER.debug("getVertexRequest", queryPlusParameters.toString()); - Preconditions.checkNotNull(activityObject.getObjectType()); + return queryPlusParameters; - List<String> labels = getLabels(activityObject); + } - ST createVertex = new ST(createVertexStatementTemplate); - createVertex.add("id", activityObject.getId()); - createVertex.add("type", activityObject.getObjectType()); - if( labels.size() > 0) - createVertex.add("labels", Joiner.on(' ').join(labels)); - String query = createVertex.render(); + /** + * createVertexRequest. + * @param activityObject activityObject + * @return pair (query, parameterMap) + */ + public Pair<String, Map<String, Object>> createVertexRequest(ActivityObject activityObject) { - ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class); - Map<String, Object> props = PropertyUtil.flattenToMap(object, '.'); + Preconditions.checkNotNull(activityObject.getObjectType()); - Pair<String, Map<String, Object>> queryPlusParameters = new Pair(createVertex.render(), props); + List<String> labels = getLabels(activityObject); - LOGGER.debug("createVertexRequest: ({},{})", query, props); + ST createVertex = new ST(createVertexStatementTemplate); + createVertex.add("id", activityObject.getId()); + createVertex.add("type", activityObject.getObjectType()); - return queryPlusParameters; + if ( labels.size() > 0 ) { + createVertex.add("labels", Joiner.on(' ').join(labels)); } - public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject activityObject) { + String query = createVertex.render(); - Preconditions.checkNotNull(activityObject.getObjectType()); + ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class); + Map<String, Object> props = PropertyUtil.flattenToMap(object, '.'); - Pair queryPlusParameters = new Pair(null, Maps.newHashMap()); + Pair<String, Map<String, Object>> queryPlusParameters = new Pair(createVertex.render(), props); - List<String> labels = getLabels(activityObject); + LOGGER.debug("createVertexRequest: ({},{})", query, props); - ST mergeVertex = new ST(mergeVertexStatementTemplate); - mergeVertex.add("id", activityObject.getId()); - mergeVertex.add("type", activityObject.getObjectType()); - if( labels.size() > 0) - mergeVertex.add("labels", Joiner.on(' ').join(labels)); - String query = mergeVertex.render(); + return queryPlusParameters; + } - ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class); - Map<String, Object> props = PropertyUtil.flattenToMap(object, '.'); + /** + * mergeVertexRequest. + * @param activityObject activityObject + * @return pair (query, parameterMap) + */ + public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject activityObject) { - LOGGER.debug("mergeVertexRequest: ({},{})", query, props); + Preconditions.checkNotNull(activityObject.getObjectType()); - queryPlusParameters = queryPlusParameters.setAt0(query); - queryPlusParameters = queryPlusParameters.setAt1(props); + Pair queryPlusParameters = new Pair(null, Maps.newHashMap()); - return queryPlusParameters; - } - - public Pair<String, Map<String, Object>> createEdgeRequest(Activity activity) { - - Pair queryPlusParameters = new Pair(null, Maps.newHashMap()); - - ObjectNode object = MAPPER.convertValue(activity, ObjectNode.class); - Map<String, Object> props = PropertyUtil.flattenToMap(object, '.'); - - ST mergeEdge = new ST(createEdgeStatementTemplate); - mergeEdge.add("s_id", activity.getActor().getId()); - mergeEdge.add("s_type", activity.getActor().getObjectType()); - mergeEdge.add("d_id", activity.getObject().getId()); - mergeEdge.add("d_type", activity.getObject().getObjectType()); - mergeEdge.add("r_id", activity.getId()); - mergeEdge.add("r_type", activity.getVerb()); - mergeEdge.add("r_props", getPropertyCreater(props)); - - // set the activityObject's and extensions null, because their properties don't need to appear on the relationship - activity.setActor(null); - activity.setObject(null); - activity.setTarget(null); - activity.getAdditionalProperties().put("extensions", null); - - String statement = mergeEdge.render(); - queryPlusParameters = queryPlusParameters.setAt0(statement); - queryPlusParameters = queryPlusParameters.setAt1(props); + List<String> labels = getLabels(activityObject); - LOGGER.debug("createEdgeRequest: ({},{})", statement, props); - - return queryPlusParameters; - } - - public static String getPropertyValueSetter(Map<String, Object> map, String symbol) { - StringBuilder builder = new StringBuilder(); - for( Map.Entry<String, Object> entry : map.entrySet()) { - if( entry.getValue() instanceof String ) { - String propVal = (String)(entry.getValue()); - builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + propVal + "'"); - } - } - return builder.toString(); + ST mergeVertex = new ST(mergeVertexStatementTemplate); + mergeVertex.add("id", activityObject.getId()); + mergeVertex.add("type", activityObject.getObjectType()); + if ( labels.size() > 0 ) { + mergeVertex.add("labels", Joiner.on(' ').join(labels)); } - - public static String getPropertyParamSetter(Map<String, Object> map, String symbol) { - StringBuilder builder = new StringBuilder(); - for( Map.Entry<String, Object> entry : map.entrySet()) { - if( entry.getValue() instanceof String ) { - String propVal = (String)(entry.getValue()); - builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + propVal + "'"); - } - } - return builder.toString(); + String query = mergeVertex.render(); + + ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class); + Map<String, Object> props = PropertyUtil.flattenToMap(object, '.'); + + LOGGER.debug("mergeVertexRequest: ({},{})", query, props); + + queryPlusParameters = queryPlusParameters.setAt0(query); + queryPlusParameters = queryPlusParameters.setAt1(props); + + return queryPlusParameters; + } + + /** + * createEdgeRequest. + * @param activity activity + * @return pair (query, parameterMap) + */ + public Pair<String, Map<String, Object>> createEdgeRequest(Activity activity) { + + Pair queryPlusParameters = new Pair(null, Maps.newHashMap()); + + ObjectNode object = MAPPER.convertValue(activity, ObjectNode.class); + Map<String, Object> props = PropertyUtil.flattenToMap(object, '.'); + + ST mergeEdge = new ST(createEdgeStatementTemplate); + mergeEdge.add("s_id", activity.getActor().getId()); + mergeEdge.add("s_type", activity.getActor().getObjectType()); + mergeEdge.add("d_id", activity.getObject().getId()); + mergeEdge.add("d_type", activity.getObject().getObjectType()); + mergeEdge.add("r_id", activity.getId()); + mergeEdge.add("r_type", activity.getVerb()); + mergeEdge.add("r_props", getPropertyCreater(props)); + + // set the activityObject's and extensions null, because their properties don't need to appear on the relationship + activity.setActor(null); + activity.setObject(null); + activity.setTarget(null); + activity.getAdditionalProperties().put("extensions", null); + + String statement = mergeEdge.render(); + queryPlusParameters = queryPlusParameters.setAt0(statement); + queryPlusParameters = queryPlusParameters.setAt1(props); + + LOGGER.debug("createEdgeRequest: ({},{})", statement, props); + + return queryPlusParameters; + } + + /** + * getPropertyCreater. + * @param map paramMap + * @return PropertyCreater string + */ + public static String getPropertyCreater(Map<String, Object> map) { + StringBuilder builder = new StringBuilder(); + builder.append("{"); + List<String> parts = Lists.newArrayList(); + for ( Map.Entry<String, Object> entry : map.entrySet()) { + if ( entry.getValue() instanceof String ) { + String propVal = (String) (entry.getValue()); + parts.add("`" + entry.getKey() + "`:'" + propVal + "'"); + } } - - public static String getPropertyCreater(Map<String, Object> map) { - StringBuilder builder = new StringBuilder(); - builder.append("{"); - List<String> parts = Lists.newArrayList(); - for( Map.Entry<String, Object> entry : map.entrySet()) { - if( entry.getValue() instanceof String ) { - String propVal = (String) (entry.getValue()); - parts.add("`"+entry.getKey() + "`:'" + propVal + "'"); - } - } - builder.append(Joiner.on(",").join(parts)); - builder.append("}"); - return builder.toString(); - } - - private List<String> getLabels(ActivityObject activityObject) { - List<String> labels = Lists.newArrayList(":streams"); - if( activityObject.getAdditionalProperties().containsKey("labels") ) { - List<String> extraLabels = (List<String>)activityObject.getAdditionalProperties().get("labels"); - for( String extraLabel : extraLabels ) - labels.add(":"+extraLabel); - } - return labels; + builder.append(Joiner.on(",").join(parts)); + builder.append("}"); + return builder.toString(); + } + + private List<String> getLabels(ActivityObject activityObject) { + List<String> labels = Lists.newArrayList(":streams"); + if ( activityObject.getAdditionalProperties().containsKey("labels") ) { + List<String> extraLabels = (List<String>)activityObject.getAdditionalProperties().get("labels"); + for ( String extraLabel : extraLabels ) { + labels.add(":" + extraLabel); + } } + return labels; + } }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java index 48fe62d..72e668f 100644 --- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java +++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java @@ -18,58 +18,59 @@ package org.apache.streams.graph.neo4j; +import org.apache.streams.graph.HttpGraphHelper; +import org.apache.streams.jackson.StreamsJacksonMapper; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import org.apache.streams.data.util.PropertyUtil; -import org.apache.streams.graph.HttpGraphHelper; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.pojo.json.ActivityObject; + import org.javatuples.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.stringtemplate.v4.ST; -import java.util.List; import java.util.Map; /** - * Supporting class for interacting with neo4j via rest API + * Supporting class for interacting with neo4j via rest API. */ public class Neo4jHttpGraphHelper implements HttpGraphHelper { - private final static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - private final static Logger LOGGER = LoggerFactory.getLogger(Neo4jHttpGraphHelper.class); + private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jHttpGraphHelper.class); - public final static String statementKey = "statement"; - public final static String paramsKey = "parameters"; - public final static String propsKey = "props"; + public static final String statementKey = "statement"; + public static final String paramsKey = "parameters"; + public static final String propsKey = "props"; - public ObjectNode createHttpRequest(Pair<String, Map<String, Object>> queryPlusParameters) { + /** + * createHttpRequest neo4j rest json payload. + * + * @param queryPlusParameters (query, parameter map) + * @return ObjectNode + */ + public ObjectNode createHttpRequest(Pair<String, Map<String, Object>> queryPlusParameters) { - LOGGER.debug("createHttpRequest: ", queryPlusParameters); + LOGGER.debug("createHttpRequest: ", queryPlusParameters); - Preconditions.checkNotNull(queryPlusParameters); - Preconditions.checkNotNull(queryPlusParameters.getValue0()); - Preconditions.checkNotNull(queryPlusParameters.getValue1()); + Preconditions.checkNotNull(queryPlusParameters); + Preconditions.checkNotNull(queryPlusParameters.getValue0()); + Preconditions.checkNotNull(queryPlusParameters.getValue1()); - ObjectNode request = MAPPER.createObjectNode(); + ObjectNode request = MAPPER.createObjectNode(); - request.put(statementKey, queryPlusParameters.getValue0()); + request.put(statementKey, queryPlusParameters.getValue0()); - ObjectNode params = MAPPER.createObjectNode(); - ObjectNode props = MAPPER.convertValue(queryPlusParameters.getValue1(), ObjectNode.class); + ObjectNode params = MAPPER.createObjectNode(); + ObjectNode props = MAPPER.convertValue(queryPlusParameters.getValue1(), ObjectNode.class); - params.put(propsKey, props); - request.put(paramsKey, params); + params.put(propsKey, props); + request.put(paramsKey, params); - LOGGER.debug("createHttpRequest: ", request); + LOGGER.debug("createHttpRequest: ", request); - return request; - } + return request; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java index 3f889aa..c29c8b7 100644 --- a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java +++ b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java @@ -16,98 +16,101 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.graph.test; import org.apache.streams.graph.neo4j.CypherQueryGraphHelper; import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; + import org.javatuples.Pair; import org.junit.Test; import java.util.Map; /** - * Created by sblackmon on 6/24/15. + * TestCypherQueryGraphHelper tests + * @see org.apache.streams.graph.neo4j.CypherQueryGraphHelper */ public class TestCypherQueryGraphHelper { - CypherQueryGraphHelper helper = new CypherQueryGraphHelper(); + CypherQueryGraphHelper helper = new CypherQueryGraphHelper(); - @Test - public void getVertexRequestIdTest() throws Exception { + @Test + public void getVertexRequestIdTest() throws Exception { - Pair<String, Map<String, Object>> queryAndParams = helper.getVertexRequest("id"); - assert(queryAndParams != null); - assert(queryAndParams.getValue0() != null); + Pair<String, Map<String, Object>> queryAndParams = helper.getVertexRequest("id"); + assert(queryAndParams != null); + assert(queryAndParams.getValue0() != null); - } + } - @Test - public void getVertexRequestLongTest() throws Exception { + @Test + public void getVertexRequestLongTest() throws Exception { - Pair<String, Map<String, Object>> queryAndParams = helper.getVertexRequest(new Long(1)); + Pair<String, Map<String, Object>> queryAndParams = helper.getVertexRequest(new Long(1)); - assert(queryAndParams != null); - assert(queryAndParams.getValue0() != null); + assert(queryAndParams != null); + assert(queryAndParams.getValue0() != null); - } + } - @Test - public void createVertexRequestTest() throws Exception { + @Test + public void createVertexRequestTest() throws Exception { - ActivityObject activityObject = new ActivityObject(); - activityObject.setId("id"); - activityObject.setObjectType("type"); - activityObject.setContent("content"); + ActivityObject activityObject = new ActivityObject(); + activityObject.setId("id"); + activityObject.setObjectType("type"); + activityObject.setContent("content"); - Pair<String, Map<String, Object>> queryAndParams = helper.createVertexRequest(activityObject); - assert(queryAndParams != null); - assert(queryAndParams.getValue0() != null); - assert(queryAndParams.getValue1() != null); + Pair<String, Map<String, Object>> queryAndParams = helper.createVertexRequest(activityObject); + assert(queryAndParams != null); + assert(queryAndParams.getValue0() != null); + assert(queryAndParams.getValue1() != null); - } + } - @Test - public void mergeVertexRequestTest() throws Exception { + @Test + public void mergeVertexRequestTest() throws Exception { - ActivityObject activityObject = new ActivityObject(); - activityObject.setId("id"); - activityObject.setObjectType("type"); - activityObject.setContent("content"); + ActivityObject activityObject = new ActivityObject(); + activityObject.setId("id"); + activityObject.setObjectType("type"); + activityObject.setContent("content"); - Pair<String, Map<String, Object>> queryAndParams = helper.mergeVertexRequest(activityObject); - assert(queryAndParams != null); - assert(queryAndParams.getValue0() != null); - assert(queryAndParams.getValue1() != null); + Pair<String, Map<String, Object>> queryAndParams = helper.mergeVertexRequest(activityObject); + assert(queryAndParams != null); + assert(queryAndParams.getValue0() != null); + assert(queryAndParams.getValue1() != null); - } + } - @Test - public void createEdgeRequestTest() throws Exception { + @Test + public void createEdgeRequestTest() throws Exception { - ActivityObject actor = new ActivityObject(); - actor.setId("actor"); - actor.setObjectType("type"); - actor.setContent("content"); + ActivityObject actor = new ActivityObject(); + actor.setId("actor"); + actor.setObjectType("type"); + actor.setContent("content"); - ActivityObject object = new ActivityObject(); - object.setId("object"); - object.setObjectType("type"); - object.setContent("content"); + ActivityObject object = new ActivityObject(); + object.setId("object"); + object.setObjectType("type"); + object.setContent("content"); - Activity activity = new Activity(); - activity.setId("activity"); - activity.setVerb("verb"); - activity.setContent("content"); + Activity activity = new Activity(); + activity.setId("activity"); + activity.setVerb("verb"); + activity.setContent("content"); - activity.setActor(actor); - activity.setObject(object); + activity.setActor(actor); + activity.setObject(object); - Pair<String, Map<String, Object>> queryAndParams = helper.createEdgeRequest(activity); + Pair<String, Map<String, Object>> queryAndParams = helper.createEdgeRequest(activity); - assert(queryAndParams != null); - assert(queryAndParams.getValue0() != null); - assert(queryAndParams.getValue1() != null); + assert(queryAndParams != null); + assert(queryAndParams.getValue0() != null); + assert(queryAndParams.getValue1() != null); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java index eb7ce96..673b402 100644 --- a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java +++ b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java @@ -18,14 +18,16 @@ package org.apache.streams.graph.test; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.commons.io.IOUtils; import org.apache.streams.graph.GraphHttpConfiguration; import org.apache.streams.graph.GraphReaderConfiguration; import org.apache.streams.graph.GraphVertexReader; import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import org.apache.commons.io.IOUtils; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -39,43 +41,43 @@ import java.util.List; * Unit test for * @see {@link org.apache.streams.graph.GraphVertexReader} * - * Test that graph db responses can be converted to streams data + * Test that graph db responses can be converted to streams data. */ public class TestNeo4jHttpVertexReader { - private final static Logger LOGGER = LoggerFactory.getLogger(TestNeo4jHttpVertexReader.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TestNeo4jHttpVertexReader.class); - private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - private JsonNode sampleReaderResult; + private JsonNode sampleReaderResult; - private GraphReaderConfiguration testConfiguration; + private GraphReaderConfiguration testConfiguration; - private GraphVertexReader graphPersistReader; + private GraphVertexReader graphPersistReader; - @Before - public void prepareTest() throws IOException { + @Before + public void prepareTest() throws IOException { - testConfiguration = new GraphReaderConfiguration(); - testConfiguration.setType(GraphHttpConfiguration.Type.NEO_4_J); + testConfiguration = new GraphReaderConfiguration(); + testConfiguration.setType(GraphHttpConfiguration.Type.NEO_4_J); - graphPersistReader = new GraphVertexReader(testConfiguration); - InputStream testActivityFileStream = TestNeo4jHttpVertexReader.class.getClassLoader() - .getResourceAsStream("sampleReaderResult.json"); - String sampleText = IOUtils.toString(testActivityFileStream, "utf-8"); - sampleReaderResult = mapper.readValue(sampleText, JsonNode.class); + graphPersistReader = new GraphVertexReader(testConfiguration); + InputStream testActivityFileStream = TestNeo4jHttpVertexReader.class.getClassLoader() + .getResourceAsStream("sampleReaderResult.json"); + String sampleText = IOUtils.toString(testActivityFileStream, "utf-8"); + sampleReaderResult = mapper.readValue(sampleText, JsonNode.class); - } + } - @Test - public void testParseNeoResult() throws IOException { + @Test + public void testParseNeoResult() throws IOException { - List<ObjectNode> result = graphPersistReader.parse(sampleReaderResult); + List<ObjectNode> result = graphPersistReader.parse(sampleReaderResult); - assert( result.size() == 10); + assert( result.size() == 10); - for( int i = 0 ; i < 10; i++ ) - assert( result.get(i).get("extensions").size() == 5); + for( int i = 0 ; i < 10; i++ ) + assert( result.get(i).get("extensions").size() == 5); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java index 1e066fb..c5a06fc 100644 --- a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java +++ b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java @@ -18,13 +18,15 @@ package org.apache.streams.hbase; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistWriter; +import org.apache.streams.util.GuidUtils; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import java.io.Closeable; -import java.io.Flushable; -import java.io.IOException; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HConnection; @@ -33,187 +35,195 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Put; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsPersistWriter; -import org.apache.streams.util.GuidUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class HbasePersistWriter implements StreamsPersistWriter, Flushable, Closeable -{ - public final static String STREAMS_ID = "HbasePersistWriter"; - - private final static Logger LOGGER = LoggerFactory.getLogger(HbasePersistWriter.class); - - protected HConnection connection; - protected HTablePool pool; - protected HTableInterface table; - protected HTableDescriptor descriptor; - - protected volatile Queue<StreamsDatum> persistQueue; - - private ObjectMapper mapper = new ObjectMapper(); - - private HbaseConfiguration config; +import java.io.Closeable; +import java.io.Flushable; +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; - public HbasePersistWriter() { - this.config = new ComponentConfigurator<>(HbaseConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("hbase")); - this.persistQueue = new ConcurrentLinkedQueue<>(); +/** + * HbasePersistWriter writes to hbase. + */ +public class HbasePersistWriter implements StreamsPersistWriter, Flushable, Closeable { + + public static final String STREAMS_ID = "HbasePersistWriter"; + + private static final Logger LOGGER = LoggerFactory.getLogger(HbasePersistWriter.class); + + protected HConnection connection; + protected HTablePool pool; + protected HTableInterface table; + protected HTableDescriptor descriptor; + + protected volatile Queue<StreamsDatum> persistQueue; + + private ObjectMapper mapper = new ObjectMapper(); + + private HbaseConfiguration config; + + /** + * HbasePersistWriter constructor - resolve HbaseConfiguration from JVM 'hbase'. + */ + public HbasePersistWriter() { + this.config = new ComponentConfigurator<>(HbaseConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("hbase")); + this.persistQueue = new ConcurrentLinkedQueue<>(); + } + + /** + * HbasePersistWriter constructor - use supplied persistQueue. + * @param persistQueue persistQueue + */ + // TODO: refactor this to use HbaseConfiguration + public HbasePersistWriter(Queue<StreamsDatum> persistQueue) { + this.config = new ComponentConfigurator<>(HbaseConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("hbase")); + this.persistQueue = persistQueue; + } + + private synchronized void connectToHbase() { + + // TODO: refactor this to resolve this stuff from typesafe + Configuration configuration = new Configuration(); + configuration.set("hbase.rootdir", config.getRootdir()); + configuration.set("zookeeper.znode.parent", config.getParent()); + configuration.set("zookeeper.znode.rootserver", config.getRootserver()); + //configuration.set("hbase.master", config.getRootserver()); + //configuration.set("hbase.cluster.distributed", "false"); + configuration.set("hbase.zookeeper.quorum", config.getQuorum()); + configuration.set("hbase.zookeeper.property.clientPort", Long.toString(config.getClientPort())); + configuration.setInt("zookeeper.session.timeout", 1000); + + configuration.setInt("timeout", 1000); + + //pool = new HTablePool(configuration, 10); + try { + connection = HConnectionManager.createConnection(configuration); + } catch (Exception ex) { + ex.printStackTrace(); + return; } - public HbasePersistWriter(Queue<StreamsDatum> persistQueue) { - this.config = new ComponentConfigurator<>(HbaseConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("hbase")); - this.persistQueue = persistQueue; + try { + // table = new HTable(configuration, config.getTable()); + // table = (HTable) pool.getTable(config.getTable()); + table = new HTable(configuration, config.getTable().getBytes()); + table.setAutoFlush(true); + } catch (Exception ex) { + ex.printStackTrace(); + return; } + // - private synchronized void connectToHbase() - { - Configuration configuration = new Configuration(); - configuration.set("hbase.rootdir", config.getRootdir()); - configuration.set("zookeeper.znode.parent", config.getParent()); - configuration.set("zookeeper.znode.rootserver", config.getRootserver()); - //configuration.set("hbase.master", config.getRootserver()); - //configuration.set("hbase.cluster.distributed", "false"); - configuration.set("hbase.zookeeper.quorum", config.getQuorum()); - configuration.set("hbase.zookeeper.property.clientPort", Long.toString(config.getClientPort())); - configuration.setInt("zookeeper.session.timeout", 1000); - - configuration.setInt("timeout", 1000); - - //pool = new HTablePool(configuration, 10); - try { - connection = HConnectionManager.createConnection(configuration); - } catch (Exception e) { - e.printStackTrace(); - return; - } - - try { - // table = new HTable(configuration, config.getTable()); - // table = (HTable) pool.getTable(config.getTable()); - table = new HTable(configuration, config.getTable().getBytes()); - table.setAutoFlush(true); - } catch (Exception e) { - e.printStackTrace(); - return; - } - // - - try { - descriptor = table.getTableDescriptor(); - } catch (Exception e) { - e.printStackTrace(); - return; - } - - try - { - LOGGER.info("Table : {}", descriptor); - } - catch (Exception e) - { - LOGGER.error("There was an error connecting to HBase, please check your settings and try again"); - e.printStackTrace(); - } + try { + descriptor = table.getTableDescriptor(); + } catch (Exception ex) { + ex.printStackTrace(); + return; } - @Override - public String getId() { - return STREAMS_ID; + try { + LOGGER.info("Table : {}", descriptor); + } catch (Exception ex) { + LOGGER.error("There was an error connecting to HBase, please check your settings and try again"); + ex.printStackTrace(); } - - @Override - public void write(StreamsDatum streamsDatum) { - - ObjectNode node; - Put put = new Put(); - if( streamsDatum.getDocument() instanceof String ) { - try { - node = mapper.readValue((String)streamsDatum.getDocument(), ObjectNode.class); - } catch (IOException e) { - e.printStackTrace(); - LOGGER.warn("Invalid json: {}", streamsDatum.getDocument().toString()); - return; - } - put.setId(GuidUtils.generateGuid(node.toString())); - try { - byte[] value = node.binaryValue(); - put.add(config.getFamily().getBytes(), config.getQualifier().getBytes(), value); - } catch (IOException e) { - e.printStackTrace(); - LOGGER.warn("Failure adding object: {}", streamsDatum.getDocument().toString()); - return; - } - } else { - try { - node = mapper.valueToTree(streamsDatum.getDocument()); - } catch (Exception e) { - e.printStackTrace(); - LOGGER.warn("Invalid json: {}", streamsDatum.getDocument().toString()); - return; - } - put.setId(GuidUtils.generateGuid(node.toString())); - try { - byte[] value = node.binaryValue(); - put.add(config.getFamily().getBytes(), config.getQualifier().getBytes(), value); - } catch (IOException e) { - e.printStackTrace(); - LOGGER.warn("Failure preparing put: {}", streamsDatum.getDocument().toString()); - return; - } - } - try { - table.put(put); - } catch (IOException e) { - e.printStackTrace(); - LOGGER.warn("Failure executin put: {}", streamsDatum.getDocument().toString()); - } - + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void write(StreamsDatum streamsDatum) { + + ObjectNode node; + Put put = new Put(); + if ( streamsDatum.getDocument() instanceof String ) { + try { + node = mapper.readValue((String)streamsDatum.getDocument(), ObjectNode.class); + } catch (IOException ex) { + ex.printStackTrace(); + LOGGER.warn("Invalid json: {}", streamsDatum.getDocument().toString()); + return; + } + put.setId(GuidUtils.generateGuid(node.toString())); + try { + byte[] value = node.binaryValue(); + put.add(config.getFamily().getBytes(), config.getQualifier().getBytes(), value); + } catch (IOException ex) { + ex.printStackTrace(); + LOGGER.warn("Failure adding object: {}", streamsDatum.getDocument().toString()); + return; + } + } else { + try { + node = mapper.valueToTree(streamsDatum.getDocument()); + } catch (Exception ex) { + ex.printStackTrace(); + LOGGER.warn("Invalid json: {}", streamsDatum.getDocument().toString()); + return; + } + put.setId(GuidUtils.generateGuid(node.toString())); + try { + byte[] value = node.binaryValue(); + put.add(config.getFamily().getBytes(), config.getQualifier().getBytes(), value); + } catch (IOException ex) { + ex.printStackTrace(); + LOGGER.warn("Failure preparing put: {}", streamsDatum.getDocument().toString()); + return; + } } - - public void flush() throws IOException - { - table.flushCommits(); + try { + table.put(put); + } catch (IOException ex) { + ex.printStackTrace(); + LOGGER.warn("Failure executin put: {}", streamsDatum.getDocument().toString()); } - public synchronized void close() throws IOException - { - table.close(); - } + } + + public void flush() throws IOException { + table.flushCommits(); + } - @Override - public void prepare(Object configurationObject) { + public synchronized void close() throws IOException { + table.close(); + } - connectToHbase(); + @Override + public void prepare(Object configurationObject) { - Thread task = new Thread(new HbasePersistWriterTask(this)); - task.start(); + connectToHbase(); - try { - task.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - } + Thread task = new Thread(new HbasePersistWriterTask(this)); + task.start(); + try { + task.join(); + } catch (InterruptedException ex) { + ex.printStackTrace(); } - @Override - public void cleanUp() { + } - try { - flush(); - } catch (IOException e) { - e.printStackTrace(); - } - try { - close(); - } catch (IOException e) { - e.printStackTrace(); - } + @Override + public void cleanUp() { + try { + flush(); + } catch (IOException ex) { + ex.printStackTrace(); } + try { + close(); + } catch (IOException ex) { + ex.printStackTrace(); + } + + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java index 19a398d..eef7004 100644 --- a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java +++ b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java @@ -19,38 +19,45 @@ package org.apache.streams.hbase; import org.apache.streams.core.StreamsDatum; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Random; +/** + * HbasePersistWriterTask writes to hbase on behalf of + * @see org.apache.streams.hbase.HbasePersistWriter + */ public class HbasePersistWriterTask implements Runnable { - private static final Logger LOGGER = LoggerFactory.getLogger(HbasePersistWriterTask.class); + private static final Logger LOGGER = LoggerFactory.getLogger(HbasePersistWriterTask.class); - private HbasePersistWriter writer; + private HbasePersistWriter writer; - public HbasePersistWriterTask(HbasePersistWriter writer) { - this.writer = writer; - } + public HbasePersistWriterTask(HbasePersistWriter writer) { + this.writer = writer; + } - @Override - public void run() { - - while(true) { - if( writer.persistQueue.peek() != null ) { - try { - StreamsDatum entry = writer.persistQueue.remove(); - writer.write(entry); - } catch (Exception e) { - e.printStackTrace(); - } - } - try { - Thread.sleep(new Random().nextInt(1)); - } catch (InterruptedException ignored) {} - } + @Override + public void run() { + while (true) { + if ( writer.persistQueue.peek() != null ) { + try { + StreamsDatum entry = writer.persistQueue.remove(); + writer.write(entry); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + try { + Thread.sleep(new Random().nextInt(1)); + } catch (InterruptedException ignored) { + LOGGER.trace("ignored InterruptedException", ignored); + } } + } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConstants.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConstants.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConstants.java index e96cc8d..b6660ab 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConstants.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConstants.java @@ -19,13 +19,13 @@ package org.apache.streams.hdfs; /** - * Predefined field symbols for streams-persist-hdfs + * Predefined field symbols for streams-persist-hdfs. */ public class HdfsConstants { - protected static final String ID = "ID"; - protected static final String TS = "TS"; - protected static final String META = "META"; - protected static final String DOC = "DOC"; + protected static final String ID = "ID"; + protected static final String TS = "TS"; + protected static final String META = "META"; + protected static final String DOC = "DOC"; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java index 24c9737..d254abb 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java @@ -18,16 +18,6 @@ package org.apache.streams.hdfs; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.collect.Queues; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfiguration; import org.apache.streams.config.StreamsConfigurator; @@ -38,6 +28,18 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistReader; import org.apache.streams.core.StreamsResultSet; import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Queues; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,220 +58,244 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; /** - * Created by sblackmon on 2/28/14. + * WebHdfsPersistReader reads from hdfs. */ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCountable { - public final static String STREAMS_ID = "WebHdfsPersistReader"; - - private final static Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistReader.class); - - protected final static char DELIMITER = '\t'; - - protected FileSystem client; - protected Path path; - protected FileStatus[] status; - - protected volatile Queue<StreamsDatum> persistQueue; - - protected ObjectMapper mapper; - protected LineReadWriteUtil lineReaderUtil; - - protected HdfsReaderConfiguration hdfsConfiguration; - protected StreamsConfiguration streamsConfiguration; - - private ExecutorService executor; - - protected DatumStatusCounter countersTotal = new DatumStatusCounter(); - protected DatumStatusCounter countersCurrent = new DatumStatusCounter(); - private Future<?> task; - - public WebHdfsPersistReader() { - this(new ComponentConfigurator<>(HdfsReaderConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("hdfs"))); + public static final String STREAMS_ID = "WebHdfsPersistReader"; + + private static final Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistReader.class); + + protected static final char DELIMITER = '\t'; + + protected FileSystem client; + protected Path path; + protected FileStatus[] status; + + protected volatile Queue<StreamsDatum> persistQueue; + + protected ObjectMapper mapper; + protected LineReadWriteUtil lineReaderUtil; + + protected HdfsReaderConfiguration hdfsConfiguration; + protected StreamsConfiguration streamsConfiguration; + + private ExecutorService executor; + + protected DatumStatusCounter countersTotal = new DatumStatusCounter(); + protected DatumStatusCounter countersCurrent = new DatumStatusCounter(); + private Future<?> task; + + /** + * WebHdfsPersistReader constructor - resolves HdfsReaderConfiguration from JVM 'hdfs'. + */ + public WebHdfsPersistReader() { + this(new ComponentConfigurator<>(HdfsReaderConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("hdfs"))); + } + + /** + * WebHdfsPersistReader constructor - uses supplied HdfsReaderConfiguration. + * @param hdfsConfiguration hdfsConfiguration + */ + public WebHdfsPersistReader(HdfsReaderConfiguration hdfsConfiguration) { + this.hdfsConfiguration = hdfsConfiguration; + } + + /** + * getURI from hdfsConfiguration. + * @return URI + * @throws URISyntaxException URISyntaxException + */ + public URI getURI() throws URISyntaxException { + StringBuilder uriBuilder = new StringBuilder(); + uriBuilder.append(hdfsConfiguration.getScheme()); + uriBuilder.append("://"); + if ( !Strings.isNullOrEmpty(hdfsConfiguration.getHost())) { + uriBuilder.append(hdfsConfiguration.getHost()); + if (hdfsConfiguration.getPort() != null) { + uriBuilder.append(":" + hdfsConfiguration.getPort()); + } + } else { + uriBuilder.append("/"); } - - public WebHdfsPersistReader(HdfsReaderConfiguration hdfsConfiguration) { - this.hdfsConfiguration = hdfsConfiguration; - } - - public URI getURI() throws URISyntaxException { - StringBuilder uriBuilder = new StringBuilder(); - uriBuilder.append(hdfsConfiguration.getScheme()); - uriBuilder.append("://"); - if( !Strings.isNullOrEmpty(hdfsConfiguration.getHost())) { - uriBuilder.append(hdfsConfiguration.getHost()); - if (hdfsConfiguration.getPort() != null) - uriBuilder.append(":" + hdfsConfiguration.getPort()); - } else { - uriBuilder.append("/"); - } - return new URI(uriBuilder.toString()); + return new URI(uriBuilder.toString()); + } + + /** + * isConnected. + * @return true if connected, false otherwise + */ + public boolean isConnected() { + return (client != null); + } + + /** + * getFileSystem. + * @return FileSystem + */ + public final synchronized FileSystem getFileSystem() { + // Check to see if we are connected. + if (!isConnected()) { + connectToWebHDFS(); } - - public boolean isConnected() { return (client != null); } - - public final synchronized FileSystem getFileSystem() - { - // Check to see if we are connected. - if(!isConnected()) - connectToWebHDFS(); - return this.client; - } - - private synchronized void connectToWebHDFS() - { - try - { - LOGGER.info("User : {}", this.hdfsConfiguration.getUser()); - UserGroupInformation ugi = UserGroupInformation.createRemoteUser(this.hdfsConfiguration.getUser()); - ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE); - - ugi.doAs(new PrivilegedExceptionAction<Void>() { - public Void run() throws Exception { - Configuration conf = new Configuration(); - conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); - conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); - conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); - LOGGER.info("WebURI : {}", getURI().toString()); - client = FileSystem.get(getURI(), conf); - LOGGER.info("Connected to WebHDFS"); - - /* - * ************************************************************************************************ - * This code is an example of how you would work with HDFS and you weren't going over - * the webHDFS protocol. - * - * Smashew: 2013-10-01 - * ************************************************************************************************ - conf.set("fs.defaultFS", "hdfs://hadoop.mdigitallife.com:8020/user/" + userName); - conf.set("namenode.host","0.0.0.0"); - conf.set("hadoop.job.ugi", userName); - conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, "runner"); - fileSystem.createNewFile(new Path("/user/"+ userName + "/test")); - FileStatus[] status = fs.listStatus(new Path("/user/" + userName)); - for(int i=0;i<status.length;i++) - { - LOGGER.info("Directory: {}", status[i].getPath()); - } - */ - return null; - } - }); + return this.client; + } + + // TODO: combine with WebHdfsPersistReader.connectToWebHDFS + private synchronized void connectToWebHDFS() { + try { + LOGGER.info("User : {}", this.hdfsConfiguration.getUser()); + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(this.hdfsConfiguration.getUser()); + ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE); + + ugi.doAs(new PrivilegedExceptionAction<Void>() { + public Void run() throws Exception { + Configuration conf = new Configuration(); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + LOGGER.info("WebURI : {}", getURI().toString()); + client = FileSystem.get(getURI(), conf); + LOGGER.info("Connected to WebHDFS"); + + /* + * ************************************************************************************************ + * This code is an example of how you would work with HDFS and you weren't going over + * the webHDFS protocol. + * + * Smashew: 2013-10-01 + * ************************************************************************************************ + conf.set("fs.defaultFS", "hdfs://hadoop.mdigitallife.com:8020/user/" + userName); + conf.set("namenode.host","0.0.0.0"); + conf.set("hadoop.job.ugi", userName); + conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, "runner"); + fileSystem.createNewFile(new Path("/user/"+ userName + "/test")); + FileStatus[] status = fs.listStatus(new Path("/user/" + userName)); + for(int i=0;i<status.length;i++) + { + LOGGER.info("Directory: {}", status[i].getPath()); + } + */ + return null; } - catch (Exception e) - { - LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again"); - e.printStackTrace(); - } - } - - @Override - public String getId() { - return STREAMS_ID; - } - - @Override - public void prepare(Object configurationObject) { - LOGGER.debug("Prepare"); - lineReaderUtil = LineReadWriteUtil.getInstance(hdfsConfiguration); - connectToWebHDFS(); - String pathString = hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getReaderPath(); - LOGGER.info("Path : {}", pathString); - path = new Path(pathString); - try { - if( client.isFile(path)) { - LOGGER.info("Found File"); - FileStatus fileStatus = client.getFileStatus(path); - status = new FileStatus[1]; - status[0] = fileStatus; - } else if( client.isDirectory(path)){ - status = client.listStatus(path); - List<FileStatus> statusList = Lists.newArrayList(status); - Collections.sort(statusList); - status = statusList.toArray(new FileStatus[0]); - LOGGER.info("Found Directory : {} files", status.length); - } else { - LOGGER.error("Neither file nor directory, wtf"); - } - } catch (IOException e) { - LOGGER.error("IOException", e); - } - streamsConfiguration = StreamsConfigurator.detectConfiguration(); - persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(streamsConfiguration.getBatchSize().intValue())); - //persistQueue = Queues.synchronizedQueue(new ConcurrentLinkedQueue()); - executor = Executors.newSingleThreadExecutor(); - mapper = StreamsJacksonMapper.getInstance(); - } - - @Override - public void cleanUp() { - + }); + } catch (Exception ex) { + LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again"); + ex.printStackTrace(); } - - @Override - public StreamsResultSet readAll() { - WebHdfsPersistReaderTask readerTask = new WebHdfsPersistReaderTask(this); - Thread readerThread = new Thread(readerTask); - readerThread.start(); - try { - readerThread.join(); - } catch (InterruptedException e) {} - return new StreamsResultSet(persistQueue); + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void prepare(Object configurationObject) { + LOGGER.debug("Prepare"); + lineReaderUtil = LineReadWriteUtil.getInstance(hdfsConfiguration); + connectToWebHDFS(); + String pathString = hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getReaderPath(); + LOGGER.info("Path : {}", pathString); + path = new Path(pathString); + try { + if ( client.isFile(path)) { + LOGGER.info("Found File"); + FileStatus fileStatus = client.getFileStatus(path); + status = new FileStatus[1]; + status[0] = fileStatus; + } else if ( client.isDirectory(path)) { + status = client.listStatus(path); + List<FileStatus> statusList = Lists.newArrayList(status); + Collections.sort(statusList); + status = statusList.toArray(new FileStatus[0]); + LOGGER.info("Found Directory : {} files", status.length); + } else { + LOGGER.error("Neither file nor directory, wtf"); + } + } catch (IOException ex) { + LOGGER.error("IOException", ex); } - - @Override - public void startStream() { - LOGGER.debug("startStream"); - task = executor.submit(new WebHdfsPersistReaderTask(this)); + streamsConfiguration = StreamsConfigurator.detectConfiguration(); + persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(streamsConfiguration.getBatchSize().intValue())); + //persistQueue = Queues.synchronizedQueue(new ConcurrentLinkedQueue()); + executor = Executors.newSingleThreadExecutor(); + mapper = StreamsJacksonMapper.getInstance(); + } + + @Override + public void cleanUp() { + + } + + @Override + public StreamsResultSet readAll() { + WebHdfsPersistReaderTask readerTask = new WebHdfsPersistReaderTask(this); + Thread readerThread = new Thread(readerTask); + readerThread.start(); + try { + readerThread.join(); + } catch (InterruptedException ignored) { + LOGGER.trace("ignored InterruptedException", ignored); } - - @Override - public StreamsResultSet readCurrent() { - - StreamsResultSet current; - - synchronized( WebHdfsPersistReader.class ) { - current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue)); - current.setCounter(new DatumStatusCounter()); - current.getCounter().add(countersCurrent); - countersTotal.add(countersCurrent); - countersCurrent = new DatumStatusCounter(); - persistQueue.clear(); - } - - return current; + return new StreamsResultSet(persistQueue); + } + + @Override + public void startStream() { + LOGGER.debug("startStream"); + task = executor.submit(new WebHdfsPersistReaderTask(this)); + } + + @Override + public StreamsResultSet readCurrent() { + + StreamsResultSet current; + + synchronized ( WebHdfsPersistReader.class ) { + current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue)); + current.setCounter(new DatumStatusCounter()); + current.getCounter().add(countersCurrent); + countersTotal.add(countersCurrent); + countersCurrent = new DatumStatusCounter(); + persistQueue.clear(); } - protected void write( StreamsDatum entry ) { - boolean success; - do { - synchronized( WebHdfsPersistReader.class ) { - success = persistQueue.offer(entry); - } - Thread.yield(); - } - while( !success ); - } + return current; + } - @Override - public StreamsResultSet readNew(BigInteger sequence) { - return null; + protected void write( StreamsDatum entry ) { + boolean success; + do { + synchronized ( WebHdfsPersistReader.class ) { + success = persistQueue.offer(entry); + } + Thread.yield(); } - - @Override - public StreamsResultSet readRange(DateTime start, DateTime end) { - return null; + while ( !success ); + } + + @Override + public StreamsResultSet readNew(BigInteger sequence) { + return null; + } + + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + return null; + } + + @Override + public boolean isRunning() { + if ( task != null) { + return !task.isDone() && !task.isCancelled(); + } else { + return true; } + } - @Override - public boolean isRunning() { - if( task != null) - return !task.isDone() && !task.isCancelled(); - else return true; - } - - @Override - public DatumStatusCounter getDatumStatusCounter() { - return countersTotal; - } + @Override + public DatumStatusCounter getDatumStatusCounter() { + return countersTotal; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java index c5c1ffe..5bff080 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java @@ -18,98 +18,100 @@ package org.apache.streams.hdfs; -import com.fasterxml.jackson.databind.JsonNode; +import org.apache.streams.core.DatumStatus; +import org.apache.streams.core.StreamsDatum; + import com.google.common.base.Strings; -import com.google.common.collect.Maps; import com.google.common.util.concurrent.Uninterruptibles; + import org.apache.hadoop.fs.FileStatus; -import org.apache.streams.converter.LineReadWriteUtil; -import org.apache.streams.core.DatumStatus; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; -import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.zip.GZIPInputStream; +/** + * WebHdfsPersistReaderTask reads from hdfs on behalf of + * @see org.apache.streams.hdfs.WebHdfsPersistReader + */ public class WebHdfsPersistReaderTask implements Runnable { - private static final Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistReaderTask.class); + private static final Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistReaderTask.class); - private WebHdfsPersistReader reader; + private WebHdfsPersistReader reader; - public WebHdfsPersistReaderTask(WebHdfsPersistReader reader) { - this.reader = reader; - } + public WebHdfsPersistReaderTask(WebHdfsPersistReader reader) { + this.reader = reader; + } + + @Override + public void run() { - @Override - public void run() { + LOGGER.info("WebHdfsPersistReaderTask: files to process"); - LOGGER.info("WebHdfsPersistReaderTask: files to process"); + for ( FileStatus fileStatus : reader.status ) { + LOGGER.info(" " + fileStatus.getPath().getName()); + } - for( FileStatus fileStatus : reader.status ) { - LOGGER.info(" " + fileStatus.getPath().getName()); + for ( FileStatus fileStatus : reader.status ) { + InputStream inputStream; + InputStreamReader inputStreamReader; + BufferedReader bufferedReader; + if ( fileStatus.isFile() && !fileStatus.getPath().getName().startsWith("_")) { + HdfsWriterConfiguration.Compression compression = HdfsWriterConfiguration.Compression.NONE; + if ( fileStatus.getPath().getName().endsWith(".gz")) { + compression = HdfsWriterConfiguration.Compression.GZIP; + } + LOGGER.info("Started Processing: {} Encoding: {} Compression: {}", fileStatus.getPath().getName(), reader.hdfsConfiguration.getEncoding(), compression.toString()); + try { + inputStream = reader.client.open(fileStatus.getPath()); + if ( compression.equals(HdfsWriterConfiguration.Compression.GZIP)) { + inputStream = new GZIPInputStream(inputStream); + } + inputStreamReader = new InputStreamReader(inputStream, reader.hdfsConfiguration.getEncoding()); + bufferedReader = new BufferedReader(inputStreamReader); + } catch (Exception ex) { + LOGGER.error("Exception Opening " + fileStatus.getPath(), ex.getMessage()); + return; } - for( FileStatus fileStatus : reader.status ) { - InputStream inputStream; - InputStreamReader inputStreamReader; - BufferedReader bufferedReader; - if( fileStatus.isFile() && !fileStatus.getPath().getName().startsWith("_")) { - HdfsWriterConfiguration.Compression compression = HdfsWriterConfiguration.Compression.NONE; - if( fileStatus.getPath().getName().endsWith(".gz")) - compression = HdfsWriterConfiguration.Compression.GZIP; - LOGGER.info("Started Processing: {} Encoding: {} Compression: {}", fileStatus.getPath().getName(), reader.hdfsConfiguration.getEncoding(), compression.toString()); - try { - inputStream = reader.client.open(fileStatus.getPath()); - if( compression.equals(HdfsWriterConfiguration.Compression.GZIP)) - inputStream = new GZIPInputStream(inputStream); - inputStreamReader = new InputStreamReader(inputStream, reader.hdfsConfiguration.getEncoding()); - bufferedReader = new BufferedReader(inputStreamReader); - } catch (Exception e) { - LOGGER.error("Exception Opening " + fileStatus.getPath(), e.getMessage()); - return; - } - - String line = ""; - do{ - try { - line = bufferedReader.readLine(); - if( !Strings.isNullOrEmpty(line) ) { - reader.countersCurrent.incrementAttempt(); - StreamsDatum entry = reader.lineReaderUtil.processLine(line); - if( entry != null ) { - reader.write(entry); - reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS); - } else { - LOGGER.warn("processLine failed"); - reader.countersCurrent.incrementStatus(DatumStatus.FAIL); - } - } - } catch (Exception e) { - LOGGER.warn("WebHdfsPersistReader readLine Exception: {}", e); - reader.countersCurrent.incrementStatus(DatumStatus.FAIL); - } - } while( !Strings.isNullOrEmpty(line) ); - LOGGER.info("Finished Processing " + fileStatus.getPath().getName()); - try { - bufferedReader.close(); - } catch (Exception e) { - LOGGER.error("WebHdfsPersistReader Exception: {}", e); - } + String line = ""; + do { + try { + line = bufferedReader.readLine(); + if ( !Strings.isNullOrEmpty(line) ) { + reader.countersCurrent.incrementAttempt(); + StreamsDatum entry = reader.lineReaderUtil.processLine(line); + if ( entry != null ) { + reader.write(entry); + reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS); + } else { + LOGGER.warn("processLine failed"); + reader.countersCurrent.incrementStatus(DatumStatus.FAIL); + } } + } catch (Exception ex) { + LOGGER.warn("WebHdfsPersistReader readLine Exception: {}", ex); + reader.countersCurrent.incrementStatus(DatumStatus.FAIL); + } } + while ( !Strings.isNullOrEmpty(line) ); + LOGGER.info("Finished Processing " + fileStatus.getPath().getName()); + try { + bufferedReader.close(); + } catch (Exception ex) { + LOGGER.error("WebHdfsPersistReader Exception: {}", ex); + } + } + } - LOGGER.info("WebHdfsPersistReaderTask Finished"); + LOGGER.info("WebHdfsPersistReaderTask Finished"); - Uninterruptibles.sleepUninterruptibly(15, TimeUnit.SECONDS); - } + Uninterruptibles.sleepUninterruptibly(15, TimeUnit.SECONDS); + } }
