Repository: incubator-streams Updated Branches: refs/heads/master 7aaba1e9b -> 6f5caa238
Trivial fixes, this closes apache/incubator-streams#311 Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6f5caa23 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6f5caa23 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6f5caa23 Branch: refs/heads/master Commit: 6f5caa238af20e8eb519a4877011574976fb1089 Parents: 7aaba1e Author: smarthi <[email protected]> Authored: Sat Oct 22 10:53:10 2016 -0400 Committer: smarthi <[email protected]> Committed: Sat Oct 22 10:53:10 2016 -0400 ---------------------------------------------------------------------- .../elasticsearch/ElasticsearchClient.java | 3 --- .../ElasticsearchClientManager.java | 13 +++++------- .../ElasticsearchMetadataUtil.java | 15 ++++++------- .../ElasticsearchPersistReader.java | 22 ++++++++++---------- .../ElasticsearchPersistWriter.java | 13 +----------- .../elasticsearch/ElasticsearchQuery.java | 1 + .../regex/AbstractRegexExtensionExtractor.java | 8 ++++--- 7 files changed, 29 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6f5caa23/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java index 5bb0e9d..0b2b782 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java @@ -21,9 +21,6 @@ package org.apache.streams.elasticsearch; import org.elasticsearch.Version; import org.elasticsearch.client.Client; -/** - * Created by sblackmon on 2/10/14. - */ public class ElasticsearchClient { private Client client; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6f5caa23/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java index 60ffb5f..4809334 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java @@ -36,17 +36,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetAddress; -import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; -/** - * Created by sblackmon on 2/10/14. - */ public class ElasticsearchClientManager { private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchClientManager.class); - private static Map<String, ElasticsearchClient> ALL_CLIENTS = new HashMap<String, ElasticsearchClient>(); + private static Map<String, ElasticsearchClient> ALL_CLIENTS = new HashMap<>(); private ElasticsearchConfiguration elasticsearchConfiguration; @@ -85,7 +82,7 @@ public class ElasticsearchClientManager { } public void start() throws Exception { - /*********************************************************************** + /* * Note: * Everything in these classes is being switched to lazy loading. Within * Heroku you only have 60 seconds to connect, and bind to the service, @@ -130,11 +127,11 @@ public class ElasticsearchClientManager { } public boolean equals(Object o) { - return EqualsBuilder.reflectionEquals(this, o, Arrays.asList(this.elasticsearchConfiguration.toString())); + return EqualsBuilder.reflectionEquals(this, o, Collections.singletonList(this.elasticsearchConfiguration.toString())); } public int hashCode() { - return HashCodeBuilder.reflectionHashCode(this, Arrays.asList(this.elasticsearchConfiguration.toString())); + return HashCodeBuilder.reflectionHashCode(this, Collections.singletonList(this.elasticsearchConfiguration.toString())); } private synchronized void checkAndLoadClient(String clusterName) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6f5caa23/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java index 6cbd203..100b0c5 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java @@ -19,15 +19,12 @@ package org.apache.streams.elasticsearch; import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.Maps; import org.apache.streams.core.StreamsDatum; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; -/** - * Created by sblackmon on 10/20/14. - */ public class ElasticsearchMetadataUtil { public static String getIndex(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) { @@ -100,25 +97,25 @@ public class ElasticsearchMetadataUtil { return id; } - public static String getParent(StreamsDatum datum) { + static String getParent(StreamsDatum datum) { String parent = null; Map<String, Object> metadata = datum.getMetadata(); - if( parent == null && metadata != null && metadata.containsKey("parent")) + if(metadata != null && metadata.containsKey("parent")) parent = (String) datum.getMetadata().get("parent"); return parent; } - public static String getRouting(StreamsDatum datum) { + static String getRouting(StreamsDatum datum) { String routing = null; Map<String, Object> metadata = datum.getMetadata(); - if( routing == null && metadata != null && metadata.containsKey("routing")) + if(metadata != null && metadata.containsKey("routing")) routing = (String) datum.getMetadata().get("routing"); return routing; @@ -133,7 +130,7 @@ public class ElasticsearchMetadataUtil { public static Map<String, Object> asMap(JsonNode node) { Iterator<Map.Entry<String, JsonNode>> iterator = node.fields(); - Map<String, Object> ret = Maps.newHashMap(); + Map<String, Object> ret = new HashMap<>(); Map.Entry<String, JsonNode> entry; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6f5caa23/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java index 9103614..909f5c4 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java @@ -21,7 +21,10 @@ package org.apache.streams.elasticsearch; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Queues; -import org.apache.streams.core.*; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistReader; +import org.apache.streams.core.StreamsResultSet; import org.apache.streams.jackson.StreamsJacksonMapper; import org.elasticsearch.search.SearchHit; import org.joda.time.DateTime; @@ -31,19 +34,16 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; import java.math.BigInteger; -import java.util.*; -import java.util.concurrent.*; +import java.util.Queue; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -/** - * *********************************************************************************************************** - * Authors: - * smashew - * steveblackmon - * ************************************************************************************************************ - */ - public class ElasticsearchPersistReader implements StreamsPersistReader, Serializable { public static final String STREAMS_ID = "ElasticsearchPersistReader"; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6f5caa23/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java index 49523f8..8f9c7d7 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java @@ -24,9 +24,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.DatumStatus; -import org.apache.streams.core.DatumStatusCountable; -import org.apache.streams.core.DatumStatusCounter; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; import org.apache.streams.jackson.StreamsJacksonMapper; @@ -58,7 +55,7 @@ import java.util.TimerTask; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumStatusCountable, Serializable { +public class ElasticsearchPersistWriter implements StreamsPersistWriter, Serializable { public static final String STREAMS_ID = ElasticsearchPersistWriter.class.getCanonicalName(); @@ -268,14 +265,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt } } - @Override - public DatumStatusCounter getDatumStatusCounter() { - DatumStatusCounter counters = new DatumStatusCounter(); - counters.incrementStatus(DatumStatus.SUCCESS, (int)this.totalOk.get()); - counters.incrementStatus(DatumStatus.FAIL, (int)this.totalFailed.get()); - return counters; - } - private synchronized void flushInternal() { // we do not have a working bulk request, we can just exit here. if (this.bulkRequest == null || this.currentBatchItems.get() == 0) http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6f5caa23/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java index d9e9273..3bb4b97 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.script.Script; +import org.elasticsearch.search.Scroll; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortBuilders; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6f5caa23/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java index e4fa0e0..0f46ccd 100644 --- a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java +++ b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java @@ -32,7 +32,9 @@ import org.apache.streams.pojo.json.Activity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -70,7 +72,7 @@ public abstract class AbstractRegexExtensionExtractor<T> implements StreamsProce } else if (entry.getDocument() instanceof ObjectNode) { activity = mapper.convertValue(entry.getDocument(), Activity.class); } else { - return Lists.newArrayList(); + return new ArrayList<>(); } if (Strings.isNullOrEmpty(pattern)) { prepare(null); @@ -81,7 +83,7 @@ public abstract class AbstractRegexExtensionExtractor<T> implements StreamsProce entities.add(prepareObject(key)); } - Set<T> set = Sets.newHashSet(); + Set<T> set = new HashSet<>(); set.addAll(entities); entities.clear(); entities.addAll(set); @@ -122,7 +124,7 @@ public abstract class AbstractRegexExtensionExtractor<T> implements StreamsProce if(extensions.containsKey(extensionKey) && extensions.get(extensionKey) != null) { hashtags = Sets.newHashSet((Iterable<T>) extensions.get(extensionKey)); } else { - hashtags = Sets.newHashSet(); + hashtags = new HashSet<>(); } extensions.put(extensionKey, hashtags);
