http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseDocumentClassifier.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseDocumentClassifier.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseDocumentClassifier.java index d39c087..cdba150 100644 --- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseDocumentClassifier.java +++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseDocumentClassifier.java @@ -19,61 +19,72 @@ under the License. package org.apache.streams.converter; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.streams.data.DocumentClassifier; import org.apache.streams.data.util.ActivityUtil; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.ArrayList; import java.util.List; /** + * Ensures generic String and ObjectNode documents can be converted to Activity + * + * <p/> * BaseDocumentClassifier is included by default in all * @see org.apache.streams.converter.ActivityConverterProcessor * - * Ensures generic String and ObjectNode documents can be converted to Activity - * */ public class BaseDocumentClassifier implements DocumentClassifier { - private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - @Override - @SuppressWarnings("unchecked") - public List<Class> detectClasses(Object document) { - - Activity activity; - ObjectNode node = null; - - List<Class> classes = new ArrayList<>(); - // Soon javax.validation will available in jackson - // That will make this simpler and more powerful - if( document instanceof String ) { - classes.add(String.class); - try { - activity = this.mapper.readValue((String)document, Activity.class); - if(activity != null && ActivityUtil.isValid(activity)) - classes.add(Activity.class); - } catch (IOException e1) { - try { - node = this.mapper.readValue((String)document, ObjectNode.class); - classes.add(ObjectNode.class); - } catch (IOException ignored) { } - } - } else if( document instanceof ObjectNode ){ - classes.add(ObjectNode.class); - activity = this.mapper.convertValue(document, Activity.class); - if(ActivityUtil.isValid(activity)) - classes.add(Activity.class); - } else { - classes.add(document.getClass()); - } + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + private static final Logger LOGGER = LoggerFactory.getLogger(BaseDocumentClassifier.class); + + @Override + @SuppressWarnings("unchecked") + public List<Class> detectClasses(Object document) { - return classes; + Activity activity; + ObjectNode node = null; + List<Class> classes = new ArrayList<>(); + // Soon javax.validation will available in jackson + // That will make this simpler and more powerful + if ( document instanceof String ) { + classes.add(String.class); + try { + activity = this.mapper.readValue((String)document, Activity.class); + if (activity != null && ActivityUtil.isValid(activity)) { + classes.add(Activity.class); + } + } catch (IOException e1) { + try { + node = this.mapper.readValue((String)document, ObjectNode.class); + classes.add(ObjectNode.class); + } catch (IOException ignored) { + LOGGER.trace("ignoring ", ignored); + } + } + } else if ( document instanceof ObjectNode ) { + classes.add(ObjectNode.class); + activity = this.mapper.convertValue(document, Activity.class); + if (ActivityUtil.isValid(activity)) { + classes.add(Activity.class); + } + } else { + classes.add(document.getClass()); } + return classes; + + } + }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityConverter.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityConverter.java index b6cde29..cb61f0e 100644 --- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityConverter.java +++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityConverter.java @@ -19,75 +19,88 @@ under the License. package org.apache.streams.converter; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; -import org.apache.commons.lang.NotImplementedException; import org.apache.streams.data.ActivityConverter; import org.apache.streams.exceptions.ActivityConversionException; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; + +import org.apache.commons.lang.NotImplementedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.List; /** + * Ensures generic ObjectNode representation of an Activity can be converted to Activity + * + * <p/> * BaseObjectNodeActivityConverter is included by default in all * @see {@link org.apache.streams.converter.ActivityConverterProcessor} * - * Ensures generic ObjectNode representation of an Activity can be converted to Activity * */ public class BaseObjectNodeActivityConverter implements ActivityConverter<ObjectNode> { - public static Class requiredClass = ObjectNode.class; + private static final Logger LOGGER = LoggerFactory.getLogger(BaseObjectNodeActivityConverter.class); - private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + public static Class requiredClass = ObjectNode.class; - @Override - public Class requiredClass() { - return requiredClass; - } + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - @Override - public String serializationFormat() { - return null; - } + @Override + public Class requiredClass() { + return requiredClass; + } - @Override - public ObjectNode fromActivity(Activity deserialized) throws ActivityConversionException { - try { - return mapper.convertValue(deserialized, ObjectNode.class); - } catch (Exception e) { - throw new ActivityConversionException(); - } - } + @Override + public String serializationFormat() { + return null; + } - @Override - public List<Activity> toActivityList(ObjectNode serialized) throws ActivityConversionException { - List<Activity> activityList = Lists.newArrayList(); - try { - activityList.add(mapper.convertValue(serialized, Activity.class)); - } catch (Exception e) { - throw new ActivityConversionException(); - } finally { - return activityList; - } + @Override + public ObjectNode fromActivity(Activity deserialized) throws ActivityConversionException { + try { + return mapper.convertValue(deserialized, ObjectNode.class); + } catch (Exception ex) { + throw new ActivityConversionException(); } + } + + @Override + public List<ObjectNode> fromActivityList(List<Activity> list) { + throw new NotImplementedException(); + } - @Override - public List<ObjectNode> fromActivityList(List<Activity> list) { - throw new NotImplementedException(); + @Override + public List<Activity> toActivityList(ObjectNode serialized) throws ActivityConversionException { + List<Activity> activityList = Lists.newArrayList(); + try { + activityList.add(mapper.convertValue(serialized, Activity.class)); + } catch (Exception ex) { + throw new ActivityConversionException(); + } finally { + return activityList; } + } - @Override - public List<Activity> toActivityList(List<ObjectNode> list) { - List<Activity> result = Lists.newArrayList(); - for( ObjectNode item : list ) { - try { - result.addAll(toActivityList(item)); - } catch (ActivityConversionException e) {} - } - return result; + @Override + public List<Activity> toActivityList(List<ObjectNode> list) { + List<Activity> result = Lists.newArrayList(); + for ( ObjectNode item : list ) { + try { + result.addAll(toActivityList(item)); + } catch (ActivityConversionException ex) { + LOGGER.trace("ActivityConversionException", ex); + } } + return result; + } + + + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityObjectConverter.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityObjectConverter.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityObjectConverter.java index cb66414..a38585e 100644 --- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityObjectConverter.java +++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseObjectNodeActivityObjectConverter.java @@ -19,58 +19,54 @@ under the License. package org.apache.streams.converter; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; -import org.apache.commons.lang.NotImplementedException; -import org.apache.streams.data.ActivityConverter; import org.apache.streams.data.ActivityObjectConverter; import org.apache.streams.exceptions.ActivityConversionException; import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; -import java.util.List; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; /** + * Ensures generic ObjectNode representation of an Activity can be converted to Activity. + * + * <p/> * BaseObjectNodeActivityConverter is included by default in all * @see {@link ActivityConverterProcessor} * - * Ensures generic ObjectNode representation of an Activity can be converted to Activity - * */ public class BaseObjectNodeActivityObjectConverter implements ActivityObjectConverter<ObjectNode> { - public static Class requiredClass = ObjectNode.class; + public static Class requiredClass = ObjectNode.class; - private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - @Override - public Class requiredClass() { - return requiredClass; - } + @Override + public Class requiredClass() { + return requiredClass; + } - @Override - public String serializationFormat() { - return null; - } + @Override + public String serializationFormat() { + return null; + } - @Override - public ObjectNode fromActivityObject(ActivityObject deserialized) throws ActivityConversionException { - try { - return mapper.convertValue(deserialized, ObjectNode.class); - } catch (Exception e) { - throw new ActivityConversionException(); - } + @Override + public ObjectNode fromActivityObject(ActivityObject deserialized) throws ActivityConversionException { + try { + return mapper.convertValue(deserialized, ObjectNode.class); + } catch (Exception ex) { + throw new ActivityConversionException(); } + } - @Override - public ActivityObject toActivityObject(ObjectNode serialized) throws ActivityConversionException { - try { - return mapper.convertValue(serialized, ActivityObject.class); - } catch (Exception e) { - throw new ActivityConversionException(); - } + @Override + public ActivityObject toActivityObject(ObjectNode serialized) throws ActivityConversionException { + try { + return mapper.convertValue(serialized, ActivityObject.class); + } catch (Exception ex) { + throw new ActivityConversionException(); } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityConverter.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityConverter.java index 7438abb..da15dee 100644 --- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityConverter.java +++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityConverter.java @@ -19,75 +19,86 @@ under the License. package org.apache.streams.converter; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; -import org.apache.commons.lang.NotImplementedException; import org.apache.streams.data.ActivityConverter; import org.apache.streams.exceptions.ActivityConversionException; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; + +import org.apache.commons.lang.NotImplementedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.List; /** + * Ensures generic String Json representation of an Activity can be converted to Activity + * + * <p/> * BaseObjectNodeActivityConverter is included by default in all * @see {@link org.apache.streams.converter.ActivityConverterProcessor} * - * Ensures generic String Json representation of an Activity can be converted to Activity - * */ public class BaseStringActivityConverter implements ActivityConverter<String> { - public static Class requiredClass = String.class; + private static final Logger LOGGER = LoggerFactory.getLogger(BaseObjectNodeActivityConverter.class); - private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + public static final Class requiredClass = String.class; - @Override - public Class requiredClass() { - return requiredClass; - } + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - @Override - public String serializationFormat() { - return null; - } + @Override + public Class requiredClass() { + return requiredClass; + } - @Override - public String fromActivity(Activity deserialized) throws ActivityConversionException { - try { - return mapper.writeValueAsString(deserialized); - } catch (JsonProcessingException e) { - throw new ActivityConversionException(); - } - } + @Override + public String serializationFormat() { + return null; + } - @Override - public List<Activity> toActivityList(String serialized) throws ActivityConversionException { - List<Activity> activityList = Lists.newArrayList(); - try { - activityList.add(mapper.readValue(serialized, Activity.class)); - } catch (Exception e) { - throw new ActivityConversionException(); - } finally { - return activityList; - } + @Override + public String fromActivity(Activity deserialized) throws ActivityConversionException { + try { + return mapper.writeValueAsString(deserialized); + } catch (JsonProcessingException ex) { + throw new ActivityConversionException(); } + } + + @Override + public List<String> fromActivityList(List<Activity> list) { + throw new NotImplementedException(); + } - @Override - public List<String> fromActivityList(List<Activity> list) { - throw new NotImplementedException(); + @Override + public List<Activity> toActivityList(String serialized) throws ActivityConversionException { + List<Activity> activityList = Lists.newArrayList(); + try { + activityList.add(mapper.readValue(serialized, Activity.class)); + } catch (Exception ex) { + throw new ActivityConversionException(); + } finally { + return activityList; } + } - @Override - public List<Activity> toActivityList(List<String> list) { - List<Activity> result = Lists.newArrayList(); - for( String item : list ) { - try { - result.addAll(toActivityList(item)); - } catch (ActivityConversionException e) {} - } - return result; + @Override + public List<Activity> toActivityList(List<String> list) { + List<Activity> result = Lists.newArrayList(); + for ( String item : list ) { + try { + result.addAll(toActivityList(item)); + } catch (ActivityConversionException ex) { + LOGGER.trace("ActivityConversionException", ex); + } } + return result; + } + + + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityObjectConverter.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityObjectConverter.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityObjectConverter.java index 3bbbdac..7322fc1 100644 --- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityObjectConverter.java +++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/BaseStringActivityObjectConverter.java @@ -19,52 +19,53 @@ under the License. package org.apache.streams.converter; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.streams.data.ActivityObjectConverter; import org.apache.streams.exceptions.ActivityConversionException; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.ActivityObject; +import com.fasterxml.jackson.databind.ObjectMapper; + /** + * Ensures generic ObjectNode representation of an Activity can be converted to Activity. + * + * <p/> * BaseObjectNodeActivityConverter is included by default in all * @see {@link ActivityConverterProcessor} * - * Ensures generic ObjectNode representation of an Activity can be converted to Activity - * */ public class BaseStringActivityObjectConverter implements ActivityObjectConverter<String> { - public static Class requiredClass = String.class; + public static Class requiredClass = String.class; - private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - @Override - public Class requiredClass() { - return requiredClass; - } + @Override + public Class requiredClass() { + return requiredClass; + } - @Override - public String serializationFormat() { - return null; - } + @Override + public String serializationFormat() { + return null; + } - @Override - public String fromActivityObject(ActivityObject deserialized) throws ActivityConversionException { - try { - return mapper.writeValueAsString(deserialized); - } catch (Exception e) { - throw new ActivityConversionException(); - } + @Override + public String fromActivityObject(ActivityObject deserialized) throws ActivityConversionException { + try { + return mapper.writeValueAsString(deserialized); + } catch (Exception ex) { + throw new ActivityConversionException(); } + } - @Override - public ActivityObject toActivityObject(String serialized) throws ActivityConversionException { - try { - return mapper.readValue(serialized, ActivityObject.class); - } catch (Exception e) { - throw new ActivityConversionException(); - } + @Override + public ActivityObject toActivityObject(String serialized) throws ActivityConversionException { + try { + return mapper.readValue(serialized, ActivityObject.class); + } catch (Exception ex) { + throw new ActivityConversionException(); } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java index 26dfcb3..3443bd9 100644 --- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java +++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java @@ -19,14 +19,14 @@ package org.apache.streams.converter; /** - * Predefined field symbols + * Predefined field symbols. */ public class FieldConstants { - protected static final String ID = "ID"; - protected static final String SEQ = "SEQ"; - protected static final String TS = "TS"; - protected static final String META = "META"; - protected static final String DOC = "DOC"; + protected static final String ID = "ID"; + protected static final String SEQ = "SEQ"; + protected static final String TS = "TS"; + protected static final String META = "META"; + protected static final String DOC = "DOC"; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterProcessor.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterProcessor.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterProcessor.java index b3ee72f..44aa56b 100644 --- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterProcessor.java +++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterProcessor.java @@ -19,11 +19,12 @@ under the License. package org.apache.streams.converter; -import com.google.common.collect.Lists; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.core.util.DatumUtils; -import org.apache.streams.pojo.json.Activity; + +import com.google.common.collect.Lists; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,53 +34,62 @@ import java.util.List; * HoconConverterProcessor is a utility processor for converting any datum document * with translation rules expressed as HOCON in the classpath or at a URL. * + * <p/> * To use this capability without a dedicated stream processor, just use HoconConverterUtil. */ public class HoconConverterProcessor implements StreamsProcessor { - public static final String STREAMS_ID = "HoconConverterProcessor"; + public static final String STREAMS_ID = "HoconConverterProcessor"; - private final static Logger LOGGER = LoggerFactory.getLogger(HoconConverterProcessor.class); + private static final Logger LOGGER = LoggerFactory.getLogger(HoconConverterProcessor.class); - protected Class outClass; - protected String hocon; - protected String inPath; - protected String outPath; + protected Class outClass; + protected String hocon; + protected String inPath; + protected String outPath; - public HoconConverterProcessor(Class outClass, String hocon, String inPath, String outPath) { - this.outClass = outClass; - this.hocon = hocon; - this.inPath = inPath; - this.outPath = outPath; - } + /** + * HoconConverterProcessor. + * + * @param outClass outClass + * @param hocon hocon + * @param inPath inPath + * @param outPath outPath + */ + public HoconConverterProcessor(Class outClass, String hocon, String inPath, String outPath) { + this.outClass = outClass; + this.hocon = hocon; + this.inPath = inPath; + this.outPath = outPath; + } - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - public List<StreamsDatum> process(StreamsDatum entry) { + @Override + public List<StreamsDatum> process(StreamsDatum entry) { - List<StreamsDatum> result = Lists.newLinkedList(); - Object document = entry.getDocument(); + List<StreamsDatum> result = Lists.newLinkedList(); + Object document = entry.getDocument(); - Object outDoc = HoconConverterUtil.getInstance().convert(document, outClass, hocon, inPath, outPath); + Object outDoc = HoconConverterUtil.getInstance().convert(document, outClass, hocon, inPath, outPath); - StreamsDatum datum = DatumUtils.cloneDatum(entry); - datum.setDocument(outDoc); - result.add(datum); + StreamsDatum datum = DatumUtils.cloneDatum(entry); + datum.setDocument(outDoc); + result.add(datum); - return result; - } + return result; + } - @Override - public void prepare(Object configurationObject) { + @Override + public void prepare(Object configurationObject) { - } + } - @Override - public void cleanUp() { + @Override + public void cleanUp() { - } -}; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterUtil.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterUtil.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterUtil.java index f8db30c..bac081c 100644 --- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterUtil.java +++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/HoconConverterUtil.java @@ -18,6 +18,8 @@ package org.apache.streams.converter; +import org.apache.streams.jackson.StreamsJacksonMapper; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -26,7 +28,7 @@ import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigObject; import com.typesafe.config.ConfigRenderOptions; import com.typesafe.config.ConfigValue; -import org.apache.streams.jackson.StreamsJacksonMapper; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,95 +36,104 @@ import java.io.IOException; /** * HoconConverterUtil supports HoconConverterProcessor in converting types via application - * of hocon (https://github.com/typesafehub/config/blob/master/HOCON.md) scripts + * of hocon (https://github.com/typesafehub/config/blob/master/HOCON.md) scripts. */ public class HoconConverterUtil { - private final static Logger LOGGER = LoggerFactory.getLogger(HoconConverterUtil.class); - - private static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - private static final HoconConverterUtil INSTANCE = new HoconConverterUtil(); - - public static HoconConverterUtil getInstance(){ - return INSTANCE; - } - - public Object convert(Object object, Class outClass, String hoconResource) { - Config hocon = ConfigFactory.parseResources(hoconResource); - return convert(object, outClass, hocon, null); - } - - public Object convert(Object object, Class outClass, String hoconResource, String outPath) { - Config hocon = ConfigFactory.parseResources(hoconResource); - return convert(object, outClass, hocon, outPath); + private static final Logger LOGGER = LoggerFactory.getLogger(HoconConverterUtil.class); + + private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + private static final HoconConverterUtil INSTANCE = new HoconConverterUtil(); + + public static HoconConverterUtil getInstance() { + return INSTANCE; + } + + public Object convert(Object object, Class outClass, String hoconResource) { + Config hocon = ConfigFactory.parseResources(hoconResource); + return convert(object, outClass, hocon, null); + } + + public Object convert(Object object, Class outClass, String hoconResource, String outPath) { + Config hocon = ConfigFactory.parseResources(hoconResource); + return convert(object, outClass, hocon, outPath); + } + + public Object convert(Object object, Class outClass, String hoconResource, String inPath, String outPath) { + Config hocon = ConfigFactory.parseResources(hoconResource); + return convert(object, outClass, hocon, inPath, outPath); + } + + public Object convert(Object object, Class outClass, Config hocon, String outPath) { + return convert(object, outClass, hocon, null, outPath); + } + + /** + * convert. + * @param object object + * @param outClass outClass + * @param hocon hocon + * @param inPath inPath + * @param outPath outPath + * @return result + */ + public Object convert(Object object, Class outClass, Config hocon, String inPath, String outPath) { + String json = null; + Object outDoc = null; + if ( object instanceof String ) { + json = (String) object; + } else { + try { + json = mapper.writeValueAsString(object); + } catch (JsonProcessingException ex) { + LOGGER.warn("Failed to process input:", object); + return outDoc; + } } - public Object convert(Object object, Class outClass, String hoconResource, String inPath, String outPath) { - Config hocon = ConfigFactory.parseResources(hoconResource); - return convert(object, outClass, hocon, inPath, outPath); + Config base; + if( inPath == null) { + base = ConfigFactory.parseString(json); + } else { + ObjectNode node; + try { + node = mapper.readValue(json, ObjectNode.class); + ObjectNode root = mapper.createObjectNode(); + root.set(inPath, node); + json = mapper.writeValueAsString(root); + base = ConfigFactory.parseString(json); + } catch (Exception ex) { + LOGGER.warn("Failed to process input:", object); + return outDoc; + } } - public Object convert(Object object, Class outClass, Config hocon, String outPath) { - return convert(object, outClass, hocon, null, outPath); + Config converted = hocon.withFallback(base); + + String outJson = null; + try { + if( outPath == null ) { + outJson = converted.resolve().root().render(ConfigRenderOptions.concise()); + } else { + Config resolved = converted.resolve(); + ConfigObject outObject = resolved.withOnlyPath(outPath).root(); + ConfigValue outValue = outObject.get(outPath); + outJson = outValue.render(ConfigRenderOptions.concise()); + } + } catch (Exception ex) { + LOGGER.warn("Failed to convert:", json); + LOGGER.warn(ex.getMessage()); } - - public Object convert(Object object, Class outClass, Config hocon, String inPath, String outPath) { - String json = null; - Object outDoc = null; - if( object instanceof String ) { - json = (String) object; - } else { - try { - json = mapper.writeValueAsString(object); - } catch (JsonProcessingException e) { - LOGGER.warn("Failed to process input:", object); - return outDoc; - } - } - - Config base; - if( inPath == null) - base = ConfigFactory.parseString(json); - else { - ObjectNode node; - try { - node = mapper.readValue(json, ObjectNode.class); - ObjectNode root = mapper.createObjectNode(); - root.set(inPath, node); - json = mapper.writeValueAsString(root); - base = ConfigFactory.parseString(json); - } catch (Exception e) { - LOGGER.warn("Failed to process input:", object); - return outDoc; - } - } - - Config converted = hocon.withFallback(base); - - String outJson = null; - try { - if( outPath == null ) - outJson = converted.resolve().root().render(ConfigRenderOptions.concise()); - else { - Config resolved = converted.resolve(); - ConfigObject outObject = resolved.withOnlyPath(outPath).root(); - ConfigValue outValue = outObject.get(outPath); - outJson = outValue.render(ConfigRenderOptions.concise()); - } - } catch (Exception e) { - LOGGER.warn("Failed to convert:", json); - LOGGER.warn(e.getMessage()); - } - if( outClass == String.class ) - return outJson; - else { - try { - outDoc = mapper.readValue( outJson, outClass ); - } catch (IOException e) { - LOGGER.warn("Failed to convert:", object); - } - } - return outDoc; + if ( outClass == String.class ) + return outJson; + else { + try { + outDoc = mapper.readValue( outJson, outClass ); + } catch (IOException ex) { + LOGGER.warn("Failed to convert:", object); + } } + return outDoc; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java index a38568b..d245c3e 100644 --- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java +++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java @@ -42,185 +42,219 @@ import java.util.Map; */ public class LineReadWriteUtil { - private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterUtil.class); - - private static Map<LineReadWriteConfiguration, LineReadWriteUtil> INSTANCE_MAP = Maps.newConcurrentMap(); - - private final static List<String> DEFAULT_FIELDS = Lists.newArrayList("ID", "SEQ", "TS", "META", "DOC"); + private static final Logger LOGGER = LoggerFactory.getLogger(TypeConverterUtil.class); + + private static Map<LineReadWriteConfiguration, LineReadWriteUtil> INSTANCE_MAP = Maps.newConcurrentMap(); + + private static final List<String> DEFAULT_FIELDS = Lists.newArrayList("ID", "SEQ", "TS", "META", "DOC"); + + private List<String> fields; + private String fieldDelimiter = "\t"; + private String lineDelimiter = "\n"; + private String encoding = "UTF-8"; + + private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + + private LineReadWriteUtil() { + } + + private LineReadWriteUtil(LineReadWriteConfiguration configuration) { + this.fields = configuration.getFields(); + this.fieldDelimiter = configuration.getFieldDelimiter(); + this.lineDelimiter = configuration.getLineDelimiter(); + this.encoding = configuration.getEncoding(); + } + + public static LineReadWriteUtil getInstance() { + return getInstance(new LineReadWriteConfiguration()); + } + + /** + * getInstance. + * @param configuration + * @return result + */ + public static LineReadWriteUtil getInstance(LineReadWriteConfiguration configuration) { + if ( INSTANCE_MAP.containsKey(configuration) + && + INSTANCE_MAP.get(configuration) != null) { + return INSTANCE_MAP.get(configuration); + } else { + INSTANCE_MAP.put(configuration, new LineReadWriteUtil(configuration)); + return INSTANCE_MAP.get(configuration); + } + } + + /** + * processLine + * @param line + * @return result + */ + public StreamsDatum processLine(String line) { + + List<String> expectedFields = fields; + if ( line.endsWith(lineDelimiter)) { + line = trimLineDelimiter(line); + } + String[] parsedFields = line.split(fieldDelimiter); - private List<String> fields; - private String fieldDelimiter = "\t"; - private String lineDelimiter = "\n"; - private String encoding = "UTF-8"; + if ( parsedFields.length == 0) { + return null; + } - private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + String id = null; + DateTime ts = null; + BigInteger seq = null; + Map<String, Object> metadata = null; + String json = null; - private LineReadWriteUtil() { + if ( expectedFields.contains( FieldConstants.DOC ) + && parsedFields.length > expectedFields.indexOf(FieldConstants.DOC)) { + json = parsedFields[expectedFields.indexOf(FieldConstants.DOC)]; } - private LineReadWriteUtil(LineReadWriteConfiguration configuration) { - this.fields = configuration.getFields(); - this.fieldDelimiter = configuration.getFieldDelimiter(); - this.lineDelimiter = configuration.getLineDelimiter(); - this.encoding = configuration.getEncoding(); + if ( expectedFields.contains( FieldConstants.ID ) + && parsedFields.length > expectedFields.indexOf(FieldConstants.ID)) { + id = parsedFields[expectedFields.indexOf(FieldConstants.ID)]; } - - public static LineReadWriteUtil getInstance() { - return getInstance(new LineReadWriteConfiguration()); + if ( expectedFields.contains( FieldConstants.SEQ ) + && parsedFields.length > expectedFields.indexOf(FieldConstants.SEQ)) { + try { + seq = new BigInteger(parsedFields[expectedFields.indexOf(FieldConstants.SEQ)]); + } catch ( NumberFormatException nfe ) { + LOGGER.warn("invalid sequence number {}", nfe); + } } - - public static LineReadWriteUtil getInstance(LineReadWriteConfiguration configuration) { - if( INSTANCE_MAP.containsKey(configuration) && - INSTANCE_MAP.get(configuration) != null) - return INSTANCE_MAP.get(configuration); - else { - INSTANCE_MAP.put(configuration, new LineReadWriteUtil(configuration)); - return INSTANCE_MAP.get(configuration); - } + if ( expectedFields.contains( FieldConstants.TS ) + && parsedFields.length > expectedFields.indexOf(FieldConstants.TS)) { + ts = parseTs(parsedFields[expectedFields.indexOf(FieldConstants.TS)]); } - - public StreamsDatum processLine(String line) { - - List<String> expectedFields = fields; - if( line.endsWith(lineDelimiter)) line = trimLineDelimiter(line); - String[] parsedFields = line.split(fieldDelimiter); - - if( parsedFields.length == 0) - return null; - - String id = null; - DateTime ts = null; - BigInteger seq = null; - Map<String, Object> metadata = null; - String json = null; - - if( expectedFields.contains( FieldConstants.DOC ) - && parsedFields.length > expectedFields.indexOf(FieldConstants.DOC)) { - json = parsedFields[expectedFields.indexOf(FieldConstants.DOC)]; - } - - if( expectedFields.contains( FieldConstants.ID ) - && parsedFields.length > expectedFields.indexOf(FieldConstants.ID)) { - id = parsedFields[expectedFields.indexOf(FieldConstants.ID)]; - } - if( expectedFields.contains( FieldConstants.SEQ ) - && parsedFields.length > expectedFields.indexOf(FieldConstants.SEQ)) { - try { - seq = new BigInteger(parsedFields[expectedFields.indexOf(FieldConstants.SEQ)]); - } catch( NumberFormatException nfe ) - { LOGGER.warn("invalid sequence number {}", nfe); } - } - if( expectedFields.contains( FieldConstants.TS ) - && parsedFields.length > expectedFields.indexOf(FieldConstants.TS)) { - ts = parseTs(parsedFields[expectedFields.indexOf(FieldConstants.TS)]); - } - if( expectedFields.contains( FieldConstants.META ) - && parsedFields.length > expectedFields.indexOf(FieldConstants.META)) { - metadata = parseMap(parsedFields[expectedFields.indexOf(FieldConstants.META)]); - } - - StreamsDatum datum = new StreamsDatum(json); - datum.setId(id); - datum.setTimestamp(ts); - datum.setMetadata(metadata); - datum.setSequenceid(seq); - return datum; - + if ( expectedFields.contains( FieldConstants.META ) + && parsedFields.length > expectedFields.indexOf(FieldConstants.META)) { + metadata = parseMap(parsedFields[expectedFields.indexOf(FieldConstants.META)]); } - public String convertResultToString(StreamsDatum entry) { - String metadataJson = null; - try { - metadataJson = MAPPER.writeValueAsString(entry.getMetadata()); - } catch (JsonProcessingException e) { - LOGGER.warn("Error converting metadata to a string", e); - } + StreamsDatum datum = new StreamsDatum(json); + datum.setId(id); + datum.setTimestamp(ts); + datum.setMetadata(metadata); + datum.setSequenceid(seq); + return datum; + + } + + /** + * convertResultToString + * @param entry + * @return result + */ + public String convertResultToString(StreamsDatum entry) { + String metadataJson = null; + try { + metadataJson = MAPPER.writeValueAsString(entry.getMetadata()); + } catch (JsonProcessingException ex) { + LOGGER.warn("Error converting metadata to a string", ex); + } - String documentJson = null; - try { - if( entry.getDocument() instanceof String ) - documentJson = (String)entry.getDocument(); - else - documentJson = MAPPER.writeValueAsString(entry.getDocument()); - } catch (JsonProcessingException e) { - LOGGER.warn("Error converting document to string", e); - } + String documentJson = null; + try { + if ( entry.getDocument() instanceof String ) { + documentJson = (String) entry.getDocument(); + } else { + documentJson = MAPPER.writeValueAsString(entry.getDocument()); + } + } catch (JsonProcessingException ex) { + LOGGER.warn("Error converting document to string", ex); + } - if (Strings.isNullOrEmpty(documentJson)) - return null; - else { - StringBuilder stringBuilder = new StringBuilder(); - Iterator<String> fields = this.fields.iterator(); - List<String> fielddata = Lists.newArrayList(); - Joiner joiner = Joiner.on(fieldDelimiter).useForNull(""); - while( fields.hasNext() ) { - String field = fields.next(); - if( field.equals(FieldConstants.DOC) ) - fielddata.add(documentJson); - else if( field.equals(FieldConstants.ID) ) - fielddata.add(entry.getId()); - else if( field.equals(FieldConstants.SEQ) ) - if( entry.getSequenceid() != null) - fielddata.add(entry.getSequenceid().toString()); - else - fielddata.add("null"); - else if( field.equals(FieldConstants.TS) ) - if( entry.getTimestamp() != null ) - fielddata.add(entry.getTimestamp().toString()); - else - fielddata.add(DateTime.now().toString()); - else if( field.equals(FieldConstants.META) ) - fielddata.add(metadataJson); - else if( entry.getMetadata().containsKey(field)) { - fielddata.add(entry.getMetadata().get(field).toString()); - } else { - fielddata.add(null); - } - - } - joiner.appendTo(stringBuilder, fielddata); - return stringBuilder.toString(); + if (Strings.isNullOrEmpty(documentJson)) { + return null; + } else { + StringBuilder stringBuilder = new StringBuilder(); + Iterator<String> fields = this.fields.iterator(); + List<String> fielddata = Lists.newArrayList(); + Joiner joiner = Joiner.on(fieldDelimiter).useForNull(""); + while( fields.hasNext() ) { + String field = fields.next(); + if ( field.equals(FieldConstants.DOC) ) { + fielddata.add(documentJson); + } else if ( field.equals(FieldConstants.ID) ) { + fielddata.add(entry.getId()); + } else if ( field.equals(FieldConstants.SEQ) ) { + if (entry.getSequenceid() != null) { + fielddata.add(entry.getSequenceid().toString()); + } else { + fielddata.add("null"); + } + } else if ( field.equals(FieldConstants.TS) ) { + if (entry.getTimestamp() != null) { + fielddata.add(entry.getTimestamp().toString()); + } else { + fielddata.add(DateTime.now().toString()); + } + } else if ( field.equals(FieldConstants.META) ) { + fielddata.add(metadataJson); + } else if ( entry.getMetadata().containsKey(field)) { + fielddata.add(entry.getMetadata().get(field).toString()); + } else { + fielddata.add(null); } + } + joiner.appendTo(stringBuilder, fielddata); + return stringBuilder.toString(); } - - public DateTime parseTs(String field) { - - DateTime timestamp = null; + } + + /** + * parseTs + * @param field + * @return + */ + public DateTime parseTs(String field) { + + DateTime timestamp = null; + try { + long longts = Long.parseLong(field); + timestamp = new DateTime(longts); + } catch ( Exception e1 ) { + try { + timestamp = DateTime.parse(field); + } catch ( Exception e2 ) { try { - long longts = Long.parseLong(field); - timestamp = new DateTime(longts); - } catch ( Exception e ) { - try { - timestamp = DateTime.parse(field); - } catch ( Exception e2 ) { - try { - timestamp = MAPPER.readValue(field, DateTime.class); - } catch ( Exception e3 ) { - LOGGER.warn("Could not parse timestamp:{} ", field); - } - } + timestamp = MAPPER.readValue(field, DateTime.class); + } catch ( Exception e3 ) { + LOGGER.warn("Could not parse timestamp:{} ", field); } - - return timestamp; + } } - public Map<String, Object> parseMap(String field) { + return timestamp; + } - Map<String, Object> metadata = null; + /** + * parseMap + * @param field + * @return result + */ + public Map<String, Object> parseMap(String field) { - try { - JsonNode jsonNode = MAPPER.readValue(field, JsonNode.class); - metadata = MAPPER.convertValue(jsonNode, Map.class); - } catch (Exception e) { - LOGGER.warn("failed in parseMap: " + e.getMessage()); - } - return metadata; - } + Map<String, Object> metadata = null; - private String trimLineDelimiter(String str) { - if( !Strings.isNullOrEmpty(str)) - if( str.endsWith(lineDelimiter)) - return str.substring(0,str.length()-1); - return str; + try { + JsonNode jsonNode = MAPPER.readValue(field, JsonNode.class); + metadata = MAPPER.convertValue(jsonNode, Map.class); + } catch (Exception ex) { + LOGGER.warn("failed in parseMap: " + ex.getMessage()); + } + return metadata; + } + + private String trimLineDelimiter(String str) { + if ( !Strings.isNullOrEmpty(str)) { + if (str.endsWith(lineDelimiter)) { + return str.substring(0, str.length() - 1); + } } + return str; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java index edd70f4..a269f4d 100644 --- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java +++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java @@ -16,13 +16,16 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + package org.apache.streams.converter; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,65 +35,68 @@ import java.util.List; /** * TypeConverterProcessor converts between String json and jackson-compatible POJO objects. * + * <p/> * Activity is one supported jackson-compatible POJO, so JSON String and objects with structual similarities * to Activity can be converted to Activity objects. * + * <p/> * However, conversion to Activity should probably use {@link org.apache.streams.converter.ActivityConverterProcessor} * */ public class TypeConverterProcessor implements StreamsProcessor, Serializable { - public static final String STREAMS_ID = "TypeConverterProcessor"; + public static final String STREAMS_ID = "TypeConverterProcessor"; - private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterProcessor.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TypeConverterProcessor.class); - private List<String> formats = Lists.newArrayList(); + private List<String> formats = Lists.newArrayList(); - protected ObjectMapper mapper; + protected ObjectMapper mapper; - protected Class outClass; + protected Class outClass; - public TypeConverterProcessor(Class outClass) { - this.outClass = outClass; - } - - public TypeConverterProcessor(Class outClass, List<String> formats) { - this(outClass); - this.formats = formats; - } + public TypeConverterProcessor(Class outClass) { + this.outClass = outClass; + } - @Override - public String getId() { - return STREAMS_ID; - } + public TypeConverterProcessor(Class outClass, List<String> formats) { + this(outClass); + this.formats = formats; + } - @Override - public List<StreamsDatum> process(StreamsDatum entry) { + @Override + public String getId() { + return STREAMS_ID; + } - List<StreamsDatum> result = Lists.newLinkedList(); - Object inDoc = entry.getDocument(); + @Override + public List<StreamsDatum> process(StreamsDatum entry) { - Object outDoc = TypeConverterUtil.getInstance().convert(inDoc, outClass, mapper); + List<StreamsDatum> result = Lists.newLinkedList(); + Object inDoc = entry.getDocument(); - if( outDoc != null ) { - entry.setDocument(outDoc); - result.add(entry); - } + Object outDoc = TypeConverterUtil.getInstance().convert(inDoc, outClass, mapper); - return result; + if ( outDoc != null ) { + entry.setDocument(outDoc); + result.add(entry); } - @Override - public void prepare(Object configurationObject) { - if( formats.size() > 0 ) - this.mapper = StreamsJacksonMapper.getInstance(formats); - else - this.mapper = StreamsJacksonMapper.getInstance(); - } + return result; + } - @Override - public void cleanUp() { - this.mapper = null; + @Override + public void prepare(Object configurationObject) { + if ( formats.size() > 0 ) { + this.mapper = StreamsJacksonMapper.getInstance(formats); + } else { + this.mapper = StreamsJacksonMapper.getInstance(); } + } + + @Override + public void cleanUp() { + this.mapper = null; + } -}; +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java index 4ace9c4..8843d0e 100644 --- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java +++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java @@ -18,9 +18,11 @@ package org.apache.streams.converter; +import org.apache.streams.jackson.StreamsJacksonMapper; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.streams.jackson.StreamsJacksonMapper; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,49 +30,56 @@ import java.io.IOException; /** * TypeConverterUtil supports TypeConverterProcessor in converting between String json and - * jackson-compatible POJO objects + * jackson-compatible POJO objects. */ public class TypeConverterUtil { - private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterUtil.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TypeConverterUtil.class); - private static final TypeConverterUtil INSTANCE = new TypeConverterUtil(); + private static final TypeConverterUtil INSTANCE = new TypeConverterUtil(); - public static TypeConverterUtil getInstance(){ - return INSTANCE; - } + public static TypeConverterUtil getInstance() { + return INSTANCE; + } + + public Object convert(Object object, Class outClass) { + return TypeConverterUtil.getInstance().convert(object, outClass, StreamsJacksonMapper.getInstance()); + } - public Object convert(Object object, Class outClass) { - return TypeConverterUtil.getInstance().convert(object, outClass, StreamsJacksonMapper.getInstance()); + /** + * convert + * @param object + * @param outClass + * @param mapper + * @return + */ + public Object convert(Object object, Class outClass, ObjectMapper mapper) { + ObjectNode node = null; + Object outDoc = null; + if ( object instanceof String ) { + try { + node = mapper.readValue((String)object, ObjectNode.class); + } catch (IOException ex) { + LOGGER.warn(ex.getMessage()); + LOGGER.warn(object.toString()); + } + } else { + node = mapper.convertValue(object, ObjectNode.class); } - public Object convert(Object object, Class outClass, ObjectMapper mapper) { - ObjectNode node = null; - Object outDoc = null; - if( object instanceof String ) { - try { - node = mapper.readValue((String)object, ObjectNode.class); - } catch (IOException e) { - LOGGER.warn(e.getMessage()); - LOGGER.warn(object.toString()); - } + if(node != null) { + try { + if ( outClass == String.class ) { + outDoc = mapper.writeValueAsString(node); } else { - node = mapper.convertValue(object, ObjectNode.class); + outDoc = mapper.convertValue(node, outClass); } - - if(node != null) { - try { - if( outClass == String.class ) - outDoc = mapper.writeValueAsString(node); - else - outDoc = mapper.convertValue(node, outClass); - - } catch (Throwable e) { - LOGGER.warn(e.getMessage()); - LOGGER.warn(node.toString()); - } - } - - return outDoc; + } catch (Throwable ex) { + LOGGER.warn(ex.getMessage()); + LOGGER.warn(node.toString()); + } } + + return outDoc; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionDropFilter.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionDropFilter.java b/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionDropFilter.java index e0c7a68..ed40a17 100644 --- a/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionDropFilter.java +++ b/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionDropFilter.java @@ -18,15 +18,16 @@ package org.apache.streams.filters; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.pojo.json.Activity; -import org.apache.streams.verbs.ObjectCombination; import org.apache.streams.verbs.VerbDefinition; import org.apache.streams.verbs.VerbDefinitionMatchUtil; import org.apache.streams.verbs.VerbDefinitionResolver; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,57 +40,60 @@ import java.util.Set; */ public class VerbDefinitionDropFilter implements StreamsProcessor { - public static final String STREAMS_ID = "VerbDefinitionDropFilter"; + public static final String STREAMS_ID = "VerbDefinitionDropFilter"; - private static final Logger LOGGER = LoggerFactory.getLogger(VerbDefinitionDropFilter.class); + private static final Logger LOGGER = LoggerFactory.getLogger(VerbDefinitionDropFilter.class); - protected Set<VerbDefinition> verbDefinitionSet; - protected VerbDefinitionResolver resolver; + protected Set<VerbDefinition> verbDefinitionSet; + protected VerbDefinitionResolver resolver; - public VerbDefinitionDropFilter() { - // get with reflection - } - - public VerbDefinitionDropFilter(Set<VerbDefinition> verbDefinitionSet) { - this(); - this.verbDefinitionSet = verbDefinitionSet; - } + public VerbDefinitionDropFilter() { + // get with reflection + } - @Override - public String getId() { - return STREAMS_ID; - } + public VerbDefinitionDropFilter(Set<VerbDefinition> verbDefinitionSet) { + this(); + this.verbDefinitionSet = verbDefinitionSet; + } - @Override - public List<StreamsDatum> process(StreamsDatum entry) { + @Override + public String getId() { + return STREAMS_ID; + } - List<StreamsDatum> result = Lists.newArrayList(); + @Override + public List<StreamsDatum> process(StreamsDatum entry) { - LOGGER.debug("{} filtering {}", STREAMS_ID, entry.getDocument().getClass()); + List<StreamsDatum> result = Lists.newArrayList(); - Activity activity; + LOGGER.debug("{} filtering {}", STREAMS_ID, entry.getDocument().getClass()); - Preconditions.checkArgument(entry.getDocument() instanceof Activity); + Activity activity; - activity = (Activity) entry.getDocument(); + Preconditions.checkArgument(entry.getDocument() instanceof Activity); - if( VerbDefinitionMatchUtil.match(activity, this.verbDefinitionSet) == false ) - result.add(entry); + activity = (Activity) entry.getDocument(); - return result; + if ( VerbDefinitionMatchUtil.match(activity, this.verbDefinitionSet) == false ) { + result.add(entry); } - @Override - public void prepare(Object o) { - if( verbDefinitionSet != null) - resolver = new VerbDefinitionResolver(verbDefinitionSet); - else resolver = new VerbDefinitionResolver(); - Preconditions.checkNotNull(resolver); - } + return result; + } - @Override - public void cleanUp() { - // noOp + @Override + public void prepare(Object configuration) { + if ( verbDefinitionSet != null) { + resolver = new VerbDefinitionResolver(verbDefinitionSet); + } else { + resolver = new VerbDefinitionResolver(); } + Preconditions.checkNotNull(resolver); + } + + @Override + public void cleanUp() { + // noOp + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionKeepFilter.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionKeepFilter.java b/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionKeepFilter.java index 82e8c99..7562905 100644 --- a/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionKeepFilter.java +++ b/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionKeepFilter.java @@ -18,19 +18,21 @@ package org.apache.streams.filters; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.pojo.json.Activity; -import org.apache.streams.verbs.ObjectCombination; import org.apache.streams.verbs.VerbDefinition; import org.apache.streams.verbs.VerbDefinitionMatchUtil; import org.apache.streams.verbs.VerbDefinitionResolver; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.List; +import java.util.Set; /** * Checks one or more verb definitions against a stream of Activity documents, and drops any activities @@ -38,57 +40,60 @@ import java.util.*; */ public class VerbDefinitionKeepFilter implements StreamsProcessor { - public static final String STREAMS_ID = "VerbDefinitionKeepFilter"; + public static final String STREAMS_ID = "VerbDefinitionKeepFilter"; - private static final Logger LOGGER = LoggerFactory.getLogger(VerbDefinitionKeepFilter.class); + private static final Logger LOGGER = LoggerFactory.getLogger(VerbDefinitionKeepFilter.class); - protected Set<VerbDefinition> verbDefinitionSet; - protected VerbDefinitionResolver resolver; + protected Set<VerbDefinition> verbDefinitionSet; + protected VerbDefinitionResolver resolver; - public VerbDefinitionKeepFilter() { - // get with reflection - } - - public VerbDefinitionKeepFilter(Set<VerbDefinition> verbDefinitionSet) { - this(); - this.verbDefinitionSet = verbDefinitionSet; - } + public VerbDefinitionKeepFilter() { + // get with reflection + } - @Override - public String getId() { - return STREAMS_ID; - } + public VerbDefinitionKeepFilter(Set<VerbDefinition> verbDefinitionSet) { + this(); + this.verbDefinitionSet = verbDefinitionSet; + } - @Override - public List<StreamsDatum> process(StreamsDatum entry) { + @Override + public String getId() { + return STREAMS_ID; + } - List<StreamsDatum> result = Lists.newArrayList(); + @Override + public List<StreamsDatum> process(StreamsDatum entry) { - LOGGER.debug("{} filtering {}", STREAMS_ID, entry.getDocument().getClass()); + List<StreamsDatum> result = Lists.newArrayList(); - Activity activity; + LOGGER.debug("{} filtering {}", STREAMS_ID, entry.getDocument().getClass()); - Preconditions.checkArgument(entry.getDocument() instanceof Activity); + Activity activity; - activity = (Activity) entry.getDocument(); + Preconditions.checkArgument(entry.getDocument() instanceof Activity); - if( VerbDefinitionMatchUtil.match(activity, this.verbDefinitionSet) == true ) - result.add(entry); + activity = (Activity) entry.getDocument(); - return result; + if ( VerbDefinitionMatchUtil.match(activity, this.verbDefinitionSet) == true ) { + result.add(entry); } - @Override - public void prepare(Object o) { - if( verbDefinitionSet != null) - resolver = new VerbDefinitionResolver(verbDefinitionSet); - else resolver = new VerbDefinitionResolver(); - Preconditions.checkNotNull(resolver); - } + return result; + } - @Override - public void cleanUp() { - // noOp + @Override + public void prepare(Object configuration) { + if ( verbDefinitionSet != null ) { + resolver = new VerbDefinitionResolver(verbDefinitionSet); + } else { + resolver = new VerbDefinitionResolver(); } + Preconditions.checkNotNull(resolver); + } + + @Override + public void cleanUp() { + // noOp + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java index d8309d9..8cacf1f 100644 --- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java +++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java @@ -18,11 +18,19 @@ package org.apache.streams.components.http.persist; +import org.apache.streams.components.http.HttpPersistWriterConfiguration; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistWriter; +import org.apache.streams.jackson.StreamsJacksonMapper; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; import com.google.common.base.Strings; + import org.apache.commons.codec.binary.Base64; import org.apache.http.HttpEntity; import org.apache.http.HttpStatus; @@ -33,12 +41,6 @@ import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; -import org.apache.streams.components.http.HttpPersistWriterConfiguration; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsPersistWriter; -import org.apache.streams.jackson.StreamsJacksonMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,183 +53,189 @@ import java.util.Map; public class SimpleHTTPPostPersistWriter implements StreamsPersistWriter { - private final static String STREAMS_ID = "SimpleHTTPPostPersistWriter"; + private static final String STREAMS_ID = "SimpleHTTPPostPersistWriter"; - private final static Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPPostPersistWriter.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPPostPersistWriter.class); - protected ObjectMapper mapper; + protected ObjectMapper mapper; - protected URIBuilder uriBuilder; + protected URIBuilder uriBuilder; - protected CloseableHttpClient httpclient; + protected CloseableHttpClient httpclient; - protected HttpPersistWriterConfiguration configuration; + protected HttpPersistWriterConfiguration configuration; - protected String authHeader; + protected String authHeader; - public SimpleHTTPPostPersistWriter() { - this(new ComponentConfigurator<>(HttpPersistWriterConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("http"))); - } + public SimpleHTTPPostPersistWriter() { + this(new ComponentConfigurator<>(HttpPersistWriterConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("http"))); + } - public SimpleHTTPPostPersistWriter(HttpPersistWriterConfiguration configuration) { - this.configuration = configuration; - } + public SimpleHTTPPostPersistWriter(HttpPersistWriterConfiguration configuration) { + this.configuration = configuration; + } - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - public void write(StreamsDatum entry) { + @Override + public void write(StreamsDatum entry) { - ObjectNode payload; - try { - payload = preparePayload(entry); - } catch( Exception e ) { - LOGGER.warn("Exception preparing payload, using empty payload"); - payload = mapper.createObjectNode(); - } + ObjectNode payload; + try { + payload = preparePayload(entry); + } catch ( Exception ex ) { + LOGGER.warn("Exception preparing payload, using empty payload"); + payload = mapper.createObjectNode(); + } - Map<String, String> params = prepareParams(entry); + Map<String, String> params = prepareParams(entry); - URI uri = prepareURI(params); + URI uri = prepareURI(params); - HttpPost httppost = prepareHttpPost(uri, payload); + HttpPost httppost = prepareHttpPost(uri, payload); - ObjectNode result = executePost(httppost); + ObjectNode result = executePost(httppost); - try { - LOGGER.debug(mapper.writeValueAsString(result)); - } catch (JsonProcessingException e) { - LOGGER.warn("Non-json response", e.getMessage()); - } + try { + LOGGER.debug(mapper.writeValueAsString(result)); + } catch (JsonProcessingException ex) { + LOGGER.warn("Non-json response", ex.getMessage()); } - - /** - Override this to alter request URI - */ - protected URI prepareURI(Map<String, String> params) { - URI uri = null; - for( Map.Entry<String,String> param : params.entrySet()) { - uriBuilder = uriBuilder.setParameter(param.getKey(), param.getValue()); - } - try { - uri = uriBuilder.build(); - } catch (URISyntaxException e) { - LOGGER.error("URI error {}", uriBuilder.toString()); - } - return uri; + } + + /** + Override this to alter request URI. + */ + protected URI prepareURI(Map<String, String> params) { + URI uri = null; + for ( Map.Entry<String,String> param : params.entrySet()) { + uriBuilder = uriBuilder.setParameter(param.getKey(), param.getValue()); } - - /** - Override this to add parameters to the request - */ - protected Map<String, String> prepareParams(StreamsDatum entry) { - return new HashMap<>(); + try { + uri = uriBuilder.build(); + } catch (URISyntaxException ex) { + LOGGER.error("URI error {}", uriBuilder.toString()); } - - /** - Override this to alter json payload on to the request - */ - protected ObjectNode preparePayload(StreamsDatum entry) throws Exception { - - if( entry.getDocument() != null ) { - if( entry.getDocument() instanceof ObjectNode ) - return (ObjectNode) entry.getDocument(); - else return mapper.convertValue(entry.getDocument(), ObjectNode.class); - } - else return null; + return uri; + } + + /** + Override this to add parameters to the request. + */ + protected Map<String, String> prepareParams(StreamsDatum entry) { + return new HashMap<>(); + } + + /** + Override this to alter json payload on to the request. + */ + protected ObjectNode preparePayload(StreamsDatum entry) throws Exception { + + if ( entry.getDocument() != null ) { + if ( entry.getDocument() instanceof ObjectNode ) { + return (ObjectNode) entry.getDocument(); + } else { + return mapper.convertValue(entry.getDocument(), ObjectNode.class); + } + } else { + return null; } - - /** - Override this to add headers to the request - */ - public HttpPost prepareHttpPost(URI uri, ObjectNode payload) { - HttpPost httppost = new HttpPost(uri); - httppost.addHeader("content-type", this.configuration.getContentType()); - httppost.addHeader("accept-charset", "UTF-8"); - if( !Strings.isNullOrEmpty(authHeader)) - httppost.addHeader("Authorization", "Basic " + authHeader); - try { - String entity = mapper.writeValueAsString(payload); - httppost.setEntity(new StringEntity(entity)); - } catch (JsonProcessingException | UnsupportedEncodingException e) { - LOGGER.warn(e.getMessage()); - } - return httppost; + } + + /** + Override this to add headers to the request. + */ + public HttpPost prepareHttpPost(URI uri, ObjectNode payload) { + HttpPost httppost = new HttpPost(uri); + httppost.addHeader("content-type", this.configuration.getContentType()); + httppost.addHeader("accept-charset", "UTF-8"); + if ( !Strings.isNullOrEmpty(authHeader)) { + httppost.addHeader("Authorization", "Basic " + authHeader); } - - protected ObjectNode executePost(HttpPost httpPost) { - - Preconditions.checkNotNull(httpPost); - - ObjectNode result = null; - - CloseableHttpResponse response = null; - - String entityString; - try { - response = httpclient.execute(httpPost); - HttpEntity entity = response.getEntity(); - // TODO: handle retry - if (response.getStatusLine() != null && response.getStatusLine().getStatusCode() >= HttpStatus.SC_OK && entity != null) { - entityString = EntityUtils.toString(entity); - result = mapper.readValue(entityString, ObjectNode.class); - } - } catch (IOException e) { - LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, e.getMessage()); - } finally { - try { - if (response != null) { - response.close(); - } - } catch (IOException ignored) {} + try { + String entity = mapper.writeValueAsString(payload); + httppost.setEntity(new StringEntity(entity)); + } catch (JsonProcessingException | UnsupportedEncodingException ex) { + LOGGER.warn(ex.getMessage()); + } + return httppost; + } + + protected ObjectNode executePost(HttpPost httpPost) { + + Preconditions.checkNotNull(httpPost); + + ObjectNode result = null; + + CloseableHttpResponse response = null; + + String entityString; + try { + response = httpclient.execute(httpPost); + HttpEntity entity = response.getEntity(); + // TODO: handle retry + if (response.getStatusLine() != null && response.getStatusLine().getStatusCode() >= HttpStatus.SC_OK && entity != null) { + entityString = EntityUtils.toString(entity); + result = mapper.readValue(entityString, ObjectNode.class); + } + } catch (IOException ex) { + LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, ex.getMessage()); + } finally { + try { + if (response != null) { + response.close(); } - return result; + } catch (IOException ignored) { + LOGGER.trace("IOException", ignored); + } } + return result; + } - @Override - public void prepare(Object configurationObject) { + @Override + public void prepare(Object configurationObject) { - mapper = StreamsJacksonMapper.getInstance(); + mapper = StreamsJacksonMapper.getInstance(); - uriBuilder = new URIBuilder() - .setScheme(this.configuration.getProtocol()) - .setHost(this.configuration.getHostname()) - .setPort(this.configuration.getPort().intValue()) - .setPath(this.configuration.getResourcePath()); - - if( !Strings.isNullOrEmpty(configuration.getAccessToken()) ) - uriBuilder = uriBuilder.addParameter("access_token", configuration.getAccessToken()); - if( !Strings.isNullOrEmpty(configuration.getUsername()) - && !Strings.isNullOrEmpty(configuration.getPassword())) { - String string = configuration.getUsername() + ":" + configuration.getPassword(); - authHeader = Base64.encodeBase64String(string.getBytes()); - } - - httpclient = HttpClients.createDefault(); + uriBuilder = new URIBuilder() + .setScheme(this.configuration.getProtocol()) + .setHost(this.configuration.getHostname()) + .setPort(this.configuration.getPort().intValue()) + .setPath(this.configuration.getResourcePath()); + if ( !Strings.isNullOrEmpty(configuration.getAccessToken()) ) { + uriBuilder = uriBuilder.addParameter("access_token", configuration.getAccessToken()); + } + if ( !Strings.isNullOrEmpty(configuration.getUsername()) && !Strings.isNullOrEmpty(configuration.getPassword())) { + String string = configuration.getUsername() + ":" + configuration.getPassword(); + authHeader = Base64.encodeBase64String(string.getBytes()); } - @Override - public void cleanUp() { - - LOGGER.info("shutting down SimpleHTTPPostPersistWriter"); - try { - httpclient.close(); - } catch (IOException e) { - LOGGER.error(e.getMessage()); - } finally { - try { - httpclient.close(); - } catch (IOException e) { - LOGGER.error(e.getMessage()); - } finally { - httpclient = null; - } - } + httpclient = HttpClients.createDefault(); + + } + + @Override + public void cleanUp() { + + LOGGER.info("shutting down SimpleHTTPPostPersistWriter"); + try { + httpclient.close(); + } catch (IOException ex) { + LOGGER.error(ex.getMessage()); + } finally { + try { + httpclient.close(); + } catch (IOException e2) { + LOGGER.error(e2.getMessage()); + } finally { + httpclient = null; + } } + } }
