http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java index 5868ba6..871a08b 100644 --- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java +++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java @@ -18,10 +18,20 @@ package org.apache.streams.components.http.processor; +import org.apache.streams.components.http.HttpProcessorConfiguration; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.extensions.ExtensionUtil; +import org.apache.streams.pojo.json.ActivityObject; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Strings; + import org.apache.commons.codec.binary.Base64; import org.apache.http.HttpEntity; import org.apache.http.client.methods.CloseableHttpResponse; @@ -30,14 +40,6 @@ import org.apache.http.client.utils.URIBuilder; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; -import org.apache.streams.components.http.HttpProcessorConfiguration; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.extensions.ExtensionUtil; -import org.apache.streams.pojo.json.ActivityObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,230 +52,249 @@ import java.util.List; import java.util.Map; /** - * Processor retrieves contents from an known url and stores the resulting object in an extension field + * Processor retrieves contents from an known url and stores the resulting object in an extension field. */ public class SimpleHTTPGetProcessor implements StreamsProcessor { - private final static String STREAMS_ID = "SimpleHTTPGetProcessor"; + private static final String STREAMS_ID = "SimpleHTTPGetProcessor"; - // from root config id - private final static String EXTENSION = "account_type"; + // from root config id + private static final String EXTENSION = "account_type"; - private final static Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPGetProcessor.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPGetProcessor.class); - protected ObjectMapper mapper; + protected ObjectMapper mapper; - protected URIBuilder uriBuilder; + protected URIBuilder uriBuilder; - protected CloseableHttpClient httpclient; + protected CloseableHttpClient httpclient; - protected HttpProcessorConfiguration configuration; + protected HttpProcessorConfiguration configuration; - protected String authHeader; - public SimpleHTTPGetProcessor() { - this(new ComponentConfigurator<>(HttpProcessorConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("http"))); - } + protected String authHeader; - public SimpleHTTPGetProcessor(HttpProcessorConfiguration processorConfiguration) { - LOGGER.info("creating SimpleHTTPGetProcessor"); - LOGGER.info(processorConfiguration.toString()); - this.configuration = processorConfiguration; - } + /** + * SimpleHTTPGetProcessor constructor - resolves HttpProcessorConfiguration from JVM 'http'. + */ + public SimpleHTTPGetProcessor() { + this(new ComponentConfigurator<>(HttpProcessorConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("http"))); + } - @Override - public String getId() { - return STREAMS_ID; - } + /** + * SimpleHTTPGetProcessor constructor - uses provided HttpProcessorConfiguration. + */ + public SimpleHTTPGetProcessor(HttpProcessorConfiguration processorConfiguration) { + LOGGER.info("creating SimpleHTTPGetProcessor"); + LOGGER.info(processorConfiguration.toString()); + this.configuration = processorConfiguration; + } - /** - Override this to store a result other than exact json representation of response - */ - protected ObjectNode prepareExtensionFragment(String entityString) { + @Override + public String getId() { + return STREAMS_ID; + } - try { - return mapper.readValue(entityString, ObjectNode.class); - } catch (IOException e) { - LOGGER.warn(e.getMessage()); - return null; - } - } - - /** - Override this to place result in non-standard location on document - */ - protected ObjectNode getRootDocument(StreamsDatum datum) { - - try { - String json = datum.getDocument() instanceof String ? - (String) datum.getDocument() : - mapper.writeValueAsString(datum.getDocument()); - return mapper.readValue(json, ObjectNode.class); - } catch (JsonProcessingException e) { - LOGGER.warn(e.getMessage()); - return null; - } catch (IOException e) { - LOGGER.warn(e.getMessage()); - return null; - } + /** + Override this to store a result other than exact json representation of response. + */ + protected ObjectNode prepareExtensionFragment(String entityString) { + try { + return mapper.readValue(entityString, ObjectNode.class); + } catch (IOException ex) { + LOGGER.warn(ex.getMessage()); + return null; + } + } + + /** + Override this to place result in non-standard location on document. + */ + protected ObjectNode getRootDocument(StreamsDatum datum) { + + try { + String json = datum.getDocument() instanceof String + ? (String) datum.getDocument() + : mapper.writeValueAsString(datum.getDocument()); + return mapper.readValue(json, ObjectNode.class); + } catch (JsonProcessingException ex) { + LOGGER.warn(ex.getMessage()); + return null; + } catch (IOException ex) { + LOGGER.warn(ex.getMessage()); + return null; } - /** - Override this to place result in non-standard location on document - */ - protected ActivityObject getEntityToExtend(ObjectNode rootDocument) { + } - if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY)) - return mapper.convertValue(rootDocument, ActivityObject.class); - else - return mapper.convertValue(rootDocument.get(this.configuration.getEntity().toString()), ActivityObject.class); + /** + Override this to place result in non-standard location on document. + */ + protected ActivityObject getEntityToExtend(ObjectNode rootDocument) { + if ( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY)) { + return mapper.convertValue(rootDocument, ActivityObject.class); + } else { + return mapper.convertValue(rootDocument.get(this.configuration.getEntity().toString()), ActivityObject.class); } + } - /** - Override this to place result in non-standard location on document - */ - protected ObjectNode setEntityToExtend(ObjectNode rootDocument, ActivityObject activityObject) { + /** + Override this to place result in non-standard location on document. + */ + protected ObjectNode setEntityToExtend(ObjectNode rootDocument, ActivityObject activityObject) { - if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY)) - return mapper.convertValue(activityObject, ObjectNode.class); - else - rootDocument.set(this.configuration.getEntity().toString(), mapper.convertValue(activityObject, ObjectNode.class)); + if ( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY)) { + return mapper.convertValue(activityObject, ObjectNode.class); + } else { + rootDocument.set(this.configuration.getEntity().toString(), mapper.convertValue(activityObject, ObjectNode.class)); + } - return rootDocument; + return rootDocument; - } + } - @Override - public List<StreamsDatum> process(StreamsDatum entry) { + @Override + public List<StreamsDatum> process(StreamsDatum entry) { - List<StreamsDatum> result = new ArrayList<>(); + List<StreamsDatum> result = new ArrayList<>(); - ObjectNode rootDocument = getRootDocument(entry); + ObjectNode rootDocument = getRootDocument(entry); - Map<String, String> params = prepareParams(entry); + Map<String, String> params = prepareParams(entry); - URI uri = prepareURI(params); + URI uri = prepareURI(params); - HttpGet httpget = prepareHttpGet(uri); + HttpGet httpget = prepareHttpGet(uri); - CloseableHttpResponse response = null; + CloseableHttpResponse response = null; - String entityString = null; - try { - response = httpclient.execute(httpget); - HttpEntity entity = response.getEntity(); - // TODO: handle retry - if (response.getStatusLine().getStatusCode() == 200 && entity != null) { - entityString = EntityUtils.toString(entity); - } - } catch (IOException e) { - LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, e.getMessage()); - return result; - } finally { - try { - if (response != null) { - response.close(); - } - } catch (IOException ignored) {} + String entityString = null; + try { + response = httpclient.execute(httpget); + HttpEntity entity = response.getEntity(); + // TODO: handle retry + if (response.getStatusLine().getStatusCode() == 200 && entity != null) { + entityString = EntityUtils.toString(entity); + } + } catch (IOException ex) { + LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, ex.getMessage()); + return result; + } finally { + try { + if (response != null) { + response.close(); } + } catch (IOException ignored) { + LOGGER.trace("IOException", ignored); + } + } - if( entityString == null ) - return result; - - LOGGER.debug(entityString); + if( entityString == null ) { + return result; + } - ObjectNode extensionFragment = prepareExtensionFragment(entityString); + LOGGER.debug(entityString); - ActivityObject extensionEntity = getEntityToExtend(rootDocument); + ObjectNode extensionFragment = prepareExtensionFragment(entityString); - ExtensionUtil.getInstance().addExtension(extensionEntity, this.configuration.getExtension(), extensionFragment); + ActivityObject extensionEntity = getEntityToExtend(rootDocument); - rootDocument = setEntityToExtend(rootDocument, extensionEntity); + ExtensionUtil.getInstance().addExtension(extensionEntity, this.configuration.getExtension(), extensionFragment); - entry.setDocument(rootDocument); + rootDocument = setEntityToExtend(rootDocument, extensionEntity); - result.add(entry); + entry.setDocument(rootDocument); - return result; + result.add(entry); - } + return result; - /** - Override this to alter request URI - */ - protected URI prepareURI(Map<String, String> params) { + } - URI uri = null; - for( Map.Entry<String,String> param : params.entrySet()) { - uriBuilder = uriBuilder.setParameter(param.getKey(), param.getValue()); - } - try { - uri = uriBuilder.build(); - } catch (URISyntaxException e) { - LOGGER.error("URI error {}", uriBuilder.toString()); - } - return uri; - } + /** + Override this to alter request URI. + */ + protected URI prepareURI(Map<String, String> params) { - /** - Override this to add parameters to the request - */ - protected Map<String, String> prepareParams(StreamsDatum entry) { - return new HashMap<>(); + URI uri = null; + for ( Map.Entry<String,String> param : params.entrySet()) { + uriBuilder = uriBuilder.setParameter(param.getKey(), param.getValue()); } - - /** - Override this to set a payload on the request - */ - protected ObjectNode preparePayload(StreamsDatum entry) { - return null; + try { + uri = uriBuilder.build(); + } catch (URISyntaxException ex) { + LOGGER.error("URI error {}", uriBuilder.toString()); } - - public HttpGet prepareHttpGet(URI uri) { - HttpGet httpget = new HttpGet(uri); - httpget.addHeader("content-type", this.configuration.getContentType()); - if( !Strings.isNullOrEmpty(authHeader)) - httpget.addHeader("Authorization", String.format("Basic %s", authHeader)); - return httpget; + return uri; + } + + /** + Override this to add parameters to the request. + */ + protected Map<String, String> prepareParams(StreamsDatum entry) { + return new HashMap<>(); + } + + /** + Override this to set a payload on the request. + */ + protected ObjectNode preparePayload(StreamsDatum entry) { + return null; + } + + /** + * Override this to set the URI for the request or modify headers. + * @param uri uri + * @return result + */ + public HttpGet prepareHttpGet(URI uri) { + HttpGet httpget = new HttpGet(uri); + httpget.addHeader("content-type", this.configuration.getContentType()); + if ( !Strings.isNullOrEmpty(authHeader)) { + httpget.addHeader("Authorization", String.format("Basic %s", authHeader)); } + return httpget; + } - @Override - public void prepare(Object configurationObject) { + @Override + public void prepare(Object configurationObject) { - mapper = StreamsJacksonMapper.getInstance(); + mapper = StreamsJacksonMapper.getInstance(); - uriBuilder = new URIBuilder() - .setScheme(this.configuration.getProtocol()) - .setHost(this.configuration.getHostname()) - .setPath(this.configuration.getResourcePath()); + uriBuilder = new URIBuilder() + .setScheme(this.configuration.getProtocol()) + .setHost(this.configuration.getHostname()) + .setPath(this.configuration.getResourcePath()); - if( !Strings.isNullOrEmpty(configuration.getAccessToken()) ) - uriBuilder = uriBuilder.addParameter("access_token", configuration.getAccessToken()); - if( !Strings.isNullOrEmpty(configuration.getUsername()) - && !Strings.isNullOrEmpty(configuration.getPassword())) { - String string = configuration.getUsername() + ":" + configuration.getPassword(); - authHeader = Base64.encodeBase64String(string.getBytes()); - } - httpclient = HttpClients.createDefault(); + if ( !Strings.isNullOrEmpty(configuration.getAccessToken()) ) { + uriBuilder = uriBuilder.addParameter("access_token", configuration.getAccessToken()); } - - @Override - public void cleanUp() { - LOGGER.info("shutting down SimpleHTTPGetProcessor"); - try { - httpclient.close(); - } catch (IOException e) { - e.printStackTrace(); - } finally { - try { - httpclient.close(); - } catch (IOException e) { - e.printStackTrace(); - } finally { - httpclient = null; - } - } + if ( !Strings.isNullOrEmpty(configuration.getUsername()) + && + !Strings.isNullOrEmpty(configuration.getPassword())) { + String string = configuration.getUsername() + ":" + configuration.getPassword(); + authHeader = Base64.encodeBase64String(string.getBytes()); + } + httpclient = HttpClients.createDefault(); + } + + @Override + public void cleanUp() { + LOGGER.info("shutting down SimpleHTTPGetProcessor"); + try { + httpclient.close(); + } catch (IOException ex) { + ex.printStackTrace(); + } finally { + try { + httpclient.close(); + } catch (IOException e2) { + e2.printStackTrace(); + } finally { + httpclient = null; + } } + } }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java index f6089f6..1d52b5c 100644 --- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java +++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java @@ -52,225 +52,241 @@ import java.util.List; import java.util.Map; /** - * Processor retrieves contents from an known url and stores the resulting object in an extension field + * Processor retrieves contents from an known url and stores the resulting object in an extension field. */ public class SimpleHTTPPostProcessor implements StreamsProcessor { - private final static String STREAMS_ID = "SimpleHTTPPostProcessor"; + private static final String STREAMS_ID = "SimpleHTTPPostProcessor"; - private final static Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPPostProcessor.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPPostProcessor.class); - protected ObjectMapper mapper; + protected ObjectMapper mapper; - protected URIBuilder uriBuilder; + protected URIBuilder uriBuilder; - protected CloseableHttpClient httpclient; + protected CloseableHttpClient httpclient; - protected HttpProcessorConfiguration configuration; + protected HttpProcessorConfiguration configuration; - protected String authHeader; + protected String authHeader; - public SimpleHTTPPostProcessor() { - this(new ComponentConfigurator<>(HttpProcessorConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("http"))); - } + /** + * SimpleHTTPPostProcessor constructor - resolves HttpProcessorConfiguration from JVM 'http'. + */ + public SimpleHTTPPostProcessor() { + this(new ComponentConfigurator<>(HttpProcessorConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("http"))); + } - public SimpleHTTPPostProcessor(HttpProcessorConfiguration processorConfiguration) { - LOGGER.info("creating SimpleHTTPPostProcessor"); - LOGGER.info(processorConfiguration.toString()); - this.configuration = processorConfiguration; - } + /** + * SimpleHTTPPostProcessor constructor - uses provided HttpProcessorConfiguration. + */ + public SimpleHTTPPostProcessor(HttpProcessorConfiguration processorConfiguration) { + LOGGER.info("creating SimpleHTTPPostProcessor"); + LOGGER.info(processorConfiguration.toString()); + this.configuration = processorConfiguration; + } - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - /** - Override this to store a result other than exact json representation of response - */ - protected ObjectNode prepareExtensionFragment(String entityString) { + /** + Override this to store a result other than exact json representation of response. + */ + protected ObjectNode prepareExtensionFragment(String entityString) { - try { - return mapper.readValue(entityString, ObjectNode.class); - } catch (IOException e) { - LOGGER.warn("IOException", e); - return null; - } + try { + return mapper.readValue(entityString, ObjectNode.class); + } catch (IOException ex) { + LOGGER.warn("IOException", ex); + return null; } - - /** - Override this to place result in non-standard location on document - */ - protected ObjectNode getRootDocument(StreamsDatum datum) { - - try { - String json = datum.getDocument() instanceof String ? - (String) datum.getDocument() : - mapper.writeValueAsString(datum.getDocument()); - return mapper.readValue(json, ObjectNode.class); - } catch (JsonProcessingException e) { - LOGGER.warn("JsonProcessingException", e); - return null; - } catch (IOException e) { - LOGGER.warn("IOException", e); - return null; - } - + } + + /** + Override this to place result in non-standard location on document. + */ + protected ObjectNode getRootDocument(StreamsDatum datum) { + + try { + String json = datum.getDocument() instanceof String + ? (String) datum.getDocument() + : mapper.writeValueAsString(datum.getDocument()); + return mapper.readValue(json, ObjectNode.class); + } catch (JsonProcessingException ex) { + LOGGER.warn("JsonProcessingException", ex); + return null; + } catch (IOException ex) { + LOGGER.warn("IOException", ex); + return null; } - /** - Override this to place result in non-standard location on document - */ - protected ActivityObject getEntityToExtend(ObjectNode rootDocument) { + } - if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY)) - return mapper.convertValue(rootDocument, ActivityObject.class); - else - return mapper.convertValue(rootDocument.get(this.configuration.getEntity().toString()), ActivityObject.class); + /** + Override this to place result in non-standard location on document. + */ + protected ActivityObject getEntityToExtend(ObjectNode rootDocument) { + if ( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY)) { + return mapper.convertValue(rootDocument, ActivityObject.class); + } else { + return mapper.convertValue(rootDocument.get(this.configuration.getEntity().toString()), ActivityObject.class); } + } - /** - Override this to place result in non-standard location on document - */ - protected ObjectNode setEntityToExtend(ObjectNode rootDocument, ActivityObject activityObject) { - - if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY)) - return mapper.convertValue(activityObject, ObjectNode.class); - else - rootDocument.set(this.configuration.getEntity().toString(), mapper.convertValue(activityObject, ObjectNode.class)); - - return rootDocument; + /** + Override this to place result in non-standard location on document. + */ + protected ObjectNode setEntityToExtend(ObjectNode rootDocument, ActivityObject activityObject) { + if ( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY)) { + return mapper.convertValue(activityObject, ObjectNode.class); + } else { + rootDocument.set(this.configuration.getEntity().toString(), mapper.convertValue(activityObject, ObjectNode.class)); } + return rootDocument; - @Override - public List<StreamsDatum> process(StreamsDatum entry) { + } - List<StreamsDatum> result = new ArrayList<>(); + @Override + public List<StreamsDatum> process(StreamsDatum entry) { - ObjectNode rootDocument = getRootDocument(entry); + List<StreamsDatum> result = new ArrayList<>(); - Map<String, String> params = prepareParams(entry); + ObjectNode rootDocument = getRootDocument(entry); - URI uri; - for( Map.Entry<String,String> param : params.entrySet()) { - uriBuilder = uriBuilder.setParameter(param.getKey(), param.getValue()); - } - try { - uri = uriBuilder.build(); - } catch (URISyntaxException e) { - LOGGER.error("URI error {}", uriBuilder.toString(), e); - return result; - } + Map<String, String> params = prepareParams(entry); - HttpEntity payload = preparePayload(entry); - - HttpPost httpPost = prepareHttpPost(uri, payload); - - CloseableHttpResponse response = null; - - String entityString = null; - try { - response = httpclient.execute(httpPost); - HttpEntity entity = response.getEntity(); - // TODO: handle retry - if (response.getStatusLine().getStatusCode() == 200 && entity != null) { - entityString = EntityUtils.toString(entity); - } - } catch (IOException e) { - LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, e); - return result; - } finally { - try { - if (response != null) { - response.close(); - } - } catch (IOException ignored) {} - } + URI uri; + for ( Map.Entry<String,String> param : params.entrySet() ) { + uriBuilder = uriBuilder.setParameter(param.getKey(), param.getValue()); + } + try { + uri = uriBuilder.build(); + } catch (URISyntaxException ex) { + LOGGER.error("URI error {}", uriBuilder.toString(), ex); + return result; + } - if( entityString == null ) - return result; + HttpEntity payload = preparePayload(entry); + + HttpPost httpPost = prepareHttpPost(uri, payload); + + CloseableHttpResponse response = null; + + String entityString = null; + try { + response = httpclient.execute(httpPost); + HttpEntity entity = response.getEntity(); + // TODO: handle retry + if (response.getStatusLine().getStatusCode() == 200 && entity != null) { + entityString = EntityUtils.toString(entity); + } + } catch (IOException ex) { + LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, ex); + return result; + } finally { + try { + if (response != null) { + response.close(); + } + } catch (IOException ignored) { + LOGGER.trace("IOException", ignored); + } + } - LOGGER.debug(entityString); + if ( entityString == null ) { + return result; + } - ObjectNode extensionFragment = prepareExtensionFragment(entityString); + LOGGER.debug(entityString); - ActivityObject extensionEntity = getEntityToExtend(rootDocument); + ObjectNode extensionFragment = prepareExtensionFragment(entityString); - ExtensionUtil.getInstance().addExtension(extensionEntity, this.configuration.getExtension(), extensionFragment); + ActivityObject extensionEntity = getEntityToExtend(rootDocument); - rootDocument = setEntityToExtend(rootDocument, extensionEntity); + ExtensionUtil.getInstance().addExtension(extensionEntity, this.configuration.getExtension(), extensionFragment); - entry.setDocument(rootDocument); + rootDocument = setEntityToExtend(rootDocument, extensionEntity); - result.add(entry); + entry.setDocument(rootDocument); - return result; + result.add(entry); - } + return result; - /** - Override this to add parameters to the request - */ - protected Map<String, String> prepareParams(StreamsDatum entry) { - return new HashMap<>(); - } + } - /** - Override this to add parameters to the request - */ - protected HttpEntity preparePayload(StreamsDatum entry) { - return new StringEntity("{}", - ContentType.create("application/json")); - } + /** + Override this to add parameters to the request. + */ + protected Map<String, String> prepareParams(StreamsDatum entry) { + return new HashMap<>(); + } + /** + Override this to add parameters to the request. + */ + protected HttpEntity preparePayload(StreamsDatum entry) { + return new StringEntity("{}", + ContentType.create("application/json")); + } - public HttpPost prepareHttpPost(URI uri, HttpEntity entity) { - HttpPost httpPost = new HttpPost(uri); - httpPost.addHeader("content-type", this.configuration.getContentType()); - if( !Strings.isNullOrEmpty(authHeader)) - httpPost.addHeader("Authorization", String.format("Basic %s", authHeader)); - httpPost.setEntity(entity); - return httpPost; + /** + * Override this to set the URI / entity for the request or modify headers. + * @param uri uri + * @param entity entity + * @return result + */ + public HttpPost prepareHttpPost(URI uri, HttpEntity entity) { + HttpPost httpPost = new HttpPost(uri); + httpPost.addHeader("content-type", this.configuration.getContentType()); + if ( !Strings.isNullOrEmpty(authHeader)) { + httpPost.addHeader("Authorization", String.format("Basic %s", authHeader)); } + httpPost.setEntity(entity); + return httpPost; + } - @Override - public void prepare(Object configurationObject) { + @Override + public void prepare(Object configurationObject) { - mapper = StreamsJacksonMapper.getInstance(); + mapper = StreamsJacksonMapper.getInstance(); - uriBuilder = new URIBuilder() - .setScheme(this.configuration.getProtocol()) - .setHost(this.configuration.getHostname()) - .setPath(this.configuration.getResourcePath()); + uriBuilder = new URIBuilder() + .setScheme(this.configuration.getProtocol()) + .setHost(this.configuration.getHostname()) + .setPath(this.configuration.getResourcePath()); - if( !Strings.isNullOrEmpty(configuration.getAccessToken()) ) - uriBuilder = uriBuilder.addParameter("access_token", configuration.getAccessToken()); - if( !Strings.isNullOrEmpty(configuration.getUsername()) - && !Strings.isNullOrEmpty(configuration.getPassword())) { - String string = configuration.getUsername() + ":" + configuration.getPassword(); - authHeader = Base64.encodeBase64String(string.getBytes()); - } - httpclient = HttpClients.createDefault(); + if ( !Strings.isNullOrEmpty(configuration.getAccessToken()) ) { + uriBuilder = uriBuilder.addParameter("access_token", configuration.getAccessToken()); } - - @Override - public void cleanUp() { - LOGGER.info("shutting down SimpleHTTPPostProcessor"); - try { - httpclient.close(); - } catch (IOException e) { - e.printStackTrace(); - } finally { - try { - httpclient.close(); - } catch (IOException e) { - LOGGER.error("IOException", e); - } finally { - httpclient = null; - } - } + if ( !Strings.isNullOrEmpty(configuration.getUsername()) + && !Strings.isNullOrEmpty(configuration.getPassword())) { + String string = configuration.getUsername() + ":" + configuration.getPassword(); + authHeader = Base64.encodeBase64String(string.getBytes()); + } + httpclient = HttpClients.createDefault(); + } + + @Override + public void cleanUp() { + LOGGER.info("shutting down SimpleHTTPPostProcessor"); + try { + httpclient.close(); + } catch (IOException ex) { + ex.printStackTrace(); + } finally { + try { + httpclient.close(); + } catch (IOException e2) { + LOGGER.error("IOException", e2); + } finally { + httpclient = null; + } } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java index 2078647..ab11a68 100644 --- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java +++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java @@ -69,269 +69,287 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** - * Provider retrieves contents from an known set of urls and passes all resulting objects downstream + * Provider retrieves contents from an known set of urls and passes all resulting objects downstream. */ public class SimpleHttpProvider implements StreamsProvider { - private final static String STREAMS_ID = "SimpleHttpProvider"; - - private final static Logger LOGGER = LoggerFactory.getLogger(SimpleHttpProvider.class); - - protected ObjectMapper mapper; - - protected URIBuilder uriBuilder; - - protected CloseableHttpClient httpclient; + private static final String STREAMS_ID = "SimpleHttpProvider"; + + private static final Logger LOGGER = LoggerFactory.getLogger(SimpleHttpProvider.class); + + protected ObjectMapper mapper; + + protected URIBuilder uriBuilder; + + protected CloseableHttpClient httpclient; + + protected HttpProviderConfiguration configuration; + + protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>(); + + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + + private ExecutorService executor; + + /** + * SimpleHttpProvider constructor - resolves HttpProcessorConfiguration from JVM 'http'. + */ + public SimpleHttpProvider() { + this(new ComponentConfigurator<>(HttpProviderConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("http"))); + } + + /** + * SimpleHttpProvider constructor - uses provided HttpProviderConfiguration. + */ + public SimpleHttpProvider(HttpProviderConfiguration providerConfiguration) { + LOGGER.info("creating SimpleHttpProvider"); + LOGGER.info(providerConfiguration.toString()); + this.configuration = providerConfiguration; + } + + @Override + public String getId() { + return STREAMS_ID; + } + + /** + Override this to add parameters to the request. + */ + protected Map<String, String> prepareParams(StreamsDatum entry) { + return new HashMap<>(); + } + + /** + * prepareHttpRequest + * @param uri uri + * @return result + */ + public HttpRequestBase prepareHttpRequest(URI uri) { + HttpRequestBase request; + if ( configuration.getRequestMethod().equals(HttpProviderConfiguration.RequestMethod.GET)) { + request = new HttpGet(uri); + } else if ( configuration.getRequestMethod().equals(HttpProviderConfiguration.RequestMethod.POST)) { + request = new HttpPost(uri); + } else { + // this shouldn't happen because of the default + request = new HttpGet(uri); + } - protected HttpProviderConfiguration configuration; + request.addHeader("content-type", this.configuration.getContentType()); - protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>(); + return request; - protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + } - private ExecutorService executor; + @Override + public void prepare(Object configurationObject) { - public SimpleHttpProvider() { - this(new ComponentConfigurator<>(HttpProviderConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("http"))); - } + mapper = StreamsJacksonMapper.getInstance(); - public SimpleHttpProvider(HttpProviderConfiguration providerConfiguration) { - LOGGER.info("creating SimpleHttpProvider"); - LOGGER.info(providerConfiguration.toString()); - this.configuration = providerConfiguration; - } + uriBuilder = new URIBuilder() + .setScheme(this.configuration.getProtocol()) + .setHost(this.configuration.getHostname()) + .setPort(this.configuration.getPort().intValue()) + .setPath(this.configuration.getResourcePath()); - @Override - public String getId() { - return STREAMS_ID; + SSLContextBuilder builder = new SSLContextBuilder(); + SSLConnectionSocketFactory sslsf = null; + try { + builder.loadTrustMaterial(null, new TrustSelfSignedStrategy()); + sslsf = new SSLConnectionSocketFactory( + builder.build(), SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER); + } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException ex) { + LOGGER.warn(ex.getMessage()); } - /** - Override this to add parameters to the request - */ - protected Map<String, String> prepareParams(StreamsDatum entry) { - return new HashMap<>(); + httpclient = HttpClients.custom().setSSLSocketFactory( + sslsf).build(); + + executor = Executors.newSingleThreadExecutor(); + + } + + @Override + public void cleanUp() { + + LOGGER.info("shutting down SimpleHttpProvider"); + this.shutdownAndAwaitTermination(executor); + try { + httpclient.close(); + } catch (IOException ex) { + ex.printStackTrace(); + } finally { + try { + httpclient.close(); + } catch (IOException ex) { + ex.printStackTrace(); + } finally { + httpclient = null; + } } + } - public HttpRequestBase prepareHttpRequest(URI uri) { - HttpRequestBase request; - if( configuration.getRequestMethod().equals(HttpProviderConfiguration.RequestMethod.GET)) { - request = new HttpGet(uri); - } else if( configuration.getRequestMethod().equals(HttpProviderConfiguration.RequestMethod.POST)) { - request = new HttpPost(uri); - } else { - // this shouldn't happen because of the default - request = new HttpGet(uri); - } - - request.addHeader("content-type", this.configuration.getContentType()); + @Override + public void startStream() { - return request; + executor.execute(new Runnable() { + @Override + public void run() { - } + readCurrent(); - @Override - public void prepare(Object configurationObject) { - - mapper = StreamsJacksonMapper.getInstance(); - - uriBuilder = new URIBuilder() - .setScheme(this.configuration.getProtocol()) - .setHost(this.configuration.getHostname()) - .setPort(this.configuration.getPort().intValue()) - .setPath(this.configuration.getResourcePath()); - - SSLContextBuilder builder = new SSLContextBuilder(); - SSLConnectionSocketFactory sslsf = null; - try { - builder.loadTrustMaterial(null, new TrustSelfSignedStrategy()); - sslsf = new SSLConnectionSocketFactory( - builder.build(), SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER); - } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) { - LOGGER.warn(e.getMessage()); - } + Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); - httpclient = HttpClients.custom().setSSLSocketFactory( - sslsf).build(); + } + }); + } - executor = Executors.newSingleThreadExecutor(); + @Override + public StreamsResultSet readCurrent() { + StreamsResultSet current; - } + uriBuilder = uriBuilder.setPath( + Joiner.on("/").skipNulls().join(uriBuilder.getPath(), configuration.getResource(), configuration.getResourcePostfix()) + ); - @Override - public void cleanUp() { - - LOGGER.info("shutting down SimpleHttpProvider"); - this.shutdownAndAwaitTermination(executor); - try { - httpclient.close(); - } catch (IOException e) { - e.printStackTrace(); - } finally { - try { - httpclient.close(); - } catch (IOException e) { - e.printStackTrace(); - } finally { - httpclient = null; - } - } + URI uri; + try { + uri = uriBuilder.build(); + } catch (URISyntaxException ex) { + uri = null; } - @Override - public void startStream() { - - executor.execute(new Runnable() { - @Override - public void run() { + List<ObjectNode> results = execute(uri); - readCurrent(); + lock.writeLock().lock(); - Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); - - } - }); + for ( ObjectNode item : results ) { + providerQueue.add(newDatum(item)); } - @Override - public StreamsResultSet readCurrent() { - StreamsResultSet current; - - uriBuilder = uriBuilder.setPath( - Joiner.on("/").skipNulls().join(uriBuilder.getPath(), configuration.getResource(), configuration.getResourcePostfix()) - ); + LOGGER.debug("Creating new result set for {} items", providerQueue.size()); + current = new StreamsResultSet(providerQueue); - URI uri; - try { - uri = uriBuilder.build(); - } catch (URISyntaxException e) { - uri = null; - } - - List<ObjectNode> results = execute(uri); - - lock.writeLock().lock(); - - for( ObjectNode item : results ) { - providerQueue.add(newDatum(item)); - } - - LOGGER.debug("Creating new result set for {} items", providerQueue.size()); - current = new StreamsResultSet(providerQueue); + return current; + } - return current; - } + protected List<ObjectNode> execute(URI uri) { - protected List<ObjectNode> execute(URI uri) { - - Preconditions.checkNotNull(uri); - - List<ObjectNode> results = new ArrayList<>(); - - HttpRequestBase httpRequest = prepareHttpRequest(uri); - - CloseableHttpResponse response = null; - - String entityString; - try { - response = httpclient.execute(httpRequest); - HttpEntity entity = response.getEntity(); - // TODO: handle retry - if (response.getStatusLine().getStatusCode() == 200 && entity != null) { - entityString = EntityUtils.toString(entity); - if( !entityString.equals("{}") && !entityString.equals("[]") ) { - JsonNode jsonNode = mapper.readValue(entityString, JsonNode.class); - results = parse(jsonNode); - } - } - } catch (IOException e) { - LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, e.getMessage()); - } finally { - try { - if (response != null) { - response.close(); - } - } catch (IOException ignored) {} - } - return results; - } + Preconditions.checkNotNull(uri); - /** - Override this to change how entity gets converted to objects - */ - protected List<ObjectNode> parse(JsonNode jsonNode) { + List<ObjectNode> results = new ArrayList<>(); - List<ObjectNode> results = new ArrayList<>(); + HttpRequestBase httpRequest = prepareHttpRequest(uri); - if (jsonNode != null && jsonNode instanceof ObjectNode ) { - results.add((ObjectNode) jsonNode); - } else if (jsonNode != null && jsonNode instanceof ArrayNode) { - ArrayNode arrayNode = (ArrayNode) jsonNode; - Iterator<JsonNode> iterator = arrayNode.elements(); - while (iterator.hasNext()) { - ObjectNode element = (ObjectNode) iterator.next(); + CloseableHttpResponse response = null; - results.add(element); - } + String entityString; + try { + response = httpclient.execute(httpRequest); + HttpEntity entity = response.getEntity(); + // TODO: handle retry + if (response.getStatusLine().getStatusCode() == 200 && entity != null) { + entityString = EntityUtils.toString(entity); + if ( !entityString.equals("{}") && !entityString.equals("[]") ) { + JsonNode jsonNode = mapper.readValue(entityString, JsonNode.class); + results = parse(jsonNode); } - - return results; - } - - /** - Override this to change how metadata is derived from object - */ - protected StreamsDatum newDatum(ObjectNode item) { - try { - String id = null; - if( item.get("id") != null ) - id = item.get("id").asText(); - DateTime timestamp = null; - if( item.get("timestamp") != null ) - timestamp = new DateTime(item.get("timestamp").asText()); - if( id != null && timestamp != null ) - return new StreamsDatum(item, id, timestamp); - else if( id != null ) - return new StreamsDatum(item, id); - else if( timestamp != null ) - return new StreamsDatum(item, null, timestamp); - else return new StreamsDatum(item); - } catch( Exception e ) { - return new StreamsDatum(item); + } + } catch (IOException ex) { + LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, ex.getMessage()); + } finally { + try { + if (response != null) { + response.close(); } + } catch (IOException ignored) { + LOGGER.trace("IOException", ignored); + } } - - @Override - public StreamsResultSet readNew(BigInteger sequence) { - return null; - } - - @Override - public StreamsResultSet readRange(DateTime start, DateTime end) { - return null; + return results; + } + + /** + Override this to change how entity gets converted to objects. + */ + protected List<ObjectNode> parse(JsonNode jsonNode) { + + List<ObjectNode> results = new ArrayList<>(); + + if (jsonNode != null && jsonNode instanceof ObjectNode ) { + results.add((ObjectNode) jsonNode); + } else if (jsonNode != null && jsonNode instanceof ArrayNode) { + ArrayNode arrayNode = (ArrayNode) jsonNode; + Iterator<JsonNode> iterator = arrayNode.elements(); + while (iterator.hasNext()) { + ObjectNode element = (ObjectNode) iterator.next(); + + results.add(element); + } } - @Override - public boolean isRunning() { - return true; + return results; + } + + /** + Override this to change how metadata is derived from object. + */ + protected StreamsDatum newDatum(ObjectNode item) { + try { + String id = null; + if ( item.get("id") != null ) { + id = item.get("id").asText(); + } + DateTime timestamp = null; + if ( item.get("timestamp") != null ) { + timestamp = new DateTime(item.get("timestamp").asText()); + } + if ( id != null && timestamp != null ) { + return new StreamsDatum(item, id, timestamp); + } else if ( id != null ) { + return new StreamsDatum(item, id); + } else if ( timestamp != null ) { + return new StreamsDatum(item, null, timestamp); + } else { + return new StreamsDatum(item); + } + } catch ( Exception ex ) { + return new StreamsDatum(item); } - - protected void shutdownAndAwaitTermination(ExecutorService pool) { - pool.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - pool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) - LOGGER.error("Pool did not terminate"); - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); + } + + @Override + public StreamsResultSet readNew(BigInteger sequence) { + return null; + } + + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + return null; + } + + @Override + public boolean isRunning() { + return true; + } + + protected void shutdownAndAwaitTermination(ExecutorService pool) { + pool.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + pool.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + LOGGER.error("Pool did not terminate"); } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-http/src/test/java/SimpleHTTPPostPersistWriterTest.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/src/test/java/SimpleHTTPPostPersistWriterTest.java b/streams-components/streams-http/src/test/java/SimpleHTTPPostPersistWriterTest.java index 55e338d..2333c4b 100644 --- a/streams-components/streams-http/src/test/java/SimpleHTTPPostPersistWriterTest.java +++ b/streams-components/streams-http/src/test/java/SimpleHTTPPostPersistWriterTest.java @@ -16,16 +16,18 @@ * under the License. */ +import org.apache.streams.components.http.HttpPersistWriterConfiguration; +import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.jackson.StreamsJacksonMapper; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; + import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; -import org.apache.streams.components.http.HttpPersistWriterConfiguration; -import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.jackson.StreamsJacksonMapper; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -47,87 +49,91 @@ import static org.mockito.Matchers.any; @PrepareForTest({HttpClients.class, CloseableHttpResponse.class, CloseableHttpResponse.class}) public class SimpleHTTPPostPersistWriterTest { - private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - /** - * test port. - */ - private static final int PORT = 18080; - - /** - * test hosts. - */ - private static final String HOSTNAME = "localhost"; - - /** - * test protocol. - */ - private static final String PROTOCOL = "http"; - - /** - * CloseableHttpClient mock. - */ - private CloseableHttpClient client; - - /** - * CloseableHttpClient mock. - */ - private CloseableHttpResponse response = Mockito.mock(CloseableHttpResponse.class); - - /** - * Our output. - */ - private ByteArrayOutputStream output; - - /** - * Our input. - */ - private ByteArrayInputStream input; - - @Before - public void setUp() throws Exception - { - /* - HttpClients mock. + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + /** + * test port. + */ + private static final int PORT = 18080; + + /** + * test hosts. + */ + private static final String HOSTNAME = "localhost"; + + /** + * test protocol. + */ + private static final String PROTOCOL = "http"; + + /** + * CloseableHttpClient mock. + */ + private CloseableHttpClient client; + + /** + * CloseableHttpClient mock. + */ + private CloseableHttpResponse response = Mockito.mock(CloseableHttpResponse.class); + + /** + * Our output. + */ + private ByteArrayOutputStream output; + + /** + * Our input. + */ + private ByteArrayInputStream input; + + @Before + public void setUp() throws Exception + { + /* + HttpClients mock. */ - this.client = PowerMockito.mock(CloseableHttpClient.class); + this.client = PowerMockito.mock(CloseableHttpClient.class); - PowerMockito.mockStatic(HttpClients.class); + PowerMockito.mockStatic(HttpClients.class); - PowerMockito.when(HttpClients.createDefault()) - .thenReturn(client); + PowerMockito.when(HttpClients.createDefault()) + .thenReturn(client); - PowerMockito.when(client.execute(any(HttpUriRequest.class))) - .thenReturn(response); + PowerMockito.when(client.execute(any(HttpUriRequest.class))) + .thenReturn(response); - Mockito.when(response.getEntity()).thenReturn(null); - Mockito.doNothing().when(response).close(); + Mockito.when(response.getEntity()).thenReturn(null); + Mockito.doNothing().when(response).close(); - } + } - @Test - public void testPersist() throws Exception - { - HttpPersistWriterConfiguration configuration = new HttpPersistWriterConfiguration(); - configuration.setProtocol(PROTOCOL); - configuration.setHostname(HOSTNAME); - configuration.setPort((long) PORT); - configuration.setResourcePath("/"); + /** + * testPersist. + * @throws Exception + */ + @Test + public void testPersist() throws Exception + { + HttpPersistWriterConfiguration configuration = new HttpPersistWriterConfiguration(); + configuration.setProtocol(PROTOCOL); + configuration.setHostname(HOSTNAME); + configuration.setPort((long) PORT); + configuration.setResourcePath("/"); - /* - Instance under tests. + /* + Instance under tests. */ - SimpleHTTPPostPersistWriter writer = new SimpleHTTPPostPersistWriter(configuration); + SimpleHTTPPostPersistWriter writer = new SimpleHTTPPostPersistWriter(configuration); - writer.prepare(null); + writer.prepare(null); - StreamsDatum testDatum = new StreamsDatum(mapper.readValue("{\"message\":\"ping\"}", ObjectNode.class)); + StreamsDatum testDatum = new StreamsDatum(mapper.readValue("{\"message\":\"ping\"}", ObjectNode.class)); - writer.write(testDatum); + writer.write(testDatum); - Mockito.verify(this.client).execute(any(HttpUriRequest.class)); + Mockito.verify(this.client).execute(any(HttpUriRequest.class)); - Mockito.verify(this.response).close(); + Mockito.verify(this.response).close(); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-config/src/main/java/org/apache/streams/config/ComponentConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-config/src/main/java/org/apache/streams/config/ComponentConfigurator.java b/streams-config/src/main/java/org/apache/streams/config/ComponentConfigurator.java index 42b70a6..5eea60e 100644 --- a/streams-config/src/main/java/org/apache/streams/config/ComponentConfigurator.java +++ b/streams-config/src/main/java/org/apache/streams/config/ComponentConfigurator.java @@ -19,9 +19,9 @@ package org.apache.streams.config; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.reflect.TypeToken; import com.typesafe.config.Config; import com.typesafe.config.ConfigRenderOptions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,44 +30,64 @@ import java.io.Serializable; /** * ComponentConfigurator supplies serializable configuration beans derived from a specified typesafe path or object. * + * <p/> * Typically a component will select a 'default' typesafe path to be used if no other path or object is provided. * + * <p/> * For example, streams-persist-elasticsearch will use 'elasticsearch' by default, but an implementation - * such as github.com/w2ogroup/elasticsearch-reindex can resolve a reader from elasticsearch.source - * and a writer from elasticsearch.destination + * such as github.com/apache/streams-examples/local/elasticsearch-reindex + * can resolve a reader from elasticsearch.source + * and a writer from elasticsearch.destination. * */ public class ComponentConfigurator<T extends Serializable> { - private Class<T> configClass; - public ComponentConfigurator(Class<T> configClass) { - this.configClass = configClass; - } + private Class<T> configClass; - private final static Logger LOGGER = LoggerFactory.getLogger(ComponentConfigurator.class); + public ComponentConfigurator(Class<T> configClass) { + this.configClass = configClass; + } - private final static ObjectMapper mapper = new ObjectMapper(); + private static final Logger LOGGER = LoggerFactory.getLogger(ComponentConfigurator.class); - public T detectConfiguration(Config typesafeConfig) { + private static final ObjectMapper mapper = new ObjectMapper(); - T pojoConfig = null; + /** + * resolve a serializable configuration pojo from a given typesafe config object. + * @param typesafeConfig typesafeConfig + * @return result + */ + public T detectConfiguration(Config typesafeConfig) { - try { - pojoConfig = mapper.readValue(typesafeConfig.root().render(ConfigRenderOptions.concise()), configClass); - } catch (Exception e) { - e.printStackTrace(); - LOGGER.warn("Could not parse:", typesafeConfig); - } + T pojoConfig = null; - return pojoConfig; + try { + pojoConfig = mapper.readValue(typesafeConfig.root().render(ConfigRenderOptions.concise()), configClass); + } catch (Exception ex) { + ex.printStackTrace(); + LOGGER.warn("Could not parse:", typesafeConfig); } - public T detectConfiguration(String subConfig) { - Config streamsConfig = StreamsConfigurator.getConfig(); - return detectConfiguration( streamsConfig.getConfig(subConfig)); - } + return pojoConfig; + } - public T detectConfiguration(Config typesafeConfig, String subConfig) { - return detectConfiguration( typesafeConfig.getConfig(subConfig)); - } + /** + * resolve a serializable configuration pojo from a portion of the JVM config object. + * @param subConfig subConfig + * @return result + */ + public T detectConfiguration(String subConfig) { + Config streamsConfig = StreamsConfigurator.getConfig(); + return detectConfiguration( streamsConfig.getConfig(subConfig)); + } + + /** + * resolve a serializable configuration pojo from a portion of a given typesafe config object. + * @param typesafeConfig typesafeConfig + * @param subConfig subConfig + * @return result + */ + public T detectConfiguration(Config typesafeConfig, String subConfig) { + return detectConfiguration( typesafeConfig.getConfig(subConfig)); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java b/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java index 6a8fb1d..319b32a 100644 --- a/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java +++ b/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigRenderOptions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,60 +37,60 @@ import java.net.URL; */ public class StreamsConfigurator { - private final static Logger LOGGER = LoggerFactory.getLogger(ComponentConfigurator.class); - - private final static ObjectMapper mapper = new ObjectMapper(); + private static final Logger LOGGER = LoggerFactory.getLogger(ComponentConfigurator.class); - /* - Pull all configuration files from the classpath, system properties, and environment variables - */ - public static Config config = ConfigFactory.load(); + private static final ObjectMapper mapper = new ObjectMapper(); - public static Config getConfig() { - return config; - } + /* + Pull all configuration files from the classpath, system properties, and environment variables + */ + public static Config config = ConfigFactory.load(); - public static Config resolveConfig(String configUrl) throws MalformedURLException { - URL url = new URL(configUrl); - Config urlConfig = ConfigFactory.parseURL(url); - urlConfig.resolve(); - config = urlConfig; - return config; - } + public static Config getConfig() { + return config; + } + public static Config resolveConfig(String configUrl) throws MalformedURLException { + URL url = new URL(configUrl); + Config urlConfig = ConfigFactory.parseURL(url); + urlConfig.resolve(); + config = urlConfig; + return config; + } - public static StreamsConfiguration detectConfiguration() { - return detectConfiguration(config); - } - public static StreamsConfiguration detectConfiguration(Config typesafeConfig) { + public static StreamsConfiguration detectConfiguration() { + return detectConfiguration(config); + } - StreamsConfiguration pojoConfig = null; + public static StreamsConfiguration detectConfiguration(Config typesafeConfig) { - try { - pojoConfig = mapper.readValue(typesafeConfig.root().render(ConfigRenderOptions.concise()), StreamsConfiguration.class); - } catch (Exception e) { - e.printStackTrace(); - LOGGER.warn("Could not parse:", typesafeConfig); - } + StreamsConfiguration pojoConfig = null; - return pojoConfig; + try { + pojoConfig = mapper.readValue(typesafeConfig.root().render(ConfigRenderOptions.concise()), StreamsConfiguration.class); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.warn("Could not parse:", typesafeConfig); } - public static StreamsConfiguration mergeConfigurations(Config base, Config delta) { + return pojoConfig; + } - Config merged = delta.withFallback(base); + public static StreamsConfiguration mergeConfigurations(Config base, Config delta) { - StreamsConfiguration pojoConfig = null; + Config merged = delta.withFallback(base); - try { - pojoConfig = mapper.readValue(merged.root().render(ConfigRenderOptions.concise()), StreamsConfiguration.class); - } catch (Exception e) { - e.printStackTrace(); - LOGGER.warn("Failed to merge."); - } + StreamsConfiguration pojoConfig = null; - return pojoConfig; + try { + pojoConfig = mapper.readValue(merged.root().render(ConfigRenderOptions.concise()), StreamsConfiguration.class); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.warn("Failed to merge."); } + + return pojoConfig; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-config/src/test/java/org/apache/streams/config/test/ComponentConfiguratorTest.java ---------------------------------------------------------------------- diff --git a/streams-config/src/test/java/org/apache/streams/config/test/ComponentConfiguratorTest.java b/streams-config/src/test/java/org/apache/streams/config/test/ComponentConfiguratorTest.java index eddfb53..82cc6bc 100644 --- a/streams-config/src/test/java/org/apache/streams/config/test/ComponentConfiguratorTest.java +++ b/streams-config/src/test/java/org/apache/streams/config/test/ComponentConfiguratorTest.java @@ -40,7 +40,7 @@ import org.powermock.modules.junit4.PowerMockRunner; @PrepareForTest(StreamsConfigurator.class) public class ComponentConfiguratorTest { - private final static ObjectMapper mapper = new ObjectMapper(); + private static final ObjectMapper mapper = new ObjectMapper(); @Test public void testDetectDefaults() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-config/src/test/java/org/apache/streams/config/test/StreamsConfiguratorTest.java ---------------------------------------------------------------------- diff --git a/streams-config/src/test/java/org/apache/streams/config/test/StreamsConfiguratorTest.java b/streams-config/src/test/java/org/apache/streams/config/test/StreamsConfiguratorTest.java index 65dbd75..a29d8c7 100644 --- a/streams-config/src/test/java/org/apache/streams/config/test/StreamsConfiguratorTest.java +++ b/streams-config/src/test/java/org/apache/streams/config/test/StreamsConfiguratorTest.java @@ -44,7 +44,7 @@ import java.util.Scanner; */ public class StreamsConfiguratorTest { - private final static ObjectMapper mapper = new ObjectMapper(); + private static final ObjectMapper mapper = new ObjectMapper(); @Test public void testDetectConfiguration() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java index fc00321..e3bfe70 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java @@ -18,6 +18,12 @@ package org.apache.streams.amazon.kinesis; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistReader; +import org.apache.streams.core.StreamsResultSet; + import com.amazonaws.ClientConfiguration; import com.amazonaws.Protocol; import com.amazonaws.auth.AWSCredentials; @@ -27,150 +33,150 @@ import com.amazonaws.regions.Regions; import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.model.DescribeStreamResult; import com.amazonaws.services.kinesis.model.Shard; -import com.amazonaws.services.s3.AmazonS3Client; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; import com.google.common.collect.Queues; import com.typesafe.config.Config; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.DatumStatusCounter; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsPersistReader; -import org.apache.streams.core.StreamsResultSet; + import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.streams.amazon.kinesis.KinesisConfiguration; - import java.io.Serializable; import java.math.BigInteger; import java.util.List; -import java.util.Properties; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +/** + * KinesisPersistReader reads documents from kinesis. + */ public class KinesisPersistReader implements StreamsPersistReader, Serializable { - public final static String STREAMS_ID = "KinesisPersistReader"; + public static final String STREAMS_ID = "KinesisPersistReader"; - private static final Logger LOGGER = LoggerFactory.getLogger(KinesisPersistReader.class); + private static final Logger LOGGER = LoggerFactory.getLogger(KinesisPersistReader.class); - protected volatile Queue<StreamsDatum> persistQueue; + protected volatile Queue<StreamsDatum> persistQueue; - private ObjectMapper mapper = new ObjectMapper(); + private ObjectMapper mapper = new ObjectMapper(); - private KinesisReaderConfiguration config; + private KinesisReaderConfiguration config; - protected Long pollInterval = StreamsConfigurator.detectConfiguration().getBatchFrequencyMs(); + protected Long pollInterval = StreamsConfigurator.detectConfiguration().getBatchFrequencyMs(); - private List<String> streamNames; + private List<String> streamNames; - private ExecutorService executor; + private ExecutorService executor; - protected AmazonKinesisClient client; + protected AmazonKinesisClient client; - public KinesisPersistReader() { - Config config = StreamsConfigurator.config.getConfig("kinesis"); - this.config = new ComponentConfigurator<>(KinesisReaderConfiguration.class).detectConfiguration(config); - this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); - } + /** + * KinesisPersistReader constructor - resolves KinesisReaderConfiguration from JVM 'kinesis'. + */ + public KinesisPersistReader() { + Config config = StreamsConfigurator.config.getConfig("kinesis"); + this.config = new ComponentConfigurator<>(KinesisReaderConfiguration.class).detectConfiguration(config); + this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + } - public KinesisPersistReader(KinesisReaderConfiguration config) { - this.config = config; - this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); - } + /** + * KinesisPersistReader constructor - uses provided KinesisReaderConfiguration. + */ + public KinesisPersistReader(KinesisReaderConfiguration config) { + this.config = config; + this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + } - public void setConfig(KinesisReaderConfiguration config) { - this.config = config; - } - - @Override - public String getId() { - return STREAMS_ID; - } + public void setConfig(KinesisReaderConfiguration config) { + this.config = config; + } - @Override - public void startStream() { + @Override + public String getId() { + return STREAMS_ID; + } - this.streamNames = this.config.getStreams(); + @Override + public void startStream() { - for (final String stream : streamNames) { + this.streamNames = this.config.getStreams(); - DescribeStreamResult describeStreamResult = client.describeStream(stream); + for (final String stream : streamNames) { - if( "ACTIVE".equals(describeStreamResult.getStreamDescription().getStreamStatus())) { + DescribeStreamResult describeStreamResult = client.describeStream(stream); - List<Shard> shardList = describeStreamResult.getStreamDescription().getShards(); + if( "ACTIVE".equals(describeStreamResult.getStreamDescription().getStreamStatus())) { - for( Shard shard : shardList ) { - executor.submit(new KinesisPersistReaderTask(this, stream, shard.getShardId())); - } - } + List<Shard> shardList = describeStreamResult.getStreamDescription().getShards(); + for( Shard shard : shardList ) { + executor.submit(new KinesisPersistReaderTask(this, stream, shard.getShardId())); } + } } - @Override - public StreamsResultSet readAll() { - return readCurrent(); - } + } - public StreamsResultSet readCurrent() { + @Override + public StreamsResultSet readAll() { + return readCurrent(); + } - StreamsResultSet current; - synchronized( KinesisPersistReader.class ) { - current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue)); - persistQueue.clear(); - } - return current; - } + public StreamsResultSet readCurrent() { - @Override - public StreamsResultSet readNew(BigInteger bigInteger) { - return null; + StreamsResultSet current; + synchronized( KinesisPersistReader.class ) { + current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue)); + persistQueue.clear(); } - - @Override - public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) { - return null; + return current; + } + + @Override + public StreamsResultSet readNew(BigInteger bigInteger) { + return null; + } + + @Override + public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) { + return null; + } + + @Override + public boolean isRunning() { + return !executor.isShutdown() && !executor.isTerminated(); + } + + @Override + public void prepare(Object configurationObject) { + // Connect to Kinesis + synchronized (this) { + // Create the credentials Object + AWSCredentials credentials = new BasicAWSCredentials(config.getKey(), config.getSecretKey()); + + ClientConfiguration clientConfig = new ClientConfiguration(); + clientConfig.setProtocol(Protocol.valueOf(config.getProtocol().toString())); + + this.client = new AmazonKinesisClient(credentials, clientConfig); + if (!Strings.isNullOrEmpty(config.getRegion())) + this.client.setRegion(Region.getRegion(Regions.fromName(config.getRegion()))); } + streamNames = this.config.getStreams(); + executor = Executors.newFixedThreadPool(streamNames.size()); + } - @Override - public boolean isRunning() { - return !executor.isShutdown() && !executor.isTerminated(); - } + @Override + public void cleanUp() { - @Override - public void prepare(Object configurationObject) { - // Connect to Kinesis - synchronized (this) { - // Create the credentials Object - AWSCredentials credentials = new BasicAWSCredentials(config.getKey(), config.getSecretKey()); - - ClientConfiguration clientConfig = new ClientConfiguration(); - clientConfig.setProtocol(Protocol.valueOf(config.getProtocol().toString())); - - this.client = new AmazonKinesisClient(credentials, clientConfig); - if (!Strings.isNullOrEmpty(config.getRegion())) - this.client.setRegion(Region.getRegion(Regions.fromName(config.getRegion()))); - } - streamNames = this.config.getStreams(); - executor = Executors.newFixedThreadPool(streamNames.size()); - } - - @Override - public void cleanUp() { - - while( !executor.isTerminated()) { - try { - executor.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) {} - } + while( !executor.isTerminated()) { + try { + executor.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) {} } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java index 7753031..a93fda8 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java @@ -18,94 +18,102 @@ package org.apache.streams.amazon.kinesis; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; + import com.amazonaws.services.kinesis.model.GetRecordsRequest; import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; import com.amazonaws.services.kinesis.model.GetShardIteratorResult; import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.util.Base64; import com.google.common.collect.Maps; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; + import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigInteger; import java.nio.charset.Charset; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; +/** + * KinesisPersistReaderTask reads documents from kinesis on behalf of + * @see {@link KinesisPersistReader}. + */ public class KinesisPersistReaderTask implements Runnable { - private static final Logger LOGGER = LoggerFactory.getLogger(KinesisPersistReaderTask.class); + private static final Logger LOGGER = LoggerFactory.getLogger(KinesisPersistReaderTask.class); - private KinesisPersistReader reader; - private String streamName; - private String shardId; + private KinesisPersistReader reader; + private String streamName; + private String shardId; - private String shardIteratorId; + private String shardIteratorId; - private Long pollInterval = StreamsConfigurator.detectConfiguration().getBatchFrequencyMs(); + private Long pollInterval = StreamsConfigurator.detectConfiguration().getBatchFrequencyMs(); - public KinesisPersistReaderTask(KinesisPersistReader reader, String streamName, String shardId) { - this.reader = reader; - this.streamName = streamName; - this.shardId = shardId; - } + /** + * KinesisPersistReaderTask constructor. + */ + public KinesisPersistReaderTask(KinesisPersistReader reader, String streamName, String shardId) { + this.reader = reader; + this.streamName = streamName; + this.shardId = shardId; + } - @Override - public void run() { + @Override + public void run() { - GetShardIteratorRequest shardIteratorRequest = new GetShardIteratorRequest() - .withStreamName(this.streamName) - .withShardId(shardId) - .withShardIteratorType("TRIM_HORIZON"); + GetShardIteratorRequest shardIteratorRequest = new GetShardIteratorRequest() + .withStreamName(this.streamName) + .withShardId(shardId) + .withShardIteratorType("TRIM_HORIZON"); - GetShardIteratorResult shardIteratorResult = reader.client.getShardIterator(shardIteratorRequest); + GetShardIteratorResult shardIteratorResult = reader.client.getShardIterator(shardIteratorRequest); - shardIteratorId = shardIteratorResult.getShardIterator(); + shardIteratorId = shardIteratorResult.getShardIterator(); - Map<String,Object> metadata = Maps.newHashMap(); - metadata.put("streamName", streamName); - metadata.put("shardId", shardId); + Map<String,Object> metadata = Maps.newHashMap(); + metadata.put("streamName", streamName); + metadata.put("shardId", shardId); - while(true) { + while (true) { - GetRecordsRequest recordsRequest = new GetRecordsRequest() - .withShardIterator(shardIteratorId); + GetRecordsRequest recordsRequest = new GetRecordsRequest() + .withShardIterator(shardIteratorId); - GetRecordsResult recordsResult = reader.client.getRecords(recordsRequest); + GetRecordsResult recordsResult = reader.client.getRecords(recordsRequest); - LOGGER.info("{} records {} millis behind {}:{}:{} ", recordsResult.getRecords().size(), recordsResult.getMillisBehindLatest(), streamName, shardId, shardIteratorId); + LOGGER.info("{} records {} millis behind {}:{}:{} ", recordsResult.getRecords().size(), recordsResult.getMillisBehindLatest(), streamName, shardId, shardIteratorId); - shardIteratorId = recordsResult.getNextShardIterator(); + shardIteratorId = recordsResult.getNextShardIterator(); - List<Record> recordList = recordsResult.getRecords(); + List<Record> recordList = recordsResult.getRecords(); - for (Record record : recordList) { - try { - byte[] byteArray = record.getData().array(); - //byte[] decoded = Base64.decode(byteArray); - String message = new String(byteArray, Charset.forName("UTF-8")); - reader.persistQueue.add( - new StreamsDatum( - message, - record.getPartitionKey(), - new DateTime(), - new BigInteger(record.getSequenceNumber()), - metadata)); - } catch( Exception e ) { - LOGGER.warn("Exception processing record {}: {}", record, e); - } - } - try { - Thread.sleep(reader.pollInterval); - } catch (InterruptedException e) {} + for (Record record : recordList) { + try { + byte[] byteArray = record.getData().array(); + //byte[] decoded = Base64.decode(byteArray); + String message = new String(byteArray, Charset.forName("UTF-8")); + reader.persistQueue.add( + new StreamsDatum( + message, + record.getPartitionKey(), + new DateTime(), + new BigInteger(record.getSequenceNumber()), + metadata)); + } catch ( Exception ex ) { + LOGGER.warn("Exception processing record {}: {}", record, ex); } - + } + try { + Thread.sleep(reader.pollInterval); + } catch (InterruptedException ex) { + LOGGER.trace("InterruptedException", ex); + } } + } + }
