Updated Branches: refs/heads/master ac7f32c8c -> c64dfda7f
SAMZA-128; support scala 2.10 Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/c64dfda7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/c64dfda7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/c64dfda7 Branch: refs/heads/master Commit: c64dfda7f2df40a21521b49f79c4eec862c257c7 Parents: ac7f32c Author: Dan Di Spaltro <[email protected]> Authored: Wed Jan 15 09:18:05 2014 -0800 Committer: Chris Riccomini <[email protected]> Committed: Wed Jan 15 09:18:05 2014 -0800 ---------------------------------------------------------------------- README.md | 2 +- build.gradle | 14 +++++------ gradle/dependency-versions-scala-2.10.gradle | 7 ++++++ gradle/dependency-versions-scala-2.8.1.gradle | 1 + gradle/dependency-versions-scala-2.9.2.gradle | 1 + .../system/chooser/TieredPriorityChooser.scala | 2 +- .../lib/kafka_2.10-0.8.1-SNAPSHOT-test.jar | Bin 0 -> 913564 bytes samza-kafka/lib/kafka_2.10-0.8.1-SNAPSHOT.jar | Bin 0 -> 2856071 bytes .../samza/config/RegExTopicGenerator.scala | 5 +++- .../checkpoint/TestKafkaCheckpointManager.scala | 21 +++++------------ .../samza/config/TestRegExTopicGenerator.scala | 2 +- .../system/kafka/TestKafkaSystemAdmin.scala | 23 ++++--------------- .../test/integration/TestStatefulTask.scala | 21 +++-------------- 13 files changed, 36 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c64dfda7/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 3cc9e74..321dad7 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ To build Samza, run: #### Scala and YARN -Samza builds with [Scala](http://www.scala-lang.org/) 2.9.2 and [YARN](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html) 2.2.0, by default. Use the -PscalaVersion switches to change Scala versions. Samza supports building Scala with 2.8.1 or 2.9.2. +Samza builds with [Scala](http://www.scala-lang.org/) 2.9.2 and [YARN](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html) 2.2.0, by default. Use the -PscalaVersion switches to change Scala versions. Samza supports building Scala with 2.8.1, 2.9.2, or 2.10. ./gradlew -PscalaVersion=2.8.1 clean build http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c64dfda7/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 263511c..31c54be 100644 --- a/build.gradle +++ b/build.gradle @@ -51,7 +51,7 @@ project(":samza-core_$scalaVersion") { dependencies { compile project(':samza-api') - compile "org.scala-lang:scala-library:$scalaVersion" + compile "org.scala-lang:scala-library:$scalaLibVersion" compile "org.clapper:grizzled-slf4j_$scalaVersion:$grizzledVersion" compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion" compile "org.codehaus.jackson:jackson-jaxrs:$jacksonVersion" @@ -66,7 +66,7 @@ project(":samza-kafka_$scalaVersion") { compile project(':samza-api') compile project(":samza-core_$scalaVersion") compile project(":samza-serializers_$scalaVersion") - compile "org.scala-lang:scala-library:$scalaVersion" + compile "org.scala-lang:scala-library:$scalaLibVersion" compile "org.clapper:grizzled-slf4j_$scalaVersion:$grizzledVersion" compile "com.101tec:zkclient:$zkClientVersion" compile "org.codehaus.jackson:jackson-jaxrs:$jacksonVersion" @@ -97,7 +97,7 @@ project(":samza-serializers_$scalaVersion") { dependencies { compile project(':samza-api') compile project(":samza-core_$scalaVersion") - compile "org.scala-lang:scala-library:$scalaVersion" + compile "org.scala-lang:scala-library:$scalaLibVersion" compile "org.clapper:grizzled-slf4j_$scalaVersion:$grizzledVersion" compile "org.codehaus.jackson:jackson-jaxrs:$jacksonVersion" testCompile "junit:junit:$junitVersion" @@ -110,8 +110,8 @@ project(":samza-yarn_$scalaVersion") { dependencies { compile project(':samza-api') compile project(":samza-core_$scalaVersion") - compile "org.scala-lang:scala-library:$scalaVersion" - compile "org.scala-lang:scala-compiler:$scalaVersion" + compile "org.scala-lang:scala-library:$scalaLibVersion" + compile "org.scala-lang:scala-compiler:$scalaLibVersion" compile "org.clapper:grizzled-slf4j_$scalaVersion:$grizzledVersion" compile "org.codehaus.jackson:jackson-jaxrs:$jacksonVersion" compile "commons-httpclient:commons-httpclient:$commonsHttpClientVersion" @@ -165,7 +165,7 @@ project(":samza-kv_$scalaVersion") { dependencies { compile project(':samza-api') compile project(":samza-core_$scalaVersion") - compile "org.scala-lang:scala-library:$scalaVersion" + compile "org.scala-lang:scala-library:$scalaLibVersion" compile "org.clapper:grizzled-slf4j_$scalaVersion:$grizzledVersion" compile "org.fusesource.leveldbjni:leveldbjni-all:$leveldbVersion" testCompile "junit:junit:$junitVersion" @@ -181,7 +181,7 @@ project(":samza-test_$scalaVersion") { compile project(':samza-api') compile project(":samza-kv_$scalaVersion") compile project(":samza-core_$scalaVersion") - compile "org.scala-lang:scala-library:$scalaVersion" + compile "org.scala-lang:scala-library:$scalaLibVersion" compile "org.clapper:grizzled-slf4j_$scalaVersion:$grizzledVersion" compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion" compile "javax.mail:mail:1.4" http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c64dfda7/gradle/dependency-versions-scala-2.10.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependency-versions-scala-2.10.gradle b/gradle/dependency-versions-scala-2.10.gradle new file mode 100644 index 0000000..47de65a --- /dev/null +++ b/gradle/dependency-versions-scala-2.10.gradle @@ -0,0 +1,7 @@ +ext { + scalaVersion = "2.10" + scalaLibVersion = "2.10.2" + grizzledVersion = "1.0.1" + scalatraVersion = "2.2.1" + jettyVersion = "8.1.8.v20121106" +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c64dfda7/gradle/dependency-versions-scala-2.8.1.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependency-versions-scala-2.8.1.gradle b/gradle/dependency-versions-scala-2.8.1.gradle index 070c1ff..002688c 100644 --- a/gradle/dependency-versions-scala-2.8.1.gradle +++ b/gradle/dependency-versions-scala-2.8.1.gradle @@ -1,5 +1,6 @@ ext { scalaVersion = "2.8.1" + scalaLibVersion = "2.8.1" grizzledVersion = "0.6.10" scalatraVersion = "2.0.4" jettyVersion = "7.0.0.v20091005" http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c64dfda7/gradle/dependency-versions-scala-2.9.2.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependency-versions-scala-2.9.2.gradle b/gradle/dependency-versions-scala-2.9.2.gradle index e1e574e..e7b56a6 100644 --- a/gradle/dependency-versions-scala-2.9.2.gradle +++ b/gradle/dependency-versions-scala-2.9.2.gradle @@ -1,5 +1,6 @@ ext { scalaVersion = "2.9.2" + scalaLibVersion = "2.9.2" grizzledVersion = "0.6.10" scalatraVersion = "2.2.1" jettyVersion = "8.1.8.v20121106" http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c64dfda7/samza-core/src/main/scala/org/apache/samza/system/chooser/TieredPriorityChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/TieredPriorityChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/TieredPriorityChooser.scala index 94b2b66..24c6875 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/chooser/TieredPriorityChooser.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/TieredPriorityChooser.scala @@ -86,7 +86,7 @@ class TieredPriorityChooser( val prioritizedChoosers = choosers .keys .toList - .sort(_ > _) + .sortWith(_ > _) .map(choosers(_)) /** http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c64dfda7/samza-kafka/lib/kafka_2.10-0.8.1-SNAPSHOT-test.jar ---------------------------------------------------------------------- diff --git a/samza-kafka/lib/kafka_2.10-0.8.1-SNAPSHOT-test.jar b/samza-kafka/lib/kafka_2.10-0.8.1-SNAPSHOT-test.jar new file mode 100644 index 0000000..8079578 Binary files /dev/null and b/samza-kafka/lib/kafka_2.10-0.8.1-SNAPSHOT-test.jar differ http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c64dfda7/samza-kafka/lib/kafka_2.10-0.8.1-SNAPSHOT.jar ---------------------------------------------------------------------- diff --git a/samza-kafka/lib/kafka_2.10-0.8.1-SNAPSHOT.jar b/samza-kafka/lib/kafka_2.10-0.8.1-SNAPSHOT.jar new file mode 100644 index 0000000..88d81b2 Binary files /dev/null and b/samza-kafka/lib/kafka_2.10-0.8.1-SNAPSHOT.jar differ http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c64dfda7/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala index 18b49a1..90e4041 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala @@ -29,6 +29,7 @@ import scala.collection._ import org.apache.samza.config.TaskConfig.Config2Task import org.apache.samza.system.SystemStream import org.apache.samza.util.Util +import scala.util.Sorting /** * Dynamically determine the Kafka topics to use as input streams to the task via a regular expression. @@ -86,7 +87,9 @@ class RegExTopicGenerator extends ConfigRewriter with Logging { info("Generated config values for %d new topics" format newInputStreams.size) val inputStreams = TaskConfig.INPUT_STREAMS -> (existingInputStreams ++ newInputStreams) - .map(Util.getNameFromSystemStream(_)) + .map(Util.getNameFromSystemStream) + .toArray + .sortWith(_ < _) .mkString(",") new MapConfig((keysAndValsToAdd ++ config) += inputStreams) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c64dfda7/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala index cad2231..3f5a609 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala @@ -55,20 +55,11 @@ object TestKafkaCheckpointManager { val (port1, port2, port3) = (ports(0), ports(1), ports(2)) val props1 = TestUtils.createBrokerConfig(brokerId1, port1) - val config1 = new KafkaConfig(props1) { - override val hostName = "localhost" - override val numPartitions = 1 - } + props1.put("controlled.shutdown.enable", "true") val props2 = TestUtils.createBrokerConfig(brokerId2, port2) - val config2 = new KafkaConfig(props2) { - override val hostName = "localhost" - override val numPartitions = 1 - } + props1.put("controlled.shutdown.enable", "true") val props3 = TestUtils.createBrokerConfig(brokerId3, port3) - val config3 = new KafkaConfig(props3) { - override val hostName = "localhost" - override val numPartitions = 1 - } + props1.put("controlled.shutdown.enable", "true") val config = new java.util.Properties() val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3) @@ -88,9 +79,9 @@ object TestKafkaCheckpointManager { @BeforeClass def beforeSetupServers { zookeeper = new EmbeddedZookeeper(zkConnect) - server1 = TestUtils.createServer(config1) - server2 = TestUtils.createServer(config2) - server3 = TestUtils.createServer(config3) + server1 = TestUtils.createServer(new KafkaConfig(props1)) + server2 = TestUtils.createServer(new KafkaConfig(props2)) + server3 = TestUtils.createServer(new KafkaConfig(props3)) metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name") } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c64dfda7/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala index d52a7e4..77cdbe3 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala @@ -53,7 +53,7 @@ class TestRegExTopicGenerator { val rewritten = rewriter.rewrite(REWRITER_NAME, config) val expected = Map( - "task.inputs" -> "test.scaredycat,test.crazycat", + "task.inputs" -> "test.crazycat,test.scaredycat", "systems.test.streams.scaredycat.ford" -> "mustang", "systems.test.streams.scaredycat.alfa.romeo" -> "spider", "systems.test.streams.scaredycat.b.triumph" -> "spitfire", http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c64dfda7/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala index 9f43585..e010c5a 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala @@ -61,30 +61,15 @@ object TestKafkaSystemAdmin { val (port1, port2, port3) = (ports(0), ports(1), ports(2)) val props1 = TestUtils.createBrokerConfig(brokerId1, port1) - val config1 = new KafkaConfig(props1) { - override val hostName = "localhost" - override val numPartitions = 1 - override val zkConnect = TestKafkaSystemAdmin.zkConnect + "/" - } val props2 = TestUtils.createBrokerConfig(brokerId2, port2) - val config2 = new KafkaConfig(props2) { - override val hostName = "localhost" - override val numPartitions = 1 - override val zkConnect = TestKafkaSystemAdmin.zkConnect + "/" - } val props3 = TestUtils.createBrokerConfig(brokerId3, port3) - val config3 = new KafkaConfig(props3) { - override val hostName = "localhost" - override val numPartitions = 1 - override val zkConnect = TestKafkaSystemAdmin.zkConnect + "/" - } val config = new java.util.Properties() val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3) config.put("metadata.broker.list", brokers) config.put("producer.type", "sync") config.put("request.required.acks", "-1") - config.put("serializer.class", "kafka.serializer.StringEncoder"); + config.put("serializer.class", "kafka.serializer.StringEncoder") val producerConfig = new ProducerConfig(config) var producer: Producer[String, String] = null var zookeeper: EmbeddedZookeeper = null @@ -96,9 +81,9 @@ object TestKafkaSystemAdmin { @BeforeClass def beforeSetupServers { zookeeper = new EmbeddedZookeeper(zkConnect) - server1 = TestUtils.createServer(config1) - server2 = TestUtils.createServer(config2) - server3 = TestUtils.createServer(config3) + server1 = TestUtils.createServer(new KafkaConfig(props1)) + server2 = TestUtils.createServer(new KafkaConfig(props2)) + server3 = TestUtils.createServer(new KafkaConfig(props3)) zkClient = new ZkClient(zkConnect + "/", 6000, 6000, ZKStringSerializer) producer = new Producer(producerConfig) metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name") http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c64dfda7/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala index 55c24f8..ae3f663 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala @@ -90,23 +90,8 @@ object TestStatefulTask { val (port1, port2, port3) = (ports(0), ports(1), ports(2)) val props1 = TestUtils.createBrokerConfig(brokerId1, port1) - val config1 = new KafkaConfig(props1) { - override val hostName = "localhost" - override val numPartitions = 1 - override val zkConnect = TestStatefulTask.zkConnect + "/" - } val props2 = TestUtils.createBrokerConfig(brokerId2, port2) - val config2 = new KafkaConfig(props2) { - override val hostName = "localhost" - override val numPartitions = 1 - override val zkConnect = TestStatefulTask.zkConnect + "/" - } val props3 = TestUtils.createBrokerConfig(brokerId3, port3) - val config3 = new KafkaConfig(props3) { - override val hostName = "localhost" - override val numPartitions = 1 - override val zkConnect = TestStatefulTask.zkConnect + "/" - } val config = new java.util.Properties() val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3) @@ -127,9 +112,9 @@ object TestStatefulTask { @BeforeClass def beforeSetupServers { zookeeper = new EmbeddedZookeeper(zkConnect) - server1 = TestUtils.createServer(config1) - server2 = TestUtils.createServer(config2) - server3 = TestUtils.createServer(config3) + server1 = TestUtils.createServer(new KafkaConfig(props1)) + server2 = TestUtils.createServer(new KafkaConfig(props2)) + server3 = TestUtils.createServer(new KafkaConfig(props3)) zkClient = new ZkClient(zkConnect + "/", 6000, 6000, ZKStringSerializer) producer = new Producer(producerConfig) metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
