Repository: samza Updated Branches: refs/heads/master bb8a78a85 -> f0809a54b
SAMZA-657: add checkstyle to build script Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f0809a54 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f0809a54 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f0809a54 Branch: refs/heads/master Commit: f0809a54bd48109373cf000056dd652cb8b298f3 Parents: bb8a78a Author: Guozhang Wang <[email protected]> Authored: Thu May 7 17:38:53 2015 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Thu May 7 17:38:53 2015 -0700 ---------------------------------------------------------------------- README.md | 4 + build.gradle | 27 +++ checkstyle/checkstyle.xml | 88 ++++++++++ checkstyle/import-control.xml | 175 +++++++++++++++++++ .../java/org/apache/samza/config/Config.java | 20 +-- .../java/org/apache/samza/config/MapConfig.java | 2 +- .../samza/system/SystemStreamPartition.java | 2 +- .../apache/samza/util/BlockingEnvelopeMap.java | 4 +- ...inglePartitionWithoutOffsetsSystemAdmin.java | 8 +- .../org/apache/samza/config/TestConfig.java | 26 +-- .../TestSystemStreamPartitionIterator.java | 1 - .../samza/util/TestBlockingEnvelopeMap.java | 14 +- .../samza/checkpoint/CheckpointManager.java | 3 +- .../stream/CoordinatorStreamMessage.java | 8 +- .../stream/CoordinatorStreamSystemConsumer.java | 12 +- .../stream/CoordinatorStreamSystemProducer.java | 3 +- .../serializers/model/SamzaObjectMapper.java | 9 +- .../storage/ChangelogPartitionManager.java | 3 +- .../grouper/task/GroupByContainerCount.scala | 2 - .../MockCoordinatorStreamSystemFactory.java | 2 +- .../MockCoordinatorStreamWrappedConsumer.java | 25 +-- .../stream/TestCoordinatorStreamMessage.java | 14 +- .../TestCoordinatorStreamSystemConsumer.java | 13 +- .../TestCoordinatorStreamSystemProducer.java | 8 +- .../model/TestSamzaObjectMapper.java | 4 +- .../samza/storage/kv/KeyValueIterator.java | 2 +- .../samza/logging/log4j/StreamAppender.java | 14 +- .../serializers/LoggingEventStringSerde.java | 4 +- .../samza/config/TestLog4jSystemConfig.java | 6 +- .../samza/logging/log4j/TestJmxAppender.java | 14 +- .../samza/system/mock/MockSystemAdmin.java | 1 - .../test/integration/SimpleStatefulTask.java | 2 +- .../test/integration/StatePerfTestTask.java | 11 +- .../samza/test/integration/join/Checker.java | 23 +-- .../samza/test/integration/join/Emitter.java | 18 +- .../test/integration/join/EpochPartitioner.java | 35 ---- .../samza/test/integration/join/Joiner.java | 16 +- .../samza/test/integration/join/Watcher.java | 6 +- 38 files changed, 435 insertions(+), 194 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 0492790..f83fd41 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,10 @@ To run all integration tests: ./bin/integration-tests.sh <dir> +### Running checkstyle on the java code ### + + ./gradlew checkstyleMain checkstyleTest + ### Job Management To run a job (defined in a properties file): http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index ebad6eb..ac80a86 100644 --- a/build.gradle +++ b/build.gradle @@ -114,18 +114,26 @@ subprojects { project(':samza-api') { apply plugin: 'java' + apply plugin: 'checkstyle' dependencies { testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" } + + checkstyle { + configFile = new File(rootDir, "checkstyle/checkstyle.xml") + } } project(":samza-core_$scalaVersion") { apply plugin: 'scala' + apply plugin: 'checkstyle' + // Force scala joint compilation sourceSets.main.scala.srcDir "src/main/java" sourceSets.main.java.srcDirs = [] + jar { manifest { attributes("Implementation-Version": "$version") @@ -143,6 +151,10 @@ project(":samza-core_$scalaVersion") { testCompile "org.mockito:mockito-all:$mockitoVersion" testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion" } + + checkstyle { + configFile = new File(rootDir, "checkstyle/checkstyle.xml") + } } project(":samza-kafka_$scalaVersion") { @@ -191,6 +203,7 @@ project(":samza-kafka_$scalaVersion") { project(':samza-log4j') { apply plugin: 'java' + apply plugin: 'checkstyle' dependencies { compile "log4j:log4j:$log4jVersion" @@ -199,6 +212,10 @@ project(':samza-log4j') { compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion" testCompile "junit:junit:$junitVersion" } + + checkstyle { + configFile = new File(rootDir, "checkstyle/checkstyle.xml") + } } project(":samza-yarn_$scalaVersion") { @@ -324,6 +341,7 @@ project(":samza-shell") { project(":samza-kv_$scalaVersion") { apply plugin: 'scala' + apply plugin: 'checkstyle' dependencies { compile project(':samza-api') @@ -332,6 +350,10 @@ project(":samza-kv_$scalaVersion") { testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" } + + checkstyle { + configFile = new File(rootDir, "checkstyle/checkstyle.xml") + } } project(":samza-kv-inmemory_$scalaVersion") { @@ -363,6 +385,7 @@ project(":samza-kv-rocksdb_$scalaVersion") { project(":samza-test_$scalaVersion") { apply plugin: 'scala' + apply plugin: 'checkstyle' configurations { // Remove transitive dependencies from Zookeeper that we don't want. @@ -399,6 +422,10 @@ project(":samza-test_$scalaVersion") { jvmArgs = ["-XX:+UseConcMarkSweepGC", "-server"] } + checkstyle { + configFile = new File(rootDir, "checkstyle/checkstyle.xml") + } + tasks.create(name: "releaseTestJobs", dependsOn: configurations.archives.artifacts, type: Tar) { description 'Build an integration test tarball' compression = Compression.GZIP http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/checkstyle/checkstyle.xml ---------------------------------------------------------------------- diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml new file mode 100644 index 0000000..770b5e7 --- /dev/null +++ b/checkstyle/checkstyle.xml @@ -0,0 +1,88 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE module PUBLIC + "-//Puppy Crawl//DTD Check Configuration 1.3//EN" + "http://www.puppycrawl.com/dtds/configuration_1_3.dtd"> +<!-- +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +--> +<module name="Checker"> + <property name="localeLanguage" value="en"/> + + <module name="FileTabCharacter"/> + + <!-- header: use one star only --> + <module name="RegexpHeader"> + <property name="header" value="/\*\nLicensed to the Apache.*"/> + </module> + + <module name="TreeWalker"> + + <!-- code cleanup --> + <module name="UnusedImports"/> + <module name="RedundantImport"/> + <module name="IllegalImport" /> + <module name="EqualsHashCode"/> + <module name="SimplifyBooleanExpression"/> + <module name="OneStatementPerLine"/> + <module name="UnnecessaryParentheses" /> + <module name="SimplifyBooleanReturn"/> + + <!-- style --> + <module name="DefaultComesLast"/> + <module name="EmptyStatement"/> + <module name="ArrayTypeStyle"/> + <module name="UpperEll"/> + <module name="LeftCurly"/> + <module name="RightCurly"/> + <module name="EmptyStatement"/> + <module name="ConstantName"> + <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/> + </module> + <module name="LocalVariableName"/> + <module name="LocalFinalVariableName"/> + <module name="ClassTypeParameterName"/> + <module name="MemberName"/> + <module name="MethodTypeParameterName"/> + <module name="PackageName"/> + <module name="ParameterName"/> + <module name="StaticVariableName"/> + <module name="TypeName"/> + + <!-- dependencies --> + <module name="ImportControl"> + <property name="file" value="${basedir}/checkstyle/import-control.xml"/> + </module> + + <!-- whitespace --> + <module name="GenericWhitespace"/> + <module name="NoWhitespaceBefore"/> + <module name="WhitespaceAfter" /> + <module name="NoWhitespaceAfter"/> + <module name="WhitespaceAround"> + <property name="allowEmptyConstructors" value="true"/> + <property name="allowEmptyMethods" value="true"/> + </module> + <!-- set indentation with 2 spaces to be consistent with Scala --> + <module name="Indentation"> + <property name="basicOffset" value="2"/> + <property name="caseIndent" value="2"/> + <property name="throwsIndent" value="2"/> + </module> + <module name="MethodParamPad"/> + <module name="ParenPad"/> + <module name="TypecastParenPad"/> + </module> +</module> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml new file mode 100644 index 0000000..5f8e103 --- /dev/null +++ b/checkstyle/import-control.xml @@ -0,0 +1,175 @@ +<!DOCTYPE import-control PUBLIC + "-//Puppy Crawl//DTD Import Control 1.1//EN" + "http://www.puppycrawl.com/dtds/import_control_1_1.dtd"> +<!-- +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +--> +<import-control pkg="org.apache.samza"> + + <!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE --> + + <!-- common library dependencies --> + <allow pkg="java" /> + <allow pkg="javax.management" /> + <allow pkg="org.slf4j" /> + <allow pkg="org.junit" /> + <allow pkg="org.codehaus" /> + <allow pkg="org.mockito" /> + <allow pkg="org.apache.log4j" /> + <allow pkg="org.apache.kafka" /> + + <subpackage name="config"> + <allow class="org.apache.samza.SamzaException" /> + </subpackage> + + <subpackage name="serializers"> + <allow pkg="org.apache.samza.config" /> + + <subpackage name="model"> + <allow pkg="org.apache.samza.job.model" /> + <allow pkg="org.apache.samza.util" /> + + <allow class="org.apache.samza.Partition" /> + <allow class="org.apache.samza.container.TaskName" /> + <allow class="org.apache.samza.system.SystemStreamPartition" /> + </subpackage> + </subpackage> + + <subpackage name="job"> + <allow pkg="org.apache.samza.config" /> + + <subpackage name="model"> + <allow class="org.apache.samza.Partition" /> + <allow class="org.apache.samza.container.TaskName" /> + <allow class="org.apache.samza.system.SystemStreamPartition" /> + </subpackage> + </subpackage> + + <subpackage name="system"> + <allow pkg="org.apache.samza.config" /> + <allow pkg="org.apache.samza.metrics" /> + <allow pkg="org.apache.samza.serializers" /> + + <allow class="org.apache.samza.Partition" /> + <allow class="org.apache.samza.SamzaException" /> + + <subpackage name="chooser"> + <allow class="org.apache.samza.system.SystemStreamPartition" /> + <allow class="org.apache.samza.system.IncomingMessageEnvelope" /> + </subpackage> + + <subpackage name="mock"> + <allow pkg="org.apache.samza.system" /> + <allow pkg="org.apache.samza.util" /> + </subpackage> + </subpackage> + + <subpackage name="util"> + <allow pkg="org.apache.samza.metrics" /> + <allow pkg="org.apache.samza.system" /> + + <allow class="org.apache.samza.Partition" /> + <allow class="org.apache.samza.SamzaException" /> + </subpackage> + + <subpackage name="metrics"> + <allow pkg="org.apache.samza.config" /> + <allow pkg="org.apache.samza.util" /> + </subpackage> + + <subpackage name="task"> + <allow pkg="org.apache.samza.config" /> + <allow pkg="org.apache.samza.container" /> + <allow pkg="org.apache.samza.metrics" /> + <allow pkg="org.apache.samza.system" /> + </subpackage> + + <subpackage name="container"> + <allow pkg="org.apache.samza.config" /> + + <subpackage name="grouper"> + <subpackage name="stream"> + <allow pkg="org.apache.samza.container" /> + <allow pkg="org.apache.samza.system" /> + </subpackage> + + <subpackage name="task"> + <allow pkg="org.apache.samza.job" /> + </subpackage> + </subpackage> + </subpackage> + + <subpackage name="coordinator"> + <allow pkg="org.apache.samza.checkpoint" /> + <allow pkg="org.apache.samza.config" /> + <allow pkg="org.apache.samza.metrics" /> + <allow pkg="org.apache.samza.system" /> + <allow pkg="org.apache.samza.serializers" /> + <allow pkg="org.apache.samza.util" /> + + <allow class="org.apache.samza.Partition" /> + <allow class="org.apache.samza.SamzaException" /> + </subpackage> + + <subpackage name="checkpoint"> + <allow pkg="org.apache.samza.config" /> + <allow pkg="org.apache.samza.container" /> + <allow pkg="org.apache.samza.coordinator" /> + <allow pkg="org.apache.samza.metrics" /> + <allow pkg="org.apache.samza.system" /> + + <allow class="org.apache.samza.SamzaException" /> + </subpackage> + + <subpackage name="storage"> + <allow pkg="org.apache.samza.container" /> + <allow pkg="org.apache.samza.coordinator" /> + <allow pkg="org.apache.samza.metrics" /> + <allow pkg="org.apache.samza.serializers" /> + <allow pkg="org.apache.samza.system" /> + <allow pkg="org.apache.samza.task" /> + </subpackage> + + <subpackage name="logging"> + <subpackage name="log4j"> + <allow pkg="org.apache.samza.config" /> + <allow pkg="org.apache.samza.coordinator" /> + <allow pkg="org.apache.samza.job" /> + <allow pkg="org.apache.samza.metrics" /> + <allow pkg="org.apache.samza.system" /> + <allow pkg="org.apache.samza.serializers" /> + <allow pkg="org.apache.samza.util" /> + + <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerdeFactory" /> + <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerde" /> + <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory" /> + <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventStringSerde" /> + <allow class="org.apache.samza.SamzaException" /> + </subpackage> + </subpackage> + + <subpackage name="test"> + <subpackage name="integration"> + <allow pkg="org.apache.samza.config" /> + <allow pkg="org.apache.samza.container" /> + <allow pkg="org.apache.samza.system" /> + <allow pkg="org.apache.samza.storage" /> + <allow pkg="org.apache.samza.task" /> + <allow pkg="org.apache.samza.util" /> + </subpackage> + </subpackage> + +</import-control> http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-api/src/main/java/org/apache/samza/config/Config.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/config/Config.java b/samza-api/src/main/java/org/apache/samza/config/Config.java index 2b99050..7abe557 100644 --- a/samza-api/src/main/java/org/apache/samza/config/Config.java +++ b/samza-api/src/main/java/org/apache/samza/config/Config.java @@ -58,18 +58,18 @@ public abstract class Config implements Map<String, String> { } public Config regexSubset(String regex) { - Map<String, String> out = new HashMap<String, String>(); - Pattern pattern = Pattern.compile(regex); - - for (Entry<String, String> entry : entrySet()) { - String k = entry.getKey(); - Matcher matcher = pattern.matcher(k); - if(matcher.find()){ - out.put(k, entry.getValue()); - } + Map<String, String> out = new HashMap<String, String>(); + Pattern pattern = Pattern.compile(regex); + + for (Entry<String, String> entry : entrySet()) { + String k = entry.getKey(); + Matcher matcher = pattern.matcher(k); + if (matcher.find()) { + out.put(k, entry.getValue()); } + } - return new MapConfig(out); + return new MapConfig(out); } public String get(String k, String defaultString) { http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-api/src/main/java/org/apache/samza/config/MapConfig.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/config/MapConfig.java b/samza-api/src/main/java/org/apache/samza/config/MapConfig.java index 38d7424..0c3f14a 100644 --- a/samza-api/src/main/java/org/apache/samza/config/MapConfig.java +++ b/samza-api/src/main/java/org/apache/samza/config/MapConfig.java @@ -42,7 +42,7 @@ public class MapConfig extends Config { public MapConfig(List<Map<String, String>> maps) { this.map = new HashMap<String, String>(); - for(Map<String, String> m: maps) + for (Map<String, String> m: maps) this.map.putAll(m); } http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java index 8dcea09..95cc266 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java @@ -96,7 +96,7 @@ public class SystemStreamPartition extends SystemStream implements Comparable<Sy @Override public String toString() { - return "SystemStreamPartition ["+ system + ", " + stream + ", " + partition.getPartitionId() + "]"; + return "SystemStreamPartition [" + system + ", " + stream + ", " + partition.getPartitionId() + "]"; } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java index e30321d..7dd99fb 100644 --- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java +++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java @@ -227,11 +227,11 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer { } public void initMetrics(SystemStreamPartition systemStreamPartition) { - this.noMoreMessageGaugeMap.putIfAbsent(systemStreamPartition, metricsRegistry.<Boolean> newGauge(group, "no-more-messages-" + systemStreamPartition, false)); + this.noMoreMessageGaugeMap.putIfAbsent(systemStreamPartition, metricsRegistry.<Boolean>newGauge(group, "no-more-messages-" + systemStreamPartition, false)); this.blockingPollCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(group, "blocking-poll-count-" + systemStreamPartition)); this.blockingPollTimeoutCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(group, "blocking-poll-timeout-count-" + systemStreamPartition)); - metricsRegistry.<Integer> newGauge(group, new BufferGauge(systemStreamPartition, "buffered-message-count-" + systemStreamPartition)); + metricsRegistry.<Integer>newGauge(group, new BufferGauge(systemStreamPartition, "buffered-message-count-" + systemStreamPartition)); } public void setNoMoreMessages(SystemStreamPartition systemStreamPartition, boolean noMoreMessages) { http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java index a6b14fb..249b8ae 100644 --- a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java +++ b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java @@ -37,10 +37,10 @@ import org.apache.samza.system.SystemStreamPartition; * Samza needs at least one partition for an input stream, in order to read it. */ public class SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin { - private static final Map<Partition, SystemStreamPartitionMetadata> fakePartitionMetadata = new HashMap<Partition, SystemStreamPartitionMetadata>(); + private static final Map<Partition, SystemStreamPartitionMetadata> FAKE_PARTITION_METADATA = new HashMap<Partition, SystemStreamPartitionMetadata>(); static { - fakePartitionMetadata.put(new Partition(0), new SystemStreamPartitionMetadata(null, null, null)); + FAKE_PARTITION_METADATA.put(new Partition(0), new SystemStreamPartitionMetadata(null, null, null)); } @Override @@ -48,14 +48,14 @@ public class SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin { Map<String, SystemStreamMetadata> metadata = new HashMap<String, SystemStreamMetadata>(); for (String streamName : streamNames) { - metadata.put(streamName, new SystemStreamMetadata(streamName, fakePartitionMetadata)); + metadata.put(streamName, new SystemStreamMetadata(streamName, FAKE_PARTITION_METADATA)); } return metadata; } @Override - public void createChangelogStream(String streamName, int numOfPartitions){ + public void createChangelogStream(String streamName, int numOfPartitions) { throw new SamzaException("Method not implemented"); } http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-api/src/test/java/org/apache/samza/config/TestConfig.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/config/TestConfig.java b/samza-api/src/test/java/org/apache/samza/config/TestConfig.java index d9f378d..5d066c5 100644 --- a/samza-api/src/test/java/org/apache/samza/config/TestConfig.java +++ b/samza-api/src/test/java/org/apache/samza/config/TestConfig.java @@ -32,18 +32,20 @@ public class TestConfig { * overloaded args */ Class getClass(long l) { - return Long.class ; + return Long.class; } Class getClass(short s) { - return Short.class ; + return Short.class; } @Test public void testgetShortAndLong() { - Map<String, String> m = new HashMap<String, String>() {{ - put("testkey", "11"); - }}; + Map<String, String> m = new HashMap<String, String>() { + { + put("testkey", "11"); + } + }; MapConfig mc = new MapConfig(m); short defaultShort = 0; @@ -64,12 +66,14 @@ public class TestConfig { @Test public void testSanitize() { - Map<String, String> m = new HashMap<String, String>() {{ - put("key1", "value1"); - put("key2", "value2"); - put("sensitive.key3", "secret1"); - put("sensitive.key4", "secret2"); - }}; + Map<String, String> m = new HashMap<String, String>() { + { + put("key1", "value1"); + put("key2", "value2"); + put("sensitive.key3", "secret1"); + put("sensitive.key4", "secret2"); + } + }; Config config = new MapConfig(m); assertFalse(config.toString().contains("secret")); http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java b/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java index 5af2a11..81c483d 100644 --- a/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java +++ b/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java @@ -28,7 +28,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Queue; import java.util.Set; import org.apache.samza.Partition; http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java index 4eb87eb..d1a0a82 100644 --- a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java +++ b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java @@ -39,7 +39,7 @@ import org.junit.Test; public class TestBlockingEnvelopeMap { private static final SystemStreamPartition SSP = new SystemStreamPartition("test", "test", new Partition(0)); - private static final IncomingMessageEnvelope envelope = new IncomingMessageEnvelope(SSP, null, null, null); + private static final IncomingMessageEnvelope ENVELOPE = new IncomingMessageEnvelope(SSP, null, null, null); private static final Set<SystemStreamPartition> FETCH = new HashSet<SystemStreamPartition>(); static { @@ -69,12 +69,12 @@ public class TestBlockingEnvelopeMap { public void testShouldGetSomeMessages() throws InterruptedException { BlockingEnvelopeMap map = new MockBlockingEnvelopeMap(); map.register(SSP, "0"); - map.put(SSP, envelope); + map.put(SSP, ENVELOPE); Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = map.poll(FETCH, 0); assertEquals(1, envelopes.size()); assertEquals(1, envelopes.get(SSP).size()); - map.put(SSP, envelope); - map.put(SSP, envelope); + map.put(SSP, ENVELOPE); + map.put(SSP, ENVELOPE); envelopes = map.poll(FETCH, 0); assertEquals(1, envelopes.size()); assertEquals(2, envelopes.get(SSP).size()); @@ -117,10 +117,10 @@ public class TestBlockingEnvelopeMap { // because BlockingEnvelopeMap calls clock.currentTimeMillis twice, and // uses the second call to determine the actual poll time. final BlockingEnvelopeMap map = new MockBlockingEnvelopeMap(q, new Clock() { - private final long NOW = System.currentTimeMillis(); + private final long now = System.currentTimeMillis(); public long currentTimeMillis() { - return NOW; + return now; } }); @@ -166,7 +166,7 @@ public class TestBlockingEnvelopeMap { pollTimeoutBarrier.countDown(); - return envelope; + return ENVELOPE; } } http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java index 3ac63ca..7445996 100644 --- a/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java +++ b/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java @@ -23,7 +23,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -import org.apache.samza.SamzaException; import org.apache.samza.container.TaskName; import org.apache.samza.coordinator.stream.CoordinatorStreamMessage; import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetCheckpoint; @@ -101,7 +100,7 @@ public class CheckpointManager { for (CoordinatorStreamMessage coordinatorStreamMessage : bootstrappedStream) { SetCheckpoint setCheckpoint = new SetCheckpoint(coordinatorStreamMessage); TaskName taskNameInCheckpoint = new TaskName(setCheckpoint.getKey()); - if(taskNames.contains(taskNameInCheckpoint)) { + if (taskNames.contains(taskNameInCheckpoint)) { taskNamesToOffsets.put(taskNameInCheckpoint, setCheckpoint.getCheckpoint()); log.debug("Adding checkpoint {} for taskName {}", taskNameInCheckpoint, taskName); } http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java index f8b705f..0988ded 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java @@ -93,9 +93,9 @@ import org.slf4j.LoggerFactory; * </p> */ public class CoordinatorStreamMessage { - public static int VERSION_INDEX = 0; - public static int TYPE_INDEX = 1; - public static int KEY_INDEX = 2; + public static final int VERSION_INDEX = 0; + public static final int TYPE_INDEX = 1; + public static final int KEY_INDEX = 2; private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamMessage.class); @@ -135,7 +135,7 @@ public class CoordinatorStreamMessage { } public CoordinatorStreamMessage(String source) { - this(source, new Object[] { Integer.valueOf(VERSION), null, null }, new HashMap<String, Object>()); + this(source, new Object[] {Integer.valueOf(VERSION), null, null}, new HashMap<String, Object>()); } public CoordinatorStreamMessage(String source, Object[] keyArray, Map<String, Object> messageMap) { http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java index 2134603..27e0750 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java @@ -29,7 +29,6 @@ import org.apache.samza.Partition; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; -import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig; import org.apache.samza.serializers.JsonSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.system.IncomingMessageEnvelope; @@ -110,8 +109,7 @@ public class CoordinatorStreamSystemConsumer { * Starts the underlying SystemConsumer. */ public void start() { - if(isStarted) - { + if (isStarted) { log.info("Coordinator stream consumer already started"); return; } @@ -147,12 +145,12 @@ public class CoordinatorStreamSystemConsumer { CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, valueMap); log.debug("Received coordinator stream message: {}", coordinatorStreamMessage); bootstrappedStreamSet.add(coordinatorStreamMessage); - if (SetConfig.TYPE.equals(coordinatorStreamMessage.getType())) { + if (CoordinatorStreamMessage.SetConfig.TYPE.equals(coordinatorStreamMessage.getType())) { String configKey = coordinatorStreamMessage.getKey(); if (coordinatorStreamMessage.isDelete()) { configMap.remove(configKey); } else { - String configValue = new SetConfig(coordinatorStreamMessage).getConfigValue(); + String configValue = new CoordinatorStreamMessage.SetConfig(coordinatorStreamMessage).getConfigValue(); configMap.put(configKey, configValue); } } @@ -166,7 +164,7 @@ public class CoordinatorStreamSystemConsumer { public Set<CoordinatorStreamMessage> getBoostrappedStream() { log.info("Returning the bootstrapped data from the stream"); - if(!isBootstrapped) + if (!isBootstrapped) bootstrap(); return bootstrappedStreamSet; } @@ -176,7 +174,7 @@ public class CoordinatorStreamSystemConsumer { bootstrap(); HashSet<CoordinatorStreamMessage> bootstrappedStream = new HashSet<CoordinatorStreamMessage>(); for (CoordinatorStreamMessage coordinatorStreamMessage : bootstrappedStreamSet) { - if(type.equalsIgnoreCase(coordinatorStreamMessage.getType())) { + if (type.equalsIgnoreCase(coordinatorStreamMessage.getType())) { bootstrappedStream.add(coordinatorStreamMessage); } } http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java index 0f3e10e..92f8907 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java @@ -74,8 +74,7 @@ public class CoordinatorStreamSystemProducer { * Creates the coordinator stream, and starts the system producer. */ public void start() { - if(isStarted) - { + if (isStarted) { log.info("Coordinator stream producer already started"); return; } http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java index 17410c5..717b5dc 100644 --- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java +++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java @@ -29,7 +29,6 @@ import org.apache.samza.container.TaskName; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; import org.apache.samza.job.model.TaskModel; -import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.util.Util; import org.codehaus.jackson.JsonGenerator; @@ -108,7 +107,7 @@ public class SamzaObjectMapper { public Config deserialize(JsonParser jsonParser, DeserializationContext context) throws IOException, JsonProcessingException { ObjectCodec oc = jsonParser.getCodec(); JsonNode node = oc.readTree(jsonParser); - return new MapConfig(OBJECT_MAPPER.<Map<String, String>> readValue(node, new TypeReference<Map<String, String>>() { + return new MapConfig(OBJECT_MAPPER.<Map<String, String>>readValue(node, new TypeReference<Map<String, String>>() { })); } } @@ -147,8 +146,7 @@ public class SamzaObjectMapper { public static class SystemStreamPartitionKeySerializer extends JsonSerializer<SystemStreamPartition> { @Override - public void serialize(SystemStreamPartition systemStreamPartition, JsonGenerator jgen, SerializerProvider provider) - throws IOException { + public void serialize(SystemStreamPartition systemStreamPartition, JsonGenerator jgen, SerializerProvider provider) throws IOException { String ssp = Util.sspToString(systemStreamPartition); jgen.writeFieldName(ssp); } @@ -156,8 +154,7 @@ public class SamzaObjectMapper { public static class SystemStreamPartitionKeyDeserializer extends KeyDeserializer { @Override - public Object deserializeKey(String sspString, DeserializationContext ctxt) - throws IOException { + public Object deserializeKey(String sspString, DeserializationContext ctxt) throws IOException { return Util.stringToSsp(sspString); } } http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java b/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java index fff7634..7d3409c 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java @@ -20,7 +20,6 @@ package org.apache.samza.storage; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; import org.apache.samza.container.TaskName; @@ -73,7 +72,7 @@ public class ChangelogPartitionManager { */ public void register(TaskName taskName) { log.debug("Adding taskName {} to {}", taskName, this); - if(!isCoordinatorConsumerRegistered) { + if (!isCoordinatorConsumerRegistered) { coordinatorStreamConsumer.register(); isCoordinatorConsumerRegistered = true; } http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala index 8071fec..be36125 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala @@ -19,10 +19,8 @@ package org.apache.samza.container.grouper.task -import org.apache.samza.container.TaskName import org.apache.samza.job.model.TaskModel import org.apache.samza.job.model.ContainerModel -import org.apache.samza.system.SystemStreamPartition import scala.collection.JavaConversions._ /** http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java index 59782fe..647cadb 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java @@ -61,7 +61,7 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory { */ public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) { - if(useCachedConsumer && mockConsumer != null) { + if (useCachedConsumer && mockConsumer != null) { return mockConsumer; } http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java index 00a2d59..e454593 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java @@ -22,13 +22,11 @@ package org.apache.samza.coordinator.stream; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.Set; import java.util.concurrent.CountDownLatch; import org.apache.samza.SamzaException; import org.apache.samza.checkpoint.Checkpoint; import org.apache.samza.config.Config; -import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig; import org.apache.samza.serializers.model.SamzaObjectMapper; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemStreamPartition; @@ -72,8 +70,7 @@ public class MockCoordinatorStreamWrappedConsumer extends BlockingEnvelopeMap { for (Map.Entry<String, String> configPair : config.entrySet()) { byte[] keyBytes = null; byte[] messgeBytes = null; - if(configPair.getKey().startsWith(CHECKPOINTPREFIX)) - { + if (configPair.getKey().startsWith(CHECKPOINTPREFIX)) { String[] checkpointInfo = configPair.getKey().split(":"); String[] sspOffsetPair = configPair.getValue().split(":"); HashMap<SystemStreamPartition, String> checkpointMap = new HashMap<SystemStreamPartition, String>(); @@ -82,16 +79,14 @@ public class MockCoordinatorStreamWrappedConsumer extends BlockingEnvelopeMap { CoordinatorStreamMessage.SetCheckpoint setCheckpoint = new CoordinatorStreamMessage.SetCheckpoint(checkpointInfo[1], checkpointInfo[2], cp); keyBytes = MAPPER.writeValueAsString(setCheckpoint.getKeyArray()).getBytes("UTF-8"); messgeBytes = MAPPER.writeValueAsString(setCheckpoint.getMessageMap()).getBytes("UTF-8"); - } - else if (configPair.getKey().startsWith(CHANGELOGPREFIX)) { + } else if (configPair.getKey().startsWith(CHANGELOGPREFIX)) { String[] changelogInfo = configPair.getKey().split(":"); String changeLogPartition = configPair.getValue(); CoordinatorStreamMessage.SetChangelogMapping changelogMapping = new CoordinatorStreamMessage.SetChangelogMapping(changelogInfo[1], changelogInfo[2], Integer.parseInt(changeLogPartition)); keyBytes = MAPPER.writeValueAsString(changelogMapping.getKeyArray()).getBytes("UTF-8"); messgeBytes = MAPPER.writeValueAsString(changelogMapping.getMessageMap()).getBytes("UTF-8"); - } - else { - SetConfig setConfig = new SetConfig("source", configPair.getKey(), configPair.getValue()); + } else { + CoordinatorStreamMessage.SetConfig setConfig = new CoordinatorStreamMessage.SetConfig("source", configPair.getKey(), configPair.getValue()); keyBytes = MAPPER.writeValueAsString(setConfig.getKeyArray()).getBytes("UTF-8"); messgeBytes = MAPPER.writeValueAsString(setConfig.getMessageMap()).getBytes("UTF-8"); } @@ -106,23 +101,21 @@ public class MockCoordinatorStreamWrappedConsumer extends BlockingEnvelopeMap { @Override public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll( - Set<SystemStreamPartition> systemStreamPartitions, long timeout) - throws InterruptedException { + Set<SystemStreamPartition> systemStreamPartitions, long timeout) + throws InterruptedException { - if(blockpollFlag) { + if (blockpollFlag) { blockConsumerPoll.await(); } return super.poll(systemStreamPartitions, timeout); } - public CountDownLatch blockPool() - { + public CountDownLatch blockPool() { blockpollFlag = true; return blockConsumerPoll; } - public void stop() { - } + public void stop() {} } http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java index 15181bb..ac26a01 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java @@ -19,9 +19,11 @@ package org.apache.samza.coordinator.stream; -import static org.junit.Assert.*; -import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.Delete; -import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import org.junit.Test; public class TestCoordinatorStreamMessage { @@ -46,8 +48,8 @@ public class TestCoordinatorStreamMessage { @Test public void testSetConfig() { - SetConfig setConfig = new SetConfig("source", "key", "value"); - assertEquals(SetConfig.TYPE, setConfig.getType()); + CoordinatorStreamMessage.SetConfig setConfig = new CoordinatorStreamMessage.SetConfig("source", "key", "value"); + assertEquals(CoordinatorStreamMessage.SetConfig.TYPE, setConfig.getType()); assertEquals("key", setConfig.getKey()); assertEquals("value", setConfig.getConfigValue()); assertFalse(setConfig.isDelete()); @@ -56,7 +58,7 @@ public class TestCoordinatorStreamMessage { @Test public void testDelete() { - Delete delete = new Delete("source2", "key", "delete-type"); + CoordinatorStreamMessage.Delete delete = new CoordinatorStreamMessage.Delete("source2", "key", "delete-type"); assertEquals("delete-type", delete.getType()); assertEquals("key", delete.getKey()); assertNull(delete.getMessageMap()); http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java index 5e193f8..2fe872b 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java @@ -19,7 +19,10 @@ package org.apache.samza.coordinator.stream; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; import java.util.ArrayList; import java.util.HashMap; @@ -29,8 +32,6 @@ import java.util.Set; import org.apache.samza.Partition; import org.apache.samza.SamzaException; -import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.Delete; -import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig; import org.apache.samza.serializers.model.SamzaObjectMapper; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemConsumer; @@ -102,9 +103,9 @@ public class TestCoordinatorStreamSystemConsumer { if (pollCount++ == 0) { List<IncomingMessageEnvelope> list = new ArrayList<IncomingMessageEnvelope>(); - SetConfig setConfig1 = new SetConfig("test", "job.name", "my-job-name"); - SetConfig setConfig2 = new SetConfig("test", "job.id", "1234"); - Delete delete = new Delete("test", "job.name", SetConfig.TYPE); + CoordinatorStreamMessage.SetConfig setConfig1 = new CoordinatorStreamMessage.SetConfig("test", "job.name", "my-job-name"); + CoordinatorStreamMessage.SetConfig setConfig2 = new CoordinatorStreamMessage.SetConfig("test", "job.id", "1234"); + CoordinatorStreamMessage.Delete delete = new CoordinatorStreamMessage.Delete("test", "job.name", CoordinatorStreamMessage.SetConfig.TYPE); list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(setConfig1.getKeyArray()), serialize(setConfig1.getMessageMap()))); list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(setConfig2.getKeyArray()), serialize(setConfig2.getMessageMap()))); list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(delete.getKeyArray()), delete.getMessageMap())); http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java index 728fa53..68e3255 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java @@ -28,8 +28,6 @@ import java.util.List; import java.util.Map; import org.apache.samza.SamzaException; -import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.Delete; -import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig; import org.apache.samza.serializers.model.SamzaObjectMapper; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.OutgoingMessageEnvelope; @@ -47,9 +45,9 @@ public class TestCoordinatorStreamSystemProducer { MockSystemProducer systemProducer = new MockSystemProducer(source); MockSystemAdmin systemAdmin = new MockSystemAdmin(); CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(systemStream, systemProducer, systemAdmin); - SetConfig setConfig1 = new SetConfig(source, "job.name", "my-job-name"); - SetConfig setConfig2 = new SetConfig(source, "job.id", "1234"); - Delete delete = new Delete(source, "job.name", SetConfig.TYPE); + CoordinatorStreamMessage.SetConfig setConfig1 = new CoordinatorStreamMessage.SetConfig(source, "job.name", "my-job-name"); + CoordinatorStreamMessage.SetConfig setConfig2 = new CoordinatorStreamMessage.SetConfig(source, "job.id", "1234"); + CoordinatorStreamMessage.Delete delete = new CoordinatorStreamMessage.Delete(source, "job.name", CoordinatorStreamMessage.SetConfig.TYPE); assertFalse(systemProducer.isRegistered()); producer.register(source); assertTrue(systemProducer.isRegistered()); http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java index 72b134c..ad1fbc5 100644 --- a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java +++ b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java @@ -19,11 +19,9 @@ package org.apache.samza.serializers.model; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import org.apache.samza.Partition; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java index 2fb26e2..854ebbf 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java @@ -21,6 +21,6 @@ package org.apache.samza.storage.kv; import java.util.Iterator; -public interface KeyValueIterator<K,V> extends Iterator<Entry<K,V>> { +public interface KeyValueIterator<K, V> extends Iterator<Entry<K, V>> { public void close(); } http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java index d3616fe..8948453 100644 --- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java +++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java @@ -32,8 +32,6 @@ import org.apache.samza.config.MapConfig; import org.apache.samza.config.Log4jSystemConfig; import org.apache.samza.config.SerializerConfig; import org.apache.samza.config.ShellCommandConfig; -import org.apache.samza.config.StreamConfig; -import org.apache.samza.config.SystemConfig; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.job.model.JobModel; import org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerdeFactory; @@ -54,9 +52,9 @@ import org.apache.samza.util.Util; */ public class StreamAppender extends AppenderSkeleton { - private final String JAVA_OPTS_CONTAINER_NAME = "samza.container.name"; - private final String APPLICATION_MASTER_TAG = "samza-application-master"; - private final String SOURCE = "log4j-log"; + private static final String JAVA_OPTS_CONTAINER_NAME = "samza.container.name"; + private static final String APPLICATION_MASTER_TAG = "samza-application-master"; + private static final String SOURCE = "log4j-log"; private Config config = null; private SystemStream systemStream = null; private SystemProducer systemProducer = null; @@ -104,7 +102,7 @@ public class StreamAppender extends AppenderSkeleton { String systemName = log4jSystemConfig.getSystemName(); String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName); if (systemFactoryName != null) { - systemFactory = Util.<SystemFactory> getObj(systemFactoryName); + systemFactory = Util.<SystemFactory>getObj(systemFactoryName); } else { throw new SamzaException("Please define log4j system name and factory class"); } @@ -214,7 +212,7 @@ public class StreamAppender extends AppenderSkeleton { * @param streamName name of the stream */ private void setSerde(Log4jSystemConfig log4jSystemConfig, String systemName, String streamName) { - String serdeClass = LoggingEventJsonSerdeFactory.class.getCanonicalName();; + String serdeClass = LoggingEventJsonSerdeFactory.class.getCanonicalName(); String serdeName = log4jSystemConfig.getStreamSerdeName(systemName, streamName); if (serdeName != null) { @@ -222,7 +220,7 @@ public class StreamAppender extends AppenderSkeleton { } if (serdeClass != null) { - SerdeFactory<LoggingEvent> serdeFactory = Util.<SerdeFactory<LoggingEvent>> getObj(serdeClass); + SerdeFactory<LoggingEvent> serdeFactory = Util.<SerdeFactory<LoggingEvent>>getObj(serdeClass); serde = serdeFactory.getSerde(systemName, config); } else { String serdeKey = String.format(SerializerConfig.SERDE(), serdeName); http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java index 8d8f5e8..908a080 100644 --- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java +++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java @@ -32,8 +32,8 @@ import org.apache.samza.serializers.Serde; * LoggingEvent based on the messages, which is deserialized from the bytes. */ public class LoggingEventStringSerde implements Serde<LoggingEvent> { - final private String ENCODING = "UTF-8"; - final Logger logger = Logger.getLogger(LoggingEventStringSerde.class); + private static final String ENCODING = "UTF-8"; + private final Logger logger = Logger.getLogger(LoggingEventStringSerde.class); @Override public byte[] toBytes(LoggingEvent object) { http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java b/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java index 6314a3e..f7d3cbe 100644 --- a/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java +++ b/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java @@ -37,8 +37,8 @@ public class TestLog4jSystemConfig { @Test public void testGetSystemNames() { Map<String, String> map = new HashMap<String, String>(); - map.put("systems.system1.samza.factory","1"); - map.put("systems.system2.samza.factory","2"); + map.put("systems.system1.samza.factory", "1"); + map.put("systems.system2.samza.factory", "2"); Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(new MapConfig(map)); assertEquals(2, log4jSystemConfig.getSystemNames().size()); @@ -48,7 +48,7 @@ public class TestLog4jSystemConfig { public void testGetLog4jSystemName() { Map<String, String> map = new HashMap<String, String>(); map.put("task.log4j.system", "log4j-system"); - map.put("systems.system1.samza.factory","1"); + map.put("systems.system1.samza.factory", "1"); Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(new MapConfig(map)); assertEquals("log4j-system", log4jSystemConfig.getSystemName()); http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestJmxAppender.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestJmxAppender.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestJmxAppender.java index 0bdade0..4859125 100644 --- a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestJmxAppender.java +++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestJmxAppender.java @@ -48,24 +48,24 @@ import org.junit.Test; * with the JmxAppender added as a root-level appender. */ public class TestJmxAppender { - public static final int port = 5500; - public static final JMXServiceURL url = getJmxServiceURL(); + public static final int PORT = 5500; + public static final JMXServiceURL URL = getJmxServiceURL(); private static JMXConnectorServer cs = null; private static final Logger log = Logger.getLogger(TestJmxAppender.class.getName()); private static JMXServiceURL getJmxServiceURL() { try { - return new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:" + port + "/jmxapitestrmi"); - } catch(Exception e) { + return new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:" + PORT + "/jmxapitestrmi"); + } catch (Exception e) { throw new RuntimeException(e); } } @BeforeClass public static void beforeSetupServers() throws Exception { - LocateRegistry.createRegistry(port); + LocateRegistry.createRegistry(PORT); MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); - cs = JMXConnectorServerFactory.newJMXConnectorServer(url, null, mbeanServer); + cs = JMXConnectorServerFactory.newJMXConnectorServer(URL, null, mbeanServer); cs.start(); } @@ -78,7 +78,7 @@ public class TestJmxAppender { @Test public void testJmxAppender() throws Exception { - MBeanServerConnection mbserver = JMXConnectorFactory.connect(url).getMBeanServerConnection(); + MBeanServerConnection mbserver = JMXConnectorFactory.connect(URL).getMBeanServerConnection(); ObjectName objectName = new ObjectName(JmxAppender.JMX_OBJECT_DOMAIN + ":type=" + JmxAppender.JMX_OBJECT_TYPE + ",name=" + JmxAppender.JMX_OBJECT_NAME); String level = null; MockAppender mockAppender = new MockAppender(); http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java index 8c95211..b063366 100644 --- a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java +++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.Set; import org.apache.samza.Partition; -import org.apache.samza.SamzaException; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.SystemStreamPartition; http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java index 67e56e0..6fafabc 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java @@ -45,7 +45,7 @@ public class SimpleStatefulTask implements StreamTask, InitableTask { this.store = (KeyValueStore<String, String>) context.getStore("mystore"); System.out.println("Contents of store: "); KeyValueIterator<String, String> iter = store.all(); - while(iter.hasNext()) { + while (iter.hasNext()) { Entry<String, String> entry = iter.next(); System.out.println(entry.getKey() + " => " + entry.getValue()); } http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java b/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java index d7fecd8..1d524a8 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java @@ -33,10 +33,11 @@ import org.apache.samza.task.TaskCoordinator.RequestScope; * A simple performance test that just reads in messages and writes them to a state store as quickly as possible and periodically prints out throughput numbers */ public class StatePerfTestTask implements StreamTask, InitableTask { - + + private static final int LOG_INTERVAL = 100000; + private KeyValueStore<String, String> store; private int count = 0; - private int LOG_INTERVAL = 100000; private long start = System.currentTimeMillis(); @SuppressWarnings("unchecked") @@ -47,9 +48,9 @@ public class StatePerfTestTask implements StreamTask, InitableTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { store.put((String) envelope.getMessage(), (String) envelope.getMessage()); count++; - if(count % LOG_INTERVAL == 0) { - double ellapsedSecs = (System.currentTimeMillis() - start)/1000.0; - System.out.println(String.format("Throughput = %.2f messages/sec.", count/ellapsedSecs)); + if (count % LOG_INTERVAL == 0) { + double ellapsedSecs = (System.currentTimeMillis() - start) / 1000.0; + System.out.println(String.format("Throughput = %.2f messages/sec.", count / ellapsedSecs)); start = System.currentTimeMillis(); count = 0; coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER); http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java index 0598e51..ea3aeb1 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java @@ -39,7 +39,8 @@ public class Checker implements StreamTask, WindowableTask, InitableTask { private static Logger logger = LoggerFactory.getLogger(Checker.class); - private static String CURRENT_EPOCH = "current-epoch"; + private static final String CURRENT_EPOCH = "current-epoch"; + private KeyValueStore<String, String> store; private int expectedKeys; private int numPartitions; @@ -68,25 +69,25 @@ public class Checker implements StreamTask, WindowableTask, InitableTask { int count = 0; KeyValueIterator<String, String> iter = this.store.all(); - while(iter.hasNext()) { - Entry<String, String> entry= iter.next(); + while (iter.hasNext()) { + Entry<String, String> entry = iter.next(); String foundEpoch = entry.getValue(); - if(foundEpoch.equals(currentEpoch)) { - count += 1; + if (foundEpoch.equals(currentEpoch)) { + count += 1; } else { - logger.info("####### Found a different epoch! - " + foundEpoch + " Current epoch is " + currentEpoch); + logger.info("####### Found a different epoch! - " + foundEpoch + " Current epoch is " + currentEpoch); } } iter.close(); - if(count == expectedKeys + 1) { + if (count == expectedKeys + 1) { logger.info("Epoch " + currentEpoch + " is complete."); int nextEpoch = Integer.parseInt(currentEpoch) + 1; - for(int i = 0; i < numPartitions; i++) { - logger.info("Emitting next epoch - " + Integer.toString(i) + " -> " + Integer.toString(nextEpoch)); - collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "epoch"), Integer.toString(i), Integer.toString(nextEpoch))); + for (int i = 0; i < numPartitions; i++) { + logger.info("Emitting next epoch - " + Integer.toString(i) + " -> " + Integer.toString(nextEpoch)); + collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "epoch"), Integer.toString(i), Integer.toString(nextEpoch))); } this.store.put(CURRENT_EPOCH, Integer.toString(nextEpoch)); - } else if(count > expectedKeys + 1) { + } else if (count > expectedKeys + 1) { throw new IllegalStateException("Got " + count + " keys, which is more than the expected " + (expectedKeys + 1)); } else { logger.info("Only found " + count + " valid keys, try again later."); http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java index 82a633d..6661d8e 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java @@ -41,9 +41,9 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask { private static Logger logger = LoggerFactory.getLogger(Emitter.class); - private static String EPOCH = "the-epoch"; - private static String COUNT = "the-count"; - + private static final String EPOCH = "the-epoch"; + private static final String COUNT = "the-count"; + private KeyValueStore<String, String> state; private int max; private TaskName taskName; @@ -57,14 +57,14 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask { @Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { - if(envelope.getSystemStreamPartition().getStream().equals("epoch")) { + if (envelope.getSystemStreamPartition().getStream().equals("epoch")) { int newEpoch = Integer.parseInt((String) envelope.getMessage()); logger.info("New epoch in message - " + newEpoch); Integer epoch = getInt(EPOCH); - if(epoch == null || newEpoch == epoch) + if (epoch == null || newEpoch == epoch) return; - if(newEpoch < epoch) + if (newEpoch < epoch) throw new IllegalArgumentException("Got new epoch " + newEpoch + " which is less than current epoch " + epoch); // it's a new era, reset current epoch and count @@ -77,12 +77,12 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask { public void window(MessageCollector collector, TaskCoordinator coordinator) { Integer epoch = getInt(EPOCH); - if(epoch == null) { + if (epoch == null) { resetEpoch(); return; } int counter = getInt(COUNT); - if(counter < max) { + if (counter < max) { logger.info("Emitting: " + counter + ", epoch = " + epoch + ", task = " + taskName); OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(new SystemStream("kafka", "emitted"), Integer.toString(counter), epoch + "-" + taskName.toString()); collector.send(envelope); @@ -98,7 +98,7 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask { private Integer getInt(String key) { String value = this.state.get(key); - return value == null? null : Integer.parseInt(value); + return value == null ? null : Integer.parseInt(value); } } http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-test/src/main/java/org/apache/samza/test/integration/join/EpochPartitioner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/EpochPartitioner.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/EpochPartitioner.java deleted file mode 100644 index 438d77c..0000000 --- a/samza-test/src/main/java/org/apache/samza/test/integration/join/EpochPartitioner.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.test.integration.join; - -import kafka.producer.Partitioner; -import kafka.utils.VerifiableProperties; - -public class EpochPartitioner implements Partitioner { - - public EpochPartitioner(VerifiableProperties p){} - - public int partition(Object key, int numParts) { - if(key instanceof Integer) - return (Integer) key; - else - return Integer.parseInt((String) key); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java index d2c0c7e..d1dd1f8 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java @@ -19,7 +19,6 @@ package org.apache.samza.test.integration.join; -import java.util.Arrays; import java.util.HashSet; import java.util.Set; @@ -36,7 +35,6 @@ import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Int; @SuppressWarnings("unchecked") public class Joiner implements StreamTask, InitableTask { @@ -64,20 +62,20 @@ public class Joiner implements StreamTask, InitableTask { int partition = Integer.parseInt(pieces[1].split(" ")[1]); Partitions partitions = loadPartitions(epoch, key); logger.info("Joiner got epoch = " + epoch + ", partition = " + partition + ", parts = " + partitions); - if(partitions.epoch < epoch) { + if (partitions.epoch < epoch) { // we are in a new era - if(partitions.partitions.size() != expected) + if (partitions.partitions.size() != expected) throw new IllegalArgumentException("Should have " + expected + " partitions when new epoch starts."); logger.info("Reseting epoch to " + epoch); this.store.delete(key); partitions.epoch = epoch; partitions.partitions.clear(); partitions.partitions.add(partition); - } else if(partitions.epoch > epoch){ + } else if (partitions.epoch > epoch) { logger.info("Ignoring message for epoch " + epoch); } else { partitions.partitions.add(partition); - if(partitions.partitions.size() == expected) { + if (partitions.partitions.size() == expected) { logger.info("Completed: " + key + " -> " + Integer.toString(epoch)); collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "completed-keys"), key, Integer.toString(epoch))); } @@ -89,7 +87,7 @@ public class Joiner implements StreamTask, InitableTask { private Partitions loadPartitions(int epoch, String key) { String current = this.store.get(key); Partitions partitions; - if(current == null) + if (current == null) partitions = new Partitions(epoch, new HashSet<Integer>()); else partitions = Partitions.parse(current); @@ -109,7 +107,7 @@ public class Joiner implements StreamTask, InitableTask { String[] pieces = s.split("\\|", -1); int epoch = Integer.parseInt(pieces[1]); Set<Integer> set = new HashSet<Integer>(pieces.length); - for(int i = 2; i < pieces.length - 1; i++) + for (int i = 2; i < pieces.length - 1; i++) set.add(Integer.parseInt(pieces[i])); return new Partitions(epoch, set); } @@ -118,7 +116,7 @@ public class Joiner implements StreamTask, InitableTask { StringBuilder b = new StringBuilder("|"); b.append(epoch); b.append("|"); - for(int p: partitions) { + for (int p: partitions) { b.append(p); b.append("|"); } http://git-wip-us.apache.org/repos/asf/samza/blob/f0809a54/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java index 7c82e0a..4a299b6 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java @@ -47,7 +47,7 @@ public class Watcher implements StreamTask, WindowableTask, InitableTask { @Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { int epoch = Integer.parseInt((String) envelope.getMessage()); - if(epoch > currentEpoch) { + if (epoch > currentEpoch) { logger.info("Epoch changed to " + epoch + " from " + currentEpoch); this.currentEpoch = epoch; this.lastEpochChange = System.currentTimeMillis(); @@ -58,10 +58,10 @@ public class Watcher implements StreamTask, WindowableTask, InitableTask { @Override public void window(MessageCollector collector, TaskCoordinator coordinator) { boolean isLagging = System.currentTimeMillis() - lastEpochChange > maxTimeBetweenEpochsMs; - if(!inError && isLagging) { + if (!inError && isLagging) { this.inError = true; logger.info("Error state detected, alerting..."); - logger.error("Job failed to make progress!" + String.format("No epoch change for %d minutes.", this.maxTimeBetweenEpochsMs / (60*1000))); + logger.error("Job failed to make progress!" + String.format("No epoch change for %d minutes.", this.maxTimeBetweenEpochsMs / (60 * 1000))); } }
