Repository: samza
Updated Branches:
  refs/heads/master bb8a78a85 -> f0809a54b


SAMZA-657: add checkstyle to build script


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f0809a54
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f0809a54
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f0809a54

Branch: refs/heads/master
Commit: f0809a54bd48109373cf000056dd652cb8b298f3
Parents: bb8a78a
Author: Guozhang Wang <[email protected]>
Authored: Thu May 7 17:38:53 2015 -0700
Committer: Yi Pan (Data Infrastructure) <[email protected]>
Committed: Thu May 7 17:38:53 2015 -0700

----------------------------------------------------------------------
 README.md                                       |   4 +
 build.gradle                                    |  27 +++
 checkstyle/checkstyle.xml                       |  88 ++++++++++
 checkstyle/import-control.xml                   | 175 +++++++++++++++++++
 .../java/org/apache/samza/config/Config.java    |  20 +--
 .../java/org/apache/samza/config/MapConfig.java |   2 +-
 .../samza/system/SystemStreamPartition.java     |   2 +-
 .../apache/samza/util/BlockingEnvelopeMap.java  |   4 +-
 ...inglePartitionWithoutOffsetsSystemAdmin.java |   8 +-
 .../org/apache/samza/config/TestConfig.java     |  26 +--
 .../TestSystemStreamPartitionIterator.java      |   1 -
 .../samza/util/TestBlockingEnvelopeMap.java     |  14 +-
 .../samza/checkpoint/CheckpointManager.java     |   3 +-
 .../stream/CoordinatorStreamMessage.java        |   8 +-
 .../stream/CoordinatorStreamSystemConsumer.java |  12 +-
 .../stream/CoordinatorStreamSystemProducer.java |   3 +-
 .../serializers/model/SamzaObjectMapper.java    |   9 +-
 .../storage/ChangelogPartitionManager.java      |   3 +-
 .../grouper/task/GroupByContainerCount.scala    |   2 -
 .../MockCoordinatorStreamSystemFactory.java     |   2 +-
 .../MockCoordinatorStreamWrappedConsumer.java   |  25 +--
 .../stream/TestCoordinatorStreamMessage.java    |  14 +-
 .../TestCoordinatorStreamSystemConsumer.java    |  13 +-
 .../TestCoordinatorStreamSystemProducer.java    |   8 +-
 .../model/TestSamzaObjectMapper.java            |   4 +-
 .../samza/storage/kv/KeyValueIterator.java      |   2 +-
 .../samza/logging/log4j/StreamAppender.java     |  14 +-
 .../serializers/LoggingEventStringSerde.java    |   4 +-
 .../samza/config/TestLog4jSystemConfig.java     |   6 +-
 .../samza/logging/log4j/TestJmxAppender.java    |  14 +-
 .../samza/system/mock/MockSystemAdmin.java      |   1 -
 .../test/integration/SimpleStatefulTask.java    |   2 +-
 .../test/integration/StatePerfTestTask.java     |  11 +-
 .../samza/test/integration/join/Checker.java    |  23 +--
 .../samza/test/integration/join/Emitter.java    |  18 +-
 .../test/integration/join/EpochPartitioner.java |  35 ----
 .../samza/test/integration/join/Joiner.java     |  16 +-
 .../samza/test/integration/join/Watcher.java    |   6 +-
 38 files changed, 435 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 0492790..f83fd41 100644
--- a/README.md
+++ b/README.md
@@ -50,6 +50,10 @@ To run all integration tests:
 
     ./bin/integration-tests.sh <dir>
 
+### Running checkstyle on the java code ###
+
+    ./gradlew checkstyleMain checkstyleTest
+
 ### Job Management
 
 To run a job (defined in a properties file):

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index ebad6eb..ac80a86 100644
--- a/build.gradle
+++ b/build.gradle
@@ -114,18 +114,26 @@ subprojects {
 
 project(':samza-api') {
   apply plugin: 'java'
+  apply plugin: 'checkstyle'
 
   dependencies {
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
   }
+
+  checkstyle {
+    configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+  }
 }
 
 project(":samza-core_$scalaVersion") {
   apply plugin: 'scala'
+  apply plugin: 'checkstyle'
+
   // Force scala joint compilation
   sourceSets.main.scala.srcDir "src/main/java"
   sourceSets.main.java.srcDirs = []
+
   jar {
     manifest {
       attributes("Implementation-Version": "$version")
@@ -143,6 +151,10 @@ project(":samza-core_$scalaVersion") {
     testCompile "org.mockito:mockito-all:$mockitoVersion"
     testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
   }
+
+  checkstyle {
+    configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+  }
 }
 
 project(":samza-kafka_$scalaVersion") {
@@ -191,6 +203,7 @@ project(":samza-kafka_$scalaVersion") {
 
 project(':samza-log4j') {
   apply plugin: 'java'
+  apply plugin: 'checkstyle'
 
   dependencies {
     compile "log4j:log4j:$log4jVersion"
@@ -199,6 +212,10 @@ project(':samza-log4j') {
     compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
     testCompile "junit:junit:$junitVersion"
   }
+
+  checkstyle {
+    configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+  }
 }
 
 project(":samza-yarn_$scalaVersion") {
@@ -324,6 +341,7 @@ project(":samza-shell") {
 
 project(":samza-kv_$scalaVersion") {
   apply plugin: 'scala'
+  apply plugin: 'checkstyle'
 
   dependencies {
     compile project(':samza-api')
@@ -332,6 +350,10 @@ project(":samza-kv_$scalaVersion") {
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
   }
+
+  checkstyle {
+    configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+  }
 }
 
 project(":samza-kv-inmemory_$scalaVersion") {
@@ -363,6 +385,7 @@ project(":samza-kv-rocksdb_$scalaVersion") {
 
 project(":samza-test_$scalaVersion") {
   apply plugin: 'scala'
+  apply plugin: 'checkstyle'
 
   configurations {
     // Remove transitive dependencies from Zookeeper that we don't want.
@@ -399,6 +422,10 @@ project(":samza-test_$scalaVersion") {
     jvmArgs = ["-XX:+UseConcMarkSweepGC", "-server"]
   }
 
+  checkstyle {
+    configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+  }
+
   tasks.create(name: "releaseTestJobs", dependsOn: 
configurations.archives.artifacts, type: Tar) {
     description 'Build an integration test tarball'
     compression = Compression.GZIP

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
new file mode 100644
index 0000000..770b5e7
--- /dev/null
+++ b/checkstyle/checkstyle.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE module PUBLIC
+    "-//Puppy Crawl//DTD Check Configuration 1.3//EN" 
+     "http://www.puppycrawl.com/dtds/configuration_1_3.dtd";>
+<!--
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+// 
+//    http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+--> 
+<module name="Checker">
+  <property name="localeLanguage" value="en"/>
+  
+  <module name="FileTabCharacter"/>
+  
+  <!-- header: use one star only -->
+  <module name="RegexpHeader">
+    <property name="header" value="/\*\nLicensed to the Apache.*"/>
+  </module>
+  
+  <module name="TreeWalker">
+    
+    <!-- code cleanup -->
+    <module name="UnusedImports"/>
+    <module name="RedundantImport"/>
+    <module name="IllegalImport" />
+    <module name="EqualsHashCode"/>
+    <module name="SimplifyBooleanExpression"/>
+    <module name="OneStatementPerLine"/>
+    <module name="UnnecessaryParentheses" />
+    <module name="SimplifyBooleanReturn"/>
+    
+    <!-- style -->
+    <module name="DefaultComesLast"/>
+    <module name="EmptyStatement"/>
+    <module name="ArrayTypeStyle"/>
+    <module name="UpperEll"/>
+    <module name="LeftCurly"/>
+    <module name="RightCurly"/>
+    <module name="EmptyStatement"/>
+    <module name="ConstantName">
+      <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/>
+    </module>
+    <module name="LocalVariableName"/>
+    <module name="LocalFinalVariableName"/>
+    <module name="ClassTypeParameterName"/>
+    <module name="MemberName"/>
+    <module name="MethodTypeParameterName"/>
+    <module name="PackageName"/>
+    <module name="ParameterName"/>
+    <module name="StaticVariableName"/>
+    <module name="TypeName"/>
+    
+    <!-- dependencies -->
+    <module name="ImportControl">
+      <property name="file" value="${basedir}/checkstyle/import-control.xml"/>
+    </module>
+    
+    <!-- whitespace -->
+    <module name="GenericWhitespace"/>
+    <module name="NoWhitespaceBefore"/>
+    <module name="WhitespaceAfter" />
+    <module name="NoWhitespaceAfter"/>
+    <module name="WhitespaceAround">
+      <property name="allowEmptyConstructors" value="true"/>
+      <property name="allowEmptyMethods" value="true"/>
+    </module>
+    <!-- set indentation with 2 spaces to be consistent with Scala -->
+    <module name="Indentation">
+      <property name="basicOffset" value="2"/>
+      <property name="caseIndent" value="2"/>
+      <property name="throwsIndent" value="2"/>
+    </module>
+    <module name="MethodParamPad"/>
+    <module name="ParenPad"/>
+    <module name="TypecastParenPad"/>
+  </module>
+</module>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
new file mode 100644
index 0000000..5f8e103
--- /dev/null
+++ b/checkstyle/import-control.xml
@@ -0,0 +1,175 @@
+<!DOCTYPE import-control PUBLIC
+    "-//Puppy Crawl//DTD Import Control 1.1//EN"
+    "http://www.puppycrawl.com/dtds/import_control_1_1.dtd";>
+<!--
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+// 
+//    http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+--> 
+<import-control pkg="org.apache.samza">
+       
+       <!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS 
FILE -->
+       
+       <!-- common library dependencies -->
+       <allow pkg="java" />
+       <allow pkg="javax.management" />
+       <allow pkg="org.slf4j" />
+       <allow pkg="org.junit" />
+    <allow pkg="org.codehaus" />
+    <allow pkg="org.mockito" />
+    <allow pkg="org.apache.log4j" />
+    <allow pkg="org.apache.kafka" />
+
+    <subpackage name="config">
+        <allow class="org.apache.samza.SamzaException" />
+    </subpackage>
+
+    <subpackage name="serializers">
+        <allow pkg="org.apache.samza.config" />
+
+        <subpackage name="model">
+            <allow pkg="org.apache.samza.job.model" />
+            <allow pkg="org.apache.samza.util" />
+
+            <allow class="org.apache.samza.Partition" />
+            <allow class="org.apache.samza.container.TaskName" />
+            <allow class="org.apache.samza.system.SystemStreamPartition" />
+        </subpackage>
+    </subpackage>
+
+    <subpackage name="job">
+        <allow pkg="org.apache.samza.config" />
+
+        <subpackage name="model">
+            <allow class="org.apache.samza.Partition" />
+            <allow class="org.apache.samza.container.TaskName" />
+            <allow class="org.apache.samza.system.SystemStreamPartition" />
+        </subpackage>
+    </subpackage>
+
+    <subpackage name="system">
+        <allow pkg="org.apache.samza.config" />
+        <allow pkg="org.apache.samza.metrics" />
+        <allow pkg="org.apache.samza.serializers" />
+
+        <allow class="org.apache.samza.Partition" />
+        <allow class="org.apache.samza.SamzaException" />
+
+        <subpackage name="chooser">
+            <allow class="org.apache.samza.system.SystemStreamPartition" />
+            <allow class="org.apache.samza.system.IncomingMessageEnvelope" />
+        </subpackage>
+
+        <subpackage name="mock">
+            <allow pkg="org.apache.samza.system" />
+            <allow pkg="org.apache.samza.util" />
+        </subpackage>
+    </subpackage>
+
+    <subpackage name="util">
+        <allow pkg="org.apache.samza.metrics" />
+        <allow pkg="org.apache.samza.system" />
+
+        <allow class="org.apache.samza.Partition" />
+        <allow class="org.apache.samza.SamzaException" />
+    </subpackage>
+
+    <subpackage name="metrics">
+        <allow pkg="org.apache.samza.config" />
+        <allow pkg="org.apache.samza.util" />
+    </subpackage>
+
+    <subpackage name="task">
+        <allow pkg="org.apache.samza.config" />
+        <allow pkg="org.apache.samza.container" />
+        <allow pkg="org.apache.samza.metrics" />
+        <allow pkg="org.apache.samza.system" />
+    </subpackage>
+
+    <subpackage name="container">
+        <allow pkg="org.apache.samza.config" />
+
+        <subpackage name="grouper">
+            <subpackage name="stream">
+                <allow pkg="org.apache.samza.container" />
+                <allow pkg="org.apache.samza.system" />
+            </subpackage>
+
+            <subpackage name="task">
+                <allow pkg="org.apache.samza.job" />
+            </subpackage>
+        </subpackage>
+    </subpackage>
+
+    <subpackage name="coordinator">
+        <allow pkg="org.apache.samza.checkpoint" />
+        <allow pkg="org.apache.samza.config" />
+        <allow pkg="org.apache.samza.metrics" />
+        <allow pkg="org.apache.samza.system" />
+        <allow pkg="org.apache.samza.serializers" />
+        <allow pkg="org.apache.samza.util" />
+
+        <allow class="org.apache.samza.Partition" />
+        <allow class="org.apache.samza.SamzaException" />
+    </subpackage>
+
+    <subpackage name="checkpoint">
+        <allow pkg="org.apache.samza.config" />
+        <allow pkg="org.apache.samza.container" />
+        <allow pkg="org.apache.samza.coordinator" />
+        <allow pkg="org.apache.samza.metrics" />
+        <allow pkg="org.apache.samza.system" />
+
+        <allow class="org.apache.samza.SamzaException" />
+    </subpackage>
+
+    <subpackage name="storage">
+        <allow pkg="org.apache.samza.container" />
+        <allow pkg="org.apache.samza.coordinator" />
+        <allow pkg="org.apache.samza.metrics" />
+        <allow pkg="org.apache.samza.serializers" />
+        <allow pkg="org.apache.samza.system" />
+        <allow pkg="org.apache.samza.task" />
+    </subpackage>
+
+    <subpackage name="logging">
+        <subpackage name="log4j">
+            <allow pkg="org.apache.samza.config" />
+            <allow pkg="org.apache.samza.coordinator" />
+            <allow pkg="org.apache.samza.job" />
+            <allow pkg="org.apache.samza.metrics" />
+            <allow pkg="org.apache.samza.system" />
+            <allow pkg="org.apache.samza.serializers" />
+            <allow pkg="org.apache.samza.util" />
+
+            <allow 
class="org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerdeFactory" 
/>
+            <allow 
class="org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerde" />
+            <allow 
class="org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory"
 />
+            <allow 
class="org.apache.samza.logging.log4j.serializers.LoggingEventStringSerde" />
+            <allow class="org.apache.samza.SamzaException" />
+        </subpackage>
+    </subpackage>
+
+    <subpackage name="test">
+        <subpackage name="integration">
+            <allow pkg="org.apache.samza.config" />
+            <allow pkg="org.apache.samza.container" />
+            <allow pkg="org.apache.samza.system" />
+            <allow pkg="org.apache.samza.storage" />
+            <allow pkg="org.apache.samza.task" />
+            <allow pkg="org.apache.samza.util" />
+        </subpackage>
+    </subpackage>
+
+</import-control>

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-api/src/main/java/org/apache/samza/config/Config.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/config/Config.java 
b/samza-api/src/main/java/org/apache/samza/config/Config.java
index 2b99050..7abe557 100644
--- a/samza-api/src/main/java/org/apache/samza/config/Config.java
+++ b/samza-api/src/main/java/org/apache/samza/config/Config.java
@@ -58,18 +58,18 @@ public abstract class Config implements Map<String, String> 
{
   }
 
   public Config regexSubset(String regex) {
-      Map<String, String> out = new HashMap<String, String>();
-      Pattern pattern = Pattern.compile(regex);
-
-      for (Entry<String, String> entry : entrySet()) {
-        String k = entry.getKey();
-        Matcher matcher = pattern.matcher(k);
-        if(matcher.find()){
-            out.put(k, entry.getValue());
-        }
+    Map<String, String> out = new HashMap<String, String>();
+    Pattern pattern = Pattern.compile(regex);
+
+    for (Entry<String, String> entry : entrySet()) {
+      String k = entry.getKey();
+      Matcher matcher = pattern.matcher(k);
+      if (matcher.find()) {
+        out.put(k, entry.getValue());
       }
+    }
 
-      return new MapConfig(out);
+    return new MapConfig(out);
   }
 
   public String get(String k, String defaultString) {

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-api/src/main/java/org/apache/samza/config/MapConfig.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/config/MapConfig.java 
b/samza-api/src/main/java/org/apache/samza/config/MapConfig.java
index 38d7424..0c3f14a 100644
--- a/samza-api/src/main/java/org/apache/samza/config/MapConfig.java
+++ b/samza-api/src/main/java/org/apache/samza/config/MapConfig.java
@@ -42,7 +42,7 @@ public class MapConfig extends Config {
 
   public MapConfig(List<Map<String, String>> maps) {
     this.map = new HashMap<String, String>();
-    for(Map<String, String> m: maps)
+    for (Map<String, String> m: maps)
       this.map.putAll(m);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java 
b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
index 8dcea09..95cc266 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
@@ -96,7 +96,7 @@ public class SystemStreamPartition extends SystemStream 
implements Comparable<Sy
 
   @Override
   public String toString() {
-    return "SystemStreamPartition ["+ system + ", " + stream + ", " + 
partition.getPartitionId() + "]";
+    return "SystemStreamPartition [" + system + ", " + stream + ", " + 
partition.getPartitionId() + "]";
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java 
b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index e30321d..7dd99fb 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -227,11 +227,11 @@ public abstract class BlockingEnvelopeMap implements 
SystemConsumer {
     }
 
     public void initMetrics(SystemStreamPartition systemStreamPartition) {
-      this.noMoreMessageGaugeMap.putIfAbsent(systemStreamPartition, 
metricsRegistry.<Boolean> newGauge(group, "no-more-messages-" + 
systemStreamPartition, false));
+      this.noMoreMessageGaugeMap.putIfAbsent(systemStreamPartition, 
metricsRegistry.<Boolean>newGauge(group, "no-more-messages-" + 
systemStreamPartition, false));
       this.blockingPollCountMap.putIfAbsent(systemStreamPartition, 
metricsRegistry.newCounter(group, "blocking-poll-count-" + 
systemStreamPartition));
       this.blockingPollTimeoutCountMap.putIfAbsent(systemStreamPartition, 
metricsRegistry.newCounter(group, "blocking-poll-timeout-count-" + 
systemStreamPartition));
 
-      metricsRegistry.<Integer> newGauge(group, new 
BufferGauge(systemStreamPartition, "buffered-message-count-" + 
systemStreamPartition));
+      metricsRegistry.<Integer>newGauge(group, new 
BufferGauge(systemStreamPartition, "buffered-message-count-" + 
systemStreamPartition));
     }
 
     public void setNoMoreMessages(SystemStreamPartition systemStreamPartition, 
boolean noMoreMessages) {

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
 
b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
index a6b14fb..249b8ae 100644
--- 
a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
+++ 
b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
@@ -37,10 +37,10 @@ import org.apache.samza.system.SystemStreamPartition;
  * Samza needs at least one partition for an input stream, in order to read it.
  */
 public class SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
-  private static final Map<Partition, SystemStreamPartitionMetadata> 
fakePartitionMetadata = new HashMap<Partition, SystemStreamPartitionMetadata>();
+  private static final Map<Partition, SystemStreamPartitionMetadata> 
FAKE_PARTITION_METADATA = new HashMap<Partition, 
SystemStreamPartitionMetadata>();
 
   static {
-    fakePartitionMetadata.put(new Partition(0), new 
SystemStreamPartitionMetadata(null, null, null));
+    FAKE_PARTITION_METADATA.put(new Partition(0), new 
SystemStreamPartitionMetadata(null, null, null));
   }
 
   @Override
@@ -48,14 +48,14 @@ public class SinglePartitionWithoutOffsetsSystemAdmin 
implements SystemAdmin {
     Map<String, SystemStreamMetadata> metadata = new HashMap<String, 
SystemStreamMetadata>();
 
     for (String streamName : streamNames) {
-      metadata.put(streamName, new SystemStreamMetadata(streamName, 
fakePartitionMetadata));
+      metadata.put(streamName, new SystemStreamMetadata(streamName, 
FAKE_PARTITION_METADATA));
     }
 
     return metadata;
   }
 
   @Override
-  public void createChangelogStream(String streamName, int numOfPartitions){
+  public void createChangelogStream(String streamName, int numOfPartitions) {
     throw new SamzaException("Method not implemented");
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/config/TestConfig.java 
b/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
index d9f378d..5d066c5 100644
--- a/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
+++ b/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
@@ -32,18 +32,20 @@ public class TestConfig {
    * overloaded args
    */
   Class getClass(long l) {
-    return Long.class ;
+    return Long.class;
   }
 
   Class getClass(short s) {
-    return Short.class ;
+    return Short.class;
   }
 
   @Test
   public void testgetShortAndLong() {
-    Map<String, String> m = new HashMap<String, String>() {{
-      put("testkey", "11");
-    }};
+    Map<String, String> m = new HashMap<String, String>() {
+      {
+        put("testkey", "11");
+      }
+    };
 
     MapConfig mc = new MapConfig(m);
     short defaultShort = 0;
@@ -64,12 +66,14 @@ public class TestConfig {
 
   @Test
   public void testSanitize() {
-    Map<String, String> m = new HashMap<String, String>() {{
-      put("key1", "value1");
-      put("key2", "value2");
-      put("sensitive.key3", "secret1");
-      put("sensitive.key4", "secret2");
-    }};
+    Map<String, String> m = new HashMap<String, String>() {
+      {
+        put("key1", "value1");
+        put("key2", "value2");
+        put("sensitive.key3", "secret1");
+        put("sensitive.key4", "secret2");
+      }
+    };
 
     Config config = new MapConfig(m);
     assertFalse(config.toString().contains("secret"));

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java
 
b/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java
index 5af2a11..81c483d 100644
--- 
a/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java
+++ 
b/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java
@@ -28,7 +28,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Queue;
 import java.util.Set;
 
 import org.apache.samza.Partition;

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java 
b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
index 4eb87eb..d1a0a82 100644
--- a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
+++ b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
@@ -39,7 +39,7 @@ import org.junit.Test;
 
 public class TestBlockingEnvelopeMap {
   private static final SystemStreamPartition SSP = new 
SystemStreamPartition("test", "test", new Partition(0));
-  private static final IncomingMessageEnvelope envelope = new 
IncomingMessageEnvelope(SSP, null, null, null);
+  private static final IncomingMessageEnvelope ENVELOPE = new 
IncomingMessageEnvelope(SSP, null, null, null);
   private static final Set<SystemStreamPartition> FETCH = new 
HashSet<SystemStreamPartition>();
 
   static {
@@ -69,12 +69,12 @@ public class TestBlockingEnvelopeMap {
   public void testShouldGetSomeMessages() throws InterruptedException {
     BlockingEnvelopeMap map = new MockBlockingEnvelopeMap();
     map.register(SSP, "0");
-    map.put(SSP, envelope);
+    map.put(SSP, ENVELOPE);
     Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = 
map.poll(FETCH, 0);
     assertEquals(1, envelopes.size());
     assertEquals(1, envelopes.get(SSP).size());
-    map.put(SSP, envelope);
-    map.put(SSP, envelope);
+    map.put(SSP, ENVELOPE);
+    map.put(SSP, ENVELOPE);
     envelopes = map.poll(FETCH, 0);
     assertEquals(1, envelopes.size());
     assertEquals(2, envelopes.get(SSP).size());
@@ -117,10 +117,10 @@ public class TestBlockingEnvelopeMap {
     // because BlockingEnvelopeMap calls clock.currentTimeMillis twice, and
     // uses the second call to determine the actual poll time.
     final BlockingEnvelopeMap map = new MockBlockingEnvelopeMap(q, new Clock() 
{
-      private final long NOW = System.currentTimeMillis();
+      private final long now = System.currentTimeMillis();
 
       public long currentTimeMillis() {
-        return NOW;
+        return now;
       }
     });
 
@@ -166,7 +166,7 @@ public class TestBlockingEnvelopeMap {
 
       pollTimeoutBarrier.countDown();
 
-      return envelope;
+      return ENVELOPE;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
b/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
index 3ac63ca..7445996 100644
--- 
a/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
@@ -23,7 +23,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import org.apache.samza.SamzaException;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.stream.CoordinatorStreamMessage;
 import 
org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetCheckpoint;
@@ -101,7 +100,7 @@ public class CheckpointManager {
     for (CoordinatorStreamMessage coordinatorStreamMessage : 
bootstrappedStream) {
       SetCheckpoint setCheckpoint = new 
SetCheckpoint(coordinatorStreamMessage);
       TaskName taskNameInCheckpoint = new TaskName(setCheckpoint.getKey());
-      if(taskNames.contains(taskNameInCheckpoint)) {
+      if (taskNames.contains(taskNameInCheckpoint)) {
         taskNamesToOffsets.put(taskNameInCheckpoint, 
setCheckpoint.getCheckpoint());
         log.debug("Adding checkpoint {} for taskName {}", 
taskNameInCheckpoint, taskName);
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
index f8b705f..0988ded 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
@@ -93,9 +93,9 @@ import org.slf4j.LoggerFactory;
  * </p>
  */
 public class CoordinatorStreamMessage {
-  public static int VERSION_INDEX = 0;
-  public static int TYPE_INDEX = 1;
-  public static int KEY_INDEX = 2;
+  public static final int VERSION_INDEX = 0;
+  public static final int TYPE_INDEX = 1;
+  public static final int KEY_INDEX = 2;
 
   private static final Logger log = 
LoggerFactory.getLogger(CoordinatorStreamMessage.class);
 
@@ -135,7 +135,7 @@ public class CoordinatorStreamMessage {
   }
 
   public CoordinatorStreamMessage(String source) {
-    this(source, new Object[] { Integer.valueOf(VERSION), null, null }, new 
HashMap<String, Object>());
+    this(source, new Object[] {Integer.valueOf(VERSION), null, null}, new 
HashMap<String, Object>());
   }
 
   public CoordinatorStreamMessage(String source, Object[] keyArray, 
Map<String, Object> messageMap) {

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
index 2134603..27e0750 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -29,7 +29,6 @@ import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -110,8 +109,7 @@ public class CoordinatorStreamSystemConsumer {
    * Starts the underlying SystemConsumer.
    */
   public void start() {
-    if(isStarted)
-    {
+    if (isStarted) {
       log.info("Coordinator stream consumer already started");
       return;
     }
@@ -147,12 +145,12 @@ public class CoordinatorStreamSystemConsumer {
         CoordinatorStreamMessage coordinatorStreamMessage = new 
CoordinatorStreamMessage(keyArray, valueMap);
         log.debug("Received coordinator stream message: {}", 
coordinatorStreamMessage);
         bootstrappedStreamSet.add(coordinatorStreamMessage);
-        if (SetConfig.TYPE.equals(coordinatorStreamMessage.getType())) {
+        if 
(CoordinatorStreamMessage.SetConfig.TYPE.equals(coordinatorStreamMessage.getType()))
 {
           String configKey = coordinatorStreamMessage.getKey();
           if (coordinatorStreamMessage.isDelete()) {
             configMap.remove(configKey);
           } else {
-            String configValue = new 
SetConfig(coordinatorStreamMessage).getConfigValue();
+            String configValue = new 
CoordinatorStreamMessage.SetConfig(coordinatorStreamMessage).getConfigValue();
             configMap.put(configKey, configValue);
           }
         }
@@ -166,7 +164,7 @@ public class CoordinatorStreamSystemConsumer {
 
   public Set<CoordinatorStreamMessage> getBoostrappedStream() {
     log.info("Returning the bootstrapped data from the stream");
-    if(!isBootstrapped)
+    if (!isBootstrapped)
       bootstrap();
     return bootstrappedStreamSet;
   }
@@ -176,7 +174,7 @@ public class CoordinatorStreamSystemConsumer {
     bootstrap();
     HashSet<CoordinatorStreamMessage> bootstrappedStream = new 
HashSet<CoordinatorStreamMessage>();
     for (CoordinatorStreamMessage coordinatorStreamMessage : 
bootstrappedStreamSet) {
-      if(type.equalsIgnoreCase(coordinatorStreamMessage.getType())) {
+      if (type.equalsIgnoreCase(coordinatorStreamMessage.getType())) {
         bootstrappedStream.add(coordinatorStreamMessage);
       }
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
index 0f3e10e..92f8907 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
@@ -74,8 +74,7 @@ public class CoordinatorStreamSystemProducer {
    * Creates the coordinator stream, and starts the system producer.
    */
   public void start() {
-    if(isStarted)
-    {
+    if (isStarted) {
       log.info("Coordinator stream producer already started");
       return;
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
 
b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index 17410c5..717b5dc 100644
--- 
a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ 
b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -29,7 +29,6 @@ import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.Util;
 import org.codehaus.jackson.JsonGenerator;
@@ -108,7 +107,7 @@ public class SamzaObjectMapper {
     public Config deserialize(JsonParser jsonParser, DeserializationContext 
context) throws IOException, JsonProcessingException {
       ObjectCodec oc = jsonParser.getCodec();
       JsonNode node = oc.readTree(jsonParser);
-      return new MapConfig(OBJECT_MAPPER.<Map<String, String>> readValue(node, 
new TypeReference<Map<String, String>>() {
+      return new MapConfig(OBJECT_MAPPER.<Map<String, String>>readValue(node, 
new TypeReference<Map<String, String>>() {
       }));
     }
   }
@@ -147,8 +146,7 @@ public class SamzaObjectMapper {
 
   public static class SystemStreamPartitionKeySerializer extends 
JsonSerializer<SystemStreamPartition> {
     @Override
-    public void serialize(SystemStreamPartition systemStreamPartition, 
JsonGenerator jgen, SerializerProvider provider)
-        throws IOException {
+    public void serialize(SystemStreamPartition systemStreamPartition, 
JsonGenerator jgen, SerializerProvider provider) throws IOException {
       String ssp = Util.sspToString(systemStreamPartition);
       jgen.writeFieldName(ssp);
     }
@@ -156,8 +154,7 @@ public class SamzaObjectMapper {
 
   public static class SystemStreamPartitionKeyDeserializer extends 
KeyDeserializer {
     @Override
-    public Object deserializeKey(String sspString, DeserializationContext ctxt)
-        throws IOException {
+    public Object deserializeKey(String sspString, DeserializationContext 
ctxt) throws IOException {
       return Util.stringToSsp(sspString);
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
 
b/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
index fff7634..7d3409c 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
@@ -20,7 +20,6 @@
 package org.apache.samza.storage;
 
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.samza.container.TaskName;
@@ -73,7 +72,7 @@ public class ChangelogPartitionManager {
    */
   public void register(TaskName taskName) {
     log.debug("Adding taskName {} to {}", taskName, this);
-    if(!isCoordinatorConsumerRegistered) {
+    if (!isCoordinatorConsumerRegistered) {
       coordinatorStreamConsumer.register();
       isCoordinatorConsumerRegistered = true;
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
 
b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
index 8071fec..be36125 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
@@ -19,10 +19,8 @@
 
 package org.apache.samza.container.grouper.task
 
-import org.apache.samza.container.TaskName
 import org.apache.samza.job.model.TaskModel
 import org.apache.samza.job.model.ContainerModel
-import org.apache.samza.system.SystemStreamPartition
 import scala.collection.JavaConversions._
 
 /**

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
index 59782fe..647cadb 100644
--- 
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
@@ -61,7 +61,7 @@ public class MockCoordinatorStreamSystemFactory implements 
SystemFactory {
    */
   public SystemConsumer getConsumer(String systemName, Config config, 
MetricsRegistry registry) {
 
-    if(useCachedConsumer && mockConsumer != null) {
+    if (useCachedConsumer && mockConsumer != null) {
       return mockConsumer;
     }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
index 00a2d59..e454593 100644
--- 
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
@@ -22,13 +22,11 @@ package org.apache.samza.coordinator.stream;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import org.apache.samza.SamzaException;
 import org.apache.samza.checkpoint.Checkpoint;
 import org.apache.samza.config.Config;
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStreamPartition;
@@ -72,8 +70,7 @@ public class MockCoordinatorStreamWrappedConsumer extends 
BlockingEnvelopeMap {
       for (Map.Entry<String, String> configPair : config.entrySet()) {
         byte[] keyBytes = null;
         byte[] messgeBytes = null;
-        if(configPair.getKey().startsWith(CHECKPOINTPREFIX))
-        {
+        if (configPair.getKey().startsWith(CHECKPOINTPREFIX)) {
           String[] checkpointInfo = configPair.getKey().split(":");
           String[] sspOffsetPair = configPair.getValue().split(":");
           HashMap<SystemStreamPartition, String> checkpointMap = new 
HashMap<SystemStreamPartition, String>();
@@ -82,16 +79,14 @@ public class MockCoordinatorStreamWrappedConsumer extends 
BlockingEnvelopeMap {
           CoordinatorStreamMessage.SetCheckpoint setCheckpoint = new 
CoordinatorStreamMessage.SetCheckpoint(checkpointInfo[1], checkpointInfo[2], 
cp);
           keyBytes = 
MAPPER.writeValueAsString(setCheckpoint.getKeyArray()).getBytes("UTF-8");
           messgeBytes = 
MAPPER.writeValueAsString(setCheckpoint.getMessageMap()).getBytes("UTF-8");
-        }
-        else if (configPair.getKey().startsWith(CHANGELOGPREFIX)) {
+        } else if (configPair.getKey().startsWith(CHANGELOGPREFIX)) {
           String[] changelogInfo = configPair.getKey().split(":");
           String changeLogPartition = configPair.getValue();
           CoordinatorStreamMessage.SetChangelogMapping changelogMapping = new 
CoordinatorStreamMessage.SetChangelogMapping(changelogInfo[1], 
changelogInfo[2], Integer.parseInt(changeLogPartition));
           keyBytes = 
MAPPER.writeValueAsString(changelogMapping.getKeyArray()).getBytes("UTF-8");
           messgeBytes = 
MAPPER.writeValueAsString(changelogMapping.getMessageMap()).getBytes("UTF-8");
-        }
-        else {
-          SetConfig setConfig = new SetConfig("source", configPair.getKey(), 
configPair.getValue());
+        } else {
+          CoordinatorStreamMessage.SetConfig setConfig = new 
CoordinatorStreamMessage.SetConfig("source", configPair.getKey(), 
configPair.getValue());
           keyBytes = 
MAPPER.writeValueAsString(setConfig.getKeyArray()).getBytes("UTF-8");
           messgeBytes = 
MAPPER.writeValueAsString(setConfig.getMessageMap()).getBytes("UTF-8");
         }
@@ -106,23 +101,21 @@ public class MockCoordinatorStreamWrappedConsumer extends 
BlockingEnvelopeMap {
 
   @Override
   public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
-      Set<SystemStreamPartition> systemStreamPartitions, long timeout)
-      throws InterruptedException {
+    Set<SystemStreamPartition> systemStreamPartitions, long timeout)
+    throws InterruptedException {
 
-    if(blockpollFlag) {
+    if (blockpollFlag) {
       blockConsumerPoll.await();
     }
 
     return super.poll(systemStreamPartitions, timeout);
   }
 
-  public CountDownLatch blockPool()
-  {
+  public CountDownLatch blockPool() {
     blockpollFlag = true;
     return blockConsumerPoll;
   }
 
 
-  public void stop() {
-  }
+  public void stop() {}
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
index 15181bb..ac26a01 100644
--- 
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
@@ -19,9 +19,11 @@
 
 package org.apache.samza.coordinator.stream;
 
-import static org.junit.Assert.*;
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.Delete;
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 import org.junit.Test;
 
 public class TestCoordinatorStreamMessage {
@@ -46,8 +48,8 @@ public class TestCoordinatorStreamMessage {
 
   @Test
   public void testSetConfig() {
-    SetConfig setConfig = new SetConfig("source", "key", "value");
-    assertEquals(SetConfig.TYPE, setConfig.getType());
+    CoordinatorStreamMessage.SetConfig setConfig = new 
CoordinatorStreamMessage.SetConfig("source", "key", "value");
+    assertEquals(CoordinatorStreamMessage.SetConfig.TYPE, setConfig.getType());
     assertEquals("key", setConfig.getKey());
     assertEquals("value", setConfig.getConfigValue());
     assertFalse(setConfig.isDelete());
@@ -56,7 +58,7 @@ public class TestCoordinatorStreamMessage {
 
   @Test
   public void testDelete() {
-    Delete delete = new Delete("source2", "key", "delete-type");
+    CoordinatorStreamMessage.Delete delete = new 
CoordinatorStreamMessage.Delete("source2", "key", "delete-type");
     assertEquals("delete-type", delete.getType());
     assertEquals("key", delete.getKey());
     assertNull(delete.getMessageMap());

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
index 5e193f8..2fe872b 100644
--- 
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
@@ -19,7 +19,10 @@
 
 package org.apache.samza.coordinator.stream;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -29,8 +32,6 @@ import java.util.Set;
 
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.Delete;
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemConsumer;
@@ -102,9 +103,9 @@ public class TestCoordinatorStreamSystemConsumer {
 
       if (pollCount++ == 0) {
         List<IncomingMessageEnvelope> list = new 
ArrayList<IncomingMessageEnvelope>();
-        SetConfig setConfig1 = new SetConfig("test", "job.name", 
"my-job-name");
-        SetConfig setConfig2 = new SetConfig("test", "job.id", "1234");
-        Delete delete = new Delete("test", "job.name", SetConfig.TYPE);
+        CoordinatorStreamMessage.SetConfig setConfig1 = new 
CoordinatorStreamMessage.SetConfig("test", "job.name", "my-job-name");
+        CoordinatorStreamMessage.SetConfig setConfig2 = new 
CoordinatorStreamMessage.SetConfig("test", "job.id", "1234");
+        CoordinatorStreamMessage.Delete delete = new 
CoordinatorStreamMessage.Delete("test", "job.name", 
CoordinatorStreamMessage.SetConfig.TYPE);
         list.add(new IncomingMessageEnvelope(systemStreamPartition, null, 
serialize(setConfig1.getKeyArray()), serialize(setConfig1.getMessageMap())));
         list.add(new IncomingMessageEnvelope(systemStreamPartition, null, 
serialize(setConfig2.getKeyArray()), serialize(setConfig2.getMessageMap())));
         list.add(new IncomingMessageEnvelope(systemStreamPartition, null, 
serialize(delete.getKeyArray()), delete.getMessageMap()));

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
index 728fa53..68e3255 100644
--- 
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
@@ -28,8 +28,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.samza.SamzaException;
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.Delete;
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -47,9 +45,9 @@ public class TestCoordinatorStreamSystemProducer {
     MockSystemProducer systemProducer = new MockSystemProducer(source);
     MockSystemAdmin systemAdmin = new MockSystemAdmin();
     CoordinatorStreamSystemProducer producer = new 
CoordinatorStreamSystemProducer(systemStream, systemProducer, systemAdmin);
-    SetConfig setConfig1 = new SetConfig(source, "job.name", "my-job-name");
-    SetConfig setConfig2 = new SetConfig(source, "job.id", "1234");
-    Delete delete = new Delete(source, "job.name", SetConfig.TYPE);
+    CoordinatorStreamMessage.SetConfig setConfig1 = new 
CoordinatorStreamMessage.SetConfig(source, "job.name", "my-job-name");
+    CoordinatorStreamMessage.SetConfig setConfig2 = new 
CoordinatorStreamMessage.SetConfig(source, "job.id", "1234");
+    CoordinatorStreamMessage.Delete delete = new 
CoordinatorStreamMessage.Delete(source, "job.name", 
CoordinatorStreamMessage.SetConfig.TYPE);
     assertFalse(systemProducer.isRegistered());
     producer.register(source);
     assertTrue(systemProducer.isRegistered());

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
 
b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
index 72b134c..ad1fbc5 100644
--- 
a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
+++ 
b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
@@ -19,11 +19,9 @@
 
 package org.apache.samza.serializers.model;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java 
b/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
index 2fb26e2..854ebbf 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
@@ -21,6 +21,6 @@ package org.apache.samza.storage.kv;
 
 import java.util.Iterator;
 
-public interface KeyValueIterator<K,V> extends Iterator<Entry<K,V>> {
+public interface KeyValueIterator<K, V> extends Iterator<Entry<K, V>> {
   public void close();
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index d3616fe..8948453 100644
--- 
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -32,8 +32,6 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.Log4jSystemConfig;
 import org.apache.samza.config.SerializerConfig;
 import org.apache.samza.config.ShellCommandConfig;
-import org.apache.samza.config.StreamConfig;
-import org.apache.samza.config.SystemConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerdeFactory;
@@ -54,9 +52,9 @@ import org.apache.samza.util.Util;
  */
 public class StreamAppender extends AppenderSkeleton {
 
-  private final String JAVA_OPTS_CONTAINER_NAME = "samza.container.name";
-  private final String APPLICATION_MASTER_TAG = "samza-application-master";
-  private final String SOURCE = "log4j-log";
+  private static final String JAVA_OPTS_CONTAINER_NAME = 
"samza.container.name";
+  private static final String APPLICATION_MASTER_TAG = 
"samza-application-master";
+  private static final String SOURCE = "log4j-log";
   private Config config = null;
   private SystemStream systemStream = null;
   private SystemProducer systemProducer = null;
@@ -104,7 +102,7 @@ public class StreamAppender extends AppenderSkeleton {
     String systemName = log4jSystemConfig.getSystemName();
     String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName);
     if (systemFactoryName != null) {
-      systemFactory = Util.<SystemFactory> getObj(systemFactoryName);
+      systemFactory = Util.<SystemFactory>getObj(systemFactoryName);
     } else {
       throw new SamzaException("Please define log4j system name and factory 
class");
     }
@@ -214,7 +212,7 @@ public class StreamAppender extends AppenderSkeleton {
    * @param streamName name of the stream
    */
   private void setSerde(Log4jSystemConfig log4jSystemConfig, String 
systemName, String streamName) {
-    String serdeClass = LoggingEventJsonSerdeFactory.class.getCanonicalName();;
+    String serdeClass = LoggingEventJsonSerdeFactory.class.getCanonicalName();
     String serdeName = log4jSystemConfig.getStreamSerdeName(systemName, 
streamName);
 
     if (serdeName != null) {
@@ -222,7 +220,7 @@ public class StreamAppender extends AppenderSkeleton {
     }
 
     if (serdeClass != null) {
-      SerdeFactory<LoggingEvent> serdeFactory = 
Util.<SerdeFactory<LoggingEvent>> getObj(serdeClass);
+      SerdeFactory<LoggingEvent> serdeFactory = 
Util.<SerdeFactory<LoggingEvent>>getObj(serdeClass);
       serde = serdeFactory.getSerde(systemName, config);
     } else {
       String serdeKey = String.format(SerializerConfig.SERDE(), serdeName);

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
index 8d8f5e8..908a080 100644
--- 
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
+++ 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
@@ -32,8 +32,8 @@ import org.apache.samza.serializers.Serde;
  * LoggingEvent based on the messages, which is deserialized from the bytes.
  */
 public class LoggingEventStringSerde implements Serde<LoggingEvent> {
-  final private String ENCODING = "UTF-8";
-  final Logger logger = Logger.getLogger(LoggingEventStringSerde.class);
+  private static final String ENCODING = "UTF-8";
+  private final Logger logger = 
Logger.getLogger(LoggingEventStringSerde.class);
 
   @Override
   public byte[] toBytes(LoggingEvent object) {

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java 
b/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
index 6314a3e..f7d3cbe 100644
--- 
a/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
+++ 
b/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
@@ -37,8 +37,8 @@ public class TestLog4jSystemConfig {
   @Test
   public void testGetSystemNames() {
     Map<String, String> map = new HashMap<String, String>();
-    map.put("systems.system1.samza.factory","1");
-    map.put("systems.system2.samza.factory","2");
+    map.put("systems.system1.samza.factory", "1");
+    map.put("systems.system2.samza.factory", "2");
     Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(new 
MapConfig(map));
 
     assertEquals(2, log4jSystemConfig.getSystemNames().size());
@@ -48,7 +48,7 @@ public class TestLog4jSystemConfig {
   public void testGetLog4jSystemName() {
     Map<String, String> map = new HashMap<String, String>();
     map.put("task.log4j.system", "log4j-system");
-    map.put("systems.system1.samza.factory","1");
+    map.put("systems.system1.samza.factory", "1");
 
     Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(new 
MapConfig(map));
     assertEquals("log4j-system", log4jSystemConfig.getSystemName());

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestJmxAppender.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestJmxAppender.java 
b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestJmxAppender.java
index 0bdade0..4859125 100644
--- 
a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestJmxAppender.java
+++ 
b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestJmxAppender.java
@@ -48,24 +48,24 @@ import org.junit.Test;
  * with the JmxAppender added as a root-level appender.
  */
 public class TestJmxAppender {
-  public static final int port = 5500;
-  public static final JMXServiceURL url = getJmxServiceURL();
+  public static final int PORT = 5500;
+  public static final JMXServiceURL URL = getJmxServiceURL();
   private static JMXConnectorServer cs = null;
   private static final Logger log = 
Logger.getLogger(TestJmxAppender.class.getName());
 
   private static JMXServiceURL getJmxServiceURL() {
     try {
-      return new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:" + 
port + "/jmxapitestrmi");
-    } catch(Exception e) {
+      return new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:" + 
PORT + "/jmxapitestrmi");
+    } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
 
   @BeforeClass
   public static void beforeSetupServers() throws Exception {
-    LocateRegistry.createRegistry(port);
+    LocateRegistry.createRegistry(PORT);
     MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
-    cs = JMXConnectorServerFactory.newJMXConnectorServer(url, null, 
mbeanServer);
+    cs = JMXConnectorServerFactory.newJMXConnectorServer(URL, null, 
mbeanServer);
     cs.start();
   }
 
@@ -78,7 +78,7 @@ public class TestJmxAppender {
 
   @Test
   public void testJmxAppender() throws Exception {
-    MBeanServerConnection mbserver = 
JMXConnectorFactory.connect(url).getMBeanServerConnection();
+    MBeanServerConnection mbserver = 
JMXConnectorFactory.connect(URL).getMBeanServerConnection();
     ObjectName objectName = new ObjectName(JmxAppender.JMX_OBJECT_DOMAIN + 
":type=" + JmxAppender.JMX_OBJECT_TYPE + ",name=" + 
JmxAppender.JMX_OBJECT_NAME);
     String level = null;
     MockAppender mockAppender = new MockAppender();

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
index 8c95211..b063366 100644
--- a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
+++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
@@ -24,7 +24,6 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.samza.Partition;
-import org.apache.samza.SamzaException;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
 
b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
index 67e56e0..6fafabc 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
@@ -45,7 +45,7 @@ public class SimpleStatefulTask implements StreamTask, 
InitableTask {
     this.store = (KeyValueStore<String, String>) context.getStore("mystore");
     System.out.println("Contents of store: ");
     KeyValueIterator<String, String> iter = store.all();
-    while(iter.hasNext()) {
+    while (iter.hasNext()) {
       Entry<String, String> entry = iter.next();
       System.out.println(entry.getKey() + " => " + entry.getValue());
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
 
b/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
index d7fecd8..1d524a8 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
@@ -33,10 +33,11 @@ import org.apache.samza.task.TaskCoordinator.RequestScope;
  * A simple performance test that just reads in messages and writes them to a 
state store as quickly as possible and periodically prints out throughput 
numbers
  */
 public class StatePerfTestTask implements StreamTask, InitableTask {
-  
+
+  private static final int LOG_INTERVAL = 100000;
+
   private KeyValueStore<String, String> store;
   private int count = 0;
-  private int LOG_INTERVAL = 100000;
   private long start = System.currentTimeMillis();
 
   @SuppressWarnings("unchecked")
@@ -47,9 +48,9 @@ public class StatePerfTestTask implements StreamTask, 
InitableTask {
   public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator) {
     store.put((String) envelope.getMessage(), (String) envelope.getMessage());
     count++;
-    if(count % LOG_INTERVAL == 0) {
-      double ellapsedSecs = (System.currentTimeMillis() - start)/1000.0;
-      System.out.println(String.format("Throughput = %.2f messages/sec.", 
count/ellapsedSecs));
+    if (count % LOG_INTERVAL == 0) {
+      double ellapsedSecs = (System.currentTimeMillis() - start) / 1000.0;
+      System.out.println(String.format("Throughput = %.2f messages/sec.", 
count / ellapsedSecs));
       start = System.currentTimeMillis();
       count = 0;
       coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER);

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java 
b/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
index 0598e51..ea3aeb1 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
@@ -39,7 +39,8 @@ public class Checker implements StreamTask, WindowableTask, 
InitableTask {
   
   private static Logger logger = LoggerFactory.getLogger(Checker.class);
 
-  private static String CURRENT_EPOCH = "current-epoch";
+  private static final String CURRENT_EPOCH = "current-epoch";
+
   private KeyValueStore<String, String> store;
   private int expectedKeys;
   private int numPartitions;
@@ -68,25 +69,25 @@ public class Checker implements StreamTask, WindowableTask, 
InitableTask {
     int count = 0;
     KeyValueIterator<String, String> iter = this.store.all();
 
-    while(iter.hasNext()) {
-      Entry<String, String> entry= iter.next();
+    while (iter.hasNext()) {
+      Entry<String, String> entry = iter.next();
       String foundEpoch = entry.getValue();
-      if(foundEpoch.equals(currentEpoch)) {
-          count += 1;
+      if (foundEpoch.equals(currentEpoch)) {
+        count += 1;
       } else {
-          logger.info("####### Found a different epoch! - " + foundEpoch + " 
Current epoch is " + currentEpoch);
+        logger.info("####### Found a different epoch! - " + foundEpoch + " 
Current epoch is " + currentEpoch);
       }
     }
     iter.close();
-    if(count == expectedKeys + 1) {
+    if (count == expectedKeys + 1) {
       logger.info("Epoch " + currentEpoch + " is complete.");
       int nextEpoch = Integer.parseInt(currentEpoch) + 1;
-      for(int i = 0; i < numPartitions; i++) {
-          logger.info("Emitting next epoch - " + Integer.toString(i) + " -> " 
+ Integer.toString(nextEpoch));
-          collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", 
"epoch"), Integer.toString(i), Integer.toString(nextEpoch)));
+      for (int i = 0; i < numPartitions; i++) {
+        logger.info("Emitting next epoch - " + Integer.toString(i) + " -> " + 
Integer.toString(nextEpoch));
+        collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", 
"epoch"), Integer.toString(i), Integer.toString(nextEpoch)));
       }
       this.store.put(CURRENT_EPOCH, Integer.toString(nextEpoch));
-    } else if(count > expectedKeys + 1) {
+    } else if (count > expectedKeys + 1) {
       throw new IllegalStateException("Got " + count + " keys, which is more 
than the expected " + (expectedKeys + 1));
     } else {
       logger.info("Only found " + count + " valid keys, try again later.");

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java 
b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
index 82a633d..6661d8e 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
@@ -41,9 +41,9 @@ public class Emitter implements StreamTask, InitableTask, 
WindowableTask {
   
   private static Logger logger = LoggerFactory.getLogger(Emitter.class);
   
-  private static String EPOCH = "the-epoch";
-  private static String COUNT = "the-count";
-  
+  private static final String EPOCH = "the-epoch";
+  private static final String COUNT = "the-count";
+
   private KeyValueStore<String, String> state;
   private int max;
   private TaskName taskName;
@@ -57,14 +57,14 @@ public class Emitter implements StreamTask, InitableTask, 
WindowableTask {
 
   @Override
   public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator) {
-    if(envelope.getSystemStreamPartition().getStream().equals("epoch")) {
+    if (envelope.getSystemStreamPartition().getStream().equals("epoch")) {
       int newEpoch = Integer.parseInt((String) envelope.getMessage());
       logger.info("New epoch in message - " + newEpoch);
 
       Integer epoch = getInt(EPOCH);
-      if(epoch == null || newEpoch == epoch)
+      if (epoch == null || newEpoch == epoch)
         return;
-      if(newEpoch < epoch)
+      if (newEpoch < epoch)
         throw new IllegalArgumentException("Got new epoch " + newEpoch + " 
which is less than current epoch " + epoch);
       
       // it's a new era, reset current epoch and count
@@ -77,12 +77,12 @@ public class Emitter implements StreamTask, InitableTask, 
WindowableTask {
   
   public void window(MessageCollector collector, TaskCoordinator coordinator) {
     Integer epoch = getInt(EPOCH);
-    if(epoch == null) {
+    if (epoch == null) {
       resetEpoch();
       return;
     }
     int counter = getInt(COUNT);
-    if(counter < max) {
+    if (counter < max) {
       logger.info("Emitting: " + counter + ", epoch = " + epoch + ", task = " 
+ taskName);
       OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(new 
SystemStream("kafka", "emitted"), Integer.toString(counter), epoch + "-" + 
taskName.toString());
       collector.send(envelope);
@@ -98,7 +98,7 @@ public class Emitter implements StreamTask, InitableTask, 
WindowableTask {
   
   private Integer getInt(String key) {
     String value = this.state.get(key);
-    return value == null? null : Integer.parseInt(value);
+    return value == null ? null : Integer.parseInt(value);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-test/src/main/java/org/apache/samza/test/integration/join/EpochPartitioner.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/join/EpochPartitioner.java
 
b/samza-test/src/main/java/org/apache/samza/test/integration/join/EpochPartitioner.java
deleted file mode 100644
index 438d77c..0000000
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/join/EpochPartitioner.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.integration.join;
-
-import kafka.producer.Partitioner;
-import kafka.utils.VerifiableProperties;
-
-public class EpochPartitioner implements Partitioner {
-  
-  public EpochPartitioner(VerifiableProperties p){}
-  
-  public int partition(Object key, int numParts) {
-    if(key instanceof Integer)
-      return (Integer) key;
-    else
-      return Integer.parseInt((String) key);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java 
b/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java
index d2c0c7e..d1dd1f8 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java
@@ -19,7 +19,6 @@
 
 package org.apache.samza.test.integration.join;
 
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -36,7 +35,6 @@ import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Int;
 
 @SuppressWarnings("unchecked")
 public class Joiner implements StreamTask, InitableTask {
@@ -64,20 +62,20 @@ public class Joiner implements StreamTask, InitableTask {
     int partition = Integer.parseInt(pieces[1].split(" ")[1]);
     Partitions partitions = loadPartitions(epoch, key);
     logger.info("Joiner got epoch = " + epoch + ", partition = " + partition + 
", parts = " + partitions);
-    if(partitions.epoch < epoch) {
+    if (partitions.epoch < epoch) {
       // we are in a new era
-      if(partitions.partitions.size() != expected)
+      if (partitions.partitions.size() != expected)
         throw new IllegalArgumentException("Should have " + expected + " 
partitions when new epoch starts.");
       logger.info("Reseting epoch to " + epoch);
       this.store.delete(key);
       partitions.epoch = epoch;
       partitions.partitions.clear();
       partitions.partitions.add(partition);
-    } else if(partitions.epoch > epoch){
+    } else if (partitions.epoch > epoch) {
       logger.info("Ignoring message for epoch " + epoch);
     } else {
       partitions.partitions.add(partition);
-      if(partitions.partitions.size() == expected) {
+      if (partitions.partitions.size() == expected) {
         logger.info("Completed: " + key + " -> " + Integer.toString(epoch));
         collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", 
"completed-keys"), key, Integer.toString(epoch)));
       }
@@ -89,7 +87,7 @@ public class Joiner implements StreamTask, InitableTask {
   private Partitions loadPartitions(int epoch, String key) {
     String current = this.store.get(key);
     Partitions partitions;
-    if(current == null)
+    if (current == null)
       partitions = new Partitions(epoch, new HashSet<Integer>());
     else
       partitions = Partitions.parse(current);
@@ -109,7 +107,7 @@ public class Joiner implements StreamTask, InitableTask {
       String[] pieces = s.split("\\|", -1);
       int epoch = Integer.parseInt(pieces[1]);
       Set<Integer> set = new HashSet<Integer>(pieces.length);
-      for(int i = 2; i < pieces.length - 1; i++)
+      for (int i = 2; i < pieces.length - 1; i++)
         set.add(Integer.parseInt(pieces[i]));
       return new Partitions(epoch, set);
     }
@@ -118,7 +116,7 @@ public class Joiner implements StreamTask, InitableTask {
       StringBuilder b = new StringBuilder("|");
       b.append(epoch);
       b.append("|");
-      for(int p: partitions) {
+      for (int p: partitions) {
         b.append(p);
         b.append("|");
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java 
b/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java
index 7c82e0a..4a299b6 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java
@@ -47,7 +47,7 @@ public class Watcher implements StreamTask, WindowableTask, 
InitableTask {
   @Override
   public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator) {
     int epoch = Integer.parseInt((String) envelope.getMessage());
-    if(epoch > currentEpoch) {
+    if (epoch > currentEpoch) {
       logger.info("Epoch changed to " + epoch + " from " + currentEpoch);
       this.currentEpoch = epoch;
       this.lastEpochChange = System.currentTimeMillis();
@@ -58,10 +58,10 @@ public class Watcher implements StreamTask, WindowableTask, 
InitableTask {
   @Override
   public void window(MessageCollector collector, TaskCoordinator coordinator) {
     boolean isLagging = System.currentTimeMillis() - lastEpochChange > 
maxTimeBetweenEpochsMs;
-    if(!inError && isLagging) {
+    if (!inError && isLagging) {
       this.inError = true;
       logger.info("Error state detected, alerting...");
-      logger.error("Job failed to make progress!" + String.format("No epoch 
change for %d minutes.", this.maxTimeBetweenEpochsMs / (60*1000)));
+      logger.error("Job failed to make progress!" + String.format("No epoch 
change for %d minutes.", this.maxTimeBetweenEpochsMs / (60 * 1000)));
     }
   }
   

Reply via email to