key classes and interfaces for STREAMS-218

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4d41eac0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4d41eac0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4d41eac0

Branch: refs/heads/master
Commit: 4d41eac017a56337bac304616da68a73cd81946f
Parents: 5b04248
Author: sblackmon <[email protected]>
Authored: Tue Dec 2 10:16:19 2014 -0600
Committer: sblackmon <[email protected]>
Committed: Tue Dec 2 10:16:19 2014 -0600

----------------------------------------------------------------------
 .../converter/ActivityConverterProcessor.java   | 242 +++++++++++++++++++
 .../converter/BaseDocumentClassifier.java       |  64 +++++
 .../BaseObjectNodeActivityConverter.java        |  74 ++++++
 .../converter/BaseStringActivityConverter.java  |  74 ++++++
 ...ActivityConverterProcessorConfiguration.json |  23 ++
 .../test/ActivityConverterProcessorTest.java    |  90 +++++++
 .../apache/streams/core/util/DatumUtils.java    |  12 +
 .../apache/streams/data/ActivityConverter.java  |  87 +++++++
 .../apache/streams/data/DocumentClassifier.java |  39 +++
 .../exceptions/ActivityConversionException.java |  42 ++++
 10 files changed, 747 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4d41eac0/streams-components/streams-converters/src/main/java/org/apache/streams/converter/ActivityConverterProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/ActivityConverterProcessor.java
 
b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/ActivityConverterProcessor.java
new file mode 100644
index 0000000..ff09a86
--- /dev/null
+++ 
b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/ActivityConverterProcessor.java
@@ -0,0 +1,242 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.streams.converter;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.core.util.DatumUtils;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.data.DocumentClassifier;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.exceptions.ActivityConversionException;
+import org.apache.streams.pojo.json.Activity;
+import org.reflections.Reflections;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * ActivityConverterProcessor is a utility processor for converting any datum 
document
+ * to an Activity.
+ *
+ * By default it will handle string json and objectnode representation of 
existing Activities.
+ *
+ * Implementations can add DocumentClassifiers and ActivityConverterResolvers 
to the processor
+ * to ensure additional ActivityConverters will be resolved and applied.
+ *
+ * A DocumentClassifier's reponsibility is to recognize document formats and 
label them, using
+ * a jackson-compatible POJO class.
+ *
+ * An ActivityConverterResolver's reponsibility is to identify 
ActivityConverter implementations
+ * capable of converting a raw document associated with that POJO class into 
an activity.
+ *
+ */
+public class ActivityConverterProcessor implements StreamsProcessor {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(ActivityConverterProcessor.class);
+
+    private List<DocumentClassifier> classifiers;
+    private List<ActivityConverter> converters;
+
+    private ActivityConverterProcessorConfiguration configuration;
+
+    public ActivityConverterProcessor() {
+        this.classifiers = Lists.newArrayList();
+        this.converters = Lists.newArrayList();
+    }
+
+    public ActivityConverterProcessor(ActivityConverterProcessorConfiguration 
configuration) {
+        this.classifiers = Lists.newArrayList();
+        this.converters = Lists.newArrayList();
+        this.configuration = configuration;
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        List<StreamsDatum> result = Lists.newLinkedList();
+        Object document = entry.getDocument();
+
+        try {
+
+            // first determine which classes this document might actually be
+            List<Class> detectedClasses = detectClasses(document);
+
+            if( detectedClasses.size() == 0 ) {
+                LOGGER.warn("Unable to classify");
+                return null;
+            } else {
+                LOGGER.debug("Classified document as " + detectedClasses);
+            }
+
+            // for each of these classes:
+            //   use TypeUtil to switch the document to
+            Map<Class, Object> typedDocs = 
convertToDetectedClasses(detectedClasses, document);
+
+            if( typedDocs.size() == 0 ) {
+                LOGGER.warn("Unable to convert to any detected Class");
+                return result;
+            }
+            else {
+                LOGGER.debug("Document has " + typedDocs.size() + " 
representations: " + typedDocs.toString());
+            }
+
+            for( ActivityConverter converter : converters ) {
+
+                Object typedDoc = typedDocs.get(converter.requiredClass());
+
+                if( typedDoc != null ) {
+
+                    StreamsDatum datum = DatumUtils.cloneDatum(entry);
+
+                    List<Activity> activities = convertToActivity(converter, 
document);
+
+                    for( Activity activity : activities ) {
+
+                        if (activity != null) {
+
+                            if( ActivityUtil.isValid(activity)) {
+                                datum.setDocument(activity);
+                                datum.setId(activity.getId());
+                                result.add(datum);
+                            } else {
+                                
LOGGER.debug(converter.getClass().getCanonicalName() + " produced invalid 
Activity converting " + 
converter.requiredClass().getClass().getCanonicalName());
+                            }
+
+                        } else {
+                            
LOGGER.debug(converter.getClass().getCanonicalName() + " returned null 
converting " + converter.requiredClass().getClass().getCanonicalName() + " to 
Activity");
+                        }
+
+                    }
+
+                }
+
+            }
+
+        } catch( Exception e ) {
+            LOGGER.warn("Unable to fromActivity!  " + e.getMessage());
+            e.printStackTrace();
+        } finally {
+            return result;
+        }
+
+    }
+
+    protected List<Activity> convertToActivity(ActivityConverter converter, 
Object document) {
+
+        List<Activity> activities = Lists.newArrayList();
+        try {
+            activities = converter.toActivityList(document);
+        } catch (ActivityConversionException e1) {
+            LOGGER.debug(converter.getClass().getCanonicalName() + " unable to 
convert " + converter.requiredClass().getClass().getCanonicalName() + " to 
Activity");
+        }
+        return activities;
+
+    }
+
+    protected List<Class> detectClasses(Object document) {
+
+        Set<Class> detectedClasses = Sets.newConcurrentHashSet();
+        for( DocumentClassifier classifier : classifiers ) {
+            List<Class> detected = classifier.detectClasses(document);
+            if( detected != null && detected.size() > 0)
+                detectedClasses.addAll(detected);
+        }
+
+        return Lists.newArrayList(detectedClasses);
+    }
+
+    private Map<Class, Object> convertToDetectedClasses(List<Class> 
datumClasses, Object document) {
+
+        Map<Class, Object> convertedDocuments = Maps.newHashMap();
+        for( Class detectedClass : datumClasses ) {
+
+            Object typedDoc;
+            if (detectedClass.isInstance(document))
+                typedDoc = document;
+            else
+                typedDoc = TypeConverterUtil.convert(document, detectedClass);
+
+            if( typedDoc != null )
+                convertedDocuments.put(detectedClass, typedDoc);
+        }
+
+        return convertedDocuments;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+//        Preconditions.checkArgument(configurationObject instanceof 
ActivityConverterProcessorConfiguration);
+//        ActivityConverterProcessorConfiguration configuration = 
(ActivityConverterProcessorConfiguration) configurationObject;
+        Reflections reflections = new Reflections(new 
ConfigurationBuilder().setUrls(ClasspathHelper.forManifest()));
+        if (configuration.getClassifiers().size() > 0) {
+            for( DocumentClassifier classifier : 
configuration.getClassifiers()) {
+                try {
+                    this.classifiers.add(classifier);
+                } catch (Exception e) {
+                    LOGGER.warn("Exception adding " + classifier);
+                }
+            }
+        } else {
+            Set<Class<? extends DocumentClassifier>> classifierClasses = 
reflections.getSubTypesOf(DocumentClassifier.class);
+            for (Class classifierClass : classifierClasses) {
+                try {
+                    this.classifiers.add((DocumentClassifier) 
classifierClass.newInstance());
+                } catch (Exception e) {
+                    LOGGER.warn("Exception instantiating " + classifierClass);
+                }
+            }
+        }
+        Preconditions.checkArgument(this.classifiers.size() > 0);
+        if (configuration.getConverters().size() > 0) {
+            for( ActivityConverter converter : configuration.getConverters()) {
+                try {
+                    this.converters.add(converter);
+                } catch (Exception e) {
+                    LOGGER.warn("Exception adding " + converter);
+                }
+            }
+        } else {
+            Set<Class<? extends ActivityConverter>> converterClasses = 
reflections.getSubTypesOf(ActivityConverter.class);
+            for (Class converterClass : converterClasses) {
+                try {
+                    this.converters.add((ActivityConverter) 
converterClass.newInstance());
+                } catch (Exception e) {
+                    LOGGER.warn("Exception instantiating " + converterClass);
+                }
+            }
+        }
+        Preconditions.checkArgument(this.converters.size() > 0);
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+
+};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4d41eac0/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseDocumentClassifier.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseDocumentClassifier.java
 
b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseDocumentClassifier.java
new file mode 100644
index 0000000..ca22cd4
--- /dev/null
+++ 
b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseDocumentClassifier.java
@@ -0,0 +1,64 @@
+package org.apache.streams.converter;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.streams.data.DocumentClassifier;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * BaseDocumentClassifier is included by default in all
+ * @see {@link org.apache.streams.converter.ActivityConverterProcessor}
+ *
+ * Ensures generic String and ObjectNode documents can be converted to Activity
+ *
+ */
+public class BaseDocumentClassifier implements DocumentClassifier {
+
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public List<Class> detectClasses(Object document) {
+        Preconditions.checkArgument(
+                document instanceof String
+             || document instanceof ObjectNode);
+
+        Activity activity = null;
+        ObjectNode node = null;
+
+        List<Class> classes = Lists.newArrayList();
+        // Soon javax.validation will available in jackson
+        //   That will make this simpler and more powerful
+        if( document instanceof String ) {
+            classes.add(String.class);
+            try {
+                activity = this.mapper.readValue((String)document, 
Activity.class);
+                if(activity != null && ActivityUtil.isValid(activity))
+                    classes.add(Activity.class);
+            } catch (IOException e1) {
+                try {
+                    node = this.mapper.readValue((String)document, 
ObjectNode.class);
+                    classes.add(ObjectNode.class);
+                } catch (IOException e2) { }
+            }
+        } else if( document instanceof ObjectNode ){
+            classes.add(ObjectNode.class);
+            activity = this.mapper.convertValue((ObjectNode)document, 
Activity.class);
+            if(ActivityUtil.isValid(activity))
+                classes.add(Activity.class);
+        } else {
+            classes.add(document.getClass());
+        }
+
+        return classes;
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4d41eac0/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityConverter.java
 
b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityConverter.java
new file mode 100644
index 0000000..3bedb98
--- /dev/null
+++ 
b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityConverter.java
@@ -0,0 +1,74 @@
+package org.apache.streams.converter;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.exceptions.ActivityConversionException;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+
+import java.util.List;
+
+/**
+ * BaseObjectNodeActivityConverter is included by default in all
+ * @see {@link org.apache.streams.converter.ActivityConverterProcessor}
+ *
+ * Ensures generic ObjectNode representation of an Activity can be converted 
to Activity
+ *
+ */
+public class BaseObjectNodeActivityConverter implements 
ActivityConverter<ObjectNode> {
+
+    public static Class requiredClass = ObjectNode.class;
+
+    private ObjectMapper mapper = new StreamsJacksonMapper();
+
+    @Override
+    public Class requiredClass() {
+        return requiredClass;
+    }
+
+    @Override
+    public String serializationFormat() {
+        return null;
+    }
+
+    @Override
+    public ObjectNode fromActivity(Activity deserialized) throws 
ActivityConversionException {
+        try {
+           return mapper.convertValue(deserialized, ObjectNode.class);
+        } catch (Exception e) {
+            throw new ActivityConversionException();
+        }
+    }
+
+    @Override
+    public List<Activity> toActivityList(ObjectNode serialized) throws 
ActivityConversionException {
+        List<Activity> activityList = Lists.newArrayList();
+        try {
+            activityList.add(mapper.convertValue(serialized, Activity.class));
+        } catch (Exception e) {
+            throw new ActivityConversionException();
+        } finally {
+            return activityList;
+        }
+    }
+
+    @Override
+    public List<ObjectNode> fromActivityList(List<Activity> list) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public List<Activity> toActivityList(List<ObjectNode> list) {
+        List<Activity> result = Lists.newArrayList();
+        for( ObjectNode item : list ) {
+            try {
+                result.addAll(toActivityList(item));
+            } catch (ActivityConversionException e) {}
+        }
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4d41eac0/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityConverter.java
 
b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityConverter.java
new file mode 100644
index 0000000..55f7cc1
--- /dev/null
+++ 
b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityConverter.java
@@ -0,0 +1,74 @@
+package org.apache.streams.converter;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.exceptions.ActivityConversionException;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+
+import java.util.List;
+
+/**
+ * BaseObjectNodeActivityConverter is included by default in all
+ * @see {@link org.apache.streams.converter.ActivityConverterProcessor}
+ *
+ * Ensures generic String Json representation of an Activity can be converted 
to Activity
+ *
+ */
+public class BaseStringActivityConverter implements ActivityConverter<String> {
+
+    public static Class requiredClass = String.class;
+
+    private ObjectMapper mapper = new StreamsJacksonMapper();
+
+    @Override
+    public Class requiredClass() {
+        return requiredClass;
+    }
+
+    @Override
+    public String serializationFormat() {
+        return null;
+    }
+
+    @Override
+    public String fromActivity(Activity deserialized) throws 
ActivityConversionException {
+        try {
+            return mapper.writeValueAsString(deserialized);
+        } catch (JsonProcessingException e) {
+            throw new ActivityConversionException();
+        }
+    }
+
+    @Override
+    public List<Activity> toActivityList(String serialized) throws 
ActivityConversionException {
+        List<Activity> activityList = Lists.newArrayList();
+        try {
+            activityList.add(mapper.readValue(serialized, Activity.class));
+        } catch (Exception e) {
+            throw new ActivityConversionException();
+        } finally {
+            return activityList;
+        }
+    }
+
+    @Override
+    public List<String> fromActivityList(List<Activity> list) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public List<Activity> toActivityList(List<String> list) {
+        List<Activity> result = Lists.newArrayList();
+        for( String item : list ) {
+            try {
+                result.addAll(toActivityList(item));
+            } catch (ActivityConversionException e) {}
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4d41eac0/streams-components/streams-converters/src/main/jsonschema/org/apache/streams/converter/ActivityConverterProcessorConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-converters/src/main/jsonschema/org/apache/streams/converter/ActivityConverterProcessorConfiguration.json
 
b/streams-components/streams-converters/src/main/jsonschema/org/apache/streams/converter/ActivityConverterProcessorConfiguration.json
new file mode 100644
index 0000000..95d9dbd
--- /dev/null
+++ 
b/streams-components/streams-converters/src/main/jsonschema/org/apache/streams/converter/ActivityConverterProcessorConfiguration.json
@@ -0,0 +1,23 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema";,
+    "id": "#",
+    "javaType" : 
"org.apache.streams.converter.ActivityConverterProcessorConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "classifiers": {
+            "type": "array",
+            "items": {
+                "type": "object",
+                "javaType": "org.apache.streams.data.DocumentClassifier"
+            }
+        },
+        "converters": {
+            "type": "array",
+            "items": {
+                "type": "object",
+                "javaType": "org.apache.streams.data.ActivityConverter"
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4d41eac0/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/ActivityConverterProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/ActivityConverterProcessorTest.java
 
b/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/ActivityConverterProcessorTest.java
new file mode 100644
index 0000000..e15ba29
--- /dev/null
+++ 
b/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/ActivityConverterProcessorTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.converter.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.streams.converter.ActivityConverterProcessor;
+import org.apache.streams.converter.ActivityConverterProcessorConfiguration;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static junit.framework.Assert.*;
+
+/**
+ * Test for
+ * @see {@link org.apache.streams.converter.ActivityConverterProcessor}
+ */
+public class ActivityConverterProcessorTest {
+
+    private static final ObjectMapper mapper = new StreamsJacksonMapper();
+
+    private static final String ACTIVITY_JSON = 
"{\"id\":\"id\",\"published\":\"Tue Jan 17 21:21:46 Z 
2012\",\"verb\":\"post\",\"provider\":{\"id\":\"providerid\"}}";
+
+    ActivityConverterProcessor processor;
+
+    @Before
+    public void setup() {
+        processor = new ActivityConverterProcessor(new 
ActivityConverterProcessorConfiguration());
+        processor.prepare(new ActivityConverterProcessorConfiguration());
+    }
+
+    @Test
+    public void testBaseActivitySerializerProcessorInvalid() {
+        String INVALID_DOCUMENT = " 38Xs}";
+        StreamsDatum datum = new StreamsDatum(INVALID_DOCUMENT);
+        List<StreamsDatum> result = processor.process(datum);
+        assertNotNull(result);
+        assertEquals(0, result.size());
+    }
+
+    @Test
+    public void testActivityConverterProcessorString() {
+        StreamsDatum datum = new StreamsDatum(ACTIVITY_JSON);
+        List<StreamsDatum> result = processor.process(datum);
+        assertNotNull(result);
+        assertEquals(1, result.size());
+        StreamsDatum resultDatum = result.get(0);
+        assertNotNull(resultDatum);
+        assertNotNull(resultDatum.getDocument());
+        assertTrue(resultDatum.getDocument() instanceof Activity);
+        
assertTrue(((Activity)resultDatum.getDocument()).getVerb().equals("post"));
+    }
+
+    @Test
+    public void testBaseActivitySerializerProcessorObject() throws IOException 
{
+        ObjectNode OBJECT_DOCUMENT = mapper.readValue(ACTIVITY_JSON, 
ObjectNode.class);
+        StreamsDatum datum = new StreamsDatum(OBJECT_DOCUMENT);
+        List<StreamsDatum> result = processor.process(datum);
+        assertNotNull(result);
+        assertEquals(1, result.size());
+        StreamsDatum resultDatum = result.get(0);
+        assertNotNull(resultDatum);
+        assertNotNull(resultDatum.getDocument());
+        assertTrue(resultDatum.getDocument() instanceof Activity);
+        
assertTrue(((Activity)resultDatum.getDocument()).getVerb().equals("post"));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4d41eac0/streams-core/src/main/java/org/apache/streams/core/util/DatumUtils.java
----------------------------------------------------------------------
diff --git 
a/streams-core/src/main/java/org/apache/streams/core/util/DatumUtils.java 
b/streams-core/src/main/java/org/apache/streams/core/util/DatumUtils.java
index eedbb07..b751b2a 100644
--- a/streams-core/src/main/java/org/apache/streams/core/util/DatumUtils.java
+++ b/streams-core/src/main/java/org/apache/streams/core/util/DatumUtils.java
@@ -22,7 +22,10 @@ package org.apache.streams.core.util;
 import com.google.common.collect.Maps;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsOperation;
+import org.joda.time.DateTime;
 
+import java.math.BigInteger;
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -46,4 +49,13 @@ public class DatumUtils {
         Map<String, Throwable> errors = (Map)datum.getMetadata().get("errors");
         errors.put(operationClass.getCanonicalName(), e);
     }
+
+    public static StreamsDatum cloneDatum(StreamsDatum datum) {
+        StreamsDatum clone = new StreamsDatum(datum.getDocument());
+        clone.setId(datum.getId() == null ? null : new String(datum.getId()));
+        clone.setTimestamp(datum.getTimestamp() == null ? null : new 
DateTime(datum.getTimestamp()));
+        clone.setSequenceid(datum.getSequenceid() == null ? null : 
datum.getSequenceid());
+        clone.setMetadata(datum.getMetadata() == null ? null : new 
HashMap<>(datum.getMetadata()));
+        return clone;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4d41eac0/streams-pojo/src/main/java/org/apache/streams/data/ActivityConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-pojo/src/main/java/org/apache/streams/data/ActivityConverter.java 
b/streams-pojo/src/main/java/org/apache/streams/data/ActivityConverter.java
new file mode 100644
index 0000000..4d6d759
--- /dev/null
+++ b/streams-pojo/src/main/java/org/apache/streams/data/ActivityConverter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.data;
+
+import org.apache.streams.exceptions.ActivityConversionException;
+import org.apache.streams.pojo.json.Activity;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Converts non-Activity documents to Activities and back.
+ *
+ * Each converter may one, several, or zero activities.
+ *
+ * The recommended approach for deriving multiple activities from a source 
document is:
+ *
+ *   1) Return one activity for each occurance of a verb, from the same 
ActivityConverter, if the activities are of like type.
+ *
+ *      For example, BlogShareConverter would convert a blog containing two 
shares into two Activities with verb: share
+ *
+ *   2) Create multiple ActivityConverters, if the activities are not of like 
type.
+ *
+ *      For example, a blog post that is both a post and a share should be 
transformed by two seperate Converters, individually
+ *      or simultaneously applied.
+ */
+public interface ActivityConverter<T> extends Serializable {
+
+    /**
+     * What class does this ActivityConverter require?
+     *
+     * @return The class the ActivityConverter requires.  Should always return 
the templated class.
+     */
+    Class requiredClass();
+
+    /**
+     * Gets the supported content type that can be deserialized/serialized
+     *
+     * @return A string representing the format name.  Can be an IETF MIME 
type or other
+     */
+    String serializationFormat();
+
+    /**
+     * Converts the activity to a POJO representation.
+     *
+     * @param deserialized the string
+     * @return a fully populated Activity object
+     */
+    T fromActivity(Activity deserialized) throws ActivityConversionException;
+
+    /**
+     * Converts a POJO into one or more Activities
+     * @param serialized the string representation
+     * @return a fully populated Activity object
+     */
+    List<Activity> toActivityList(T serialized) throws 
ActivityConversionException;
+
+    /**
+     * Converts multiple Activities into a list of source documents
+     * @param list a typed List of documents
+     * @return a list of source documents
+     */
+    List<T> fromActivityList(List<Activity> list) throws 
ActivityConversionException;
+
+    /**
+     * Converts multiple documents into a list of Activity objects
+     * @param list a typed List of documents
+     * @return a list of fully populated activities
+     */
+    List<Activity> toActivityList(List<T> list) throws 
ActivityConversionException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4d41eac0/streams-pojo/src/main/java/org/apache/streams/data/DocumentClassifier.java
----------------------------------------------------------------------
diff --git 
a/streams-pojo/src/main/java/org/apache/streams/data/DocumentClassifier.java 
b/streams-pojo/src/main/java/org/apache/streams/data/DocumentClassifier.java
new file mode 100644
index 0000000..bcb14b7
--- /dev/null
+++ b/streams-pojo/src/main/java/org/apache/streams/data/DocumentClassifier.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.data;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * DocumentClassifier assists with ActivityConversion, by determining whether 
a document may be
+ * parseable into a POJO for which an ActivityConverter exists.
+ */
+public interface DocumentClassifier extends Serializable {
+
+    /**
+     * Assess the structure of the document, and identify whether the provided 
document is
+     * a structural match for one or more typed forms.
+     *
+     * @param document the document
+     * @return a serializable pojo class this document matches
+     */
+    List<Class> detectClasses(Object document);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4d41eac0/streams-pojo/src/main/java/org/apache/streams/exceptions/ActivityConversionException.java
----------------------------------------------------------------------
diff --git 
a/streams-pojo/src/main/java/org/apache/streams/exceptions/ActivityConversionException.java
 
b/streams-pojo/src/main/java/org/apache/streams/exceptions/ActivityConversionException.java
new file mode 100644
index 0000000..a03ec87
--- /dev/null
+++ 
b/streams-pojo/src/main/java/org/apache/streams/exceptions/ActivityConversionException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.exceptions;
+
+/**
+ *  ActivityConversionException is a typed exception appropriate when a valid 
Activity
+ *  cannot be created from a given document.
+ */
+public class ActivityConversionException extends Exception {
+
+    public ActivityConversionException() {
+    }
+
+    public ActivityConversionException(String message) {
+        super(message);
+    }
+
+    public ActivityConversionException(Throwable cause) {
+        super(cause);
+    }
+
+    public ActivityConversionException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

Reply via email to