STREAMS-186:Delete unneeded Configurator classes, this closes apache/incubator-streams#304
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b273496c Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b273496c Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b273496c Branch: refs/heads/master Commit: b273496c966235fae6a57e38bd47bf811a8b50f4 Parents: 48d54c2 Author: smarthi <[email protected]> Authored: Fri Oct 21 14:55:00 2016 -0400 Committer: smarthi <[email protected]> Committed: Fri Oct 21 14:55:00 2016 -0400 ---------------------------------------------------------------------- .../components/http/HttpConfigurator.java | 75 ----- .../persist/SimpleHTTPPostPersistWriter.java | 31 +- .../http/processor/SimpleHTTPGetProcessor.java | 28 +- .../http/processor/SimpleHTTPPostProcessor.java | 34 +- .../http/provider/SimpleHTTPGetProvider.java | 64 ---- .../http/provider/SimpleHttpProvider.java | 32 +- .../streams/config/StreamsConfigurator.java | 2 - .../org/apache/streams/s3/S3Configurator.java | 76 ----- .../ElasticsearchConfigurator.java | 77 ----- .../ElasticsearchPersistWriter.java | 37 ++- .../elasticsearch/ElasticsearchQuery.java | 17 +- .../DatumFromMetadataAsDocumentProcessor.java | 31 +- .../processor/DatumFromMetadataProcessor.java | 17 +- .../processor/DocumentToMetadataProcessor.java | 18 +- .../filebuffer/FileBufferConfigurator.java | 53 ---- .../filebuffer/FileBufferPersistReader.java | 7 +- .../filebuffer/FileBufferPersistWriter.java | 11 +- .../apache/streams/hbase/HbaseConfigurator.java | 61 ---- .../streams/hbase/HbasePersistWriter.java | 31 +- .../streams/hbase/HbasePersistWriterTask.java | 2 +- .../apache/streams/hdfs/HdfsConfigurator.java | 59 ---- .../apache/streams/kafka/KafkaConfigurator.java | 49 --- .../streams/kafka/KafkaPersistReader.java | 19 +- .../streams/kafka/KafkaPersistWriter.java | 16 +- .../apache/streams/mongo/MongoConfigurator.java | 49 --- .../streams/mongo/MongoPersistReader.java | 66 ++-- .../streams/mongo/MongoPersistWriter.java | 24 +- .../peoplepattern/AccountTypeProcessor.java | 11 +- .../peoplepattern/DemographicsProcessor.java | 11 +- .../provider/FacebookStreamConfigurator.java | 66 ---- .../com/google/gmail/GMailConfigurator.java | 45 --- .../google/gmail/provider/GMailProvider.java | 43 +-- .../gplus/provider/AbstractGPlusProvider.java | 21 +- .../gplus/provider/GPlusConfigurator.java | 54 ---- .../instagram/InstagramConfigurator.java | 65 ---- .../provider/InstagramAbstractProvider.java | 26 +- .../InstagramRecentMediaProvider.java | 21 +- .../data/moreover/MoreoverConfigurator.java | 58 ---- .../rss/provider/RssStreamConfigurator.java | 48 --- .../streams/rss/provider/RssStreamProvider.java | 47 ++- .../serializer/SyndEntryActivitySerializer.java | 11 +- .../sysomos/config/SysomosConfigurator.java | 39 --- .../FetchAndReplaceTwitterProcessor.java | 9 +- .../twitter/provider/TwitterStreamProvider.java | 10 +- .../youtube/provider/YoutubeConfigurator.java | 50 --- .../com/youtube/provider/YoutubeProvider.java | 18 +- .../queues/ThroughputQueueMulitThreadTest.java | 315 ------------------- .../queues/ThroughputQueueMultiThreadTest.java | 315 +++++++++++++++++++ 48 files changed, 609 insertions(+), 1660 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java deleted file mode 100644 index ae17dbe..0000000 --- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.components.http; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigRenderOptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Converts a {@link com.typesafe.config.Config} element into an instance of HttpConfiguration - */ -public class HttpConfigurator { - - private final static Logger LOGGER = LoggerFactory.getLogger(HttpConfigurator.class); - - private final static ObjectMapper mapper = new ObjectMapper(); - - public static HttpProviderConfiguration detectProviderConfiguration(Config config) { - - HttpProviderConfiguration httpProviderConfiguration = null; - - try { - httpProviderConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), HttpProviderConfiguration.class); - } catch (Exception e) { - e.printStackTrace(); - LOGGER.warn("Could not parse http configuration", e.getMessage()); - } - return httpProviderConfiguration; - } - - public static HttpProcessorConfiguration detectProcessorConfiguration(Config config) { - - HttpProcessorConfiguration httpProcessorConfiguration = null; - - try { - httpProcessorConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), HttpProcessorConfiguration.class); - } catch (Exception e) { - e.printStackTrace(); - LOGGER.warn("Could not parse http configuration", e.getMessage()); - } - return httpProcessorConfiguration; - } - - public static HttpPersistWriterConfiguration detectPersistWriterConfiguration(Config config) { - - HttpPersistWriterConfiguration httpPersistWriterConfiguration = null; - - try { - httpPersistWriterConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), HttpPersistWriterConfiguration.class); - } catch (Exception e) { - e.printStackTrace(); - LOGGER.warn("Could not parse http configuration", e.getMessage()); - } - return httpPersistWriterConfiguration; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java index c7a0dd9..d8309d9 100644 --- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java +++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import com.google.common.collect.Maps; import org.apache.commons.codec.binary.Base64; import org.apache.http.HttpEntity; import org.apache.http.HttpStatus; @@ -34,8 +33,8 @@ import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; -import org.apache.streams.components.http.HttpConfigurator; import org.apache.streams.components.http.HttpPersistWriterConfiguration; +import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; @@ -47,11 +46,9 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URI; import java.net.URISyntaxException; +import java.util.HashMap; import java.util.Map; -/** - * Created by steve on 11/12/14. - */ public class SimpleHTTPPostPersistWriter implements StreamsPersistWriter { private final static String STREAMS_ID = "SimpleHTTPPostPersistWriter"; @@ -69,7 +66,8 @@ public class SimpleHTTPPostPersistWriter implements StreamsPersistWriter { protected String authHeader; public SimpleHTTPPostPersistWriter() { - this(HttpConfigurator.detectPersistWriterConfiguration(StreamsConfigurator.config.getConfig("http"))); + this(new ComponentConfigurator<>(HttpPersistWriterConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("http"))); } public SimpleHTTPPostPersistWriter(HttpPersistWriterConfiguration configuration) { @@ -128,8 +126,7 @@ public class SimpleHTTPPostPersistWriter implements StreamsPersistWriter { Override this to add parameters to the request */ protected Map<String, String> prepareParams(StreamsDatum entry) { - - return Maps.newHashMap(); + return new HashMap<>(); } /** @@ -157,9 +154,7 @@ public class SimpleHTTPPostPersistWriter implements StreamsPersistWriter { try { String entity = mapper.writeValueAsString(payload); httppost.setEntity(new StringEntity(entity)); - } catch (JsonProcessingException e) { - LOGGER.warn(e.getMessage()); - } catch (UnsupportedEncodingException e) { + } catch (JsonProcessingException | UnsupportedEncodingException e) { LOGGER.warn(e.getMessage()); } return httppost; @@ -173,7 +168,7 @@ public class SimpleHTTPPostPersistWriter implements StreamsPersistWriter { CloseableHttpResponse response = null; - String entityString = null; + String entityString; try { response = httpclient.execute(httpPost); HttpEntity entity = response.getEntity(); @@ -186,8 +181,10 @@ public class SimpleHTTPPostPersistWriter implements StreamsPersistWriter { LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, e.getMessage()); } finally { try { - response.close(); - } catch (IOException e) {} + if (response != null) { + response.close(); + } + } catch (IOException ignored) {} } return result; } @@ -207,11 +204,7 @@ public class SimpleHTTPPostPersistWriter implements StreamsPersistWriter { uriBuilder = uriBuilder.addParameter("access_token", configuration.getAccessToken()); if( !Strings.isNullOrEmpty(configuration.getUsername()) && !Strings.isNullOrEmpty(configuration.getPassword())) { - StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.append(configuration.getUsername()); - stringBuilder.append(":"); - stringBuilder.append(configuration.getPassword()); - String string = stringBuilder.toString(); + String string = configuration.getUsername() + ":" + configuration.getPassword(); authHeader = Base64.encodeBase64String(string.getBytes()); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java index 0d6747e..5868ba6 100644 --- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java +++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java @@ -22,8 +22,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.commons.codec.binary.Base64; import org.apache.http.HttpEntity; import org.apache.http.client.methods.CloseableHttpResponse; @@ -32,13 +30,13 @@ import org.apache.http.client.utils.URIBuilder; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; -import org.apache.streams.components.http.HttpConfigurator; import org.apache.streams.components.http.HttpProcessorConfiguration; +import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.pojo.extensions.ExtensionUtil; import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.extensions.ExtensionUtil; import org.apache.streams.pojo.json.ActivityObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +44,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -71,7 +71,8 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor { protected String authHeader; public SimpleHTTPGetProcessor() { - this(HttpConfigurator.detectProcessorConfiguration(StreamsConfigurator.config.getConfig("http"))); + this(new ComponentConfigurator<>(HttpProcessorConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("http"))); } public SimpleHTTPGetProcessor(HttpProcessorConfiguration processorConfiguration) { @@ -147,7 +148,7 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor { @Override public List<StreamsDatum> process(StreamsDatum entry) { - List<StreamsDatum> result = Lists.newArrayList(); + List<StreamsDatum> result = new ArrayList<>(); ObjectNode rootDocument = getRootDocument(entry); @@ -172,8 +173,10 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor { return result; } finally { try { - response.close(); - } catch (IOException e) {} + if (response != null) { + response.close(); + } + } catch (IOException ignored) {} } if( entityString == null ) @@ -218,8 +221,7 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor { Override this to add parameters to the request */ protected Map<String, String> prepareParams(StreamsDatum entry) { - - return Maps.newHashMap(); + return new HashMap<>(); } /** @@ -251,11 +253,7 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor { uriBuilder = uriBuilder.addParameter("access_token", configuration.getAccessToken()); if( !Strings.isNullOrEmpty(configuration.getUsername()) && !Strings.isNullOrEmpty(configuration.getPassword())) { - StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.append(configuration.getUsername()); - stringBuilder.append(":"); - stringBuilder.append(configuration.getPassword()); - String string = stringBuilder.toString(); + String string = configuration.getUsername() + ":" + configuration.getPassword(); authHeader = Base64.encodeBase64String(string.getBytes()); } httpclient = HttpClients.createDefault(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java index 528e50c..f6089f6 100644 --- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java +++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java @@ -22,12 +22,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.commons.codec.binary.Base64; import org.apache.http.HttpEntity; import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.utils.URIBuilder; import org.apache.http.entity.ContentType; @@ -35,8 +32,8 @@ import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; -import org.apache.streams.components.http.HttpConfigurator; import org.apache.streams.components.http.HttpProcessorConfiguration; +import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; @@ -49,6 +46,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -72,7 +71,8 @@ public class SimpleHTTPPostProcessor implements StreamsProcessor { protected String authHeader; public SimpleHTTPPostProcessor() { - this(HttpConfigurator.detectProcessorConfiguration(StreamsConfigurator.config.getConfig("http"))); + this(new ComponentConfigurator<>(HttpProcessorConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("http"))); } public SimpleHTTPPostProcessor(HttpProcessorConfiguration processorConfiguration) { @@ -148,7 +148,7 @@ public class SimpleHTTPPostProcessor implements StreamsProcessor { @Override public List<StreamsDatum> process(StreamsDatum entry) { - List<StreamsDatum> result = Lists.newArrayList(); + List<StreamsDatum> result = new ArrayList<>(); ObjectNode rootDocument = getRootDocument(entry); @@ -184,8 +184,10 @@ public class SimpleHTTPPostProcessor implements StreamsProcessor { return result; } finally { try { - response.close(); - } catch (IOException e) {} + if (response != null) { + response.close(); + } + } catch (IOException ignored) {} } if( entityString == null ) @@ -213,19 +215,15 @@ public class SimpleHTTPPostProcessor implements StreamsProcessor { Override this to add parameters to the request */ protected Map<String, String> prepareParams(StreamsDatum entry) { - - return Maps.newHashMap(); + return new HashMap<>(); } /** Override this to add parameters to the request */ protected HttpEntity preparePayload(StreamsDatum entry) { - - HttpEntity entity = (new StringEntity("{}", - ContentType.create("application/json"))); - - return entity; + return new StringEntity("{}", + ContentType.create("application/json")); } @@ -252,11 +250,7 @@ public class SimpleHTTPPostProcessor implements StreamsProcessor { uriBuilder = uriBuilder.addParameter("access_token", configuration.getAccessToken()); if( !Strings.isNullOrEmpty(configuration.getUsername()) && !Strings.isNullOrEmpty(configuration.getPassword())) { - StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.append(configuration.getUsername()); - stringBuilder.append(":"); - stringBuilder.append(configuration.getPassword()); - String string = stringBuilder.toString(); + String string = configuration.getUsername() + ":" + configuration.getPassword(); authHeader = Base64.encodeBase64String(string.getBytes()); } httpclient = HttpClients.createDefault(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java deleted file mode 100644 index fae01cc..0000000 --- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.components.http.provider; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; -import org.apache.http.HttpEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.utils.URIBuilder; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; -import org.apache.streams.components.http.HttpConfigurator; -import org.apache.streams.components.http.HttpProviderConfiguration; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProvider; -import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.math.BigInteger; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.*; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -/** - * @Deprecated - * Replaced by SimpleHttpProvider, which can use GET or POST - */ -@Deprecated -public class SimpleHTTPGetProvider extends SimpleHttpProvider { - - private final static String STREAMS_ID = "SimpleHTTPGetProcessor"; - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java index 46d1cca..2078647 100644 --- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java +++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java @@ -24,9 +24,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; import com.google.common.util.concurrent.Uninterruptibles; -import org.apache.commons.lang.NotImplementedException; import org.apache.http.HttpEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; @@ -39,8 +37,8 @@ import org.apache.http.conn.ssl.TrustSelfSignedStrategy; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; -import org.apache.streams.components.http.HttpConfigurator; import org.apache.streams.components.http.HttpProviderConfiguration; +import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; @@ -58,19 +56,15 @@ import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; -import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Queue; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -91,14 +85,15 @@ public class SimpleHttpProvider implements StreamsProvider { protected HttpProviderConfiguration configuration; - protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>(); protected final ReadWriteLock lock = new ReentrantReadWriteLock(); private ExecutorService executor; public SimpleHttpProvider() { - this(HttpConfigurator.detectProviderConfiguration(StreamsConfigurator.config.getConfig("http"))); + this(new ComponentConfigurator<>(HttpProviderConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("http"))); } public SimpleHttpProvider(HttpProviderConfiguration providerConfiguration) { @@ -116,8 +111,7 @@ public class SimpleHttpProvider implements StreamsProvider { Override this to add parameters to the request */ protected Map<String, String> prepareParams(StreamsDatum entry) { - - return Maps.newHashMap(); + return new HashMap<>(); } public HttpRequestBase prepareHttpRequest(URI uri) { @@ -154,11 +148,7 @@ public class SimpleHttpProvider implements StreamsProvider { builder.loadTrustMaterial(null, new TrustSelfSignedStrategy()); sslsf = new SSLConnectionSocketFactory( builder.build(), SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER); - } catch (NoSuchAlgorithmException e) { - LOGGER.warn(e.getMessage()); - } catch (KeyManagementException e) { - LOGGER.warn(e.getMessage()); - } catch (KeyStoreException e) { + } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) { LOGGER.warn(e.getMessage()); } @@ -243,7 +233,7 @@ public class SimpleHttpProvider implements StreamsProvider { CloseableHttpResponse response = null; - String entityString = null; + String entityString; try { response = httpclient.execute(httpRequest); HttpEntity entity = response.getEntity(); @@ -259,8 +249,10 @@ public class SimpleHttpProvider implements StreamsProvider { LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, e.getMessage()); } finally { try { - response.close(); - } catch (IOException e) {} + if (response != null) { + response.close(); + } + } catch (IOException ignored) {} } return results; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java b/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java index 9fa20b6..6a8fb1d 100644 --- a/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java +++ b/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java @@ -19,14 +19,12 @@ package org.apache.streams.config; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.reflect.TypeToken; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigRenderOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; import java.net.MalformedURLException; import java.net.URL; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java deleted file mode 100644 index de32898..0000000 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.streams.s3; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigRenderOptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class S3Configurator { - - private final static Logger LOGGER = LoggerFactory.getLogger(S3Configurator.class); - - private final static ObjectMapper mapper = new ObjectMapper(); - - public static S3Configuration detectConfiguration(Config s3) { - - S3Configuration s3Configuration = null; - - try { - s3Configuration = mapper.readValue(s3.root().render(ConfigRenderOptions.concise()), S3Configuration.class); - } catch (Exception e) { - LOGGER.warn("Could not parse S3Configuration", e); - } - - return s3Configuration; - } - - public static S3ReaderConfiguration detectReaderConfiguration(Config s3) { - - S3ReaderConfiguration s3Configuration = null; - - try { - s3Configuration = mapper.readValue(s3.root().render(ConfigRenderOptions.concise()), S3ReaderConfiguration.class); - } catch (Exception e) { - LOGGER.warn("Could not parse S3Configuration", e); - } - - return s3Configuration; - } - - public static S3WriterConfiguration detectWriterConfiguration(Config s3) { - - S3WriterConfiguration s3Configuration = null; - - try { - s3Configuration = mapper.readValue(s3.root().render(ConfigRenderOptions.concise()), S3WriterConfiguration.class); - - if(!s3Configuration.getWriterPath().endsWith("/")) { - s3Configuration.setWriterPath(s3Configuration.getWriterPath() + "/"); - } - } catch (Exception e) { - LOGGER.warn("Could not parse S3Configuration", e); - } - - return s3Configuration; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java deleted file mode 100644 index 439b5de..0000000 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.elasticsearch; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigRenderOptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration - */ -public class ElasticsearchConfigurator { - - private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfigurator.class); - - private final static ObjectMapper mapper = new ObjectMapper(); - - public static ElasticsearchConfiguration detectConfiguration(Config elasticsearch) { - - ElasticsearchConfiguration elasticsearchConfiguration = null; - - try { - elasticsearchConfiguration = mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), ElasticsearchConfiguration.class); - } catch (Exception e) { - e.printStackTrace(); - LOGGER.warn("Could not parse elasticsearchconfiguration"); - } - - return elasticsearchConfiguration; - } - - public static ElasticsearchReaderConfiguration detectReaderConfiguration(Config elasticsearch) { - - ElasticsearchReaderConfiguration elasticsearchReaderConfiguration = null; - - try { - elasticsearchReaderConfiguration = mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), ElasticsearchReaderConfiguration.class); - } catch (Exception e) { - e.printStackTrace(); - LOGGER.warn("Could not parse elasticsearchconfiguration"); - } - - return elasticsearchReaderConfiguration; - } - - public static ElasticsearchWriterConfiguration detectWriterConfiguration(Config elasticsearch) { - - ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = null; - - try { - elasticsearchWriterConfiguration = mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), ElasticsearchWriterConfiguration.class); - } catch (Exception e) { - e.printStackTrace(); - LOGGER.warn("Could not parse elasticsearchwriterconfiguration"); - } - return elasticsearchWriterConfiguration; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java index b268fae..49523f8 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java @@ -22,9 +22,13 @@ package org.apache.streams.elasticsearch; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.*; +import org.apache.streams.core.DatumStatus; +import org.apache.streams.core.DatumStatusCountable; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistWriter; import org.apache.streams.jackson.StreamsJacksonMapper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; @@ -45,8 +49,12 @@ import java.io.IOException; import java.io.Serializable; import java.text.DecimalFormat; import java.text.NumberFormat; -import java.util.*; -import java.util.concurrent.TimeUnit; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -57,7 +65,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class); private static final NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##"); private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###"); - private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l; + private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5L * 1024L * 1024L; private static final int DEFAULT_BATCH_SIZE = 100; //ES defaults its bulk index queue to 50 items. We want to be under this on our backoff so set this to 1/2 ES default //at a batch size as configured here. @@ -67,7 +75,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt protected static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance(); - protected final List<String> affectedIndexes = new ArrayList<String>(); + protected final List<String> affectedIndexes = new ArrayList<>(); protected final ElasticsearchClientManager manager; protected final ElasticsearchWriterConfiguration config; @@ -96,7 +104,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt private final AtomicLong totalSizeInBytes = new AtomicLong(0); public ElasticsearchPersistWriter() { - this(ElasticsearchConfigurator.detectWriterConfiguration(StreamsConfigurator.config.getConfig("elasticsearch"))); + this(new ComponentConfigurator<>(ElasticsearchWriterConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch"))); } public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) { @@ -170,10 +179,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt } protected String docAsJson(Object streamsDocument) throws IOException { - - String docAsJson = (streamsDocument instanceof String) ? streamsDocument.toString() : OBJECT_MAPPER.writeValueAsString(streamsDocument); - - return docAsJson; + return (streamsDocument instanceof String) ? streamsDocument.toString() : OBJECT_MAPPER.writeValueAsString(streamsDocument); } protected StreamsDatum appendMetadata(StreamsDatum streamsDatum) throws IOException { @@ -210,7 +216,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt LOGGER.warn("This is unexpected: {}", e); } finally { - if( veryLargeBulk == true ) { + if(veryLargeBulk) { resetRefreshInterval(); } @@ -219,7 +225,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt LOGGER.debug("refreshIndexes completed"); } - LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding()); + LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", + this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding()); timer.cancel(); LOGGER.debug("cleanUp completed"); @@ -275,7 +282,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt return; // wait for one minute to catch up if it needs to - waitToCatchUp(5, 1 * 60 * 1000); + waitToCatchUp(5, 60 * 1000); // call the flush command. flush(this.bulkRequest, this.currentBatchItems.get(), this.currentBatchBytes.get()); @@ -306,7 +313,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt private void checkForBackOff() { try { if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) { - /**************************************************************************** + /* * Author: * Smashew * http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java index 03f40d6..d9e9273 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java @@ -19,19 +19,21 @@ package org.apache.streams.elasticsearch; import com.fasterxml.jackson.core.JsonProcessingException; -import com.google.common.collect.Lists; +import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.jackson.StreamsJacksonMapper; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.script.Script; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortBuilders; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -42,10 +44,9 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH private ElasticsearchClientManager elasticsearchClientManager; private ElasticsearchReaderConfiguration config; - private List<String> indexes = Lists.newArrayList(); - private List<String> types = Lists.newArrayList(); + private List<String> indexes = new ArrayList<>(); + private List<String> types = new ArrayList<>(); private int limit = 1000 * 1000 * 1000; // we are going to set the default limit very high to 1bil - private boolean random = false; private int batchSize = 100; private String scrollTimeout = "5m"; private org.elasticsearch.index.query.QueryBuilder queryBuilder; @@ -59,7 +60,8 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH private StreamsJacksonMapper mapper = StreamsJacksonMapper.getInstance(); public ElasticsearchQuery() { - this(ElasticsearchConfigurator.detectReaderConfiguration(StreamsConfigurator.config.getConfig("elasticsearch"))); + this(new ComponentConfigurator<>(ElasticsearchReaderConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch"))); } public ElasticsearchQuery(ElasticsearchReaderConfiguration config) { @@ -141,8 +143,9 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH search = search.setTypes(types.toArray(new String[0])); // TODO: Replace when all clusters are upgraded past 0.90.4 so we can implement a RANDOM scroll. - if (this.random) - search = search.addSort(SortBuilders.scriptSort("random()", "number")); + boolean random = false; + if (random) + search = search.addSort(SortBuilders.scriptSort(new Script("random()"), "number")); } // We don't have a scroll, we need to create a scroll http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java index 355c471..26012ef 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java @@ -18,18 +18,20 @@ package org.apache.streams.elasticsearch.processor; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.typesafe.config.Config; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.elasticsearch.ElasticsearchClientManager; -import org.apache.streams.elasticsearch.ElasticsearchConfigurator; import org.apache.streams.elasticsearch.ElasticsearchMetadataUtil; import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration; import org.apache.streams.jackson.StreamsJacksonMapper; @@ -37,18 +39,12 @@ import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.GetResponse; import org.joda.time.DateTime; -import java.io.IOException; -import java.io.Serializable; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - /** * Uses index and type in metadata map stored in datum document to populate current document into datums */ public class DatumFromMetadataAsDocumentProcessor implements StreamsProcessor, Serializable { - public final static String STREAMS_ID = "DatumFromMetadataProcessor"; + private final static String STREAMS_ID = "DatumFromMetadataProcessor"; private ElasticsearchClientManager elasticsearchClientManager; private ElasticsearchReaderConfiguration config; @@ -56,12 +52,13 @@ public class DatumFromMetadataAsDocumentProcessor implements StreamsProcessor, S private ObjectMapper mapper; public DatumFromMetadataAsDocumentProcessor() { - Config config = StreamsConfigurator.config.getConfig("elasticsearch"); - this.config = ElasticsearchConfigurator.detectReaderConfiguration(config); + this.config = new ComponentConfigurator<>(ElasticsearchReaderConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch")); } public DatumFromMetadataAsDocumentProcessor(Config config) { - this.config = ElasticsearchConfigurator.detectReaderConfiguration(config); + this.config = new ComponentConfigurator<>(ElasticsearchReaderConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch")); } public DatumFromMetadataAsDocumentProcessor(ElasticsearchReaderConfiguration config) { @@ -75,7 +72,7 @@ public class DatumFromMetadataAsDocumentProcessor implements StreamsProcessor, S @Override public List<StreamsDatum> process(StreamsDatum entry) { - List<StreamsDatum> result = Lists.newArrayList(); + List<StreamsDatum> result = new ArrayList<>(); ObjectNode metadataObjectNode; try { @@ -86,7 +83,7 @@ public class DatumFromMetadataAsDocumentProcessor implements StreamsProcessor, S Map<String, Object> metadata = ElasticsearchMetadataUtil.asMap(metadataObjectNode); - if(entry == null || entry.getMetadata() == null) + if(entry.getMetadata() == null) return result; String index = ElasticsearchMetadataUtil.getIndex(metadata, config); @@ -98,7 +95,7 @@ public class DatumFromMetadataAsDocumentProcessor implements StreamsProcessor, S getRequestBuilder.setFetchSource(true); GetResponse getResponse = getRequestBuilder.get(); - if( getResponse == null || getResponse.isExists() == false || getResponse.isSourceEmpty() == true ) + if( getResponse == null || !getResponse.isExists() || getResponse.isSourceEmpty()) return result; entry.setDocument(getResponse.getSource()); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java index 512e191..f1b7bfc 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java @@ -18,13 +18,12 @@ package org.apache.streams.elasticsearch.processor; -import com.google.common.collect.Lists; import com.typesafe.config.Config; +import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.elasticsearch.ElasticsearchClientManager; -import org.apache.streams.elasticsearch.ElasticsearchConfigurator; import org.apache.streams.elasticsearch.ElasticsearchMetadataUtil; import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration; import org.elasticsearch.action.get.GetRequestBuilder; @@ -32,6 +31,7 @@ import org.elasticsearch.action.get.GetResponse; import org.joda.time.DateTime; import java.io.Serializable; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -40,18 +40,19 @@ import java.util.Map; */ public class DatumFromMetadataProcessor implements StreamsProcessor, Serializable { - public final static String STREAMS_ID = "DatumFromMetadataProcessor"; + private final static String STREAMS_ID = "DatumFromMetadataProcessor"; private ElasticsearchClientManager elasticsearchClientManager; private ElasticsearchReaderConfiguration config; public DatumFromMetadataProcessor() { - Config config = StreamsConfigurator.config.getConfig("elasticsearch"); - this.config = ElasticsearchConfigurator.detectReaderConfiguration(config); + this.config = new ComponentConfigurator<>(ElasticsearchReaderConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch")); } public DatumFromMetadataProcessor(Config config) { - this.config = ElasticsearchConfigurator.detectReaderConfiguration(config); + this.config = new ComponentConfigurator<>(ElasticsearchReaderConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch")); } public DatumFromMetadataProcessor(ElasticsearchReaderConfiguration config) { @@ -65,7 +66,7 @@ public class DatumFromMetadataProcessor implements StreamsProcessor, Serializabl @Override public List<StreamsDatum> process(StreamsDatum entry) { - List<StreamsDatum> result = Lists.newArrayList(); + List<StreamsDatum> result = new ArrayList<>(); if(entry == null || entry.getMetadata() == null) return result; @@ -81,7 +82,7 @@ public class DatumFromMetadataProcessor implements StreamsProcessor, Serializabl getRequestBuilder.setFetchSource(true); GetResponse getResponse = getRequestBuilder.get(); - if( getResponse == null || getResponse.isExists() == false || getResponse.isSourceEmpty() == true ) + if( getResponse == null || getResponse.isExists() || getResponse.isSourceEmpty() ) return result; entry.setDocument(getResponse.getSource()); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java index 41cc12d..9a08654 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java @@ -18,28 +18,18 @@ package org.apache.streams.elasticsearch.processor; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.typesafe.config.Config; -import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.elasticsearch.ElasticsearchClientManager; -import org.apache.streams.elasticsearch.ElasticsearchConfigurator; import org.apache.streams.elasticsearch.ElasticsearchMetadataUtil; -import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration; import org.apache.streams.jackson.StreamsJacksonMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.io.Serializable; -import java.util.Iterator; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -51,7 +41,7 @@ import java.util.Map; */ public class DocumentToMetadataProcessor implements StreamsProcessor, Serializable { - public final static String STREAMS_ID = "DatumFromMetadataProcessor"; + private final static String STREAMS_ID = "DatumFromMetadataProcessor"; private ObjectMapper mapper; @@ -67,7 +57,7 @@ public class DocumentToMetadataProcessor implements StreamsProcessor, Serializab @Override public List<StreamsDatum> process(StreamsDatum entry) { - List<StreamsDatum> result = Lists.newArrayList(); + List<StreamsDatum> result = new ArrayList<>(); Object object = entry.getDocument(); ObjectNode metadataObjectNode; @@ -81,7 +71,7 @@ public class DocumentToMetadataProcessor implements StreamsProcessor, Serializab Map<String, Object> metadata = ElasticsearchMetadataUtil.asMap(metadataObjectNode); - if(entry == null || metadata == null) + if(metadata == null) return result; entry.setMetadata(metadata); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferConfigurator.java b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferConfigurator.java deleted file mode 100644 index 23e6d7c..0000000 --- a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferConfigurator.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.filebuffer; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigRenderOptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Resolves FileBufferConfiguration from typesafe - * - * Deprecated: use ComponentConfigurator<FileBufferConfiguration> instead. - */ -@Deprecated -public class FileBufferConfigurator { - - private final static Logger LOGGER = LoggerFactory.getLogger(FileBufferConfigurator.class); - - private final static ObjectMapper mapper = new ObjectMapper(); - - public static FileBufferConfiguration detectConfiguration(Config kafka) { - - FileBufferConfiguration fileConfiguration = null; - - try { - fileConfiguration = mapper.readValue(kafka.root().render(ConfigRenderOptions.concise()), FileBufferConfiguration.class); - } catch (Exception e) { - e.printStackTrace(); - LOGGER.warn("Could not parse FileConfiguration"); - } - - return fileConfiguration; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java index 39eb853..504ea5e 100644 --- a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java +++ b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java @@ -22,11 +22,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.Queues; import com.squareup.tape.QueueFile; +import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistReader; import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.filebuffer.FileBufferConfiguration; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +48,7 @@ import java.util.concurrent.Executors; */ public class FileBufferPersistReader implements StreamsPersistReader, Serializable { - public final static String STREAMS_ID = "FileBufferPersistReader"; + public static final String STREAMS_ID = "FileBufferPersistReader"; private static final Logger LOGGER = LoggerFactory.getLogger(FileBufferPersistReader.class); @@ -66,7 +66,8 @@ public class FileBufferPersistReader implements StreamsPersistReader, Serializab private ExecutorService executor = Executors.newSingleThreadExecutor(); public FileBufferPersistReader() { - this(FileBufferConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("filebuffer"))); + this(new ComponentConfigurator<>(FileBufferConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("filebuffer"))); } public FileBufferPersistReader(FileBufferConfiguration config) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java index 89c45c8..4dea85c 100644 --- a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java +++ b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java @@ -22,10 +22,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.squareup.tape.QueueFile; +import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; -import org.apache.streams.filebuffer.FileBufferConfiguration; import org.apache.streams.util.GuidUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +54,8 @@ public class FileBufferPersistWriter implements StreamsPersistWriter, Serializab private QueueFile queueFile; public FileBufferPersistWriter() { - this(FileBufferConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("filebuffer"))); + this(new ComponentConfigurator<>(FileBufferConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("filebuffer"))); } public FileBufferPersistWriter(FileBufferConfiguration config) { @@ -71,9 +72,9 @@ public class FileBufferPersistWriter implements StreamsPersistWriter, Serializab String key = entry.getId() != null ? entry.getId() : GuidUtils.generateGuid("filewriter"); - Preconditions.checkArgument(Strings.isNullOrEmpty(key) == false); + Preconditions.checkArgument(!Strings.isNullOrEmpty(key)); Preconditions.checkArgument(entry.getDocument() instanceof String); - Preconditions.checkArgument(Strings.isNullOrEmpty((String)entry.getDocument()) == false); + Preconditions.checkArgument(!Strings.isNullOrEmpty((String) entry.getDocument())); byte[] item = ((String)entry.getDocument()).getBytes(); try { @@ -101,7 +102,7 @@ public class FileBufferPersistWriter implements StreamsPersistWriter, Serializab Preconditions.checkNotNull(queueFile); - this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + this.persistQueue = new ConcurrentLinkedQueue<>(); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbaseConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbaseConfigurator.java b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbaseConfigurator.java deleted file mode 100644 index 0d8c8f3..0000000 --- a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbaseConfigurator.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.hbase; - -import com.typesafe.config.Config; -import org.apache.streams.config.StreamsConfigurator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Created by sblackmon on 12/10/13. - */ -public class HbaseConfigurator { - - private final static Logger LOGGER = LoggerFactory.getLogger(HbaseConfigurator.class); - - public static HbaseConfiguration detectConfiguration() { - - Config zookeeper = StreamsConfigurator.config.getConfig("zookeeper"); - Config hbase = StreamsConfigurator.config.getConfig("hbase"); - - String rootdir = hbase.getString("rootdir"); - - Config znode = zookeeper.getConfig("znode"); - - String rootserver = znode.getString("rootserver"); - String parent = znode.getString("parent"); - Integer clientPort = hbase.getConfig("zookeeper").getConfig("property").getInt("clientPort"); - String quorum = hbase.getConfig("zookeeper").getString("quorum"); - - HbaseConfiguration hbaseConfiguration = new HbaseConfiguration(); - - hbaseConfiguration.setRootdir(rootdir); - hbaseConfiguration.setRootserver(rootserver); - hbaseConfiguration.setParent(parent); - hbaseConfiguration.setQuorum(quorum); - hbaseConfiguration.setClientPort(clientPort.longValue()); - hbaseConfiguration.setTable(hbase.getString("table")); - hbaseConfiguration.setFamily(hbase.getString("family")); - hbaseConfiguration.setQualifier(hbase.getString("qualifier")); - - return hbaseConfiguration; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java index 4a51fab..1e066fb 100644 --- a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java +++ b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java @@ -20,21 +20,27 @@ package org.apache.streams.hbase; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.Closeable; +import java.io.Flushable; +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.HTablePool; +import org.apache.hadoop.hbase.client.Put; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; import org.apache.streams.util.GuidUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; -import java.io.Flushable; -import java.io.IOException; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; - public class HbasePersistWriter implements StreamsPersistWriter, Flushable, Closeable { public final static String STREAMS_ID = "HbasePersistWriter"; @@ -53,12 +59,14 @@ public class HbasePersistWriter implements StreamsPersistWriter, Flushable, Clos private HbaseConfiguration config; public HbasePersistWriter() { - this.config = HbaseConfigurator.detectConfiguration(); - this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + this.config = new ComponentConfigurator<>(HbaseConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("hbase")); + this.persistQueue = new ConcurrentLinkedQueue<>(); } public HbasePersistWriter(Queue<StreamsDatum> persistQueue) { - this.config = HbaseConfigurator.detectConfiguration(); + this.config = new ComponentConfigurator<>(HbaseConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("hbase")); this.persistQueue = persistQueue; } @@ -110,7 +118,6 @@ public class HbasePersistWriter implements StreamsPersistWriter, Flushable, Clos { LOGGER.error("There was an error connecting to HBase, please check your settings and try again"); e.printStackTrace(); - return; } } @@ -164,7 +171,6 @@ public class HbasePersistWriter implements StreamsPersistWriter, Flushable, Clos } catch (IOException e) { e.printStackTrace(); LOGGER.warn("Failure executin put: {}", streamsDatum.getDocument().toString()); - return; } } @@ -191,7 +197,6 @@ public class HbasePersistWriter implements StreamsPersistWriter, Flushable, Clos task.join(); } catch (InterruptedException e) { e.printStackTrace(); - return; } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java index 5484f8e..19a398d 100644 --- a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java +++ b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java @@ -48,7 +48,7 @@ public class HbasePersistWriterTask implements Runnable { } try { Thread.sleep(new Random().nextInt(1)); - } catch (InterruptedException e) {} + } catch (InterruptedException ignored) {} } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java deleted file mode 100644 index c4823c3..0000000 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.hdfs; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigRenderOptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Created by sblackmon on 12/10/13. - */ -public class HdfsConfigurator { - - private final static Logger LOGGER = LoggerFactory.getLogger(HdfsConfigurator.class); - - private final static ObjectMapper mapper = new ObjectMapper(); - - public static HdfsConfiguration detectConfiguration(Config hdfs) { - - HdfsConfiguration hdfsConfiguration = null; - - try { - hdfsConfiguration = mapper.readValue(hdfs.root().render(ConfigRenderOptions.concise()), HdfsConfiguration.class); - } catch (Exception e) { - e.printStackTrace(); - LOGGER.warn("Could not parse HdfsConfiguration"); - } - return hdfsConfiguration; - } - - public static HdfsReaderConfiguration detectReaderConfiguration(Config hdfs) { - - return mapper.convertValue(detectConfiguration(hdfs), HdfsReaderConfiguration.class); - } - - public static HdfsWriterConfiguration detectWriterConfiguration(Config hdfs) { - - return mapper.convertValue(detectConfiguration(hdfs), HdfsWriterConfiguration.class); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaConfigurator.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaConfigurator.java deleted file mode 100644 index 9f64ae6..0000000 --- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaConfigurator.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.kafka; - -import com.typesafe.config.Config; -import org.apache.streams.config.StreamsConfigurator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Created by sblackmon on 12/10/13. - */ -public class KafkaConfigurator { - - private final static Logger LOGGER = LoggerFactory.getLogger(KafkaConfigurator.class); - - public static KafkaConfiguration detectConfiguration(Config kafka) { - String brokerlist = StreamsConfigurator.config.getString("kafka.metadata.broker.list"); - String zkconnect = StreamsConfigurator.config.getString("kafka.zkconnect"); - String topic = StreamsConfigurator.config.getString("kafka.topic"); - String groupId = StreamsConfigurator.config.getString("kafka.groupid"); - - KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(); - - kafkaConfiguration.setBrokerlist(brokerlist); - kafkaConfiguration.setZkconnect(zkconnect); - kafkaConfiguration.setTopic(topic); - kafkaConfiguration.setGroupId(groupId); - - return kafkaConfiguration; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java index 7d31d41..d54e794 100644 --- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java +++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java @@ -19,7 +19,6 @@ package org.apache.streams.kafka; import com.fasterxml.jackson.databind.ObjectMapper; -import com.typesafe.config.Config; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; @@ -27,6 +26,7 @@ import kafka.consumer.Whitelist; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; +import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistReader; @@ -35,8 +35,6 @@ import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.streams.kafka.KafkaConfiguration; - import java.io.Serializable; import java.math.BigInteger; import java.util.List; @@ -59,7 +57,6 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable { private KafkaConfiguration config; - private ConsumerConfig consumerConfig; private ConsumerConnector consumerConnector; public List<KafkaStream<String, String>> inStreams; @@ -67,14 +64,14 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable { private ExecutorService executor = Executors.newSingleThreadExecutor(); public KafkaPersistReader() { - Config config = StreamsConfigurator.config.getConfig("kafka"); - this.config = KafkaConfigurator.detectConfiguration(config); - this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + this.config = new ComponentConfigurator<>(KafkaConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka")); + this.persistQueue = new ConcurrentLinkedQueue<>(); } public KafkaPersistReader(Queue<StreamsDatum> persistQueue) { - Config config = StreamsConfigurator.config.getConfig("kafka"); - this.config = KafkaConfigurator.detectConfiguration(config); + this.config = new ComponentConfigurator<>(KafkaConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka")); this.persistQueue = persistQueue; } @@ -93,7 +90,7 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable { Properties props = new Properties(); props.setProperty("serializer.encoding", "UTF8"); - consumerConfig = new ConsumerConfig(props); + ConsumerConfig consumerConfig = new ConsumerConfig(props); consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); @@ -154,7 +151,7 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable { while( !executor.isTerminated()) { try { executor.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) {} + } catch (InterruptedException ignored) {} } } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java index 0f685c5..83032e6 100644 --- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java +++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java @@ -20,10 +20,10 @@ package org.apache.streams.kafka; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.typesafe.config.Config; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; +import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; @@ -51,14 +51,14 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, R private Producer<String, String> producer; public KafkaPersistWriter() { - Config config = StreamsConfigurator.config.getConfig("kafka"); - this.config = KafkaConfigurator.detectConfiguration(config); - this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + this.config = new ComponentConfigurator<>(KafkaConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka")); + this.persistQueue = new ConcurrentLinkedQueue<>(); } public KafkaPersistWriter(Queue<StreamsDatum> persistQueue) { - Config config = StreamsConfigurator.config.getConfig("kafka"); - this.config = KafkaConfigurator.detectConfiguration(config); + this.config = new ComponentConfigurator<>(KafkaConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka")); this.persistQueue = persistQueue; } @@ -76,7 +76,7 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, R ProducerConfig config = new ProducerConfig(props); - producer = new Producer<String, String>(config); + producer = new Producer<>(config); new Thread(new KafkaPersistWriterTask(this)).start(); } @@ -106,7 +106,7 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, R String hash = GuidUtils.generateGuid(text); - KeyedMessage<String, String> data = new KeyedMessage<String, String>(config.getTopic(), hash, text); + KeyedMessage<String, String> data = new KeyedMessage<>(config.getTopic(), hash, text); producer.send(data);
