http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java ---------------------------------------------------------------------- diff --git a/streams-monitoring/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java b/streams-monitoring/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java index e4d883d..35dbcd5 100644 --- a/streams-monitoring/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java +++ b/streams-monitoring/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java @@ -15,73 +15,78 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.jackson; +import org.apache.streams.pojo.json.ThroughputQueueBroadcast; + import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.JsonNode; -import org.apache.streams.pojo.json.ThroughputQueueBroadcast; import org.slf4j.Logger; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.Arrays; import javax.management.MBeanAttributeInfo; import javax.management.MBeanInfo; import javax.management.MBeanServer; import javax.management.ObjectName; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.util.Arrays; public class ThroughputQueueDeserializer extends JsonDeserializer<ThroughputQueueBroadcast> { - private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(ThroughputQueueDeserializer.class); - public ThroughputQueueDeserializer() { + private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(ThroughputQueueDeserializer.class); - } + public ThroughputQueueDeserializer() { - @Override - public ThroughputQueueBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException { - try { - MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + } - ThroughputQueueBroadcast throughputQueueBroadcast = new ThroughputQueueBroadcast(); - JsonNode attributes = jsonParser.getCodec().readTree(jsonParser); + @Override + public ThroughputQueueBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException { + try { + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); - ObjectName name = new ObjectName(attributes.get("canonicalName").asText()); - MBeanInfo info = server.getMBeanInfo(name); - throughputQueueBroadcast.setName(name.toString()); + ThroughputQueueBroadcast throughputQueueBroadcast = new ThroughputQueueBroadcast(); + JsonNode attributes = jsonParser.getCodec().readTree(jsonParser); - for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) { - try { - switch(attribute.getName()) { - case "CurrentSize": - throughputQueueBroadcast.setCurrentSize((long) server.getAttribute(name, attribute.getName())); - break; - case "AvgWait": - throughputQueueBroadcast.setAvgWait((double) server.getAttribute(name, attribute.getName())); - break; - case "MaxWait": - throughputQueueBroadcast.setMaxWait((long) server.getAttribute(name, attribute.getName())); - break; - case "Removed": - throughputQueueBroadcast.setRemoved((long) server.getAttribute(name, attribute.getName())); - break; - case "Added": - throughputQueueBroadcast.setAdded((long) server.getAttribute(name, attribute.getName())); - break; - case "Throughput": - throughputQueueBroadcast.setThroughput((double) server.getAttribute(name, attribute.getName())); - break; - } - } catch (Exception e) { - LOGGER.error("Exception while trying to deserialize ThroughputQueueBroadcast object: {}", e); - } - } + ObjectName name = new ObjectName(attributes.get("canonicalName").asText()); + MBeanInfo info = server.getMBeanInfo(name); + throughputQueueBroadcast.setName(name.toString()); - return throughputQueueBroadcast; - } catch (Exception e) { - return null; + for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) { + try { + switch (attribute.getName()) { + case "CurrentSize": + throughputQueueBroadcast.setCurrentSize((long) server.getAttribute(name, attribute.getName())); + break; + case "AvgWait": + throughputQueueBroadcast.setAvgWait((double) server.getAttribute(name, attribute.getName())); + break; + case "MaxWait": + throughputQueueBroadcast.setMaxWait((long) server.getAttribute(name, attribute.getName())); + break; + case "Removed": + throughputQueueBroadcast.setRemoved((long) server.getAttribute(name, attribute.getName())); + break; + case "Added": + throughputQueueBroadcast.setAdded((long) server.getAttribute(name, attribute.getName())); + break; + case "Throughput": + throughputQueueBroadcast.setThroughput((double) server.getAttribute(name, attribute.getName())); + break; + default: + break; + } + } catch (Exception ex) { + LOGGER.error("Exception while trying to deserialize ThroughputQueueBroadcast object: {}", ex); } + } + + return throughputQueueBroadcast; + } catch (Exception ex) { + return null; } + } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/MessagePersister.java ---------------------------------------------------------------------- diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/MessagePersister.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/MessagePersister.java index 28c7fa7..667e9f6 100644 --- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/MessagePersister.java +++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/MessagePersister.java @@ -15,20 +15,21 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.monitoring.persist; import java.util.List; /** - * Interface to define how we persist messages (JMX/monitoring related) + * Interface to define how we persist messages (JMX/monitoring related). */ public interface MessagePersister { - /** - * Given a list of messages, persist them out through whatever appropriate - * broadcast mechanism (HTTP request, SLF4J log, etc.) - * @param messages - * @return statusCode represents whether or not the persist was successful - */ - int persistMessages(List<String> messages); + /** + * Given a list of messages, persist them out through whatever appropriate + * broadcast mechanism (HTTP request, SLF4J log, etc.). + * @param messages List of String messages + * @return statusCode represents whether or not the persist was successful + */ + int persistMessages(List<String> messages); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersister.java ---------------------------------------------------------------------- diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersister.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersister.java index bf0591f..1466f31 100644 --- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersister.java +++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersister.java @@ -15,8 +15,11 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.monitoring.persist.impl; +import org.apache.streams.monitoring.persist.MessagePersister; + import com.google.common.collect.Lists; import org.apache.http.HttpResponse; import org.apache.http.NameValuePair; @@ -25,70 +28,70 @@ import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.HttpPost; import org.apache.http.impl.client.HttpClients; import org.apache.http.message.BasicNameValuePair; -import org.apache.streams.monitoring.persist.MessagePersister; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; public class BroadcastMessagePersister implements MessagePersister { - private final static Logger LOGGER = LoggerFactory.getLogger(BroadcastMessagePersister.class); - private String broadcastURI; - public BroadcastMessagePersister(String broadcastURI) { - this.broadcastURI = broadcastURI; - } + private static final Logger LOGGER = LoggerFactory.getLogger(BroadcastMessagePersister.class); + private String broadcastUri; - @Override - /** - * Given a list of messages as Strings, broadcast them to the broadcastURI - * (if one is defined) - * @param messages - * @return int status code from POST response - */ - public int persistMessages(List<String> messages) { - int responseCode = -1; + public BroadcastMessagePersister(String broadcastUri) { + this.broadcastUri = broadcastUri; + } - if(broadcastURI != null) { - try { - HttpClient client = HttpClients.createDefault(); - HttpPost post = new HttpPost(broadcastURI); + @Override + /** + * Given a list of messages as Strings, broadcast them to the broadcastUri + * (if one is defined) + * @param messages + * @return int status code from POST response + */ + public int persistMessages(List<String> messages) { + int responseCode = -1; - post.setHeader("User-Agent", "Streams"); + if (broadcastUri != null) { + try { + HttpClient client = HttpClients.createDefault(); + HttpPost post = new HttpPost(broadcastUri); - List<NameValuePair> urlParameters = Lists.newArrayList(); - urlParameters.add(new BasicNameValuePair("messages", serializeMessages(messages))); + post.setHeader("User-Agent", "Streams"); - post.setEntity(new UrlEncodedFormEntity(urlParameters, "UTF-8")); + List<NameValuePair> urlParameters = Lists.newArrayList(); + urlParameters.add(new BasicNameValuePair("messages", serializeMessages(messages))); - HttpResponse response = client.execute(post); - responseCode = response.getStatusLine().getStatusCode(); + post.setEntity(new UrlEncodedFormEntity(urlParameters, "UTF-8")); - LOGGER.debug("Broadcast {} messages to URI: {}", messages.size(), broadcastURI); - } catch (Exception e) { - LOGGER.error("Failed to broadcast message to URI: {}, exception: {}", broadcastURI, e); - } - } + HttpResponse response = client.execute(post); + responseCode = response.getStatusLine().getStatusCode(); - return responseCode; + LOGGER.debug("Broadcast {} messages to URI: {}", messages.size(), broadcastUri); + } catch (Exception ex) { + LOGGER.error("Failed to broadcast message to URI: {}, exception: {}", broadcastUri, ex); + } } - /** - * Given a List of String messages, convert them to a JSON array - * @param messages - * @return Serialized version of this JSON array - */ - private String serializeMessages(List<String> messages) { - String ser = "{\"messages\":["; - - for(String message : messages) { - if(messages.get(messages.size()-1).equals(message)) { - ser += message + "]}"; - } else { - ser += message + ","; - } - } - - return ser; + return responseCode; + } + + /** + * Given a List of String messages, convert them to a JSON array. + * @param messages List of String messages + * @return Serialized version of this JSON array + */ + private String serializeMessages(List<String> messages) { + String ser = "{\"messages\":["; + + for (String message : messages) { + if (messages.get(messages.size() - 1).equals(message)) { + ser += message + "]}"; + } else { + ser += message + ","; + } } + + return ser; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersister.java ---------------------------------------------------------------------- diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersister.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersister.java index 312502c..c697661 100644 --- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersister.java +++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersister.java @@ -15,90 +15,93 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.monitoring.persist.impl; import org.apache.streams.monitoring.persist.MessagePersister; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.DatagramPacket; import java.net.DatagramSocket; -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.net.SocketException; import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.List; -import java.util.concurrent.Executors; public class LogstashUdpMessagePersister implements MessagePersister { - private final static Logger LOGGER = LoggerFactory.getLogger(LogstashUdpMessagePersister.class); - private String broadcastURI; - URI uri; + private static final Logger LOGGER = LoggerFactory.getLogger(LogstashUdpMessagePersister.class); + private String broadcastUri; + URI uri; + + public LogstashUdpMessagePersister(String broadcastUri) { + this.broadcastUri = broadcastUri; + setup(); + } - public LogstashUdpMessagePersister(String broadcastURI) { - this.broadcastURI = broadcastURI; - setup(); + /** + * setup. + */ + public void setup() { + + try { + uri = new URI(broadcastUri); + } catch (URISyntaxException ex) { + LOGGER.error(ex.getMessage()); } - public void setup() { + } - try { - uri = new URI(broadcastURI); - } catch (URISyntaxException e) { - LOGGER.error(e.getMessage()); - } + @Override + /** + * Given a list of messages as Strings, broadcast them to the broadcastUri + * (if one is defined) + * @param messages + * @return int status code from POST response + */ + public int persistMessages(List<String> messages) { + int responseCode = -1; - } - @Override - /** - * Given a list of messages as Strings, broadcast them to the broadcastURI - * (if one is defined) - * @param messages - * @return int status code from POST response - */ - public int persistMessages(List<String> messages) { - int responseCode = -1; - - if(broadcastURI != null) { - DatagramSocket socket = null; - try { - socket = new DatagramSocket(); - } catch (SocketException e) { - LOGGER.error("Metrics Broadcast Setup Failed: " + e.getMessage()); - } - try { - ByteBuffer toWrite = ByteBuffer.wrap(serializeMessages(messages).getBytes()); - byte[] byteArray = toWrite.array(); - DatagramPacket packet = new DatagramPacket(byteArray, byteArray.length); - socket.connect(new InetSocketAddress(uri.getHost(), uri.getPort())); - socket.send(packet); - } catch( Exception e ) { - LOGGER.error("Metrics Broadcast Failed: " + e.getMessage()); - } finally { - socket.close(); - } - } - - return responseCode; + if (broadcastUri != null) { + DatagramSocket socket = null; + try { + socket = new DatagramSocket(); + } catch (SocketException ex) { + LOGGER.error("Metrics Broadcast Setup Failed: " + ex.getMessage()); + } + try { + ByteBuffer toWrite = ByteBuffer.wrap(serializeMessages(messages).getBytes()); + byte[] byteArray = toWrite.array(); + DatagramPacket packet = new DatagramPacket(byteArray, byteArray.length); + socket.connect(new InetSocketAddress(uri.getHost(), uri.getPort())); + socket.send(packet); + } catch ( Exception ex ) { + LOGGER.error("Metrics Broadcast Failed: " + ex.getMessage()); + } finally { + socket.close(); + } } - /** - * Given a List of String messages, convert them to a JSON array - * @param messages - * @return Serialized version of this JSON array - */ - private String serializeMessages(List<String> messages) { + return responseCode; + } - StringBuilder json_lines = new StringBuilder(); - for(String message : messages) { - json_lines.append(message).append('\n'); - } + /** + * Given a List of String messages, convert them to a JSON array. + * @param messages List of String messages + * @return Serialized version of this JSON array + */ + private String serializeMessages(List<String> messages) { - return json_lines.toString(); + StringBuilder jsonLines = new StringBuilder(); + for (String message : messages) { + jsonLines.append(message).append('\n'); } + return jsonLines.toString(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/SLF4JMessagePersister.java ---------------------------------------------------------------------- diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/SLF4JMessagePersister.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/SLF4JMessagePersister.java deleted file mode 100644 index 19c36f2..0000000 --- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/SLF4JMessagePersister.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.monitoring.persist.impl; - -import org.apache.streams.monitoring.persist.MessagePersister; -import org.slf4j.Logger; - -import java.util.List; - -public class SLF4JMessagePersister implements MessagePersister { - private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(SLF4JMessagePersister.class); - private static final int SUCCESS_STATUS = 0; - private static final int FAILURE_STATUS = -1; - - public SLF4JMessagePersister() { - - } - - @Override - public int persistMessages(List<String> messages) { - for(String message : messages) { - LOGGER.info(message); - } - - return SUCCESS_STATUS; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/Slf4jMessagePersister.java ---------------------------------------------------------------------- diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/Slf4jMessagePersister.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/Slf4jMessagePersister.java new file mode 100644 index 0000000..b237871 --- /dev/null +++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/Slf4jMessagePersister.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.monitoring.persist.impl; + +import org.apache.streams.monitoring.persist.MessagePersister; + +import org.slf4j.Logger; + +import java.util.List; + +/** + * Persist montoring messages to SLF4J. + */ +public class Slf4jMessagePersister implements MessagePersister { + + private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(Slf4jMessagePersister.class); + private static final int SUCCESS_STATUS = 0; + private static final int FAILURE_STATUS = -1; + + public Slf4jMessagePersister() { + + } + + @Override + public int persistMessages(List<String> messages) { + + for (String message : messages) { + LOGGER.info(message); + } + + return SUCCESS_STATUS; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java ---------------------------------------------------------------------- diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java index f21a212..a797ce5 100644 --- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java +++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java @@ -15,189 +15,204 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.monitoring.tasks; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.common.collect.Lists; import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.jackson.*; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.jackson.DatumStatusCounterDeserializer; +import org.apache.streams.jackson.MemoryUsageDeserializer; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.jackson.StreamsTaskCounterDeserializer; +import org.apache.streams.jackson.ThroughputQueueDeserializer; import org.apache.streams.local.monitoring.MonitoringConfiguration; import org.apache.streams.monitoring.persist.MessagePersister; import org.apache.streams.monitoring.persist.impl.BroadcastMessagePersister; import org.apache.streams.monitoring.persist.impl.LogstashUdpMessagePersister; -import org.apache.streams.monitoring.persist.impl.SLF4JMessagePersister; +import org.apache.streams.monitoring.persist.impl.Slf4jMessagePersister; import org.apache.streams.pojo.json.Broadcast; import org.apache.streams.pojo.json.DatumStatusCounterBroadcast; import org.apache.streams.pojo.json.MemoryUsageBroadcast; import org.apache.streams.pojo.json.StreamsTaskCounterBroadcast; import org.apache.streams.pojo.json.ThroughputQueueBroadcast; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.Lists; import org.slf4j.Logger; -import javax.management.*; import java.lang.management.ManagementFactory; import java.net.URI; -import java.net.URISyntaxException; import java.util.List; import java.util.Map; import java.util.Set; +import javax.management.MBeanServer; +import javax.management.NotificationBroadcasterSupport; +import javax.management.ObjectName; /** * This thread runs inside of a Streams runtime and periodically persists information - * from relevant JMX beans + * from relevant JMX beans. */ public class BroadcastMonitorThread extends NotificationBroadcasterSupport implements Runnable { - private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(BroadcastMonitorThread.class); - private static MBeanServer server; - - private MonitoringConfiguration configuration; - private URI broadcastURI = null; - private MessagePersister messagePersister; - private volatile boolean keepRunning; - - private static ObjectMapper objectMapper = StreamsJacksonMapper.getInstance(); - - /** - * DEPRECATED - * Please initialize logging with monitoring object via typesafe - * @param streamConfig - */ - @Deprecated - public BroadcastMonitorThread(Map<String, Object> streamConfig) { - this(objectMapper.convertValue(streamConfig, MonitoringConfiguration.class)); - } - - public BroadcastMonitorThread(StreamsConfiguration streamConfig) { - this(objectMapper.convertValue(streamConfig.getAdditionalProperties().get("monitoring"), MonitoringConfiguration.class)); - } - - public BroadcastMonitorThread(MonitoringConfiguration configuration) { - - this.configuration = configuration; - if( this.configuration == null ) - this.configuration = new ComponentConfigurator<>(MonitoringConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().atPath("monitoring")); - - LOGGER.info("BroadcastMonitorThread created"); - - initializeObjectMapper(); - - prepare(); - - LOGGER.info("BroadcastMonitorThread initialized"); + private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(BroadcastMonitorThread.class); + private static MBeanServer server; + + private MonitoringConfiguration configuration; + private URI broadcastUri = null; + private MessagePersister messagePersister; + private volatile boolean keepRunning; + + private static ObjectMapper objectMapper = StreamsJacksonMapper.getInstance(); + + /** + * DEPRECATED + * Please initialize logging with monitoring object via typesafe. + * @param streamConfig streamConfig map. + */ + @Deprecated + public BroadcastMonitorThread(Map<String, Object> streamConfig) { + this(objectMapper.convertValue(streamConfig, MonitoringConfiguration.class)); + } + + public BroadcastMonitorThread(StreamsConfiguration streamConfig) { + this(objectMapper.convertValue(streamConfig.getAdditionalProperties().get("monitoring"), MonitoringConfiguration.class)); + } + + /** + * BroadcastMonitorThread constructor - uses supplied MonitoringConfiguration. + * @param configuration MonitoringConfiguration + */ + public BroadcastMonitorThread(MonitoringConfiguration configuration) { + + this.configuration = configuration; + if ( this.configuration == null ) { + this.configuration = new ComponentConfigurator<>(MonitoringConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().atPath("monitoring")); } - /** - * Initialize our object mapper with all of our bean's custom deserializers - * This way we can convert them to and from Strings dictated by our - * POJOs which are generated from JSON schemas - */ - private void initializeObjectMapper() { - SimpleModule simpleModule = new SimpleModule(); - - simpleModule.addDeserializer(MemoryUsageBroadcast.class, new MemoryUsageDeserializer()); - simpleModule.addDeserializer(ThroughputQueueBroadcast.class, new ThroughputQueueDeserializer()); - simpleModule.addDeserializer(StreamsTaskCounterBroadcast.class, new StreamsTaskCounterDeserializer()); - simpleModule.addDeserializer(DatumStatusCounterBroadcast.class, new DatumStatusCounterDeserializer()); - - objectMapper.registerModule(simpleModule); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - } + LOGGER.info("BroadcastMonitorThread created"); + + initializeObjectMapper(); + + prepare(); + + LOGGER.info("BroadcastMonitorThread initialized"); + + } + + /** + * Initialize our object mapper with all of our bean's custom deserializers. + * This way we can convert them to and from Strings dictated by our + * POJOs which are generated from JSON schemas. + */ + private void initializeObjectMapper() { + SimpleModule simpleModule = new SimpleModule(); + + simpleModule.addDeserializer(MemoryUsageBroadcast.class, new MemoryUsageDeserializer()); + simpleModule.addDeserializer(ThroughputQueueBroadcast.class, new ThroughputQueueDeserializer()); + simpleModule.addDeserializer(StreamsTaskCounterBroadcast.class, new StreamsTaskCounterDeserializer()); + simpleModule.addDeserializer(DatumStatusCounterBroadcast.class, new DatumStatusCounterDeserializer()); + + objectMapper.registerModule(simpleModule); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + /** + * Get all relevant JMX beans, convert their values to strings, and then persist them. + */ + @Override + public void run() { + LOGGER.info("BroadcastMonitorThread running"); + while (keepRunning) { + try { + List<String> messages = Lists.newArrayList(); + Set<ObjectName> beans = server.queryNames(null, null); + + for (ObjectName name : beans) { + String item = objectMapper.writeValueAsString(name); + Broadcast broadcast = null; + + if (name.getKeyPropertyList().get("type") != null) { + if (name.getKeyPropertyList().get("type").equals("ThroughputQueue")) { + broadcast = objectMapper.readValue(item, ThroughputQueueBroadcast.class); + } else if (name.getKeyPropertyList().get("type").equals("StreamsTaskCounter")) { + broadcast = objectMapper.readValue(item, StreamsTaskCounterBroadcast.class); + } else if (name.getKeyPropertyList().get("type").equals("DatumStatusCounter")) { + broadcast = objectMapper.readValue(item, DatumStatusCounterBroadcast.class); + } else if (name.getKeyPropertyList().get("type").equals("Memory")) { + broadcast = objectMapper.readValue(item, MemoryUsageBroadcast.class); + } - /** - * Get all relevant JMX beans, convert their values to strings, and then persist them - */ - @Override - public void run() { - LOGGER.info("BroadcastMonitorThread running"); - while(keepRunning) { - try { - List<String> messages = Lists.newArrayList(); - Set<ObjectName> beans = server.queryNames(null, null); - - for(ObjectName name : beans) { - String item = objectMapper.writeValueAsString(name); - Broadcast broadcast = null; - - if(name.getKeyPropertyList().get("type") != null) { - if (name.getKeyPropertyList().get("type").equals("ThroughputQueue")) { - broadcast = objectMapper.readValue(item, ThroughputQueueBroadcast.class); - } else if (name.getKeyPropertyList().get("type").equals("StreamsTaskCounter")) { - broadcast = objectMapper.readValue(item, StreamsTaskCounterBroadcast.class); - } else if (name.getKeyPropertyList().get("type").equals("DatumStatusCounter")) { - broadcast = objectMapper.readValue(item, DatumStatusCounterBroadcast.class); - } else if (name.getKeyPropertyList().get("type").equals("Memory")) { - broadcast = objectMapper.readValue(item, MemoryUsageBroadcast.class); - } - - if(broadcast != null) { - messages.add(objectMapper.writeValueAsString(broadcast)); - } - } - } - - messagePersister.persistMessages(messages); - Thread.sleep(configuration.getMonitoringBroadcastIntervalMs()); - } catch (InterruptedException e) { - LOGGER.debug("Broadcast Monitor Interrupted!"); - Thread.currentThread().interrupt(); - this.keepRunning = false; - } catch (Exception e) { - LOGGER.error("Exception: {}", e); - this.keepRunning = false; + if (broadcast != null) { + messages.add(objectMapper.writeValueAsString(broadcast)); } + } } + + messagePersister.persistMessages(messages); + Thread.sleep(configuration.getMonitoringBroadcastIntervalMs()); + } catch (InterruptedException ex) { + LOGGER.debug("Broadcast Monitor Interrupted!"); + Thread.currentThread().interrupt(); + this.keepRunning = false; + } catch (Exception ex) { + LOGGER.error("Exception: {}", ex); + this.keepRunning = false; + } } + } - public void prepare() { + /** + * prepare for execution. + */ + public void prepare() { - keepRunning = true; + keepRunning = true; - LOGGER.info("BroadcastMonitorThread setup " + this.configuration); + LOGGER.info("BroadcastMonitorThread setup " + this.configuration); - server = ManagementFactory.getPlatformMBeanServer(); + server = ManagementFactory.getPlatformMBeanServer(); - if (this.configuration != null && - this.configuration.getBroadcastURI() != null) { + if (this.configuration != null && this.configuration.getBroadcastURI() != null) { - try { - broadcastURI = new URI(configuration.getBroadcastURI()); - } catch (Exception e) { - LOGGER.error("invalid URI: ", e); - } + try { + broadcastUri = new URI(configuration.getBroadcastURI()); + } catch (Exception ex) { + LOGGER.error("invalid URI: ", ex); + } - if (broadcastURI != null) { - if (broadcastURI.getScheme().equals("http")) { - messagePersister = new BroadcastMessagePersister(broadcastURI.toString()); - } else if (broadcastURI.getScheme().equals("udp")) { - messagePersister = new LogstashUdpMessagePersister(broadcastURI.toString()); - } else { - LOGGER.error("You need to specify a broadcast URI with either a HTTP or UDP protocol defined."); - throw new RuntimeException(); - } - } else { - messagePersister = new SLF4JMessagePersister(); - } + if (broadcastUri != null) { + if (broadcastUri.getScheme().equals("http")) { + messagePersister = new BroadcastMessagePersister(broadcastUri.toString()); + } else if (broadcastUri.getScheme().equals("udp")) { + messagePersister = new LogstashUdpMessagePersister(broadcastUri.toString()); } else { - messagePersister = new SLF4JMessagePersister(); + LOGGER.error("You need to specify a broadcast URI with either a HTTP or UDP protocol defined."); + throw new RuntimeException(); } - + } else { + messagePersister = new Slf4jMessagePersister(); + } + } else { + messagePersister = new Slf4jMessagePersister(); } - public void shutdown() { - this.keepRunning = false; - LOGGER.debug("Shutting down BroadcastMonitor Thread"); - } + } - public String getBroadcastURI() { - return configuration.getBroadcastURI(); - } + public void shutdown() { + this.keepRunning = false; + LOGGER.debug("Shutting down BroadcastMonitor Thread"); + } - public long getWaitTime() { - return configuration.getMonitoringBroadcastIntervalMs(); - } + public String getBroadcastUri() { + return configuration.getBroadcastURI(); + } + + public long getWaitTime() { + return configuration.getMonitoringBroadcastIntervalMs(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java ---------------------------------------------------------------------- diff --git a/streams-monitoring/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java b/streams-monitoring/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java index 1c68239..8bf3219 100644 --- a/streams-monitoring/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java +++ b/streams-monitoring/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java @@ -15,13 +15,15 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.jackson; +import org.apache.streams.pojo.json.MemoryUsageBroadcast; + import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; import org.apache.commons.lang3.StringUtils; -import org.apache.streams.pojo.json.MemoryUsageBroadcast; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -35,43 +37,46 @@ import static org.junit.Assert.assertNotNull; public class MemoryUsageDeserializerTest { - private final static Logger LOGGER = LoggerFactory.getLogger(MemoryUsageDeserializerTest.class); - private ObjectMapper objectMapper; + private static final Logger LOGGER = LoggerFactory.getLogger(MemoryUsageDeserializerTest.class); + private ObjectMapper objectMapper; - @Before - public void setup() { - objectMapper = StreamsJacksonMapper.getInstance(); - SimpleModule simpleModule = new SimpleModule(); - simpleModule.addDeserializer(MemoryUsageBroadcast.class, new MemoryUsageDeserializer()); - objectMapper.registerModule(simpleModule); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - } + /** + * setup. + */ + @Before + public void setup() { + objectMapper = StreamsJacksonMapper.getInstance(); + SimpleModule simpleModule = new SimpleModule(); + simpleModule.addDeserializer(MemoryUsageBroadcast.class, new MemoryUsageDeserializer()); + objectMapper.registerModule(simpleModule); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } - @Test - public void serDeTest() { - InputStream is = MemoryUsageDeserializerTest.class.getResourceAsStream("/MemoryUsageObjects.json"); - InputStreamReader isr = new InputStreamReader(is); - BufferedReader br = new BufferedReader(isr); + @Test + public void serDeTest() { + InputStream is = MemoryUsageDeserializerTest.class.getResourceAsStream("/MemoryUsageObjects.json"); + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr); - try { - while (br.ready()) { - String line = br.readLine(); - if (!StringUtils.isEmpty(line)) { - LOGGER.info("raw: {}", line); - MemoryUsageBroadcast broadcast = objectMapper.readValue(line, MemoryUsageBroadcast.class); + try { + while (br.ready()) { + String line = br.readLine(); + if (!StringUtils.isEmpty(line)) { + LOGGER.info("raw: {}", line); + MemoryUsageBroadcast broadcast = objectMapper.readValue(line, MemoryUsageBroadcast.class); - LOGGER.info("activity: {}", broadcast); + LOGGER.info("activity: {}", broadcast); - assertNotNull(broadcast); - assertNotNull(broadcast.getVerbose()); - assertNotNull(broadcast.getObjectPendingFinalizationCount()); - assertNotNull(broadcast.getHeapMemoryUsage()); - assertNotNull(broadcast.getNonHeapMemoryUsage()); - assertNotNull(broadcast.getName()); - } - } - } catch (Exception e) { - LOGGER.error("Exception while testing serializability: {}", e); + assertNotNull(broadcast); + assertNotNull(broadcast.getVerbose()); + assertNotNull(broadcast.getObjectPendingFinalizationCount()); + assertNotNull(broadcast.getHeapMemoryUsage()); + assertNotNull(broadcast.getNonHeapMemoryUsage()); + assertNotNull(broadcast.getName()); } + } + } catch (Exception ex) { + LOGGER.error("Exception while testing serializability: {}", ex); } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersisterTest.java ---------------------------------------------------------------------- diff --git a/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersisterTest.java b/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersisterTest.java index 6e7ff6d..fc2ff71 100644 --- a/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersisterTest.java +++ b/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersisterTest.java @@ -15,6 +15,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.monitoring.persist.impl; import com.google.common.collect.Lists; @@ -28,33 +29,33 @@ import static org.junit.Assert.assertNotNull; public class BroadcastMessagePersisterTest { - @Test - public void testFailedPersist() { - BroadcastMessagePersister persister = new BroadcastMessagePersister("http://fake.url.com/fake_endpointasdfasdfas"); - - List<String> messages = Lists.newArrayList(); - for(int x = 0; x < 10; x ++) { - messages.add("Fake_message #" + x); - } - - int statusCode = persister.persistMessages(messages); + @Test + public void testFailedPersist() { + BroadcastMessagePersister persister = new BroadcastMessagePersister("http://fake.url.com/fake_endpointasdfasdfas"); - assertNotNull(statusCode); - assertNotEquals(statusCode, 200); + List<String> messages = Lists.newArrayList(); + for (int x = 0; x < 10; x++) { + messages.add("Fake_message #" + x); } - @Test - public void testInvalidURL() { - BroadcastMessagePersister persister = new BroadcastMessagePersister("h"); + int statusCode = persister.persistMessages(messages); - List<String> messages = Lists.newArrayList(); - for(int x = 0; x < 10; x ++) { - messages.add("Fake_message #" + x); - } + assertNotNull(statusCode); + assertNotEquals(statusCode, 200); + } - int statusCode = persister.persistMessages(messages); + @Test + public void testInvalidUrl() { + BroadcastMessagePersister persister = new BroadcastMessagePersister("h"); - assertNotNull(statusCode); - assertEquals(statusCode, -1); + List<String> messages = Lists.newArrayList(); + for (int x = 0; x < 10; x++) { + messages.add("Fake_message #" + x); } + + int statusCode = persister.persistMessages(messages); + + assertNotNull(statusCode); + assertEquals(statusCode, -1); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java ---------------------------------------------------------------------- diff --git a/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java b/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java index faa99a2..3f9a4c1 100644 --- a/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java +++ b/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java @@ -15,6 +15,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.monitoring.persist.impl; import com.google.common.base.Splitter; @@ -29,47 +30,51 @@ import java.net.DatagramSocket; import java.net.SocketException; import java.util.List; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; public class LogstashUdpMessagePersisterTest { - private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(LogstashUdpMessagePersisterTest.class); + private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(LogstashUdpMessagePersisterTest.class); - DatagramSocket socket = null; + DatagramSocket socket = null; - @Before - public void setup() { - try { - socket = new DatagramSocket(56789); - } catch (SocketException e) { - LOGGER.error("Metrics Broadcast Test Setup Failed: " + e.getMessage()); - } + /** + * setup. + */ + @Before + public void setup() { + try { + socket = new DatagramSocket(56789); + } catch (SocketException ex) { + LOGGER.error("Metrics Broadcast Test Setup Failed: " + ex.getMessage()); } + } - @Test - public void testFailedPersist() { - LogstashUdpMessagePersister persister = new LogstashUdpMessagePersister("udp://127.0.0.1:56789"); - - List<String> messageArray = Lists.newArrayList(); - for(int x = 0; x < 10; x ++) { - messageArray.add("Fake_message #" + x); - } + @Test + public void testFailedPersist() { + LogstashUdpMessagePersister persister = new LogstashUdpMessagePersister("udp://127.0.0.1:56789"); - persister.persistMessages(messageArray); - byte[] receiveData = new byte[1024]; + List<String> messageArray = Lists.newArrayList(); + for (int x = 0; x < 10; x ++) { + messageArray.add("Fake_message #" + x); + } - DatagramPacket messageDatagram = new DatagramPacket(receiveData, receiveData.length); + persister.persistMessages(messageArray); + byte[] receiveData = new byte[1024]; - try { - socket.receive(messageDatagram); - assertNotNull(messageDatagram); - List<String> messages = Lists.newArrayList(Splitter.on('\n').split(new String(messageDatagram.getData()))); - assertEquals(messageArray, messages.subList(0,10)); - } catch (IOException e) { - LOGGER.error("Metrics Broadcast Test Failed: " + e.getMessage()); - } + DatagramPacket messageDatagram = new DatagramPacket(receiveData, receiveData.length); + try { + socket.receive(messageDatagram); + assertNotNull(messageDatagram); + List<String> messages = Lists.newArrayList(Splitter.on('\n').split(new String(messageDatagram.getData()))); + assertEquals(messageArray, messages.subList(0,10)); + } catch (IOException ex) { + LOGGER.error("Metrics Broadcast Test Failed: " + ex.getMessage()); } + } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java ---------------------------------------------------------------------- diff --git a/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java b/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java index a959bd2..ad1bf05 100644 --- a/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java +++ b/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java @@ -15,67 +15,59 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.monitoring.tasks; -import com.google.common.collect.Maps; import org.apache.streams.config.StreamsConfiguration; import org.apache.streams.local.monitoring.MonitoringConfiguration; -import org.junit.Ignore; + import org.junit.Test; -import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class BroadcastMonitorThreadTest { - private ExecutorService executor; - @Test - public void testThreadEmptyBeanConfig() { - StreamsConfiguration streamsConfiguration = new StreamsConfiguration(); - BroadcastMonitorThread thread = new BroadcastMonitorThread(streamsConfiguration); - testThread(thread); - } + private ExecutorService executor; - @Test - public void testThreadEmptyMapConfig() { - Map<String, Object> map = Maps.newHashMap(); - BroadcastMonitorThread thread = new BroadcastMonitorThread(map); - testThread(thread); - } + @Test + public void testThreadEmptyBeanConfig() { + StreamsConfiguration streamsConfiguration = new StreamsConfiguration(); + BroadcastMonitorThread thread = new BroadcastMonitorThread(streamsConfiguration); + testThread(thread); + } - @Test - public void testThreadFakeMapConfig() { - Map<String, Object> config = Maps.newHashMap(); - config.put("broadcastURI", "http://fakeurl.com/fake"); - BroadcastMonitorThread thread = new BroadcastMonitorThread(config); - testThread(thread); - } - @Test - public void testThreadStreamsConfig() { - StreamsConfiguration streams = new StreamsConfiguration(); - MonitoringConfiguration monitoring = new MonitoringConfiguration(); - monitoring.setBroadcastURI("http://fakeurl.com/fake"); - monitoring.setMonitoringBroadcastIntervalMs(30000L); - streams.setAdditionalProperty("monitoring", monitoring); - BroadcastMonitorThread thread = new BroadcastMonitorThread(streams); - testThread(thread); - } - public void testThread(BroadcastMonitorThread thread) { - long testRunLength = thread.getWaitTime() * 1; - executor = Executors.newFixedThreadPool(1); - executor.submit(thread); + @Test + public void testThreadStreamsConfig() { - try { - Thread.sleep(testRunLength); - } catch(InterruptedException e) { - Thread.currentThread().interrupt(); - } + StreamsConfiguration streams = new StreamsConfiguration(); + MonitoringConfiguration monitoring = new MonitoringConfiguration(); + monitoring.setBroadcastURI("http://fakeurl.com/fake"); + monitoring.setMonitoringBroadcastIntervalMs(30000L); + streams.setAdditionalProperty("monitoring", monitoring); + BroadcastMonitorThread thread = new BroadcastMonitorThread(streams); + testThread(thread); + } - executor.shutdown(); + /** + * Base Test. + * @param thread BroadcastMonitorThread + */ + public void testThread(BroadcastMonitorThread thread) { + long testRunLength = thread.getWaitTime() * 1; + executor = Executors.newFixedThreadPool(1); + executor.submit(thread); + + try { + Thread.sleep(testRunLength); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); } + executor.shutdown(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraGenerationConfig.java ---------------------------------------------------------------------- diff --git a/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraGenerationConfig.java b/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraGenerationConfig.java index 964fff6..971b99f 100644 --- a/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraGenerationConfig.java +++ b/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraGenerationConfig.java @@ -16,9 +16,11 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.plugins.cassandra; import org.apache.streams.util.schema.GenerationConfig; + import org.jsonschema2pojo.DefaultGenerationConfig; import org.jsonschema2pojo.util.URLUtil; @@ -32,68 +34,71 @@ import java.util.List; import java.util.Set; /** - * Configures StreamsHiveResourceGenerator - * - * + * Configures StreamsCassandraResourceGenerator. */ public class StreamsCassandraGenerationConfig extends DefaultGenerationConfig implements GenerationConfig { - public String getSourceDirectory() { - return sourceDirectory; - } + public String getSourceDirectory() { + return sourceDirectory; + } - public List<String> getSourcePaths() { - return sourcePaths; - } + public List<String> getSourcePaths() { + return sourcePaths; + } - private String sourceDirectory; - private List<String> sourcePaths = new ArrayList<String>(); - private String targetDirectory; - private int maxDepth = 1; + private String sourceDirectory; + private List<String> sourcePaths = new ArrayList<String>(); + private String targetDirectory; + private int maxDepth = 1; - public Set<String> getExclusions() { - return exclusions; - } + public Set<String> getExclusions() { + return exclusions; + } - public void setExclusions(Set<String> exclusions) { - this.exclusions = exclusions; - } + public void setExclusions(Set<String> exclusions) { + this.exclusions = exclusions; + } - private Set<String> exclusions = new HashSet<String>(); + private Set<String> exclusions = new HashSet<String>(); - public int getMaxDepth() { - return maxDepth; - } + public int getMaxDepth() { + return maxDepth; + } - public void setSourceDirectory(String sourceDirectory) { - this.sourceDirectory = sourceDirectory; - } + public void setSourceDirectory(String sourceDirectory) { + this.sourceDirectory = sourceDirectory; + } - public void setSourcePaths(List<String> sourcePaths) { - this.sourcePaths = sourcePaths; - } + public void setSourcePaths(List<String> sourcePaths) { + this.sourcePaths = sourcePaths; + } - public void setTargetDirectory(String targetDirectory) { - this.targetDirectory = targetDirectory; - } + public void setTargetDirectory(String targetDirectory) { + this.targetDirectory = targetDirectory; + } - public File getTargetDirectory() { - return new File(targetDirectory); - } + public File getTargetDirectory() { + return new File(targetDirectory); + } - public Iterator<URL> getSource() { - if (null != sourceDirectory) { - return Collections.singleton(URLUtil.parseURL(sourceDirectory)).iterator(); - } - List<URL> sourceURLs = new ArrayList<URL>(); - if( sourcePaths != null && sourcePaths.size() > 0) - for (String source : sourcePaths) { - sourceURLs.add(URLUtil.parseURL(source)); - } - return sourceURLs.iterator(); + /** + * get all sources. + * @return Iterator of URL + */ + public Iterator<URL> getSource() { + if (null != sourceDirectory) { + return Collections.singleton(URLUtil.parseURL(sourceDirectory)).iterator(); } - - public void setMaxDepth(int maxDepth) { - this.maxDepth = maxDepth; + List<URL> sourceUrls = new ArrayList<URL>(); + if ( sourcePaths != null && sourcePaths.size() > 0) { + for (String source : sourcePaths) { + sourceUrls.add(URLUtil.parseURL(source)); + } } + return sourceUrls.iterator(); + } + + public void setMaxDepth(int maxDepth) { + this.maxDepth = maxDepth; + } }
