Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-168 a57646091 -> ad5f90cc1


introduced a SimpleHTTPGetProvider
reorganized configuration
added access_token support
added http basic auth support


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d62061de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d62061de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d62061de

Branch: refs/heads/STREAMS-168
Commit: d62061dec8628484e66d3494e02f1f65efafda41
Parents: a576460
Author: Steve Blackmon <sblack...@w2odigital.com>
Authored: Sun Oct 12 17:49:01 2014 -0500
Committer: Steve Blackmon <sblack...@w2odigital.com>
Committed: Sun Oct 12 17:49:01 2014 -0500

----------------------------------------------------------------------
 streams-components/pom.xml                      |   2 +-
 streams-components/streams-http/README.md       |  16 ++
 streams-components/streams-http/pom.xml         | 159 +++++++++++++
 .../components/http/HttpConfigurator.java       |  62 +++++
 .../http/processor/SimpleHTTPGetProcessor.java  | 230 +++++++++++++++++++
 .../http/provider/SimpleHTTPGetProvider.java    | 215 +++++++++++++++++
 .../components/http/HttpConfiguration.json      |  50 ++++
 .../http/HttpProcessorConfiguration.json        |  28 +++
 .../http/HttpProviderConfiguration.json         |  18 ++
 .../streams-processor-http/README.md            |  16 --
 .../streams-processor-http/pom.xml              | 154 -------------
 .../components/http/HttpConfigurator.java       |  53 -----
 .../components/http/SimpleHTTPGetProcessor.java | 218 ------------------
 .../HttpProcessorConfiguration.json             |  47 ----
 14 files changed, 779 insertions(+), 489 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/pom.xml b/streams-components/pom.xml
index 26384b1..9942e14 100644
--- a/streams-components/pom.xml
+++ b/streams-components/pom.xml
@@ -37,7 +37,7 @@
     </properties>
 
     <modules>
-        <module>streams-processor-http</module>
+        <module>streams-http</module>
     </modules>
 
     <dependencyManagement>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/README.md
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/README.md 
b/streams-components/streams-http/README.md
new file mode 100644
index 0000000..62dd4c1
--- /dev/null
+++ b/streams-components/streams-http/README.md
@@ -0,0 +1,16 @@
+streams-processor-http
+=====================
+
+Hit an http endpoint and place the result in extensions
+
+Example SimpleHTTPGetProcessor configuration:
+
+    "http": {
+        "protocol": "http",
+        "hostname": "urls.api.twitter.com",
+        "port": 9300,
+        "resourceUri": "1/urls/count.json"
+    }
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/pom.xml 
b/streams-components/streams-http/pom.xml
new file mode 100644
index 0000000..9c2b079
--- /dev/null
+++ b/streams-components/streams-http/pom.xml
@@ -0,0 +1,159 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.streams</groupId>
+        <artifactId>streams-components</artifactId>
+        <version>0.1-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>streams-http</artifactId>
+
+    <name>streams-http</name>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.jsonschema2pojo</groupId>
+            <artifactId>jsonschema2pojo-core</artifactId>
+            <type>jar</type>
+            <scope>compile</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>javax.validation</groupId>
+                    <artifactId>validation-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo-extensions</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.3.5</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>1.8</version>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                
<source>target/generated-sources/jsonschema2pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>add-source-jaxb2</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/jaxb2</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.jsonschema2pojo</groupId>
+                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+                <configuration>
+                    <addCompileSourceRoot>true</addCompileSourceRoot>
+                    <generateBuilders>true</generateBuilders>
+                    <sourcePaths>
+                        <sourcePath>src/main/jsonschema</sourcePath>
+                    </sourcePaths>
+                    
<outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                    <targetPackage>org.apache.streams.http</targetPackage>
+                    <useLongIntegers>true</useLongIntegers>
+                    <useJodaDates>true</useJodaDates>
+                    <includeJsr303Annotations>true</includeJsr303Annotations>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
new file mode 100644
index 0000000..900831f
--- /dev/null
+++ 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.components.http;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Converts a {@link com.typesafe.config.Config} element into an instance of 
ElasticSearchConfiguration
+ */
+public class HttpConfigurator {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(HttpConfigurator.class);
+
+    private final static ObjectMapper mapper = new ObjectMapper();
+
+    public static HttpProviderConfiguration detectProviderConfiguration(Config 
config) {
+
+        HttpProviderConfiguration httpProviderConfiguration = null;
+
+        try {
+            httpProviderConfiguration = 
mapper.readValue(config.root().render(ConfigRenderOptions.concise()), 
HttpProviderConfiguration.class);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Could not parse http configuration", e.getMessage());
+        }
+        return httpProviderConfiguration;
+    }
+
+    public static HttpProcessorConfiguration 
detectProcessorConfiguration(Config config) {
+
+        HttpProcessorConfiguration httpProcessorConfiguration = null;
+
+        try {
+            httpProcessorConfiguration = 
mapper.readValue(config.root().render(ConfigRenderOptions.concise()), 
HttpProcessorConfiguration.class);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Could not parse http configuration", e.getMessage());
+        }
+        return httpProcessorConfiguration;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
new file mode 100644
index 0000000..c2bfef6
--- /dev/null
+++ 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
@@ -0,0 +1,230 @@
+package org.apache.streams.components.http.processor;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.streams.components.http.HttpConfigurator;
+import org.apache.streams.components.http.HttpProcessorConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ExtensionUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.Validation;
+import javax.validation.ValidatorFactory;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Processor retrieves contents from an known url and stores the resulting 
object in an extension field
+ */
+public class SimpleHTTPGetProcessor implements StreamsProcessor {
+
+    private final static String STREAMS_ID = "SimpleHTTPGetProcessor";
+
+    // from root config id
+    private final static String EXTENSION = "account_type";
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(SimpleHTTPGetProcessor.class);
+
+    protected ObjectMapper mapper;
+
+    protected URIBuilder uriBuilder;
+
+    protected CloseableHttpClient httpclient;
+
+    protected HttpProcessorConfiguration configuration;
+
+    protected String authHeader;
+//
+//    // authorized only
+//    //private PeoplePatternConfiguration peoplePatternConfiguration = null;
+//    //private String authHeader;
+//
+    public SimpleHTTPGetProcessor() {
+        
this(HttpConfigurator.detectProcessorConfiguration(StreamsConfigurator.config.getConfig("http")));
+    }
+
+    public SimpleHTTPGetProcessor(HttpProcessorConfiguration 
processorConfiguration) {
+        LOGGER.info("creating SimpleHTTPGetProcessor");
+        LOGGER.info(processorConfiguration.toString());
+        this.configuration = processorConfiguration;
+    }
+
+
+    /**
+     Override this to store a result other than exact json representation of 
response
+     */
+    protected ObjectNode prepareExtensionFragment(String entityString) {
+
+        try {
+            return mapper.readValue(entityString, ObjectNode.class);
+        } catch (IOException e) {
+            LOGGER.warn(e.getMessage());
+            return null;
+        }
+    }
+
+    /**
+     Override this to place result in non-standard location on document
+     */
+    protected ObjectNode getRootDocument(StreamsDatum datum) {
+
+        try {
+            String json = datum.getDocument() instanceof String ?
+                    (String) datum.getDocument() :
+                    mapper.writeValueAsString(datum.getDocument());
+            return mapper.readValue(json, ObjectNode.class);
+        } catch (JsonProcessingException e) {
+            LOGGER.warn(e.getMessage());
+            return null;
+        } catch (IOException e) {
+            LOGGER.warn(e.getMessage());
+            return null;
+        }
+
+    }
+        /**
+         Override this to place result in non-standard location on document
+         */
+    protected ObjectNode getEntityToExtend(ObjectNode rootDocument) {
+
+        if( 
this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
+            return rootDocument;
+        else
+            return (ObjectNode) 
rootDocument.get(this.configuration.getEntity().toString());
+
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        List<StreamsDatum> result = Lists.newArrayList();
+
+        ObjectNode rootDocument = getRootDocument(entry);
+
+        Map<String, String> params = prepareParams(entry);
+
+        URI uri;
+        for( Map.Entry<String,String> param : params.entrySet()) {
+            uriBuilder = uriBuilder.setParameter(param.getKey(), 
param.getValue());
+        }
+        try {
+            uri = uriBuilder.build();
+        } catch (URISyntaxException e) {
+            LOGGER.error("URI error {}", uriBuilder.toString());
+            return result;
+        }
+
+        HttpGet httpget = prepareHttpGet(uri);
+
+        CloseableHttpResponse response = null;
+
+        String entityString = null;
+        try {
+            response = httpclient.execute(httpget);
+            HttpEntity entity = response.getEntity();
+            // TODO: handle retry
+            if (response.getStatusLine().getStatusCode() == 200 && entity != 
null) {
+                entityString = EntityUtils.toString(entity);
+            }
+        } catch (IOException e) {
+            LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, 
e.getMessage());
+            return result;
+        } finally {
+            try {
+                response.close();
+            } catch (IOException e) {}
+            try {
+                httpclient.close();
+            } catch (IOException e) {}
+        }
+
+        if( entityString == null )
+            return result;
+
+        LOGGER.debug(entityString);
+
+        ObjectNode extensionFragment = prepareExtensionFragment(entityString);
+
+        ObjectNode extensionEntity = getEntityToExtend(rootDocument);
+
+        ExtensionUtil.ensureExtensions(extensionEntity);
+
+        ExtensionUtil.addExtension(extensionEntity, 
this.configuration.getExtension(), extensionFragment);
+
+        entry.setDocument(rootDocument);
+
+        result.add(entry);
+
+        return result;
+
+    }
+
+    /**
+     Override this to add parameters to the request
+     */
+    protected Map<String, String> prepareParams(StreamsDatum entry) {
+
+        return Maps.newHashMap();
+    }
+
+
+    public HttpGet prepareHttpGet(URI uri) {
+        HttpGet httpget = new HttpGet(uri);
+        httpget.addHeader("content-type", this.configuration.getContentType());
+        if( !Strings.isNullOrEmpty(authHeader))
+            httpget.addHeader("Authorization", String.format("Basic %s", 
authHeader));
+        return httpget;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+        ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
+        
Preconditions.checkArgument(factory.getValidator().validate(this.configuration, 
HttpProcessorConfiguration.class).size() == 0);
+
+        mapper = StreamsJacksonMapper.getInstance();
+
+        uriBuilder = new URIBuilder()
+            .setScheme(this.configuration.getProtocol())
+            .setHost(this.configuration.getHostname())
+            .setPath(this.configuration.getResourcePath());
+
+        if( !Strings.isNullOrEmpty(configuration.getAccessToken()) )
+            uriBuilder = uriBuilder.addParameter("access_token", 
configuration.getAccessToken());
+        if( !Strings.isNullOrEmpty(configuration.getUsername())
+            && !Strings.isNullOrEmpty(configuration.getPassword())) {
+            StringBuilder stringBuilder = new StringBuilder();
+            stringBuilder.append(configuration.getUsername());
+            stringBuilder.append(":");
+            stringBuilder.append(configuration.getPassword());
+            String string = stringBuilder.toString();
+            authHeader = Base64.encodeBase64String(string.getBytes());
+        }
+        httpclient = HttpClients.createDefault();
+    }
+
+    @Override
+    public void cleanUp() {
+        LOGGER.info("shutting down SimpleHTTPGetProcessor");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
new file mode 100644
index 0000000..622225a
--- /dev/null
+++ 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
@@ -0,0 +1,215 @@
+package org.apache.streams.components.http.provider;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.streams.components.http.HttpConfigurator;
+import org.apache.streams.components.http.HttpProviderConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Provider retrieves contents from an known set of urls and passes all 
resulting objects downstream
+ */
+public class SimpleHTTPGetProvider implements StreamsProvider {
+
+    private final static String STREAMS_ID = "SimpleHTTPGetProcessor";
+
+    // from root config id
+    private final static String EXTENSION = "account_type";
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(SimpleHTTPGetProvider.class);
+
+    protected ObjectMapper mapper;
+
+    protected URIBuilder uriBuilder;
+
+    protected CloseableHttpClient httpclient;
+
+    protected HttpProviderConfiguration configuration;
+
+    protected volatile Queue<StreamsDatum> providerQueue = new 
ConcurrentLinkedQueue<StreamsDatum>();
+
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    //    // authorized only
+//    //private PeoplePatternConfiguration peoplePatternConfiguration = null;
+//    //private String authHeader;
+//
+    public SimpleHTTPGetProvider() {
+        
this(HttpConfigurator.detectProviderConfiguration(StreamsConfigurator.config.getConfig("http")));
+    }
+
+    public SimpleHTTPGetProvider(HttpProviderConfiguration 
providerConfiguration) {
+        LOGGER.info("creating SimpleHTTPGetProvider");
+        LOGGER.info(providerConfiguration.toString());
+        this.configuration = providerConfiguration;
+    }
+
+    /**
+      Override this to add parameters to the request
+     */
+    protected Map<String, String> prepareParams(StreamsDatum entry) {
+
+        return Maps.newHashMap();
+    }
+
+    public HttpGet prepareHttpGet(URI uri) {
+        HttpGet httpget = new HttpGet(uri);
+        httpget.addHeader("content-type", this.configuration.getContentType());
+        return httpget;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+//        ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
+//        
Preconditions.checkArgument(factory.getValidator().validate(this.configuration, 
HttpProcessorConfiguration.class).size() == 0);
+
+        mapper = StreamsJacksonMapper.getInstance();
+
+        uriBuilder = new URIBuilder()
+            .setScheme(this.configuration.getProtocol())
+            .setHost(this.configuration.getHostname())
+            .setPath(this.configuration.getResourcePath());
+
+        httpclient = HttpClients.createDefault();
+    }
+
+    @Override
+    public void cleanUp() {
+
+        LOGGER.info("shutting down SimpleHTTPGetProvider");
+        try {
+            httpclient.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        } finally {
+            try {
+                httpclient.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            } finally {
+                httpclient = null;
+            }
+        }
+    }
+
+    @Override
+    public void startStream() {
+
+    }
+
+    @Override
+    public StreamsResultSet readCurrent() {
+        StreamsResultSet current;
+
+        uriBuilder = uriBuilder.setPath(
+            Joiner.on("/").skipNulls().join(uriBuilder.getPath(), 
configuration.getResource(), configuration.getResourcePostfix())
+        );
+
+        URI uri;
+        try {
+            uri = uriBuilder.build();
+        } catch (URISyntaxException e) {
+            uri = null;
+        }
+
+        List<ObjectNode> results = executeGet(uri);
+
+        lock.writeLock().lock();
+
+        for( ObjectNode item : results ) {
+            providerQueue.add(new StreamsDatum(item, item.get("id").asText(), 
new DateTime(item.get("timestamp").asText())));
+        }
+
+        LOGGER.debug("Creating new result set for {} items", 
providerQueue.size());
+        current = new StreamsResultSet(providerQueue);
+
+        return current;
+    }
+
+    protected List<ObjectNode> executeGet(URI uri) {
+
+        Preconditions.checkNotNull(uri);
+
+        List<ObjectNode> results = new ArrayList<>();
+
+        HttpGet httpget = prepareHttpGet(uri);
+
+        CloseableHttpResponse response = null;
+
+        String entityString = null;
+        try {
+            response = httpclient.execute(httpget);
+            HttpEntity entity = response.getEntity();
+            // TODO: handle retry
+            if (response.getStatusLine().getStatusCode() == 200 && entity != 
null) {
+                entityString = EntityUtils.toString(entity);
+                if( !entityString.equals("{}") && !entityString.equals("[]") ) 
{
+                    JsonNode jsonNode = mapper.readValue(entityString, 
JsonNode.class);
+                    if (jsonNode != null && jsonNode instanceof ObjectNode ) {
+
+                        results.add((ObjectNode) jsonNode);
+                    } else if (jsonNode != null && jsonNode instanceof 
ArrayNode) {
+                        ArrayNode arrayNode = (ArrayNode) jsonNode;
+                        Iterator<JsonNode> iterator = arrayNode.elements();
+                        while (iterator.hasNext()) {
+                            ObjectNode element = (ObjectNode) iterator.next();
+
+                            results.add(element);
+                        }
+                    }
+                }
+            }
+        } catch (IOException e) {
+            LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, 
e.getMessage());
+        } finally {
+            try {
+                response.close();
+            } catch (IOException e) {}
+        }
+        return results;
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json
 
b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json
new file mode 100644
index 0000000..b4dc243
--- /dev/null
+++ 
b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json
@@ -0,0 +1,50 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema";,
+    "id": "#",
+    "javaType" : "org.apache.streams.components.http.HttpConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "protocol": {
+            "type": "string",
+            "description": "Protocol",
+            "default": "http"
+        },
+        "hostname": {
+            "type": "string",
+            "description": "Hostname",
+            "required" : true
+        },
+        "port": {
+            "type": "integer",
+            "description": "Port",
+            "default": 80
+        },
+        "resourcePath": {
+            "type": "string",
+            "description": "Resource Path",
+            "required" : true
+        },
+        "content-type": {
+            "type": "string",
+            "description": "Resource content-type",
+            "required" : true,
+            "default": "application/json"
+        },
+        "access_token": {
+            "type": "string",
+            "description": "Known Access Token",
+            "required" : false
+        },
+        "username": {
+            "type": "string",
+            "description": "Basic Auth Username",
+            "required" : false
+        },
+        "password": {
+            "type": "string",
+            "description": "Basic Auth Password",
+            "required" : false
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProcessorConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProcessorConfiguration.json
 
b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProcessorConfiguration.json
new file mode 100644
index 0000000..32e4c23
--- /dev/null
+++ 
b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProcessorConfiguration.json
@@ -0,0 +1,28 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema";,
+    "id": "#",
+    "javaType" : 
"org.apache.streams.components.http.HttpProcessorConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "extends": { "$ref": "HttpConfiguration.json" },
+    "properties": {
+        "entity": {
+            "type": "string",
+            "description": "Entity to extend",
+            "enum": [ "activity", "actor", "object", "target" ],
+            "required" : true,
+            "default": "activity"
+        },
+        "extension": {
+            "type": "string",
+            "description": "Extension identifier",
+            "required" : true
+        },
+        "urlField": {
+            "type": "string",
+            "description": "Field where url is located",
+            "required" : true,
+            "default": "url"
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProviderConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProviderConfiguration.json
 
b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProviderConfiguration.json
new file mode 100644
index 0000000..2c135d9
--- /dev/null
+++ 
b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProviderConfiguration.json
@@ -0,0 +1,18 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema";,
+    "id": "#",
+    "javaType" : 
"org.apache.streams.components.http.HttpProviderConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "extends": { "$ref": "HttpConfiguration.json" },
+    "properties": {
+        "resource": {
+            "type": "string",
+            "required" : false
+        },
+        "resourcePostfix": {
+            "type": "string",
+            "required" : false
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-processor-http/README.md
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/README.md 
b/streams-components/streams-processor-http/README.md
deleted file mode 100644
index 62dd4c1..0000000
--- a/streams-components/streams-processor-http/README.md
+++ /dev/null
@@ -1,16 +0,0 @@
-streams-processor-http
-=====================
-
-Hit an http endpoint and place the result in extensions
-
-Example SimpleHTTPGetProcessor configuration:
-
-    "http": {
-        "protocol": "http",
-        "hostname": "urls.api.twitter.com",
-        "port": 9300,
-        "resourceUri": "1/urls/count.json"
-    }
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-processor-http/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/pom.xml 
b/streams-components/streams-processor-http/pom.xml
deleted file mode 100644
index d9215ad..0000000
--- a/streams-components/streams-processor-http/pom.xml
+++ /dev/null
@@ -1,154 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~
-  ~   http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing,
-  ~ software distributed under the License is distributed on an
-  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  ~ KIND, either express or implied.  See the License for the
-  ~ specific language governing permissions and limitations
-  ~ under the License.
-  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.streams</groupId>
-        <artifactId>streams-components</artifactId>
-        <version>0.1-SNAPSHOT</version>
-        <relativePath>../pom.xml</relativePath>
-    </parent>
-
-    <artifactId>streams-processor-http</artifactId>
-
-    <name>streams-processor-http</name>
-
-    <dependencies>
-
-        <dependency>
-            <groupId>org.jsonschema2pojo</groupId>
-            <artifactId>jsonschema2pojo-core</artifactId>
-            <type>jar</type>
-            <scope>compile</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-config</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.typesafe</groupId>
-            <artifactId>config</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-pojo</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-pojo-extensions</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.httpcomponents</groupId>
-            <artifactId>httpclient</artifactId>
-            <version>4.3.5</version>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>commons-io</groupId>
-            <artifactId>commons-io</artifactId>
-        </dependency>
-
-    </dependencies>
-
-    <build>
-        <sourceDirectory>src/main/java</sourceDirectory>
-        <testSourceDirectory>src/test/java</testSourceDirectory>
-        <resources>
-            <resource>
-                <directory>src/main/resources</directory>
-            </resource>
-        </resources>
-        <testResources>
-            <testResource>
-                <directory>src/test/resources</directory>
-            </testResource>
-        </testResources>
-        <plugins>
-            <plugin>
-                <groupId>org.codehaus.mojo</groupId>
-                <artifactId>build-helper-maven-plugin</artifactId>
-                <version>1.8</version>
-                <executions>
-                    <execution>
-                        <id>add-source</id>
-                        <phase>generate-sources</phase>
-                        <goals>
-                            <goal>add-source</goal>
-                        </goals>
-                        <configuration>
-                            <sources>
-                                
<source>target/generated-sources/jsonschema2pojo</source>
-                            </sources>
-                        </configuration>
-                    </execution>
-                    <execution>
-                        <id>add-source-jaxb2</id>
-                        <phase>generate-sources</phase>
-                        <goals>
-                            <goal>add-source</goal>
-                        </goals>
-                        <configuration>
-                            <sources>
-                                <source>target/generated-sources/jaxb2</source>
-                            </sources>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.jsonschema2pojo</groupId>
-                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
-                <configuration>
-                    <addCompileSourceRoot>true</addCompileSourceRoot>
-                    <generateBuilders>true</generateBuilders>
-                    <sourcePaths>
-                        <sourcePath>src/main/jsonschema</sourcePath>
-                    </sourcePaths>
-                    
<outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
-                    <targetPackage>org.apache.streams.http</targetPackage>
-                    <useLongIntegers>true</useLongIntegers>
-                    <useJodaDates>true</useJodaDates>
-                    <includeJsr303Annotations>true</includeJsr303Annotations>
-                </configuration>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>generate</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
 
b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
deleted file mode 100644
index 36801b8..0000000
--- 
a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.components.http;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Converts a {@link com.typesafe.config.Config} element into an instance of 
ElasticSearchConfiguration
- */
-public class HttpConfigurator {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(HttpConfigurator.class);
-
-    private final static ObjectMapper mapper = new ObjectMapper();
-
-    public static HttpProcessorConfiguration detectConfiguration(Config 
config) {
-
-        HttpProcessorConfiguration httpProcessorConfiguration = null;
-
-        try {
-            httpProcessorConfiguration = 
mapper.readValue(config.root().render(ConfigRenderOptions.concise()), 
HttpProcessorConfiguration.class);
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.warn("Could not parse http configuration", e.getMessage());
-        }
-        return httpProcessorConfiguration;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
 
b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
deleted file mode 100644
index d74793a..0000000
--- 
a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
+++ /dev/null
@@ -1,218 +0,0 @@
-package org.apache.streams.components.http;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValue;
-import org.apache.commons.io.IOUtils;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.data.util.ActivityUtil;
-import org.apache.streams.data.util.ExtensionUtil;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.Actor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.validation.Validation;
-import javax.validation.ValidatorFactory;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-public class SimpleHTTPGetProcessor implements StreamsProcessor {
-
-    private final static String STREAMS_ID = "SimpleHTTPGetProcessor";
-
-    // from root config id
-    private final static String EXTENSION = "account_type";
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(SimpleHTTPGetProcessor.class);
-
-    protected ObjectMapper mapper;
-
-    protected URIBuilder uriBuilder;
-
-    protected CloseableHttpClient httpclient;
-
-    protected HttpProcessorConfiguration configuration;
-//
-//    // authorized only
-//    //private PeoplePatternConfiguration peoplePatternConfiguration = null;
-//    //private String authHeader;
-//
-    public SimpleHTTPGetProcessor() {
-        LOGGER.info("creating SimpleHTTPGetProcessor");
-        this.configuration = 
HttpConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("http"));
-    }
-
-    public SimpleHTTPGetProcessor(HttpProcessorConfiguration 
processorConfiguration) {
-        LOGGER.info("creating SimpleHTTPGetProcessor");
-        LOGGER.info(processorConfiguration.toString());
-        this.configuration = processorConfiguration;
-    }
-
-    /**
-      Override this to add parameters to the request
-     */
-    protected Map<String, String> prepareParams(StreamsDatum entry) {
-
-        return Maps.newHashMap();
-    }
-
-    /**
-     Override this to store a result other than exact json representation of 
response
-     */
-    protected ObjectNode prepareExtensionFragment(String entityString) {
-
-        try {
-            return mapper.readValue(entityString, ObjectNode.class);
-        } catch (IOException e) {
-            LOGGER.warn(e.getMessage());
-            return null;
-        }
-    }
-
-    /**
-     Override this to place result in non-standard location on document
-     */
-    protected ObjectNode getRootDocument(StreamsDatum datum) {
-
-        try {
-            String json = datum.getDocument() instanceof String ?
-                    (String) datum.getDocument() :
-                    mapper.writeValueAsString(datum.getDocument());
-            return mapper.readValue(json, ObjectNode.class);
-        } catch (JsonProcessingException e) {
-            LOGGER.warn(e.getMessage());
-            return null;
-        } catch (IOException e) {
-            LOGGER.warn(e.getMessage());
-            return null;
-        }
-
-    }
-        /**
-         Override this to place result in non-standard location on document
-         */
-    protected ObjectNode getEntityToExtend(ObjectNode rootDocument) {
-
-        if( 
this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
-            return rootDocument;
-        else
-            return (ObjectNode) 
rootDocument.get(this.configuration.getEntity().toString());
-
-    }
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-
-        List<StreamsDatum> result = Lists.newArrayList();
-
-        ObjectNode rootDocument = getRootDocument(entry);
-
-        Map<String, String> params = prepareParams(entry);
-
-        URI uri;
-        for( Map.Entry<String,String> param : params.entrySet()) {
-            uriBuilder = uriBuilder.setParameter(param.getKey(), 
param.getValue());
-        }
-        try {
-            uri = uriBuilder.build();
-        } catch (URISyntaxException e) {
-            LOGGER.error("URI error {}", uriBuilder.toString());
-            return result;
-        }
-
-        HttpGet httpget = prepareHttpGet(uri);
-
-        CloseableHttpResponse response = null;
-
-        String entityString = null;
-        try {
-            response = httpclient.execute(httpget);
-            HttpEntity entity = response.getEntity();
-            // TODO: handle retry
-            if (response.getStatusLine().getStatusCode() == 200 && entity != 
null) {
-                entityString = EntityUtils.toString(entity);
-            }
-        } catch (IOException e) {
-            LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, 
e.getMessage());
-            return result;
-        } finally {
-            try {
-                response.close();
-            } catch (IOException e) {}
-            try {
-                httpclient.close();
-            } catch (IOException e) {}
-        }
-
-        if( entityString == null )
-            return result;
-
-        LOGGER.debug(entityString);
-
-        ObjectNode extensionFragment = prepareExtensionFragment(entityString);
-
-        ObjectNode extensionEntity = getEntityToExtend(rootDocument);
-
-        ExtensionUtil.ensureExtensions(extensionEntity);
-
-        ExtensionUtil.addExtension(extensionEntity, 
this.configuration.getExtension(), extensionFragment);
-
-        entry.setDocument(rootDocument);
-
-        result.add(entry);
-
-        return result;
-
-    }
-
-    public HttpGet prepareHttpGet(URI uri) {
-        HttpGet httpget = new HttpGet(uri);
-        httpget.addHeader("content-type", this.configuration.getContentType());
-        return httpget;
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-
-        ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
-        
Preconditions.checkArgument(factory.getValidator().validate(this.configuration, 
HttpProcessorConfiguration.class).size() == 0);
-
-        mapper = StreamsJacksonMapper.getInstance();
-
-        uriBuilder = new URIBuilder()
-            .setScheme(this.configuration.getProtocol())
-            .setHost(this.configuration.getHostname())
-            .setPath(this.configuration.getResourceUri());
-
-        httpclient = HttpClients.createDefault();
-    }
-
-    @Override
-    public void cleanUp() {
-        LOGGER.info("shutting down SimpleHTTPGetProcessor");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json
 
b/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json
deleted file mode 100644
index 40c3bcd..0000000
--- 
a/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json
+++ /dev/null
@@ -1,47 +0,0 @@
-{
-    "type": "object",
-    "$schema": "http://json-schema.org/draft-03/schema";,
-    "id": "#",
-    "javaType" : 
"org.apache.streams.components.http.HttpProcessorConfiguration",
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "protocol": {
-            "type": "string",
-            "description": "Protocol",
-            "default": "http"
-        },
-        "hostname": {
-            "type": "string",
-            "description": "Hostname",
-            "required" : true
-        },
-        "port": {
-            "type": "integer",
-            "description": "Port",
-            "default": 80
-        },
-        "resourceUri": {
-            "type": "string",
-            "description": "Resource URI",
-            "required" : true
-        },
-        "content-type": {
-            "type": "string",
-            "description": "Resource URI",
-            "required" : true,
-            "default": "application/json"
-        },
-        "entity": {
-            "type": "string",
-            "description": "Entity to extend",
-            "enum": [ "activity", "actor", "object", "target" ],
-            "required" : true,
-            "default": "activity"
-        },
-        "extension": {
-            "type": "string",
-            "description": "Extension identifier",
-            "required" : true
-        }
-    }
-}
\ No newline at end of file

Reply via email to