CHUKWA-771. Improved code quality issue identified by findbugs. (Eric Yang)
Project: http://git-wip-us.apache.org/repos/asf/chukwa/repo Commit: http://git-wip-us.apache.org/repos/asf/chukwa/commit/6012e14f Tree: http://git-wip-us.apache.org/repos/asf/chukwa/tree/6012e14f Diff: http://git-wip-us.apache.org/repos/asf/chukwa/diff/6012e14f Branch: refs/heads/master Commit: 6012e14f8d96c98439ab7e71001c9266a467da72 Parents: e29381f Author: Eric Yang <[email protected]> Authored: Sun Jun 28 18:59:12 2015 -0700 Committer: Eric Yang <[email protected]> Committed: Sun Jun 28 18:59:12 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + pom.xml | 12 +- .../org/apache/hadoop/chukwa/ChunkImpl.java | 13 +- .../salsa/fsm/DataNodeClientTraceMapper.java | 2 +- .../chukwa/analysis/salsa/fsm/FSMBuilder.java | 12 +- .../chukwa/analysis/salsa/fsm/FSMType.java | 6 +- .../chukwa/analysis/salsa/fsm/HDFSState.java | 6 +- .../salsa/fsm/JobHistoryTaskDataMapper.java | 2 +- .../chukwa/analysis/salsa/fsm/MapRedState.java | 6 +- .../chukwa/analysis/salsa/fsm/StateType.java | 6 +- .../salsa/fsm/TaskTrackerClientTraceMapper.java | 6 +- .../analysis/salsa/visualization/Swimlanes.java | 4 +- .../hadoop/chukwa/database/Aggregator.java | 6 +- .../datacollection/adaptor/ExecAdaptor.java | 3 +- .../adaptor/HeartbeatAdaptor.java | 5 +- .../datacollection/adaptor/JMXAdaptor.java | 9 +- .../datacollection/adaptor/OozieAdaptor.java | 3 +- .../datacollection/adaptor/RestAdaptor.java | 6 +- .../datacollection/adaptor/SocketAdaptor.java | 3 +- .../datacollection/adaptor/SyslogAdaptor.java | 3 +- .../adaptor/filetailer/FileTailingAdaptor.java | 2 +- .../adaptor/filetailer/LWFTAdaptor.java | 4 +- .../jms/JMSMessagePropertyTransformer.java | 4 +- .../adaptor/sigar/SigarRunner.java | 5 +- .../agent/AgentControlSocketListener.java | 4 +- .../datacollection/agent/ChukwaAgent.java | 193 ++++---- .../datacollection/agent/ChukwaRestServer.java | 29 +- .../datacollection/agent/rest/Examples.java | 4 +- .../datacollection/collector/CollectorStub.java | 142 ------ .../collector/servlet/CommitCheckServlet.java | 2 +- .../collector/servlet/LogDisplayServlet.java | 2 +- .../controller/ChukwaAgentController.java | 26 +- .../datacollection/sender/AsyncAckSender.java | 2 +- .../datacollection/sender/ChukwaHttpSender.java | 5 +- .../datacollection/writer/hbase/Reporter.java | 34 +- .../datacollection/writer/solr/SolrWriter.java | 17 +- .../chukwa/datastore/ChukwaHBaseStore.java | 104 ++-- .../extraction/hbase/AbstractProcessor.java | 11 +- .../extraction/hbase/DefaultProcessor.java | 40 +- .../hbase/HadoopMetricsProcessor.java | 31 +- .../chukwa/extraction/hbase/LogEntry.java | 64 +-- .../chukwa/extraction/hbase/SystemMetrics.java | 90 ++-- .../hadoop/chukwa/hicc/ClusterConfig.java | 7 +- .../hadoop/chukwa/hicc/HiccWebServer.java | 5 +- .../apache/hadoop/chukwa/hicc/bean/Chart.java | 2 +- .../apache/hadoop/chukwa/hicc/bean/Widget.java | 4 +- .../hadoop/chukwa/hicc/proxy/HttpProxy.java | 11 +- .../chukwa/hicc/rest/SessionController.java | 6 +- .../chukwa/hicc/rest/VelocityResolver.java | 2 +- .../inputtools/mdl/TorqueInfoProcessor.java | 473 ------------------- .../chukwa/inputtools/mdl/TorqueTimerTask.java | 51 -- .../hadoop/chukwa/util/ClusterConfig.java | 7 +- .../apache/hadoop/chukwa/util/HBaseUtil.java | 10 +- .../org/apache/hadoop/chukwa/ChunkImplTest.java | 20 - .../analysis/salsa/fsm/TestFSMBuilder.java | 3 +- .../adaptor/JMX/TestJMXAdaptor.java | 3 +- .../datacollection/adaptor/TestAddAdaptor.java | 3 +- .../adaptor/TestBufferingWrappers.java | 3 +- .../adaptor/TestDirTailingAdaptor.java | 18 +- .../datacollection/adaptor/TestExecAdaptor.java | 3 +- .../datacollection/adaptor/TestFileAdaptor.java | 6 +- .../adaptor/TestHeartbeatAdaptor.java | 4 +- .../TestCharFileTailingAdaptorUTF8.java | 3 +- .../filetailer/TestFileExpirationPolicy.java | 6 +- .../adaptor/filetailer/TestFileTailer.java | 3 +- .../TestFileTailingAdaptorBigRecord.java | 3 +- .../TestFileTailingAdaptorPreserveLines.java | 3 +- .../filetailer/TestFileTailingAdaptors.java | 113 +++-- .../adaptor/filetailer/TestLogRotate.java | 1 + .../adaptor/filetailer/TestRCheckAdaptor.java | 3 +- .../adaptor/filetailer/TestRawAdaptor.java | 3 +- .../adaptor/filetailer/TestStartAtOffset.java | 6 +- .../chukwa/datacollection/agent/TestAgent.java | 6 +- .../datacollection/agent/TestAgentConfig.java | 18 +- .../datacollection/agent/TestChukwaSsl.java | 3 +- .../datacollection/agent/TestChunkQueue.java | 4 +- .../chukwa/datacollection/agent/TestCmd.java | 12 +- .../agent/rest/TestAdaptorController.java | 1 + .../collector/TestAdaptorTimeout.java | 3 +- .../collector/TestBackpressure.java | 3 +- .../collector/TestDelayedAcks.java | 6 +- .../collector/TestFailedCollectorAck.java | 3 +- .../controller/TestAgentClient.java | 3 +- .../ChukwaAgentToCollectorValidator.java | 269 ----------- 84 files changed, 596 insertions(+), 1450 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8192ffd..e23338b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -50,6 +50,8 @@ Trunk (unreleased changes) BUGS + CHUKWA-771. Improved code quality issue identified by findbugs. (Eric Yang) + CHUKWA-770. Moved default dashboard population code to login.jsp. (Eric Yang) CHUKWA-766. Updated license on source files. (Eric Yang) http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ad931c8..82a7f03 100644 --- a/pom.xml +++ b/pom.xml @@ -465,9 +465,9 @@ <goals> <goal>compile</goal> </goals> - <compilerVersion>1.6</compilerVersion> - <source>1.6</source> - <target>1.6</target> + <compilerVersion>1.7</compilerVersion> + <source>1.7</source> + <target>1.7</target> <excludes> <exclude>**/ChukwaJobTrackerInstrumentation.java</exclude> </excludes> @@ -1360,7 +1360,7 @@ </plugin> <plugin> <artifactId>maven-pmd-plugin</artifactId> - <version>2.6</version> + <version>3.4</version> <reportSets> <reportSet> <reports> @@ -1370,13 +1370,13 @@ </reportSet> </reportSets> <configuration> - <targetJdk>1.6</targetJdk> + <targetJdk>1.7</targetJdk> </configuration> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>findbugs-maven-plugin</artifactId> - <version>2.3.3</version> + <version>3.0.1</version> <configuration> <threshold>Normal</threshold> <effort>Max</effort> http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/ChunkImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/ChunkImpl.java b/src/main/java/org/apache/hadoop/chukwa/ChunkImpl.java index 1f184c7..ed90d4a 100644 --- a/src/main/java/org/apache/hadoop/chukwa/ChunkImpl.java +++ b/src/main/java/org/apache/hadoop/chukwa/ChunkImpl.java @@ -24,6 +24,7 @@ import java.io.DataOutput; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.nio.charset.Charset; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -31,7 +32,7 @@ import org.apache.hadoop.chukwa.datacollection.DataFactory; import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor; public class ChunkImpl implements org.apache.hadoop.io.Writable, Chunk { - public static int PROTOCOL_VERSION = 1; + public final static int PROTOCOL_VERSION = 1; protected DataFactory dataFactory = DataFactory.getInstance(); private String source = ""; @@ -249,9 +250,15 @@ public class ChunkImpl implements org.apache.hadoop.io.Writable, Chunk { return w; } - // FIXME: should do something better here, but this is OK for debugging public String toString() { - return source + ":" + streamName + ":" + new String(data) + "/" + seqID; + StringBuilder buffer = new StringBuilder(); + buffer.append(source); + buffer.append(":"); + buffer.append(streamName); + buffer.append(new String(data, Charset.forName("UTF-8"))); + buffer.append("/"); + buffer.append(seqID); + return buffer.toString(); } http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/DataNodeClientTraceMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/DataNodeClientTraceMapper.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/DataNodeClientTraceMapper.java index 7cb10ec..ec37f7a 100644 --- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/DataNodeClientTraceMapper.java +++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/DataNodeClientTraceMapper.java @@ -45,7 +45,7 @@ public class DataNodeClientTraceMapper { private static Log log = LogFactory.getLog(FSMBuilder.class); protected static final String SEP = "/"; - protected static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.FILESYSTEM_FSM]; + protected final static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.FILESYSTEM_FSM]; private final Pattern ipPattern = Pattern.compile(".*[a-zA-Z\\-_:\\/]([0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+)[a-zA-Z0-9\\-_:\\/].*"); http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMBuilder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMBuilder.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMBuilder.java index de28597..d3a1656 100644 --- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMBuilder.java +++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMBuilder.java @@ -75,13 +75,13 @@ public class FSMBuilder extends Configured implements Tool { * These are used for the add_info TreeMap; keys not listed here are automatically * prepended with "COUNTER_" */ - protected static String NON_COUNTER_KEYS [] = {"csource","ctags","STATE_STRING"}; + final static String NON_COUNTER_KEYS [] = {"csource","ctags","STATE_STRING"}; - protected static String JCDF_ID1 = "JCDF_ID1"; - protected static String JCDF_ID2 = "JCDF_ID2"; - protected static String JCDF_EDGE_TIME = "JCDF_E_TIME"; - protected static String JCDF_EDGE_VOL = "JCDF_E_VOL"; - protected static String JCDF_SEP = "@"; + protected final static String JCDF_ID1 = "JCDF_ID1"; + protected final static String JCDF_ID2 = "JCDF_ID2"; + protected final static String JCDF_EDGE_TIME = "JCDF_E_TIME"; + protected final static String JCDF_EDGE_VOL = "JCDF_E_VOL"; + protected final static String JCDF_SEP = "@"; /** http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMType.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMType.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMType.java index fddfeb1..234f8bb 100644 --- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMType.java +++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMType.java @@ -23,9 +23,9 @@ public class FSMType { public static final int FILESYSTEM_FSM = 1; public static final int MAPREDUCE_FSM_INCOMPLETE = 2; public static final int FILESYSTEM_FSM_INCOMPLETE = 3; - public static final String [] NAMES = { "MAPREDUCE_FSM", "FILESYSTEM_FSM", "MAPREDUCE_FSM_INCOMPLETE", "FILESYSTEM_FSM_INCOMPLETE" }; + static final String [] NAMES = { "MAPREDUCE_FSM", "FILESYSTEM_FSM", "MAPREDUCE_FSM_INCOMPLETE", "FILESYSTEM_FSM_INCOMPLETE" }; public FSMType() { this.val = 0; } public FSMType(int newval) { this.val = newval; } public int val; - public String toString() { assert(this.val < NAMES.length && this.val >= 0); return new String(NAMES[this.val]); } -} \ No newline at end of file + public String toString() { assert(this.val < NAMES.length && this.val >= 0); return String.valueOf(NAMES[this.val]); } +} http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/HDFSState.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/HDFSState.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/HDFSState.java index a2fe353..96e1bd5 100644 --- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/HDFSState.java +++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/HDFSState.java @@ -25,9 +25,9 @@ public class HDFSState { public static final int WRITE_LOCAL = 3; public static final int WRITE_REMOTE = 4; public static final int WRITE_REPLICATED = 5; - public static final String [] NAMES = { "NONE", "READ_LOCAL", "READ_REMOTE", "WRITE_LOCAL", "WRITE_REMOTE", "WRITE_REPLICATED"}; + static final String [] NAMES = { "NONE", "READ_LOCAL", "READ_REMOTE", "WRITE_LOCAL", "WRITE_REMOTE", "WRITE_REPLICATED"}; public HDFSState() { this.val = 1; } public HDFSState(int newval) { this.val = newval; } public int val; - public String toString() { assert(this.val < NAMES.length && this.val >= 0); return new String(NAMES[this.val]); } -} \ No newline at end of file + public String toString() { assert(this.val < NAMES.length && this.val >= 0); return String.valueOf(NAMES[this.val]); } +} http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/JobHistoryTaskDataMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/JobHistoryTaskDataMapper.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/JobHistoryTaskDataMapper.java index 883225a..3de268e 100644 --- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/JobHistoryTaskDataMapper.java +++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/JobHistoryTaskDataMapper.java @@ -47,7 +47,7 @@ public class JobHistoryTaskDataMapper private static Log log = LogFactory.getLog(FSMBuilder.class); protected static final String SEP = "/"; - protected static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.MAPREDUCE_FSM]; + protected final static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.MAPREDUCE_FSM]; /* * Helper function for mapper to populate TreeMap of FSMIntermedEntr http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/MapRedState.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/MapRedState.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/MapRedState.java index ae06be5..f059049 100644 --- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/MapRedState.java +++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/MapRedState.java @@ -27,10 +27,10 @@ public class MapRedState { public static final int REDUCE_REDUCER = 5; public static final int SHUFFLE_LOCAL = 6; public static final int SHUFFLE_REMOTE = 7; - public static final String [] NAMES = { "NONE", "MAP", "REDUCE", "REDUCE_SHUFFLEWAIT", + static final String [] NAMES = { "NONE", "MAP", "REDUCE", "REDUCE_SHUFFLEWAIT", "REDUCE_SORT", "REDUCE_REDUCER", "SHUFFLE_LOCAL", "SHUFFLE_REMOTE"}; public MapRedState() { this.val = 0; } public MapRedState(int newval) { this.val = newval; } public int val; - public String toString() { assert(this.val < NAMES.length && this.val >= 0); return new String(NAMES[this.val]); } -} \ No newline at end of file + public String toString() { assert(this.val < NAMES.length && this.val >= 0); return String.valueOf(NAMES[this.val]); } +} http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/StateType.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/StateType.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/StateType.java index 56e8362..7298a5c 100644 --- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/StateType.java +++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/StateType.java @@ -23,9 +23,9 @@ public class StateType { public static final int STATE_START = 1; public static final int STATE_END = 2; public static final int STATE_INSTANT = 3; - public static final String [] NAMES = {"STATE_NOOP", "STATE_START", "STATE_END", "STATE_INSTANT"}; + static final String [] NAMES = {"STATE_NOOP", "STATE_START", "STATE_END", "STATE_INSTANT"}; public StateType() { this.val = 0; } public StateType(int newval) { this.val = newval; } public int val; - public String toString() { assert(this.val < NAMES.length && this.val >= 0); return new String(NAMES[this.val]); } -} \ No newline at end of file + public String toString() { assert(this.val < NAMES.length && this.val >= 0); return String.valueOf(NAMES[this.val]); } +} http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TaskTrackerClientTraceMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TaskTrackerClientTraceMapper.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TaskTrackerClientTraceMapper.java index 785ba8b..188b076 100644 --- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TaskTrackerClientTraceMapper.java +++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TaskTrackerClientTraceMapper.java @@ -46,7 +46,7 @@ public class TaskTrackerClientTraceMapper { private static Log log = LogFactory.getLog(FSMBuilder.class); protected static final String SEP = "/"; - protected static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.MAPREDUCE_FSM]; + protected final static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.MAPREDUCE_FSM]; private final Pattern ipPattern = Pattern.compile("([0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+)[a-zA-Z\\-_:\\/].*"); @@ -149,8 +149,8 @@ public class TaskTrackerClientTraceMapper String [] k = key.getKey().split("/"); start_rec.time_orig_epoch = k[0]; - start_rec.time_orig = (new Long(actual_time_ms)).toString(); // not actually used - start_rec.timestamp = (new Long(actual_time_ms)).toString(); + start_rec.time_orig = (Long.valueOf(actual_time_ms)).toString(); // not actually used + start_rec.timestamp = (Long.valueOf(actual_time_ms)).toString(); start_rec.time_end = new String(""); start_rec.time_start = new String(start_rec.timestamp); http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java index 36b5e5e..87bf40c 100644 --- a/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java +++ b/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java @@ -317,7 +317,7 @@ public class Swimlanes { return this.states.length; } public String [] getStates() { - return this.states; + return this.states.clone(); } } @@ -903,4 +903,4 @@ public class Swimlanes { return rs_tab; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/database/Aggregator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/database/Aggregator.java b/src/main/java/org/apache/hadoop/chukwa/database/Aggregator.java index f92c9e9..1aa50af 100644 --- a/src/main/java/org/apache/hadoop/chukwa/database/Aggregator.java +++ b/src/main/java/org/apache/hadoop/chukwa/database/Aggregator.java @@ -20,12 +20,16 @@ package org.apache.hadoop.chukwa.database; import java.io.BufferedReader; import java.io.File; +import java.io.FileInputStream; import java.io.FileReader; import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.Charset; import java.text.ParsePosition; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.chukwa.util.DatabaseWriter; @@ -48,7 +52,7 @@ public class Aggregator { public static String getContents(File aFile) { StringBuffer contents = new StringBuffer(); try { - BufferedReader input = new BufferedReader(new FileReader(aFile)); + BufferedReader input = new BufferedReader(new InputStreamReader(new FileInputStream(aFile.getAbsolutePath()), Charset.forName("UTF-8"))); try { String line = null; // not declared within while loop while ((line = input.readLine()) != null) { http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java index ba68d73..1a0e2a3 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java @@ -26,6 +26,7 @@ import org.apache.log4j.helpers.ISO8601DateFormat; import org.json.simple.JSONObject; import org.json.simple.parser.ParseException; +import java.nio.charset.Charset; import java.util.*; /** @@ -87,7 +88,7 @@ public class ExecAdaptor extends AbstractAdaptor { result.append(o.get("exitValue")); result.append(": "); result.append((String) o.get("stdout")); - data = result.toString().getBytes(); + data = result.toString().getBytes(Charset.forName("UTF-8")); } else { String stdout = (String) o.get("stdout"); data = stdout.getBytes(); http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/HeartbeatAdaptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/HeartbeatAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/HeartbeatAdaptor.java index c2792a7..313abc8 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/HeartbeatAdaptor.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/HeartbeatAdaptor.java @@ -20,6 +20,7 @@ package org.apache.hadoop.chukwa.datacollection.adaptor; import java.io.DataOutputStream; import java.io.IOException; import java.net.Socket; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -72,7 +73,7 @@ public class HeartbeatAdaptor extends AbstractAdaptor { status.put("components", array); if(_shouldUseConnector){ ChunkImpl chunk = new ChunkImpl(type, STREAM_NAME, seqId, status.toString() - .getBytes(), HeartbeatAdaptor.this); + .getBytes(Charset.forName("UTF-8")), HeartbeatAdaptor.this); dest.add(chunk); } else { sendDirectly(status.toString()); @@ -83,7 +84,7 @@ public class HeartbeatAdaptor extends AbstractAdaptor { private void sendDirectly(String data) { DataOutputStream dos = null; Socket sock = null; - byte[] bdata = data.getBytes(); + byte[] bdata = data.getBytes(Charset.forName("UTF-8")); try { sock = new Socket(_host, _port); dos = new DataOutputStream(sock.getOutputStream()); http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java index 792957c..c07f6fa 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java @@ -20,8 +20,11 @@ package org.apache.hadoop.chukwa.datacollection.adaptor; import java.io.BufferedReader; import java.io.File; +import java.io.FileInputStream; import java.io.FileReader; import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.Charset; import java.rmi.ConnectException; import java.util.Calendar; import java.util.HashMap; @@ -97,11 +100,11 @@ public class JMXAdaptor extends AbstractAdaptor{ sb.append(File.separator); } sb.append("jmxremote.password"); - String jmx_pw_file = sb.toString(); + File jmx_pw_file = new File(sb.toString()); shutdown = false; while(!shutdown){ try{ - BufferedReader br = new BufferedReader(new FileReader(jmx_pw_file)); + BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(jmx_pw_file.getAbsolutePath()), Charset.forName("UTF-8"))); String[] creds = br.readLine().split(" "); Map<String, String[]> env = new HashMap<String, String[]>(); env.put(JMXConnector.CREDENTIALS, creds); @@ -202,7 +205,7 @@ public class JMXAdaptor extends AbstractAdaptor{ } } - byte[] data = json.toString().getBytes(); + byte[] data = json.toString().getBytes(Charset.forName("UTF-8")); sendOffset+=data.length; ChunkImpl c = new ChunkImpl(type, "JMX", sendOffset, data, adaptor); long rightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis(); http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java index b401f2e..39af580 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.chukwa.datacollection.adaptor; import java.io.IOException; +import java.nio.charset.Charset; import java.security.PrivilegedExceptionAction; import java.util.Calendar; import java.util.TimeZone; @@ -164,7 +165,7 @@ public class OozieAdaptor extends AbstractAdaptor { } private int processMetrics() { - return addChunkToReceiver(getOozieMetrics().getBytes()); + return addChunkToReceiver(getOozieMetrics().getBytes(Charset.forName("UTF-8"))); } private String getOozieMetrics() { http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java index 60ac50d..8e3fd8a 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java @@ -19,9 +19,9 @@ package org.apache.hadoop.chukwa.datacollection.adaptor; import java.io.FileInputStream; +import java.nio.charset.Charset; import java.security.KeyStore; import java.security.SecureRandom; - import java.util.Calendar; import java.util.TimeZone; import java.util.Timer; @@ -33,6 +33,7 @@ import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent; import org.apache.log4j.Logger; import org.apache.hadoop.chukwa.util.ExceptionUtil; import org.apache.hadoop.conf.Configuration; + import static org.apache.hadoop.chukwa.datacollection.agent.ChukwaConstants.*; import com.sun.jersey.api.client.Client; @@ -44,7 +45,6 @@ import com.sun.jersey.client.urlconnection.HTTPSProperties; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; - import javax.ws.rs.core.MediaType; public class RestAdaptor extends AbstractAdaptor { @@ -75,7 +75,7 @@ public class RestAdaptor extends AbstractAdaptor { resource = c.resource(uri); bean = resource.accept(MediaType.APPLICATION_JSON_TYPE).get( String.class); - byte[] data = bean.getBytes(); + byte[] data = bean.getBytes(Charset.forName("UTF-8")); sendOffset += data.length; ChunkImpl c = new ChunkImpl(type, "REST", sendOffset, data, adaptor); long rightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC")) http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java index 705d310..b37be9c 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.io.ObjectInputStream; import java.net.*; +import java.nio.charset.Charset; import org.apache.hadoop.chukwa.*; import org.apache.hadoop.chukwa.util.ExceptionUtil; @@ -132,7 +133,7 @@ public class SocketAdaptor extends AbstractAdaptor { while(running) { // read an event from the wire event = (LoggingEvent) ois.readObject(); - byte[] bytes = layout.format(event).getBytes(); + byte[] bytes = layout.format(event).getBytes(Charset.forName("UTF-8")); bytesReceived=bytes.length; Chunk c = new ChunkImpl(type, java.net.InetAddress.getLocalHost().getHostName(), bytesReceived, bytes, SocketAdaptor.this); dest.add(c); http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java index 258cdb5..07f6c66 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.chukwa.datacollection.adaptor; import java.io.IOException; import java.net.*; +import java.nio.charset.Charset; import java.util.Arrays; import java.util.HashMap; @@ -63,7 +64,7 @@ public class SyslogAdaptor extends UDPAdaptor { source.append(dp.getAddress()); String dataType = type; byte[] trimmedBuf = Arrays.copyOf(buf, dp.getLength()); - String rawPRI = new String(trimmedBuf, 1, 4); + String rawPRI = new String(trimmedBuf, 1, 4, Charset.forName("UTF-8")); int i = rawPRI.indexOf(">"); if (i <= 3 && i > -1) { String priorityStr = rawPRI.substring(0,i); http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java index 4e5fcdf..5fea073 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java @@ -34,7 +34,7 @@ public class FileTailingAdaptor extends LWFTAdaptor { public static int MAX_RETRIES = 300; - public static int GRACEFUL_PERIOD = 3 * 60 * 1000; // 3 minutes + static int GRACEFUL_PERIOD = 3 * 60 * 1000; // 3 minutes private int attempts = 0; private long gracefulPeriodExpired = 0l; http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java index 1689e0e..9da09d5 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java @@ -51,10 +51,10 @@ public class LWFTAdaptor extends AbstractAdaptor { public static final String MAX_READ_SIZE_OPT = "chukwaAgent.fileTailingAdaptor.maxReadSize"; - public static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE; + static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE; static Logger log; - protected static FileTailer tailer; + static FileTailer tailer; static { tailer = null; http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java index 2ac669e..b0ef917 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java @@ -22,6 +22,8 @@ import org.apache.commons.logging.LogFactory; import javax.jms.Message; import javax.jms.JMSException; + +import java.nio.charset.Charset; import java.util.ArrayList; /** @@ -141,7 +143,7 @@ public class JMSMessagePropertyTransformer implements JMSMessageTransformer { return null; } - return sb.toString().getBytes(); + return sb.toString().getBytes(Charset.forName("UTF-8")); } /** http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/SigarRunner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/SigarRunner.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/SigarRunner.java index d23675e..5aed762 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/SigarRunner.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/SigarRunner.java @@ -18,6 +18,7 @@ package org.apache.hadoop.chukwa.datacollection.adaptor.sigar; +import java.nio.charset.Charset; import java.util.HashMap; import java.util.TimerTask; @@ -203,13 +204,13 @@ public class SigarRunner extends TimerTask { json.put("disk", fsList); } json.put("timestamp", System.currentTimeMillis()); - byte[] data = json.toString().getBytes(); + byte[] data = json.toString().getBytes(Charset.forName("UTF-8")); sendOffset += data.length; ChunkImpl c = new ChunkImpl("SystemMetrics", "Sigar", sendOffset, data, systemMetrics); if(!skip) { receiver.add(c); } - } catch (Exception se) { + } catch (InterruptedException se) { log.error(ExceptionUtil.getStackTrace(se)); } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java index 1160fd3..dda7888 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java @@ -26,7 +26,9 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.PrintStream; import java.net.*; +import java.nio.charset.Charset; import java.util.Map; + import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException; import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy; import org.apache.log4j.Logger; @@ -70,7 +72,7 @@ public class AgentControlSocketListener extends Thread { public void run() { try { InputStream in = connection.getInputStream(); - BufferedReader br = new BufferedReader(new InputStreamReader(in)); + BufferedReader br = new BufferedReader(new InputStreamReader(in, Charset.forName("UTF-8"))); PrintStream out = new PrintStream(new BufferedOutputStream(connection .getOutputStream())); String cmd = null; http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java index 7dad2d7..4a2e996 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.PrintWriter; +import java.nio.charset.Charset; import java.security.NoSuchAlgorithmException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -71,59 +72,79 @@ public class ChukwaAgent implements AdaptorManager { // boolean WRITE_CHECKPOINTS = true; static AgentMetrics agentMetrics = new AgentMetrics("ChukwaAgent", "metrics"); - private static Logger log = Logger.getLogger(ChukwaAgent.class); - private OffsetStatsManager adaptorStatsManager = null; + private final static Logger log = Logger.getLogger(ChukwaAgent.class); + private OffsetStatsManager<Adaptor> adaptorStatsManager = null; private Timer statsCollector = null; - private static volatile Configuration conf = null; - private static volatile ChukwaAgent agent = null; + private static Configuration conf = null; + private volatile static ChukwaAgent agent = null; public Connector connector = null; + private boolean stopped = false; - protected ChukwaAgent() throws AlreadyRunningException { - this(new ChukwaConfiguration()); + private ChukwaAgent() { + agent = new ChukwaAgent(new ChukwaConfiguration()); } - public ChukwaAgent(Configuration conf) throws AlreadyRunningException { - ChukwaAgent.agent = this; + private ChukwaAgent(Configuration conf) { + agent = this; ChukwaAgent.conf = conf; - // almost always just reading this; so use a ConcurrentHM. // since we wrapped the offset, it's not a structural mod. adaptorPositions = new ConcurrentHashMap<Adaptor, Offset>(); adaptorsByName = new HashMap<String, Adaptor>(); checkpointNumber = 0; + stopped = false; + } + + public static ChukwaAgent getAgent() { + if(agent == null || agent.isStopped()) { + agent = new ChukwaAgent(); + } + return agent; + } + + public static ChukwaAgent getAgent(Configuration conf) { + if(agent == null || agent.isStopped()) { + agent = new ChukwaAgent(conf); + } + return agent; + } - boolean DO_CHECKPOINT_RESTORE = conf.getBoolean( + public void start() throws AlreadyRunningException { + boolean checkPointRestore = conf.getBoolean( "chukwaAgent.checkpoint.enabled", true); - CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name", + checkPointBaseName = conf.get("chukwaAgent.checkpoint.name", "chukwa_checkpoint_"); - final int CHECKPOINT_INTERVAL_MS = conf.getInt( + final int checkPointIntervalMs = conf.getInt( "chukwaAgent.checkpoint.interval", 5000); - final int STATS_INTERVAL_MS = conf.getInt( + final int statsIntervalMs = conf.getInt( "chukwaAgent.stats.collection.interval", 10000); - final int STATS_DATA_TTL_MS = conf.getInt( + int statsDataTTLMs = conf.getInt( "chukwaAgent.stats.data.ttl", 1200000); if (conf.get("chukwaAgent.checkpoint.dir") != null) checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", null)); else - DO_CHECKPOINT_RESTORE = false; + checkPointRestore = false; if (checkpointDir != null && !checkpointDir.exists()) { - checkpointDir.mkdirs(); + boolean result = checkpointDir.mkdirs(); + if(!result) { + log.error("Failed to create check point directory."); + } } - tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\""); + String tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\""); DataFactory.getInstance().addDefaultTag(conf.get("chukwaAgent.tags", "cluster=\"unknown_cluster\"")); - log.info("Config - CHECKPOINT_BASE_NAME: [" + CHECKPOINT_BASE_NAME + "]"); + log.info("Config - CHECKPOINT_BASE_NAME: [" + checkPointBaseName + "]"); log.info("Config - checkpointDir: [" + checkpointDir + "]"); - log.info("Config - CHECKPOINT_INTERVAL_MS: [" + CHECKPOINT_INTERVAL_MS + log.info("Config - CHECKPOINT_INTERVAL_MS: [" + checkPointIntervalMs + "]"); - log.info("Config - DO_CHECKPOINT_RESTORE: [" + DO_CHECKPOINT_RESTORE + "]"); - log.info("Config - STATS_INTERVAL_MS: [" + STATS_INTERVAL_MS + "]"); + log.info("Config - DO_CHECKPOINT_RESTORE: [" + checkPointRestore + "]"); + log.info("Config - STATS_INTERVAL_MS: [" + statsIntervalMs + "]"); log.info("Config - tags: [" + tags + "]"); - if (DO_CHECKPOINT_RESTORE) { - log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS); + if (checkPointRestore) { + log.info("checkpoints are enabled, period is " + checkPointIntervalMs); } File initialAdaptors = null; @@ -131,7 +152,7 @@ public class ChukwaAgent implements AdaptorManager { initialAdaptors = new File(conf.get("chukwaAgent.initial_adaptors")); try { - if (DO_CHECKPOINT_RESTORE) { + if (checkPointRestore) { restoreFromCheckpoint(); } } catch (IOException e) { @@ -152,44 +173,31 @@ public class ChukwaAgent implements AdaptorManager { // another agent is running. controlSock.start(); // this sets us up as a daemon log.info("control socket started on port " + controlSock.portno); - - // start the HTTP server with stats collection - try { - this.adaptorStatsManager = new OffsetStatsManager(STATS_DATA_TTL_MS); - this.statsCollector = new Timer("ChukwaAgent Stats Collector"); - - startHttpServer(conf); - - statsCollector.scheduleAtFixedRate(new StatsCollectorTask(), - STATS_INTERVAL_MS, STATS_INTERVAL_MS); - } catch (Exception e) { - log.error("Couldn't start HTTP server", e); - throw new RuntimeException(e); - } - - // shouldn't start checkpointing until we're finishing launching - // adaptors on boot - if (CHECKPOINT_INTERVAL_MS > 0 && checkpointDir != null) { - checkpointer = new Timer(); - checkpointer.schedule(new CheckpointTask(), 0, CHECKPOINT_INTERVAL_MS); - } } catch (IOException e) { log.info("failed to bind to socket; aborting agent launch", e); throw new AlreadyRunningException(); } - } + // start the HTTP server with stats collection + try { + adaptorStatsManager = new OffsetStatsManager<Adaptor>(statsDataTTLMs); + statsCollector = new Timer("ChukwaAgent Stats Collector"); - public static ChukwaAgent getAgent() { - if(agent == null) { - try { - agent = new ChukwaAgent(); - } catch(AlreadyRunningException e) { - log.error("Chukwa Agent is already running", e); - agent = null; - } - } - return agent; + startHttpServer(conf); + + statsCollector.scheduleAtFixedRate(new StatsCollectorTask(), + statsIntervalMs, statsIntervalMs); + } catch (Exception e) { + log.error("Couldn't start HTTP server", e); + throw new RuntimeException(e); + } + + // shouldn't start check pointing until we're finishing launching + // adaptors on boot + if (checkPointIntervalMs > 0 && checkpointDir != null) { + checkpointer = new Timer(); + checkpointer.schedule(new CheckpointTask(), 0, checkPointIntervalMs); + } } // doesn't need an equals(), comparator, etc @@ -219,17 +227,16 @@ public class ChukwaAgent implements AdaptorManager { } } - private final Map<Adaptor, Offset> adaptorPositions; + private static Map<Adaptor, Offset> adaptorPositions; // basically only used by the control socket thread. //must be locked before access - private final Map<String, Adaptor> adaptorsByName; + private static Map<String, Adaptor> adaptorsByName; private File checkpointDir; // lock this object to indicate checkpoint in // progress - private String CHECKPOINT_BASE_NAME; // base filename for checkpoint files + private String checkPointBaseName; // base filename for checkpoint files // checkpoints - private static String tags = ""; private Timer checkpointer; private volatile boolean needNewCheckpoint = false; // set to true if any @@ -238,13 +245,13 @@ public class ChukwaAgent implements AdaptorManager { private int checkpointNumber; // id number of next checkpoint. // should be protected by grabbing lock on checkpointDir - private final AgentControlSocketListener controlSock; + private AgentControlSocketListener controlSock; public int getControllerPort() { return controlSock.getPort(); } - public OffsetStatsManager getAdaptorStatsManager() { + public OffsetStatsManager<Adaptor> getAdaptorStatsManager() { return adaptorStatsManager; } @@ -261,12 +268,10 @@ public class ChukwaAgent implements AdaptorManager { System.exit(0); } - conf = ChukwaUtil.readConfiguration(); - agent = new ChukwaAgent(conf); - + Configuration conf = ChukwaUtil.readConfiguration(); + agent = ChukwaAgent.getAgent(conf); if (agent.anotherAgentIsRunning()) { - System.out - .println("another agent is running (or port has been usurped). " + log.error("another agent is running (or port has been usurped). " + "Bailing out now"); throw new AlreadyRunningException(); } @@ -286,11 +291,12 @@ public class ChukwaAgent implements AdaptorManager { "org.apache.hadoop.chukwa.datacollection.connector.PipelineConnector"); agent.connector = (Connector) Class.forName(connectorType).newInstance(); } + agent.start(); agent.connector.start(); log.info("local agent started on port " + agent.getControlSock().portno); - //System.out.close(); - //System.err.close(); + System.out.close(); + System.err.close(); } catch (AlreadyRunningException e) { log.error("agent started already on this machine with same portno;" + " bailing out"); @@ -304,7 +310,11 @@ public class ChukwaAgent implements AdaptorManager { } private boolean anotherAgentIsRunning() { - return !controlSock.isBound(); + boolean result = false; + if(controlSock!=null) { + result = !controlSock.isBound(); + } + return result; } /** @@ -475,30 +485,26 @@ public class ChukwaAgent implements AdaptorManager { synchronized (checkpointDir) { String[] checkpointNames = checkpointDir.list(new FilenameFilter() { public boolean accept(File dir, String name) { - return name.startsWith(CHECKPOINT_BASE_NAME); + return name.startsWith(checkPointBaseName); } }); if (checkpointNames == null) { log.error("Unable to list files in checkpoint dir"); return false; - } - if (checkpointNames.length == 0) { + } else if (checkpointNames.length == 0) { log.info("No checkpoints found in " + checkpointDir); return false; - } - - if (checkpointNames.length > 2) + } else if (checkpointNames.length > 2) { log.warn("expected at most two checkpoint files in " + checkpointDir + "; saw " + checkpointNames.length); - else if (checkpointNames.length == 0) - return false; + } String lowestName = null; int lowestIndex = Integer.MAX_VALUE; for (String n : checkpointNames) { int index = Integer - .parseInt(n.substring(CHECKPOINT_BASE_NAME.length())); + .parseInt(n.substring(checkPointBaseName.length())); if (index < lowestIndex) { lowestName = n; lowestIndex = index; @@ -516,7 +522,7 @@ public class ChukwaAgent implements AdaptorManager { IOException { log.info("starting adaptors listed in " + checkpoint.getAbsolutePath()); BufferedReader br = new BufferedReader(new InputStreamReader( - new FileInputStream(checkpoint))); + new FileInputStream(checkpoint), Charset.forName("UTF-8"))); String cmd = null; while ((cmd = br.readLine()) != null) processAddCommand(cmd); @@ -534,20 +540,23 @@ public class ChukwaAgent implements AdaptorManager { log.info("writing checkpoint " + checkpointNumber); FileOutputStream fos = new FileOutputStream(new File(checkpointDir, - CHECKPOINT_BASE_NAME + checkpointNumber)); + checkPointBaseName + checkpointNumber)); PrintWriter out = new PrintWriter(new BufferedWriter( - new OutputStreamWriter(fos))); + new OutputStreamWriter(fos, Charset.forName("UTF-8")))); for (Map.Entry<String, String> stat : getAdaptorList().entrySet()) { out.println("ADD "+ stat.getKey()+ " = " + stat.getValue()); } out.close(); - File lastCheckpoint = new File(checkpointDir, CHECKPOINT_BASE_NAME + File lastCheckpoint = new File(checkpointDir, checkPointBaseName + (checkpointNumber - 1)); log.debug("hopefully removing old checkpoint file " + lastCheckpoint.getAbsolutePath()); - lastCheckpoint.delete(); + boolean result = lastCheckpoint.delete(); + if(!result) { + log.warn("Unable to delete lastCheckpoint file: "+lastCheckpoint.getAbsolutePath()); + } checkpointNumber++; } } @@ -729,11 +738,27 @@ public class ChukwaAgent implements AdaptorManager { adaptorsByName.clear(); adaptorPositions.clear(); adaptorStatsManager.clear(); + agent.stop(); if (exit) System.exit(0); } /** + * Set agent into stop state. + */ + private void stop() { + stopped = true; + } + + /** + * Check if agent is in stop state. + * @return true if agent is in stop state. + */ + private boolean isStopped() { + return stopped; + } + + /** * Returns the control socket for this agent. */ private AgentControlSocketListener getControlSock() { http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaRestServer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaRestServer.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaRestServer.java index cbabbc0..f549614 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaRestServer.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaRestServer.java @@ -41,33 +41,24 @@ public class ChukwaRestServer { private static ChukwaRestServer instance = null; - public static void startInstance(Configuration conf) throws Exception{ + public static synchronized void startInstance(Configuration conf) throws Exception{ if(instance == null){ - synchronized(ChukwaRestServer.class) { - if(instance == null){ - instance = new ChukwaRestServer(conf); - instance.start(); - } - } + instance = new ChukwaRestServer(conf); + instance.start(); } } - - public static void stopInstance() throws Exception { + + public static synchronized void stopInstance() throws Exception { if(instance != null) { - synchronized(ChukwaRestServer.class) { - if(instance != null){ - instance.stop(); - instance = null; - } - } + instance.stop(); + instance = null; } - } - + private ChukwaRestServer(Configuration conf){ this.conf = conf; } - + private void start() throws Exception{ int portNum = conf.getInt(AGENT_HTTP_PORT, 9090); String jaxRsAddlPackages = conf.get(AGENT_REST_CONTROLLER_PACKAGES); @@ -127,7 +118,7 @@ public class ChukwaRestServer { log.info("started Chukwa http agent interface on port " + portNum); } - + private void stop() throws Exception{ jettyServer.stop(); log.info("Successfully stopped Chukwa http agent interface"); http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/Examples.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/Examples.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/Examples.java index 5d3f71a..b7c912b 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/Examples.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/Examples.java @@ -23,13 +23,13 @@ import java.util.List; public class Examples { public static final AdaptorConfig CREATE_ADAPTOR_SAMPLE = new AdaptorConfig(); public static final AdaptorInfo ADAPTOR_STATUS_SAMPLE = new AdaptorInfo(); - public static final List<AdaptorAveragedRate> ADAPTOR_RATES = new ArrayList<AdaptorAveragedRate>(); + final static List<AdaptorAveragedRate> ADAPTOR_RATES = new ArrayList<AdaptorAveragedRate>(); public static final AdaptorAveragedRate ADAPTOR_RATE_SAMPLE_PER_MINUTE = new AdaptorAveragedRate(); public static final AdaptorAveragedRate ADAPTOR_RATE_SAMPLE_PER_FIVE_MINUTE = new AdaptorAveragedRate(); public static final AdaptorAveragedRate ADAPTOR_RATE_SAMPLE_PER_TEN_MINUTE = new AdaptorAveragedRate(); public static final AdaptorInfo SYS_ADAPTOR_STATUS_SAMPLE = new AdaptorInfo(); - public static final List<AdaptorAveragedRate> SYS_ADAPTOR_RATES = new ArrayList<AdaptorAveragedRate>(); + final static List<AdaptorAveragedRate> SYS_ADAPTOR_RATES = new ArrayList<AdaptorAveragedRate>(); public static final AdaptorAveragedRate SYS_ADAPTOR_RATE_SAMPLE_PER_MINUTE = new AdaptorAveragedRate(); public static final AdaptorAveragedRate SYS_ADAPTOR_RATE_SAMPLE_PER_FIVE_MINUTE = new AdaptorAveragedRate(); public static final AdaptorAveragedRate SYS_ADAPTOR_RATE_SAMPLE_PER_TEN_MINUTE = new AdaptorAveragedRate(); http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java deleted file mode 100644 index 2a17417..0000000 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.chukwa.datacollection.collector; - - -import org.mortbay.jetty.*; -import org.mortbay.jetty.nio.SelectChannelConnector; -import org.mortbay.jetty.servlet.*; -import org.apache.hadoop.chukwa.datacollection.collector.servlet.*; -import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector; -import org.apache.hadoop.chukwa.datacollection.writer.*; -import org.apache.hadoop.chukwa.conf.ChukwaConfiguration; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import javax.servlet.http.HttpServlet; -import java.io.File; -import java.util.*; - -@Deprecated -public class CollectorStub { - - static int THREADS = 120; - public static Server jettyServer = null; - - public static void main(String[] args) { - - try { - if (args.length > 0 && (args[0].equalsIgnoreCase("help")|| args[0].equalsIgnoreCase("-help"))) { - System.out.println("usage: Normally you should just invoke CollectorStub without arguments."); - System.out.println("A number of options can be specified here for debugging or special uses. e.g.: "); - System.out.println("Options include:\n\tportno=<#> \n\t" + "writer=pretend | <classname>" - + "\n\tservlet=<classname>@path"); - System.out.println("Command line options will override normal configuration."); - System.exit(0); - } - - ChukwaConfiguration conf = new ChukwaConfiguration(); - - try { - Configuration collectorConf = new Configuration(false); - collectorConf.addResource(new Path(conf.getChukwaConf() + "/chukwa-common.xml")); - collectorConf.addResource(new Path(conf.getChukwaConf() + "/chukwa-collector-conf.xml")); - } catch(Exception e) {e.printStackTrace();} - - int portNum = conf.getInt("chukwaCollector.http.port", 9999); - THREADS = conf.getInt("chukwaCollector.http.threads", THREADS); - - // pick a writer. - ChukwaWriter w = null; - Map<String, HttpServlet> servletsToAdd = new TreeMap<String, HttpServlet>(); - ServletCollector servletCollector = new ServletCollector(conf); - for(String arg: args) { - if(arg.startsWith("writer=")) { //custom writer class - String writerCmd = arg.substring("writer=".length()); - if (writerCmd.equals("pretend") || writerCmd.equals("pretend-quietly")) { - boolean verbose = !writerCmd.equals("pretend-quietly"); - w = new ConsoleWriter(verbose); - w.init(conf); - servletCollector.setWriter(w); - } else - conf.set("chukwaCollector.writerClass", writerCmd); - } else if(arg.startsWith("servlet=")) { //adding custom servlet - String servletCmd = arg.substring("servlet=".length()); - String[] halves = servletCmd.split("@"); - try { - Class<?> servletClass = Class.forName(halves[0]); - HttpServlet srvlet = (HttpServlet) servletClass.newInstance(); - if(!halves[1].startsWith("/")) - halves[1] = "/" + halves[1]; - servletsToAdd.put(halves[1], srvlet); - } catch(Exception e) { - e.printStackTrace(); - } - } else if(arg.startsWith("portno=")) { - portNum = Integer.parseInt(arg.substring("portno=".length())); - } else { //unknown arg - System.out.println("WARNING: unknown command line arg " + arg); - System.out.println("Invoke collector with command line arg 'help' for usage"); - } - } - - // Set up jetty connector - SelectChannelConnector jettyConnector = new SelectChannelConnector(); - jettyConnector.setLowResourcesConnections(THREADS - 10); - jettyConnector.setLowResourceMaxIdleTime(1500); - jettyConnector.setPort(portNum); - - // Set up jetty server proper, using connector - jettyServer = new Server(portNum); - jettyServer.setConnectors(new Connector[] { jettyConnector }); - org.mortbay.thread.BoundedThreadPool pool = new org.mortbay.thread.BoundedThreadPool(); - pool.setMaxThreads(THREADS); - jettyServer.setThreadPool(pool); - - // Add the collector servlet to server - Context root = new Context(jettyServer, "/", Context.SESSIONS); - root.addServlet(new ServletHolder(servletCollector), "/*"); - - if(conf.getBoolean(HttpConnector.ASYNC_ACKS_OPT, false)) - root.addServlet(new ServletHolder(new CommitCheckServlet(conf)), "/"+CommitCheckServlet.DEFAULT_PATH); - - if(conf.getBoolean(LogDisplayServlet.ENABLED_OPT, false)) - root.addServlet(new ServletHolder(new LogDisplayServlet(conf)), "/"+LogDisplayServlet.DEFAULT_PATH); - - - root.setAllowNullPathInfo(false); - - // Add in any user-specified servlets - for(Map.Entry<String, HttpServlet> e: servletsToAdd.entrySet()) { - root.addServlet(new ServletHolder(e.getValue()), e.getKey()); - } - - // And finally, fire up the server - jettyServer.start(); - jettyServer.setStopAtShutdown(true); - - System.out.println("started Chukwa http collector on port " + portNum); - System.out.close(); - System.err.close(); - } catch (Exception e) { - e.printStackTrace(); - } - - } - -} http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java index c3e72d2..d4c2df4 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java @@ -38,7 +38,7 @@ public class CommitCheckServlet extends HttpServlet { private static final long serialVersionUID = -4627538252371890849L; - protected static Logger log = Logger.getLogger(CommitCheckServlet.class); + protected final static Logger log = Logger.getLogger(CommitCheckServlet.class); CommitCheckThread commitCheck; Configuration conf; //interval at which to scan the filesystem, ms http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java index b1ead05..613fa3e 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java @@ -67,7 +67,7 @@ public class LogDisplayServlet extends HttpServlet { long totalStoredSize = 0; private static final long serialVersionUID = -4602082382919009285L; - protected static Logger log = Logger.getLogger(LogDisplayServlet.class); + protected final static Logger log = Logger.getLogger(LogDisplayServlet.class); public LogDisplayServlet() { conf = new Configuration(); http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java index 03a88df..69e3566 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java @@ -27,6 +27,7 @@ import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.Socket; import java.net.SocketException; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -35,6 +36,7 @@ import java.util.List; import java.util.Map; import java.util.Timer; import java.util.TimerTask; + import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent; import org.apache.log4j.Logger; @@ -128,14 +130,14 @@ public class ChukwaAgentController { e.printStackTrace(); } PrintWriter bw = new PrintWriter(new OutputStreamWriter(s - .getOutputStream())); + .getOutputStream(), Charset.forName("UTF-8"))); if(id != null) bw.println("ADD " + id + " = " + className + " " + appType + " " + params + " " + offset); else bw.println("ADD " + className + " " + appType + " " + params + " " + offset); bw.flush(); BufferedReader br = new BufferedReader(new InputStreamReader(s - .getInputStream())); + .getInputStream(), Charset.forName("UTF-8"))); String resp = br.readLine(); if (resp != null) { String[] fields = resp.split(" "); @@ -153,15 +155,14 @@ public class ChukwaAgentController { s.setSoTimeout(60000); } catch (SocketException e) { log.warn("Error while settin soTimeout to 60000"); - e.printStackTrace(); } PrintWriter bw = new PrintWriter(new OutputStreamWriter(s - .getOutputStream())); + .getOutputStream(), Charset.forName("UTF-8"))); bw.println("SHUTDOWN " + id); bw.flush(); BufferedReader br = new BufferedReader(new InputStreamReader(s - .getInputStream())); + .getInputStream(), Charset.forName("UTF-8"))); String resp = br.readLine(); if (resp == null || !resp.startsWith("OK")) { log.error("adaptor unregister error, id: " + id); @@ -348,12 +349,12 @@ public class ChukwaAgentController { e.printStackTrace(); } PrintWriter bw = new PrintWriter( - new OutputStreamWriter(s.getOutputStream())); + new OutputStreamWriter(s.getOutputStream(), Charset.forName("UTF-8"))); bw.println("LIST"); bw.flush(); BufferedReader br = new BufferedReader(new InputStreamReader(s - .getInputStream())); + .getInputStream(), Charset.forName("UTF-8"))); String ln; Map<String, Adaptor> listResult = new HashMap<String, Adaptor>(); while ((ln = br.readLine()) != null) { @@ -370,11 +371,13 @@ public class ChukwaAgentController { // - // paren long offset = Long.parseLong(parts[parts.length - 1]); - String tmpParams = parts[3]; + StringBuilder tmpParams = new StringBuilder(); + tmpParams.append(parts[3]); for (int i = 4; i < parts.length - 1; i++) { - tmpParams += " " + parts[i]; + tmpParams.append(" "); + tmpParams.append(parts[i]); } - listResult.put(id, new Adaptor(id, parts[1], parts[2], tmpParams, + listResult.put(id, new Adaptor(id, parts[1], parts[2], tmpParams.toString(), offset)); } } @@ -563,8 +566,7 @@ public class ChukwaAgentController { if (adaptorID != null) { log.info("Successfully added adaptor, id is:" + adaptorID); } else { - System.err.println("Agent reported failure to add adaptor, adaptor id returned was:" - + adaptorID); + log.error("Agent reported failure to add adaptor."); } return adaptorID; } http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java index 11e9305..6f818e4 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java @@ -44,7 +44,7 @@ import org.apache.log4j.Logger; */ public class AsyncAckSender extends ChukwaHttpSender{ - protected static Logger log = Logger.getLogger(AsyncAckSender.class); + protected final static Logger log = Logger.getLogger(AsyncAckSender.class); /* * Represents the state required for an asynchronous ack. * http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java index 22460d7..1c8c3d2 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -354,7 +355,7 @@ public class ChukwaHttpSender implements ChukwaSender { } }); - pars.setParameter(HttpMethodParams.SO_TIMEOUT, new Integer(COLLECTOR_TIMEOUT)); + pars.setParameter(HttpMethodParams.SO_TIMEOUT, Integer.valueOf(COLLECTOR_TIMEOUT)); method.setParams(pars); method.setPath(dest); @@ -385,7 +386,7 @@ public class ChukwaHttpSender implements ChukwaSender { // Get the response body byte[] resp_buf = method.getResponseBody(); rstream = new ByteArrayInputStream(resp_buf); - BufferedReader br = new BufferedReader(new InputStreamReader(rstream)); + BufferedReader br = new BufferedReader(new InputStreamReader(rstream, Charset.forName("UTF-8"))); String line; List<String> resp = new ArrayList<String>(); while ((line = br.readLine()) != null) { http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java index 30442e2..dea2d07 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java @@ -18,8 +18,8 @@ package org.apache.hadoop.chukwa.datacollection.writer.hbase; -import java.io.UnsupportedEncodingException; import java.lang.reflect.Type; +import java.nio.charset.Charset; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; @@ -36,6 +36,7 @@ import com.google.gson.reflect.TypeToken; public class Reporter { private ArrayList<Put> meta = new ArrayList<Put>(); private MessageDigest md5 = null; + private final static Charset UTF8 = Charset.forName("UTF-8"); public Reporter() throws NoSuchAlgorithmException { md5 = MessageDigest.getInstance("md5"); @@ -48,11 +49,11 @@ public class Reporter { try { Type metaType = new TypeToken<Map<String, String>>(){}.getType(); Map<String, String> meta = new HashMap<String, String>(); - meta.put("sig", new String(value, "UTF-8")); + meta.put("sig", new String(value, UTF8)); meta.put("type", "source"); Gson gson = new Gson(); buffer = gson.toJson(meta, metaType); - put(type.getBytes(), source.getBytes(), buffer.toString().getBytes()); + put(type.getBytes(UTF8), source.getBytes(UTF8), buffer.toString().getBytes(UTF8)); } catch (Exception e) { Log.warn("Error encoding metadata."); Log.warn(e); @@ -70,7 +71,7 @@ public class Reporter { meta.put("type", "metric"); Gson gson = new Gson(); buffer = gson.toJson(meta, metaType); - put(type.getBytes(), metric.getBytes(), buffer.toString().getBytes()); + put(type.getBytes(UTF8), metric.getBytes(UTF8), buffer.toString().getBytes(UTF8)); } catch (Exception e) { Log.warn("Error encoding metadata."); Log.warn(e); @@ -78,12 +79,12 @@ public class Reporter { } public void put(String key, String source, String info) { - put(key.getBytes(), source.getBytes(), info.getBytes()); + put(key.getBytes(UTF8), source.getBytes(UTF8), info.getBytes(UTF8)); } public void put(byte[] key, byte[] source, byte[] info) { Put put = new Put(key); - put.addColumn("k".getBytes(), source, info); + put.addColumn("k".getBytes(UTF8), source, info); meta.add(put); } @@ -97,25 +98,20 @@ public class Reporter { private byte[] getHash(String key) { byte[] hash = new byte[5]; - System.arraycopy(md5.digest(key.getBytes()), 0, hash, 0, 5); + System.arraycopy(md5.digest(key.getBytes(UTF8)), 0, hash, 0, 5); return hash; } public void putClusterName(String type, String clusterName) { byte[] value = getHash(clusterName); String buffer; - try { - Type metaType = new TypeToken<Map<String, String>>(){}.getType(); - Map<String, String> meta = new HashMap<String, String>(); - meta.put("sig", new String(value, "UTF-8")); - meta.put("type", "cluster"); - Gson gson = new Gson(); - buffer = gson.toJson(meta, metaType); - put(type.getBytes(), clusterName.getBytes(), buffer.toString().getBytes()); - } catch (UnsupportedEncodingException e) { - Log.warn("Error encoding metadata."); - Log.warn(e); - } + Type metaType = new TypeToken<Map<String, String>>(){}.getType(); + Map<String, String> meta = new HashMap<String, String>(); + meta.put("sig", new String(value, UTF8)); + meta.put("type", "cluster"); + Gson gson = new Gson(); + buffer = gson.toJson(meta, metaType); + put(type.getBytes(UTF8), clusterName.getBytes(UTF8), buffer.toString().getBytes(UTF8)); } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/6012e14f/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java index f67fe87..bf64b24 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.chukwa.datacollection.writer.solr; +import java.io.IOException; +import java.nio.charset.Charset; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -34,12 +36,13 @@ import org.apache.hadoop.chukwa.datacollection.writer.WriterException; import org.apache.hadoop.chukwa.util.ExceptionUtil; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; +import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.CloudSolrServer; import org.apache.solr.common.SolrInputDocument; public class SolrWriter extends PipelineableWriter { private static Logger log = Logger.getLogger(SolrWriter.class); - private static CloudSolrServer server; + private CloudSolrServer server; private final static String ID = "id"; private final static String SEQ_ID = "seqId"; private final static String DATA_TYPE = "type"; @@ -64,8 +67,10 @@ public class SolrWriter extends PipelineableWriter { throw new WriterException("Solr server address is not defined."); } String collection = c.get("solr.collection", "logs"); - server = new CloudSolrServer(serverName); - server.setDefaultCollection(collection); + if(server == null) { + server = new CloudSolrServer(serverName); + server.setDefaultCollection(collection); + } } @Override @@ -84,10 +89,10 @@ public class SolrWriter extends PipelineableWriter { doc.addField(SOURCE, chunk.getSource()); doc.addField(SEQ_ID, chunk.getSeqID()); doc.addField(DATA_TYPE, chunk.getDataType()); - doc.addField(DATA, new String(chunk.getData())); + doc.addField(DATA, new String(chunk.getData(), Charset.forName("UTF-8"))); // TODO: improve parsing logic for more sophisticated tagging - String data = new String(chunk.getData()); + String data = new String(chunk.getData(), Charset.forName("UTF-8")); Matcher m = userPattern.matcher(data); if(m.find()) { doc.addField(USER, m.group(1)); @@ -109,7 +114,7 @@ public class SolrWriter extends PipelineableWriter { } server.add(doc); server.commit(); - } catch (Exception e) { + } catch (SolrServerException | IOException e) { log.warn("Failed to store data to Solr Cloud."); log.warn(ExceptionUtil.getStackTrace(e)); }
