[
https://issues.apache.org/jira/browse/STREAMS-60?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13997331#comment-13997331
]
ASF GitHub Bot commented on STREAMS-60:
---------------------------------------
Github user smashew commented on a diff in the pull request:
https://github.com/apache/incubator-streams/pull/12#discussion_r12622946
--- Diff:
streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
---
@@ -0,0 +1,149 @@
+package org.apache.streams.json;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.jayway.jsonpath.JsonPath;
+import net.minidev.json.JSONArray;
+import net.minidev.json.JSONObject;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class JsonPathFilter implements StreamsProcessor {
+
+ public JsonPathFilter() {
+ System.out.println("creating JsonPathFilter");
+ }
+
+ private final static String STREAMS_ID = "JsonPathFilter";
+
+ private final static Logger LOGGER =
LoggerFactory.getLogger(JsonPathFilter.class);
+
+ private ObjectMapper mapper = new StreamsJacksonMapper();
+
+ private String pathExpression;
+ private JsonPath jsonPath;
+ private String destNodeName;
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ List<StreamsDatum> result = Lists.newArrayList();
+
+ String json = null;
+
+ ObjectNode document = null;
+
+ LOGGER.debug("{} processing {}", STREAMS_ID);
+
+ if( entry.getDocument() instanceof ObjectNode ) {
+ document = (ObjectNode) entry.getDocument();
+ try {
+ json = mapper.writeValueAsString(document);
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
+ } else if( entry.getDocument() instanceof String ) {
+ json = (String) entry.getDocument();
+ try {
+ document = mapper.readValue(json, ObjectNode.class);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ Preconditions.checkNotNull(document);
+
+ if( StringUtils.isNotEmpty(json)) {
+
+ Object srcResult = null;
+ try {
+ srcResult = jsonPath.read(json);
+
+ } catch( Exception e ) {
+ e.printStackTrace();
+ LOGGER.warn(e.getMessage());
+ }
+
+ Preconditions.checkNotNull(srcResult);
+
+ String[] path = StringUtils.split(pathExpression, '.');
+ ObjectNode node = document;
+ for (int i = 1; i < path.length-1; i++) {
+ node = (ObjectNode) document.get(path[i]);
+ }
+
+ Preconditions.checkNotNull(node);
+
+ if( srcResult instanceof JSONArray ) {
+ try {
+ ArrayNode jsonNode = mapper.convertValue(srcResult,
ArrayNode.class);
+ if( jsonNode.size() == 1 ) {
--- End diff --
Any way to split it into two functions for clarity incase the other use
case is ever needed?
> WebHdfsReader should set timestamp when reading documents
> ---------------------------------------------------------
>
> Key: STREAMS-60
> URL: https://issues.apache.org/jira/browse/STREAMS-60
> Project: Streams
> Issue Type: Improvement
> Reporter: Steve Blackmon
>
> WebHdfsReader should set timestamp when reading documents
--
This message was sent by Atlassian JIRA
(v6.2#6252)