Repository: incubator-streams Updated Branches: refs/heads/STREAMS-168 a57646091 -> ad5f90cc1
introduced a SimpleHTTPGetProvider reorganized configuration added access_token support added http basic auth support Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d62061de Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d62061de Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d62061de Branch: refs/heads/STREAMS-168 Commit: d62061dec8628484e66d3494e02f1f65efafda41 Parents: a576460 Author: Steve Blackmon <sblack...@w2odigital.com> Authored: Sun Oct 12 17:49:01 2014 -0500 Committer: Steve Blackmon <sblack...@w2odigital.com> Committed: Sun Oct 12 17:49:01 2014 -0500 ---------------------------------------------------------------------- streams-components/pom.xml | 2 +- streams-components/streams-http/README.md | 16 ++ streams-components/streams-http/pom.xml | 159 +++++++++++++ .../components/http/HttpConfigurator.java | 62 +++++ .../http/processor/SimpleHTTPGetProcessor.java | 230 +++++++++++++++++++ .../http/provider/SimpleHTTPGetProvider.java | 215 +++++++++++++++++ .../components/http/HttpConfiguration.json | 50 ++++ .../http/HttpProcessorConfiguration.json | 28 +++ .../http/HttpProviderConfiguration.json | 18 ++ .../streams-processor-http/README.md | 16 -- .../streams-processor-http/pom.xml | 154 ------------- .../components/http/HttpConfigurator.java | 53 ----- .../components/http/SimpleHTTPGetProcessor.java | 218 ------------------ .../HttpProcessorConfiguration.json | 47 ---- 14 files changed, 779 insertions(+), 489 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/pom.xml ---------------------------------------------------------------------- diff --git a/streams-components/pom.xml b/streams-components/pom.xml index 26384b1..9942e14 100644 --- a/streams-components/pom.xml +++ b/streams-components/pom.xml @@ -37,7 +37,7 @@ </properties> <modules> - <module>streams-processor-http</module> + <module>streams-http</module> </modules> <dependencyManagement> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/README.md ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/README.md b/streams-components/streams-http/README.md new file mode 100644 index 0000000..62dd4c1 --- /dev/null +++ b/streams-components/streams-http/README.md @@ -0,0 +1,16 @@ +streams-processor-http +===================== + +Hit an http endpoint and place the result in extensions + +Example SimpleHTTPGetProcessor configuration: + + "http": { + "protocol": "http", + "hostname": "urls.api.twitter.com", + "port": 9300, + "resourceUri": "1/urls/count.json" + } + + + http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/pom.xml ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/pom.xml b/streams-components/streams-http/pom.xml new file mode 100644 index 0000000..9c2b079 --- /dev/null +++ b/streams-components/streams-http/pom.xml @@ -0,0 +1,159 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.streams</groupId> + <artifactId>streams-components</artifactId> + <version>0.1-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>streams-http</artifactId> + + <name>streams-http</name> + + <dependencies> + + <dependency> + <groupId>org.jsonschema2pojo</groupId> + <artifactId>jsonschema2pojo-core</artifactId> + <type>jar</type> + <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>javax.validation</groupId> + <artifactId>validation-api</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-config</artifactId> + </dependency> + <dependency> + <groupId>com.typesafe</groupId> + <artifactId>config</artifactId> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-pojo</artifactId> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-pojo-extensions</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.3.5</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + </dependencies> + + <build> + <sourceDirectory>src/main/java</sourceDirectory> + <testSourceDirectory>src/test/java</testSourceDirectory> + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + </resources> + <testResources> + <testResource> + <directory>src/test/resources</directory> + </testResource> + </testResources> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.8</version> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>target/generated-sources/jsonschema2pojo</source> + </sources> + </configuration> + </execution> + <execution> + <id>add-source-jaxb2</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>target/generated-sources/jaxb2</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.jsonschema2pojo</groupId> + <artifactId>jsonschema2pojo-maven-plugin</artifactId> + <configuration> + <addCompileSourceRoot>true</addCompileSourceRoot> + <generateBuilders>true</generateBuilders> + <sourcePaths> + <sourcePath>src/main/jsonschema</sourcePath> + </sourcePaths> + <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory> + <targetPackage>org.apache.streams.http</targetPackage> + <useLongIntegers>true</useLongIntegers> + <useJodaDates>true</useJodaDates> + <includeJsr303Annotations>true</includeJsr303Annotations> + </configuration> + <executions> + <execution> + <goals> + <goal>generate</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java new file mode 100644 index 0000000..900831f --- /dev/null +++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.components.http; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigRenderOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration + */ +public class HttpConfigurator { + + private final static Logger LOGGER = LoggerFactory.getLogger(HttpConfigurator.class); + + private final static ObjectMapper mapper = new ObjectMapper(); + + public static HttpProviderConfiguration detectProviderConfiguration(Config config) { + + HttpProviderConfiguration httpProviderConfiguration = null; + + try { + httpProviderConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), HttpProviderConfiguration.class); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.warn("Could not parse http configuration", e.getMessage()); + } + return httpProviderConfiguration; + } + + public static HttpProcessorConfiguration detectProcessorConfiguration(Config config) { + + HttpProcessorConfiguration httpProcessorConfiguration = null; + + try { + httpProcessorConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), HttpProcessorConfiguration.class); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.warn("Could not parse http configuration", e.getMessage()); + } + return httpProcessorConfiguration; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java new file mode 100644 index 0000000..c2bfef6 --- /dev/null +++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java @@ -0,0 +1,230 @@ +package org.apache.streams.components.http.processor; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.codec.binary.Base64; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.apache.streams.components.http.HttpConfigurator; +import org.apache.streams.components.http.HttpProcessorConfiguration; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.data.util.ExtensionUtil; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.validation.Validation; +import javax.validation.ValidatorFactory; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Map; + +/** + * Processor retrieves contents from an known url and stores the resulting object in an extension field + */ +public class SimpleHTTPGetProcessor implements StreamsProcessor { + + private final static String STREAMS_ID = "SimpleHTTPGetProcessor"; + + // from root config id + private final static String EXTENSION = "account_type"; + + private final static Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPGetProcessor.class); + + protected ObjectMapper mapper; + + protected URIBuilder uriBuilder; + + protected CloseableHttpClient httpclient; + + protected HttpProcessorConfiguration configuration; + + protected String authHeader; +// +// // authorized only +// //private PeoplePatternConfiguration peoplePatternConfiguration = null; +// //private String authHeader; +// + public SimpleHTTPGetProcessor() { + this(HttpConfigurator.detectProcessorConfiguration(StreamsConfigurator.config.getConfig("http"))); + } + + public SimpleHTTPGetProcessor(HttpProcessorConfiguration processorConfiguration) { + LOGGER.info("creating SimpleHTTPGetProcessor"); + LOGGER.info(processorConfiguration.toString()); + this.configuration = processorConfiguration; + } + + + /** + Override this to store a result other than exact json representation of response + */ + protected ObjectNode prepareExtensionFragment(String entityString) { + + try { + return mapper.readValue(entityString, ObjectNode.class); + } catch (IOException e) { + LOGGER.warn(e.getMessage()); + return null; + } + } + + /** + Override this to place result in non-standard location on document + */ + protected ObjectNode getRootDocument(StreamsDatum datum) { + + try { + String json = datum.getDocument() instanceof String ? + (String) datum.getDocument() : + mapper.writeValueAsString(datum.getDocument()); + return mapper.readValue(json, ObjectNode.class); + } catch (JsonProcessingException e) { + LOGGER.warn(e.getMessage()); + return null; + } catch (IOException e) { + LOGGER.warn(e.getMessage()); + return null; + } + + } + /** + Override this to place result in non-standard location on document + */ + protected ObjectNode getEntityToExtend(ObjectNode rootDocument) { + + if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY)) + return rootDocument; + else + return (ObjectNode) rootDocument.get(this.configuration.getEntity().toString()); + + } + + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + + List<StreamsDatum> result = Lists.newArrayList(); + + ObjectNode rootDocument = getRootDocument(entry); + + Map<String, String> params = prepareParams(entry); + + URI uri; + for( Map.Entry<String,String> param : params.entrySet()) { + uriBuilder = uriBuilder.setParameter(param.getKey(), param.getValue()); + } + try { + uri = uriBuilder.build(); + } catch (URISyntaxException e) { + LOGGER.error("URI error {}", uriBuilder.toString()); + return result; + } + + HttpGet httpget = prepareHttpGet(uri); + + CloseableHttpResponse response = null; + + String entityString = null; + try { + response = httpclient.execute(httpget); + HttpEntity entity = response.getEntity(); + // TODO: handle retry + if (response.getStatusLine().getStatusCode() == 200 && entity != null) { + entityString = EntityUtils.toString(entity); + } + } catch (IOException e) { + LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, e.getMessage()); + return result; + } finally { + try { + response.close(); + } catch (IOException e) {} + try { + httpclient.close(); + } catch (IOException e) {} + } + + if( entityString == null ) + return result; + + LOGGER.debug(entityString); + + ObjectNode extensionFragment = prepareExtensionFragment(entityString); + + ObjectNode extensionEntity = getEntityToExtend(rootDocument); + + ExtensionUtil.ensureExtensions(extensionEntity); + + ExtensionUtil.addExtension(extensionEntity, this.configuration.getExtension(), extensionFragment); + + entry.setDocument(rootDocument); + + result.add(entry); + + return result; + + } + + /** + Override this to add parameters to the request + */ + protected Map<String, String> prepareParams(StreamsDatum entry) { + + return Maps.newHashMap(); + } + + + public HttpGet prepareHttpGet(URI uri) { + HttpGet httpget = new HttpGet(uri); + httpget.addHeader("content-type", this.configuration.getContentType()); + if( !Strings.isNullOrEmpty(authHeader)) + httpget.addHeader("Authorization", String.format("Basic %s", authHeader)); + return httpget; + } + + @Override + public void prepare(Object configurationObject) { + + ValidatorFactory factory = Validation.buildDefaultValidatorFactory(); + Preconditions.checkArgument(factory.getValidator().validate(this.configuration, HttpProcessorConfiguration.class).size() == 0); + + mapper = StreamsJacksonMapper.getInstance(); + + uriBuilder = new URIBuilder() + .setScheme(this.configuration.getProtocol()) + .setHost(this.configuration.getHostname()) + .setPath(this.configuration.getResourcePath()); + + if( !Strings.isNullOrEmpty(configuration.getAccessToken()) ) + uriBuilder = uriBuilder.addParameter("access_token", configuration.getAccessToken()); + if( !Strings.isNullOrEmpty(configuration.getUsername()) + && !Strings.isNullOrEmpty(configuration.getPassword())) { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(configuration.getUsername()); + stringBuilder.append(":"); + stringBuilder.append(configuration.getPassword()); + String string = stringBuilder.toString(); + authHeader = Base64.encodeBase64String(string.getBytes()); + } + httpclient = HttpClients.createDefault(); + } + + @Override + public void cleanUp() { + LOGGER.info("shutting down SimpleHTTPGetProcessor"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java new file mode 100644 index 0000000..622225a --- /dev/null +++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java @@ -0,0 +1,215 @@ +package org.apache.streams.components.http.provider; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.apache.streams.components.http.HttpConfigurator; +import org.apache.streams.components.http.HttpProviderConfiguration; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.math.BigInteger; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Provider retrieves contents from an known set of urls and passes all resulting objects downstream + */ +public class SimpleHTTPGetProvider implements StreamsProvider { + + private final static String STREAMS_ID = "SimpleHTTPGetProcessor"; + + // from root config id + private final static String EXTENSION = "account_type"; + + private final static Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPGetProvider.class); + + protected ObjectMapper mapper; + + protected URIBuilder uriBuilder; + + protected CloseableHttpClient httpclient; + + protected HttpProviderConfiguration configuration; + + protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + // // authorized only +// //private PeoplePatternConfiguration peoplePatternConfiguration = null; +// //private String authHeader; +// + public SimpleHTTPGetProvider() { + this(HttpConfigurator.detectProviderConfiguration(StreamsConfigurator.config.getConfig("http"))); + } + + public SimpleHTTPGetProvider(HttpProviderConfiguration providerConfiguration) { + LOGGER.info("creating SimpleHTTPGetProvider"); + LOGGER.info(providerConfiguration.toString()); + this.configuration = providerConfiguration; + } + + /** + Override this to add parameters to the request + */ + protected Map<String, String> prepareParams(StreamsDatum entry) { + + return Maps.newHashMap(); + } + + public HttpGet prepareHttpGet(URI uri) { + HttpGet httpget = new HttpGet(uri); + httpget.addHeader("content-type", this.configuration.getContentType()); + return httpget; + } + + @Override + public void prepare(Object configurationObject) { + +// ValidatorFactory factory = Validation.buildDefaultValidatorFactory(); +// Preconditions.checkArgument(factory.getValidator().validate(this.configuration, HttpProcessorConfiguration.class).size() == 0); + + mapper = StreamsJacksonMapper.getInstance(); + + uriBuilder = new URIBuilder() + .setScheme(this.configuration.getProtocol()) + .setHost(this.configuration.getHostname()) + .setPath(this.configuration.getResourcePath()); + + httpclient = HttpClients.createDefault(); + } + + @Override + public void cleanUp() { + + LOGGER.info("shutting down SimpleHTTPGetProvider"); + try { + httpclient.close(); + } catch (IOException e) { + e.printStackTrace(); + } finally { + try { + httpclient.close(); + } catch (IOException e) { + e.printStackTrace(); + } finally { + httpclient = null; + } + } + } + + @Override + public void startStream() { + + } + + @Override + public StreamsResultSet readCurrent() { + StreamsResultSet current; + + uriBuilder = uriBuilder.setPath( + Joiner.on("/").skipNulls().join(uriBuilder.getPath(), configuration.getResource(), configuration.getResourcePostfix()) + ); + + URI uri; + try { + uri = uriBuilder.build(); + } catch (URISyntaxException e) { + uri = null; + } + + List<ObjectNode> results = executeGet(uri); + + lock.writeLock().lock(); + + for( ObjectNode item : results ) { + providerQueue.add(new StreamsDatum(item, item.get("id").asText(), new DateTime(item.get("timestamp").asText()))); + } + + LOGGER.debug("Creating new result set for {} items", providerQueue.size()); + current = new StreamsResultSet(providerQueue); + + return current; + } + + protected List<ObjectNode> executeGet(URI uri) { + + Preconditions.checkNotNull(uri); + + List<ObjectNode> results = new ArrayList<>(); + + HttpGet httpget = prepareHttpGet(uri); + + CloseableHttpResponse response = null; + + String entityString = null; + try { + response = httpclient.execute(httpget); + HttpEntity entity = response.getEntity(); + // TODO: handle retry + if (response.getStatusLine().getStatusCode() == 200 && entity != null) { + entityString = EntityUtils.toString(entity); + if( !entityString.equals("{}") && !entityString.equals("[]") ) { + JsonNode jsonNode = mapper.readValue(entityString, JsonNode.class); + if (jsonNode != null && jsonNode instanceof ObjectNode ) { + + results.add((ObjectNode) jsonNode); + } else if (jsonNode != null && jsonNode instanceof ArrayNode) { + ArrayNode arrayNode = (ArrayNode) jsonNode; + Iterator<JsonNode> iterator = arrayNode.elements(); + while (iterator.hasNext()) { + ObjectNode element = (ObjectNode) iterator.next(); + + results.add(element); + } + } + } + } + } catch (IOException e) { + LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, e.getMessage()); + } finally { + try { + response.close(); + } catch (IOException e) {} + } + return results; + } + + @Override + public StreamsResultSet readNew(BigInteger sequence) { + return null; + } + + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + return null; + } + + @Override + public boolean isRunning() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json new file mode 100644 index 0000000..b4dc243 --- /dev/null +++ b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json @@ -0,0 +1,50 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.components.http.HttpConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "protocol": { + "type": "string", + "description": "Protocol", + "default": "http" + }, + "hostname": { + "type": "string", + "description": "Hostname", + "required" : true + }, + "port": { + "type": "integer", + "description": "Port", + "default": 80 + }, + "resourcePath": { + "type": "string", + "description": "Resource Path", + "required" : true + }, + "content-type": { + "type": "string", + "description": "Resource content-type", + "required" : true, + "default": "application/json" + }, + "access_token": { + "type": "string", + "description": "Known Access Token", + "required" : false + }, + "username": { + "type": "string", + "description": "Basic Auth Username", + "required" : false + }, + "password": { + "type": "string", + "description": "Basic Auth Password", + "required" : false + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProcessorConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProcessorConfiguration.json b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProcessorConfiguration.json new file mode 100644 index 0000000..32e4c23 --- /dev/null +++ b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProcessorConfiguration.json @@ -0,0 +1,28 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.components.http.HttpProcessorConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "extends": { "$ref": "HttpConfiguration.json" }, + "properties": { + "entity": { + "type": "string", + "description": "Entity to extend", + "enum": [ "activity", "actor", "object", "target" ], + "required" : true, + "default": "activity" + }, + "extension": { + "type": "string", + "description": "Extension identifier", + "required" : true + }, + "urlField": { + "type": "string", + "description": "Field where url is located", + "required" : true, + "default": "url" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProviderConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProviderConfiguration.json b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProviderConfiguration.json new file mode 100644 index 0000000..2c135d9 --- /dev/null +++ b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProviderConfiguration.json @@ -0,0 +1,18 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.components.http.HttpProviderConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "extends": { "$ref": "HttpConfiguration.json" }, + "properties": { + "resource": { + "type": "string", + "required" : false + }, + "resourcePostfix": { + "type": "string", + "required" : false + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-processor-http/README.md ---------------------------------------------------------------------- diff --git a/streams-components/streams-processor-http/README.md b/streams-components/streams-processor-http/README.md deleted file mode 100644 index 62dd4c1..0000000 --- a/streams-components/streams-processor-http/README.md +++ /dev/null @@ -1,16 +0,0 @@ -streams-processor-http -===================== - -Hit an http endpoint and place the result in extensions - -Example SimpleHTTPGetProcessor configuration: - - "http": { - "protocol": "http", - "hostname": "urls.api.twitter.com", - "port": 9300, - "resourceUri": "1/urls/count.json" - } - - - http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-processor-http/pom.xml ---------------------------------------------------------------------- diff --git a/streams-components/streams-processor-http/pom.xml b/streams-components/streams-processor-http/pom.xml deleted file mode 100644 index d9215ad..0000000 --- a/streams-components/streams-processor-http/pom.xml +++ /dev/null @@ -1,154 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, - ~ software distributed under the License is distributed on an - ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - ~ KIND, either express or implied. See the License for the - ~ specific language governing permissions and limitations - ~ under the License. - --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.streams</groupId> - <artifactId>streams-components</artifactId> - <version>0.1-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <artifactId>streams-processor-http</artifactId> - - <name>streams-processor-http</name> - - <dependencies> - - <dependency> - <groupId>org.jsonschema2pojo</groupId> - <artifactId>jsonschema2pojo-core</artifactId> - <type>jar</type> - <scope>compile</scope> - </dependency> - - <dependency> - <groupId>org.apache.streams</groupId> - <artifactId>streams-config</artifactId> - </dependency> - <dependency> - <groupId>com.typesafe</groupId> - <artifactId>config</artifactId> - </dependency> - <dependency> - <groupId>org.apache.streams</groupId> - <artifactId>streams-core</artifactId> - </dependency> - <dependency> - <groupId>org.apache.streams</groupId> - <artifactId>streams-pojo</artifactId> - </dependency> - <dependency> - <groupId>org.apache.streams</groupId> - <artifactId>streams-pojo-extensions</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - </dependency> - <dependency> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient</artifactId> - <version>4.3.5</version> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - </dependency> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - </dependency> - - </dependencies> - - <build> - <sourceDirectory>src/main/java</sourceDirectory> - <testSourceDirectory>src/test/java</testSourceDirectory> - <resources> - <resource> - <directory>src/main/resources</directory> - </resource> - </resources> - <testResources> - <testResource> - <directory>src/test/resources</directory> - </testResource> - </testResources> - <plugins> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.8</version> - <executions> - <execution> - <id>add-source</id> - <phase>generate-sources</phase> - <goals> - <goal>add-source</goal> - </goals> - <configuration> - <sources> - <source>target/generated-sources/jsonschema2pojo</source> - </sources> - </configuration> - </execution> - <execution> - <id>add-source-jaxb2</id> - <phase>generate-sources</phase> - <goals> - <goal>add-source</goal> - </goals> - <configuration> - <sources> - <source>target/generated-sources/jaxb2</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.jsonschema2pojo</groupId> - <artifactId>jsonschema2pojo-maven-plugin</artifactId> - <configuration> - <addCompileSourceRoot>true</addCompileSourceRoot> - <generateBuilders>true</generateBuilders> - <sourcePaths> - <sourcePath>src/main/jsonschema</sourcePath> - </sourcePaths> - <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory> - <targetPackage>org.apache.streams.http</targetPackage> - <useLongIntegers>true</useLongIntegers> - <useJodaDates>true</useJodaDates> - <includeJsr303Annotations>true</includeJsr303Annotations> - </configuration> - <executions> - <execution> - <goals> - <goal>generate</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java deleted file mode 100644 index 36801b8..0000000 --- a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.components.http; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigRenderOptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -/** - * Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration - */ -public class HttpConfigurator { - - private final static Logger LOGGER = LoggerFactory.getLogger(HttpConfigurator.class); - - private final static ObjectMapper mapper = new ObjectMapper(); - - public static HttpProcessorConfiguration detectConfiguration(Config config) { - - HttpProcessorConfiguration httpProcessorConfiguration = null; - - try { - httpProcessorConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), HttpProcessorConfiguration.class); - } catch (Exception e) { - e.printStackTrace(); - LOGGER.warn("Could not parse http configuration", e.getMessage()); - } - return httpProcessorConfiguration; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java deleted file mode 100644 index d74793a..0000000 --- a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java +++ /dev/null @@ -1,218 +0,0 @@ -package org.apache.streams.components.http; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigValue; -import org.apache.commons.io.IOUtils; -import org.apache.http.HttpEntity; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpUriRequest; -import org.apache.http.client.utils.URIBuilder; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.DefaultHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.data.util.ActivityUtil; -import org.apache.streams.data.util.ExtensionUtil; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.pojo.json.Actor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.validation.Validation; -import javax.validation.ValidatorFactory; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -public class SimpleHTTPGetProcessor implements StreamsProcessor { - - private final static String STREAMS_ID = "SimpleHTTPGetProcessor"; - - // from root config id - private final static String EXTENSION = "account_type"; - - private final static Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPGetProcessor.class); - - protected ObjectMapper mapper; - - protected URIBuilder uriBuilder; - - protected CloseableHttpClient httpclient; - - protected HttpProcessorConfiguration configuration; -// -// // authorized only -// //private PeoplePatternConfiguration peoplePatternConfiguration = null; -// //private String authHeader; -// - public SimpleHTTPGetProcessor() { - LOGGER.info("creating SimpleHTTPGetProcessor"); - this.configuration = HttpConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("http")); - } - - public SimpleHTTPGetProcessor(HttpProcessorConfiguration processorConfiguration) { - LOGGER.info("creating SimpleHTTPGetProcessor"); - LOGGER.info(processorConfiguration.toString()); - this.configuration = processorConfiguration; - } - - /** - Override this to add parameters to the request - */ - protected Map<String, String> prepareParams(StreamsDatum entry) { - - return Maps.newHashMap(); - } - - /** - Override this to store a result other than exact json representation of response - */ - protected ObjectNode prepareExtensionFragment(String entityString) { - - try { - return mapper.readValue(entityString, ObjectNode.class); - } catch (IOException e) { - LOGGER.warn(e.getMessage()); - return null; - } - } - - /** - Override this to place result in non-standard location on document - */ - protected ObjectNode getRootDocument(StreamsDatum datum) { - - try { - String json = datum.getDocument() instanceof String ? - (String) datum.getDocument() : - mapper.writeValueAsString(datum.getDocument()); - return mapper.readValue(json, ObjectNode.class); - } catch (JsonProcessingException e) { - LOGGER.warn(e.getMessage()); - return null; - } catch (IOException e) { - LOGGER.warn(e.getMessage()); - return null; - } - - } - /** - Override this to place result in non-standard location on document - */ - protected ObjectNode getEntityToExtend(ObjectNode rootDocument) { - - if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY)) - return rootDocument; - else - return (ObjectNode) rootDocument.get(this.configuration.getEntity().toString()); - - } - - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - - List<StreamsDatum> result = Lists.newArrayList(); - - ObjectNode rootDocument = getRootDocument(entry); - - Map<String, String> params = prepareParams(entry); - - URI uri; - for( Map.Entry<String,String> param : params.entrySet()) { - uriBuilder = uriBuilder.setParameter(param.getKey(), param.getValue()); - } - try { - uri = uriBuilder.build(); - } catch (URISyntaxException e) { - LOGGER.error("URI error {}", uriBuilder.toString()); - return result; - } - - HttpGet httpget = prepareHttpGet(uri); - - CloseableHttpResponse response = null; - - String entityString = null; - try { - response = httpclient.execute(httpget); - HttpEntity entity = response.getEntity(); - // TODO: handle retry - if (response.getStatusLine().getStatusCode() == 200 && entity != null) { - entityString = EntityUtils.toString(entity); - } - } catch (IOException e) { - LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, e.getMessage()); - return result; - } finally { - try { - response.close(); - } catch (IOException e) {} - try { - httpclient.close(); - } catch (IOException e) {} - } - - if( entityString == null ) - return result; - - LOGGER.debug(entityString); - - ObjectNode extensionFragment = prepareExtensionFragment(entityString); - - ObjectNode extensionEntity = getEntityToExtend(rootDocument); - - ExtensionUtil.ensureExtensions(extensionEntity); - - ExtensionUtil.addExtension(extensionEntity, this.configuration.getExtension(), extensionFragment); - - entry.setDocument(rootDocument); - - result.add(entry); - - return result; - - } - - public HttpGet prepareHttpGet(URI uri) { - HttpGet httpget = new HttpGet(uri); - httpget.addHeader("content-type", this.configuration.getContentType()); - return httpget; - } - - @Override - public void prepare(Object configurationObject) { - - ValidatorFactory factory = Validation.buildDefaultValidatorFactory(); - Preconditions.checkArgument(factory.getValidator().validate(this.configuration, HttpProcessorConfiguration.class).size() == 0); - - mapper = StreamsJacksonMapper.getInstance(); - - uriBuilder = new URIBuilder() - .setScheme(this.configuration.getProtocol()) - .setHost(this.configuration.getHostname()) - .setPath(this.configuration.getResourceUri()); - - httpclient = HttpClients.createDefault(); - } - - @Override - public void cleanUp() { - LOGGER.info("shutting down SimpleHTTPGetProcessor"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json b/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json deleted file mode 100644 index 40c3bcd..0000000 --- a/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json +++ /dev/null @@ -1,47 +0,0 @@ -{ - "type": "object", - "$schema": "http://json-schema.org/draft-03/schema", - "id": "#", - "javaType" : "org.apache.streams.components.http.HttpProcessorConfiguration", - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "protocol": { - "type": "string", - "description": "Protocol", - "default": "http" - }, - "hostname": { - "type": "string", - "description": "Hostname", - "required" : true - }, - "port": { - "type": "integer", - "description": "Port", - "default": 80 - }, - "resourceUri": { - "type": "string", - "description": "Resource URI", - "required" : true - }, - "content-type": { - "type": "string", - "description": "Resource URI", - "required" : true, - "default": "application/json" - }, - "entity": { - "type": "string", - "description": "Entity to extend", - "enum": [ "activity", "actor", "object", "target" ], - "required" : true, - "default": "activity" - }, - "extension": { - "type": "string", - "description": "Extension identifier", - "required" : true - } - } -} \ No newline at end of file