http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/Records.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/Records.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/Records.scala index 5e11c34..7b863fa 100644 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/Records.scala +++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/Records.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,14 +17,18 @@ */ package io.gearpump.experiments.yarn.glue -import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, ApplicationSubmissionContext, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, NodeId => YarnNodeId, Resource => YarnResource} -import org.apache.hadoop.yarn.util.{Records => YarnRecords} +import scala.language.implicitConversions + +import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, ApplicationSubmissionContext, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, NodeId => YarnNodeId, Resource => YarnResource, YarnApplicationState} +import org.apache.hadoop.yarn.util.{Records => YarnRecords} object Records { def newRecord[T](clazz: Class[T]): T = YarnRecords.newRecord(clazz) - def newAppSubmissionContext = YarnRecords.newRecord(classOf[ApplicationSubmissionContext]) + def newAppSubmissionContext: ApplicationSubmissionContext = { + YarnRecords.newRecord(classOf[ApplicationSubmissionContext]) + } class ApplicationId(private[glue] val impl: YarnApplicationId) { def getId: Int = impl.getId @@ -38,9 +42,13 @@ object Records { false } } + + override def hashCode(): Int = { + impl.hashCode() + } } - object ApplicationId{ + object ApplicationId { def newInstance(timestamp: Long, id: Int): ApplicationId = { YarnApplicationId.newInstance(timestamp, id) } @@ -53,9 +61,9 @@ object Records { def getFinishTime: Long = impl.getFinishTime - def getOriginalTrackingUrl = impl.getOriginalTrackingUrl + def getOriginalTrackingUrl: String = impl.getOriginalTrackingUrl - def getYarnApplicationState = impl.getYarnApplicationState + def getYarnApplicationState: YarnApplicationState = impl.getYarnApplicationState override def toString: String = impl.toString } @@ -69,6 +77,10 @@ object Records { false } } + + override def hashCode(): Int = { + impl.hashCode() + } } object Resource { @@ -93,6 +105,10 @@ object Records { false } } + + override def hashCode(): Int = { + impl.hashCode() + } } class ContainerId(private[glue] val impl: YarnContainerId) { @@ -105,6 +121,10 @@ object Records { false } } + + override def hashCode(): Int = { + impl.hashCode() + } } object ContainerId { @@ -123,6 +143,10 @@ object Records { false } } + + override def hashCode(): Int = { + impl.hashCode() + } } class ContainerStatus(private[glue] val impl: YarnContainerStatus) { @@ -167,11 +191,13 @@ object Records { app.impl } - private[glue] implicit def yarnContainerStatusToContainerStatus(yarn: YarnContainerStatus): ContainerStatus = { + private[glue] implicit def yarnContainerStatusToContainerStatus(yarn: YarnContainerStatus) + : ContainerStatus = { new ContainerStatus(yarn) } - private[glue] implicit def containerStatusToYarnContainerStatus(app: ContainerStatus): YarnContainerStatus = { + private[glue] implicit def containerStatusToYarnContainerStatus(app: ContainerStatus) + : YarnContainerStatus = { app.impl }
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnClient.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnClient.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnClient.scala index 2719025..db7d5d7 100644 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnClient.scala +++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnClient.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,11 +18,12 @@ package io.gearpump.experiments.yarn.glue -import io.gearpump.experiments.yarn.glue.Records._ -import io.gearpump.util.LogUtil import org.apache.hadoop.yarn.api.records.YarnApplicationState import org.apache.hadoop.yarn.client.api +import io.gearpump.experiments.yarn.glue.Records._ +import io.gearpump.util.LogUtil + /** * Adapter for api.YarnClient */ @@ -36,7 +37,7 @@ class YarnClient(yarn: YarnConfig) { LOG.info("Starting YarnClient...") def createApplication: ApplicationId = { - val app = client.createApplication() + val app = client.createApplication() val response = app.getNewApplicationResponse() LOG.info("Create application, appId: " + response.getApplicationId()) response.getApplicationId() @@ -46,7 +47,9 @@ class YarnClient(yarn: YarnConfig) { client.getApplicationReport(appId) } - def submit(name: String, appId: ApplicationId, command: String, resource: Resource, queue: String, packagePath: String, configPath: String): ApplicationId = { + def submit( + name: String, appId: ApplicationId, command: String, resource: Resource, queue: String, + packagePath: String, configPath: String): ApplicationId = { val appContext = Records.newAppSubmissionContext appContext.setApplicationName(name) @@ -61,13 +64,13 @@ class YarnClient(yarn: YarnConfig) { client.submitApplication(appContext) } - def awaitApplication(appId: ApplicationId, timeoutMilliseconds: Long = Long.MaxValue): ApplicationReport = { - import YarnApplicationState._ + def awaitApplication(appId: ApplicationId, timeoutMilliseconds: Long = Long.MaxValue) + : ApplicationReport = { + import org.apache.hadoop.yarn.api.records.YarnApplicationState._ val terminated = Set(FINISHED, KILLED, FAILED, RUNNING) var result: ApplicationReport = null var done = false - val start = System.currentTimeMillis() def timeout: Boolean = { val now = System.currentTimeMillis() @@ -78,7 +81,7 @@ class YarnClient(yarn: YarnConfig) { } } - while(!done && !timeout) { + while (!done && !timeout) { val report = client.getApplicationReport(appId) val status = report.getYarnApplicationState if (terminated.contains(status)) { @@ -90,15 +93,13 @@ class YarnClient(yarn: YarnConfig) { } } - Console.println() - if (timeout) { throw new Exception(s"Launch Application $appId timeout...") } result } - def stop: Unit = { + def stop(): Unit = { client.stop() } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnConfig.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnConfig.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnConfig.scala index 688ffed..87f199a 100644 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnConfig.scala +++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnConfig.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,8 +18,6 @@ package io.gearpump.experiments.yarn.glue -import java.io.{OutputStream, OutputStreamWriter} - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.conf.YarnConfiguration http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/package.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/package.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/package.scala new file mode 100644 index 0000000..24adbaa --- /dev/null +++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/package.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.experiments.yarn + + +/** + * YARN facade to decouple Gearpump with YARN. + */ +package object glue { +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/CommandSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/CommandSpec.scala b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/CommandSpec.scala index 3f1aed1..2a6bf38 100644 --- a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/CommandSpec.scala +++ b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/CommandSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,9 +19,10 @@ package io.gearpump.experiments.yarn.appmaster import com.typesafe.config.ConfigFactory +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + import io.gearpump.cluster.TestUtil import io.gearpump.transport.HostPort -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} class CommandSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val config = ConfigFactory.parseString( @@ -68,21 +69,27 @@ class CommandSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val version = "gearpump-0.1" val master = MasterCommand(config, version, HostPort("127.0.0.1", 8080)) - val expected = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH -Dgearpump.cluster.masters.0=127.0.0.1:8080 -Dgearpump.hostname=127.0.0.1 -Dgearpump.master-resource-manager-container-id={{CONTAINER_ID}} -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> io.gearpump.cluster.main.Master -ip 127.0.0.1 -port 8080 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr" + // scalastyle:off line.size.limit + val expected = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH -Dgearpump.cluster.masters.0=127.0.0.1:8080 -Dgearpump.hostname=127.0.0.1 -Dgearpump.master-resource-manager-container-id={{CONTAINER_ID}} -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> io.gearpump.cluster.main.Master -ip 127.0.0.1 -port 8080 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr" + // scalastyle:on line.size.limit assert(master.get == expected) } "WorkerCommand" should "create correct command line" in { val version = "gearpump-0.1" val worker = WorkerCommand(config, version, HostPort("127.0.0.1", 8080), "worker-machine") - val expected = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH -Dgearpump.cluster.masters.0=127.0.0.1:8080 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.worker-resource-manager-container-id={{CONTAINER_ID}} -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname=worker-machine io.gearpump.cluster.main.Worker 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr" + // scalastyle:off line.size.limit + val expected = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH -Dgearpump.cluster.masters.0=127.0.0.1:8080 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.worker-resource-manager-container-id={{CONTAINER_ID}} -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname=worker-machine io.gearpump.cluster.main.Worker 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr" + // scalastyle:on line.size.limit assert(worker.get == expected) } "AppMasterCommand" should "create correct command line" in { val version = "gearpump-0.1" val appmaster = AppMasterCommand(config, version, Array("arg1", "arg2", "arg3")) - val expected = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/dashboard:pack/gearpump-0.1/lib/*:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/services/*:pack/gearpump-0.1/lib/yarn/*:$CLASSPATH -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.binary-version-with-scala-version=gearpump-0.1 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname={{NM_HOST}} io.gearpump.experiments.yarn.appmaster.YarnAppMaster arg1 arg2 arg3 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr" + // scalastyle:off line.size.limit + val expected = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/dashboard:pack/gearpump-0.1/lib/*:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/services/*:pack/gearpump-0.1/lib/yarn/*:$CLASSPATH -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.binary-version-with-scala-version=gearpump-0.1 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname={{NM_HOST}} io.gearpump.experiments.yarn.appmaster.YarnAppMaster arg1 arg2 arg3 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr" + // scalastyle:on line.size.limit assert(appmaster.get == expected) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala index d8150ac..f8f9fe8 100644 --- a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala +++ b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,25 +18,29 @@ package io.gearpump.experiments.yarn.appmaster +import scala.concurrent.Await +import scala.concurrent.duration.Duration + import akka.actor.{ActorRef, ActorSystem, Props} import akka.testkit.TestProbe import com.typesafe.config.ConfigFactory +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + import io.gearpump.cluster.TestUtil import io.gearpump.experiments.yarn.appmaster.UIServiceSpec.{Info, MockUI} import io.gearpump.transport.HostPort import io.gearpump.util.Constants -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} -class UIServiceSpec extends FlatSpec with Matchers with BeforeAndAfterAll { +class UIServiceSpec extends FlatSpec with Matchers with BeforeAndAfterAll { implicit var system: ActorSystem = null - override def beforeAll() = { + override def beforeAll(): Unit = { system = ActorSystem(getClass.getSimpleName, TestUtil.DEFAULT_CONFIG) } - override def afterAll() = { - system.shutdown() - system.awaitTermination() + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } it should "start UI server correctly" in { @@ -51,14 +55,15 @@ class UIServiceSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val ui = system.actorOf(Props(new MockUI(masters, host, port, probe.ref))) - probe.expectMsgPF(){ - case info: Info => + probe.expectMsgPF() { + case info: Info => { assert(info.masterHost == "127.0.0.1") assert(info.masterPort == 3000) val conf = ConfigFactory.parseFile(new java.io.File(info.configFile)) assert(conf.getString(Constants.GEARPUMP_SERVICE_HOST) == host) assert(conf.getString(Constants.GEARPUMP_SERVICE_HTTP) == "8091") assert(conf.getString(Constants.NETTY_TCP_HOSTNAME) == host) + } } system.stop(ui) @@ -70,9 +75,10 @@ object UIServiceSpec { case class Info(supervisor: String, masterHost: String, masterPort: Int, configFile: String) class MockUI(masters: List[HostPort], host: String, port: Int, probe: ActorRef) - extends UIService(masters, host, port) { + extends UIService(masters, host, port) { - override def launch(supervisor: String, masterHost: String, masterPort: Int, configFile: String): Unit = { + override def launch( + supervisor: String, masterHost: String, masterPort: Int, configFile: String): Unit = { probe ! Info(supervisor, masterHost, masterPort, configFile) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala index 880c1d0..84d6d37 100644 --- a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala +++ b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,9 +18,17 @@ package io.gearpump.experiments.yarn.appmaster +import scala.concurrent.Await +import scala.concurrent.duration.Duration + import akka.actor.{Actor, ActorSystem, Props} import akka.testkit.{TestActorRef, TestProbe} import com.typesafe.config.ConfigFactory +import org.mockito.ArgumentCaptor +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + import io.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, RemoveWorker} import io.gearpump.cluster.TestUtil import io.gearpump.experiments.yarn.Constants @@ -29,11 +37,6 @@ import io.gearpump.experiments.yarn.appmaster.YarnAppMasterSpec.UI import io.gearpump.experiments.yarn.glue.Records.{Container, Resource, _} import io.gearpump.experiments.yarn.glue.{NMClient, RMClient} import io.gearpump.transport.HostPort -import org.mockito.ArgumentCaptor -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} - class YarnAppMasterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { @@ -83,24 +86,23 @@ class YarnAppMasterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val packagePath = "/user/gearpump/gearpump.zip" val configPath = "/user/my/conf" - - override def beforeAll() = { + override def beforeAll(): Unit = { system = ActorSystem("test", config) } - override def afterAll() = { - system.shutdown() - system.awaitTermination() + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } - private def startAppMaster: (TestActorRef[_ <: Actor], TestProbe, NMClient, RMClient) = { + private def startAppMaster(): (TestActorRef[YarnAppMaster], TestProbe, NMClient, RMClient) = { val rmClient = mock(classOf[RMClient]) val nmClient = mock(classOf[NMClient]) val ui = mock(classOf[UIFactory]) when(ui.props(any[List[HostPort]], anyString, anyInt)).thenReturn(Props(new UI)) - - val appMaster = TestActorRef(Props(new YarnAppMaster(rmClient, nmClient, packagePath, configPath, ui))) + val appMaster = TestActorRef[YarnAppMaster](Props( + new YarnAppMaster(rmClient, nmClient, packagePath, configPath, ui))) verify(rmClient).start(appMaster) verify(nmClient).start(appMaster) @@ -118,48 +120,50 @@ class YarnAppMasterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { when(masterContainer.getNodeId).thenReturn(mockNode) when(masterContainer.getId).thenReturn(mockId) - // launch master + // Launchs master appMaster ! ContainersAllocated(List.fill(masterCount)(masterContainer)) - verify(nmClient, times(masterCount)).launchCommand(any[Container], anyString, anyString, anyString) + verify(nmClient, + times(masterCount)).launchCommand(any[Container], anyString, anyString, anyString) - // master containers started + // Master containers started (0 until masterCount).foreach(_ => appMaster ! ContainerStarted(mockId)) - //transition to start workers + // Transition to start workers val workerResources = ArgumentCaptor.forClass(classOf[List[Resource]]) verify(rmClient, times(2)).requestContainers(workerResources.capture()) assert(workerResources.getValue.size == workerCount) - // launch workers + // Launchs workers val workerContainer = mock(classOf[Container]) when(workerContainer.getNodeId).thenReturn(mockNode) val workerContainerId = ContainerId.fromString("container_1449802454214_0034_01_000006") when(workerContainer.getId).thenReturn(workerContainerId) appMaster ! ContainersAllocated(List.fill(workerCount)(workerContainer)) - verify(nmClient, times(workerCount + masterCount)).launchCommand(any[Container], anyString, anyString, anyString) + verify(nmClient, times(workerCount + masterCount)) + .launchCommand(any[Container], anyString, anyString, anyString) - // worker containers started + // Worker containers started (0 until workerCount).foreach(_ => appMaster ! ContainerStarted(mockId)) - // start UI server + // Starts UI server verify(ui, times(1)).props(any[List[HostPort]], anyString, anyInt) - //Application Ready... + // Application Ready... val client = TestProbe() - // get active config + // Gets active config appMaster.tell(GetActiveConfig("client"), client.ref) client.expectMsgType[ActiveConfig] - // query version + // Queries version appMaster.tell(QueryVersion, client.ref) client.expectMsgType[Version] - // query version + // Queries version appMaster.tell(QueryClusterInfo, client.ref) client.expectMsgType[ClusterInfo] - // add worker + // Adds worker val newWorkerCount = 2 appMaster.tell(AddWorker(newWorkerCount), client.ref) client.expectMsgType[CommandResult] @@ -167,18 +171,18 @@ class YarnAppMasterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { verify(rmClient, times(3)).requestContainers(newWorkerResources.capture()) assert(newWorkerResources.getValue.size == newWorkerCount) - // new container allocated + // New container allocated appMaster ! ContainersAllocated(List.fill(newWorkerCount)(workerContainer)) verify(nmClient, times(workerCount + masterCount + newWorkerCount)). launchCommand(any[Container], anyString, anyString, anyString) - // new worker containers started + // New worker containers started (0 until newWorkerCount).foreach(_ => appMaster ! ContainerStarted(mockId)) - // same UI server + // Same UI server verify(ui, times(1)).props(any[List[HostPort]], anyString, anyInt) - // remove worker + // Removes worker appMaster.tell(RemoveWorker(workerContainerId.toString), client.ref) client.expectMsgType[CommandResult] verify(nmClient).stopContainer(any[ContainerId], any[NodeId]) @@ -187,21 +191,22 @@ class YarnAppMasterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { } it should "start master, worker and UI on YARN" in { - val (appMaster, client, nmClient, rmClient) = startAppMaster + val env = startAppMaster() + val (appMaster, client, nmClient, rmClient) = env - // kill the app + // Kills the app appMaster.tell(Kill, client.ref) client.expectMsgType[CommandResult] verify(nmClient, times(1)).stop() verify(rmClient, times(1)).shutdownApplication() - } it should "handle resource manager errors" in { - val (appMaster, client, nmClient, rmClient) = startAppMaster + val env = startAppMaster() + val (appMaster, client, nmClient, rmClient) = env // on error - val ex = new Exception + val ex = new Exception("expected resource manager exception") appMaster.tell(ResourceManagerException(ex), client.ref) verify(nmClient, times(1)).stop() verify(rmClient, times(1)).failApplication(ex) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/LaunchClusterSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/LaunchClusterSpec.scala b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/LaunchClusterSpec.scala index c794cae..3bd7f4f 100644 --- a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/LaunchClusterSpec.scala +++ b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/LaunchClusterSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,24 +18,25 @@ package io.gearpump.experiments.yarn.client -import java.io.{OutputStream, ByteArrayOutputStream, BufferedOutputStream, FileOutputStream, InputStream, ByteArrayInputStream} +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream} +import java.util.Random import java.util.zip.{ZipEntry, ZipOutputStream} +import scala.concurrent.Await +import scala.concurrent.duration.Duration +import scala.util.Try import akka.actor.ActorSystem import akka.testkit.TestProbe import com.typesafe.config.ConfigFactory +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + import io.gearpump.cluster.TestUtil import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, GetActiveConfig} +import io.gearpump.experiments.yarn.glue.Records._ import io.gearpump.experiments.yarn.glue.{FileSystem, YarnClient, YarnConfig} import io.gearpump.util.FileUtils -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} -import org.mockito.ArgumentCaptor -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import java.util.Random -import scala.concurrent.Await -import scala.util.Try -import io.gearpump.experiments.yarn.glue.Records._ class LaunchClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { implicit var system: ActorSystem = null @@ -88,14 +89,13 @@ class LaunchClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { |} """.stripMargin).withFallback(TestUtil.DEFAULT_CONFIG) - - override def beforeAll() = { + override def beforeAll(): Unit = { system = ActorSystem(getClass.getSimpleName, akka) } - override def afterAll() = { - system.shutdown() - system.awaitTermination() + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } it should "reject non-zip files" in { @@ -109,7 +109,6 @@ class LaunchClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { assert(Try(launcher.submit("gearpump", packagePath)).isFailure) } - it should "reject if we cannot find the package file on HDFS" in { val yarnConfig = mock(classOf[YarnConfig]) val yarnClient = mock(classOf[YarnClient]) @@ -145,7 +144,8 @@ class LaunchClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val appMasterResolver = mock(classOf[AppMasterResolver]) val version = "gearpump-0.2" - val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver, version) + val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, + appMasterResolver, version) val packagePath = "gearpump.zip" when(fs.exists(anyString)).thenReturn(true) @@ -161,7 +161,8 @@ class LaunchClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val appMasterResolver = mock(classOf[AppMasterResolver]) val version = "gearpump-0.2" - val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver, version) + val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, + fs, system, appMasterResolver, version) val packagePath = "gearpump.zip" val out = mock(classOf[OutputStream]) @@ -178,12 +179,15 @@ class LaunchClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { when(yarnClient.createApplication).thenReturn(appId) assert(appId == launcher.submit("gearpump", packagePath)) - // 3 config files are uploaded to HDFS, one is akka.conf, one is yarn-site.xml, one is log4j.properties. + // 3 Config files are uploaded to HDFS, one is akka.conf, + // one is yarn-site.xml, one is log4j.properties. verify(fs, times(3)).create(anyString) verify(out, times(3)).close() - //val workerResources = ArgumentCaptor.forClass(classOf[List[Resource]]) - val expectedCommand = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.2/conf:pack/gearpump-0.2/dashboard:pack/gearpump-0.2/lib/*:pack/gearpump-0.2/lib/daemon/*:pack/gearpump-0.2/lib/services/*:pack/gearpump-0.2/lib/yarn/*:$CLASSPATH -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.2 -Dgearpump.binary-version-with-scala-version=gearpump-0.2 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname={{NM_HOST}} io.gearpump.experiments.yarn.appmaster.YarnAppMaster -conf /root/.gearpump_application_0_0000/conf/ -package gearpump.zip 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr" + // val workerResources = ArgumentCaptor.forClass(classOf[List[Resource]]) + // scalastyle:off line.size.limit + val expectedCommand = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.2/conf:pack/gearpump-0.2/dashboard:pack/gearpump-0.2/lib/*:pack/gearpump-0.2/lib/daemon/*:pack/gearpump-0.2/lib/services/*:pack/gearpump-0.2/lib/yarn/*:$CLASSPATH -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.2 -Dgearpump.binary-version-with-scala-version=gearpump-0.2 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname={{NM_HOST}} io.gearpump.experiments.yarn.appmaster.YarnAppMaster -conf /root/.gearpump_application_0_0000/conf/ -package gearpump.zip 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr" + // scalastyle:on line.size.limit verify(yarnClient).submit("gearpump", appId, expectedCommand, Resource.newInstance(512, 1), "default", "gearpump.zip", "/root/.gearpump_application_0_0000/conf/") @@ -197,7 +201,8 @@ class LaunchClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val appMaster = TestProbe() val version = "gearpump-0.2" - val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver, version) + val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, + appMasterResolver, version) val outputPath = java.io.File.createTempFile("LaunchClusterSpec", ".conf") when(appMasterResolver.resolve(any[ApplicationId], anyInt)).thenReturn(appMaster.ref) @@ -206,7 +211,7 @@ class LaunchClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { appMaster.reply(ActiveConfig(ConfigFactory.empty())) import scala.concurrent.duration._ - val file = Await.result(fileFuture, 30 seconds).asInstanceOf[java.io.File] + val file = Await.result(fileFuture, 30.seconds).asInstanceOf[java.io.File] assert(!FileUtils.read(file).isEmpty) file.delete() @@ -216,10 +221,10 @@ class LaunchClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val bytes = new ByteArrayOutputStream(1000) val zipOut = new ZipOutputStream(bytes) - // not available on BufferedOutputStream + // Not available on BufferedOutputStream zipOut.putNextEntry(new ZipEntry(s"$version/README.md")) zipOut.write("README".getBytes()) - // not available on BufferedOutputStream + // Not available on BufferedOutputStream zipOut.closeEntry() zipOut.close() new ByteArrayInputStream(bytes.toByteArray) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/ManageClusterSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/ManageClusterSpec.scala b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/ManageClusterSpec.scala index 07a07d7..b324ece 100644 --- a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/ManageClusterSpec.scala +++ b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/ManageClusterSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,9 +18,14 @@ package io.gearpump.experiments.yarn.client +import scala.concurrent.Await +import scala.concurrent.duration.Duration + import akka.actor.ActorSystem import akka.testkit.TestProbe import com.typesafe.config.ConfigFactory +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + import io.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, RemoveWorker} import io.gearpump.cluster.TestUtil import io.gearpump.cluster.main.ParseResult @@ -28,21 +33,18 @@ import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, Clust import io.gearpump.experiments.yarn.client.ManageCluster._ import io.gearpump.experiments.yarn.glue.Records.ApplicationId import io.gearpump.util.FileUtils -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} - -import scala.concurrent.Await class ManageClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { implicit var system: ActorSystem = null - override def beforeAll() = { + override def beforeAll(): Unit = { system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) } - override def afterAll() = { - system.shutdown() - system.awaitTermination() + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } it should "getConfig from remote Gearpump" in { @@ -52,11 +54,12 @@ class ManageClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val output = java.io.File.createTempFile("managerClusterSpec", ".conf") - val future = manager.command(GET_CONFIG, new ParseResult(Map("output" -> output.toString), Array.empty[String])) + val future = manager.command(GET_CONFIG, new ParseResult(Map("output" -> output.toString), + Array.empty[String])) appMaster.expectMsgType[GetActiveConfig] appMaster.reply(ActiveConfig(ConfigFactory.empty())) import scala.concurrent.duration._ - Await.result(future, 30 seconds) + Await.result(future, 30.seconds) val content = FileUtils.read(output) assert(content.length > 0) @@ -68,11 +71,12 @@ class ManageClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val appMaster = TestProbe() val manager = new ManageCluster(appId, appMaster.ref, system) - val future = manager.command(ADD_WORKER, new ParseResult(Map("count" -> 1.toString), Array.empty[String])) + val future = manager.command(ADD_WORKER, new ParseResult(Map("count" -> 1.toString), + Array.empty[String])) appMaster.expectMsg(AddWorker(1)) appMaster.reply(CommandResult(true)) import scala.concurrent.duration._ - val result = Await.result(future, 30 seconds).asInstanceOf[CommandResult] + val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult] assert(result.success) } @@ -81,11 +85,12 @@ class ManageClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val appMaster = TestProbe() val manager = new ManageCluster(appId, appMaster.ref, system) - val future = manager.command(REMOVE_WORKER, new ParseResult(Map("container" -> "1"), Array.empty[String])) + val future = manager.command(REMOVE_WORKER, new ParseResult(Map("container" -> "1"), + Array.empty[String])) appMaster.expectMsg(RemoveWorker("1")) appMaster.reply(CommandResult(true)) import scala.concurrent.duration._ - val result = Await.result(future, 30 seconds).asInstanceOf[CommandResult] + val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult] assert(result.success) } @@ -93,11 +98,12 @@ class ManageClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val appId = ApplicationId.newInstance(0L, 0) val appMaster = TestProbe() val manager = new ManageCluster(appId, appMaster.ref, system) - val future = manager.command(VERSION, new ParseResult(Map("container" -> "1"), Array.empty[String])) + val future = manager.command(VERSION, new ParseResult(Map("container" -> "1"), + Array.empty[String])) appMaster.expectMsg(QueryVersion) appMaster.reply(Version("version 0.1")) import scala.concurrent.duration._ - val result = Await.result(future, 30 seconds).asInstanceOf[Version] + val result = Await.result(future, 30.seconds).asInstanceOf[Version] assert(result.version == "version 0.1") } @@ -108,11 +114,12 @@ class ManageClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val output = java.io.File.createTempFile("managerClusterSpec", ".conf") - val future = manager.command(QUERY, new ParseResult(Map.empty[String, String], Array.empty[String])) + val future = manager.command(QUERY, new ParseResult(Map.empty[String, String], + Array.empty[String])) appMaster.expectMsg(QueryClusterInfo) appMaster.reply(ClusterInfo(List("master"), List("worker"))) import scala.concurrent.duration._ - val result = Await.result(future, 30 seconds).asInstanceOf[ClusterInfo] + val result = Await.result(future, 30.seconds).asInstanceOf[ClusterInfo] assert(result.masters.sameElements(List("master"))) assert(result.workers.sameElements(List("worker"))) } @@ -124,11 +131,12 @@ class ManageClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val output = java.io.File.createTempFile("managerClusterSpec", ".conf") - val future = manager.command(KILL, new ParseResult(Map("container" -> "1"), Array.empty[String])) + val future = manager.command(KILL, new ParseResult(Map("container" -> "1"), + Array.empty[String])) appMaster.expectMsg(Kill) appMaster.reply(CommandResult(true)) import scala.concurrent.duration._ - val result = Await.result(future, 30 seconds).asInstanceOf[CommandResult] + val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult] assert(result.success) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/README.md ---------------------------------------------------------------------- diff --git a/external/hadoopfs/README.md b/external/hadoopfs/README.md index 73eee9e..7a9aeef 100644 --- a/external/hadoopfs/README.md +++ b/external/hadoopfs/README.md @@ -30,7 +30,7 @@ This interface is used by SequenceFileSink to determinate the output format, the The HDFS cluster should run on where Gearpump is deployed. Suppose HDFS is installed at ```/usr/lib/hadoop2.6``` on every node and you already have your application built into a jar file. Then before submitting the application, you need to add Hdfs lib folder and conf folder into ```gearpump.executor.extraClasspath``` in ```conf/gear.conf```, for example ```/usr/lib/hadoop2.6/share/hadoop/common/*:/usr/lib/hadoop2.6/share/hadoop/common/lib/*:/usr/lib/hadoop2.6/share/hadoop/hdfs/*:/usr/lib/hadoop2.6/share/hadoop/hdfs/lib/*:/usr/lib/hadoop2.6/etc/conf```. -Please note only client side's configuration change is needed. After that, you are able to submmit the application. +Please note only client side's configuration change is needed. After that, you are able to submit the application. ## Working with Secured HDFS http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStore.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStore.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStore.scala index eb03e3c..194c9a5 100644 --- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStore.scala +++ b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStore.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,27 +18,30 @@ package io.gearpump.streaming.hadoop -import io.gearpump.streaming.hadoop.lib.{HadoopCheckpointStoreWriter, HadoopCheckpointStoreReader} -import io.gearpump.streaming.hadoop.lib.rotation.Rotation -import io.gearpump.streaming.transaction.api.CheckpointStore -import io.gearpump.TimeStamp -import io.gearpump.util.LogUtil import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.slf4j.Logger +import io.gearpump.TimeStamp +import io.gearpump.streaming.hadoop.lib.rotation.Rotation +import io.gearpump.streaming.hadoop.lib.{HadoopCheckpointStoreReader, HadoopCheckpointStoreWriter} +import io.gearpump.streaming.transaction.api.CheckpointStore +import io.gearpump.util.LogUtil + object HadoopCheckpointStore { val LOG: Logger = LogUtil.getLogger(classOf[HadoopCheckpointStore]) } - /** - * stores timestamp-checkpoint mapping to Hadoop-compatible filesystem - * store file layout - * timestamp1, index1, - * timestamp2, index2, - * ... - * timestampN, indexN + * Stores timestamp-checkpoint mapping to Hadoop-compatible filesystem. + * + * Store file layout: + * {{{ + * timestamp1, index1, + * timestamp2, index2, + * ... + * timestampN, indexN + * }}} */ class HadoopCheckpointStore( dir: Path, @@ -52,12 +55,15 @@ class HadoopCheckpointStore( private[hadoop] var curFile: Option[String] = None private[hadoop] var curWriter: Option[HadoopCheckpointStoreWriter] = None // regex (checkpoints-$startTime-$endTime.store) for complete checkpoint file, - private val compRegex = """checkpoints-(\d+)-(\d+).store""".r + private val compRegex = + """checkpoints-(\d+)-(\d+).store""".r // regex (checkpoints-$startTime.store) for temporary checkpoint file - private val tempRegex = """checkpoints-(\d+).store""".r + private val tempRegex = + """checkpoints-(\d+).store""".r /** - * persists a pair of timestamp and checkpoint, which + * Persists a pair of timestamp and checkpoint, which: + * * 1. creates a temporary checkpoint file, checkpoints-\$startTime.store, if not exist * 2. writes out (timestamp, checkpoint) and marks rotation * 3. rotates checkpoint file if needed @@ -70,7 +76,8 @@ class HadoopCheckpointStore( if (curWriter.isEmpty) { curStartTime = curTime curFile = Some(s"checkpoints-$curStartTime.store") - curWriter = curFile.map(file => new HadoopCheckpointStoreWriter(new Path(dir, file), hadoopConfig)) + curWriter = curFile.map(file => + new HadoopCheckpointStoreWriter(new Path(dir, file), hadoopConfig)) } curWriter.foreach { w => @@ -89,7 +96,8 @@ class HadoopCheckpointStore( } /** - * recovers checkpoint given timestamp, which + * Recovers checkpoint given timestamp, which + * {{{ * 1. returns None if no store exists * 2. searches checkpoint stores for * a. complete store checkpoints-\$startTime-\$endTime.store @@ -99,6 +107,7 @@ class HadoopCheckpointStore( * 3. renames store to checkpoints-\$startTime-\$endTime.store * 4. deletes all stores whose name has a startTime larger than timestamp * 5. looks for the checkpoint in the found store + * }}} */ override def recover(timestamp: TimeStamp): Option[Array[Byte]] = { var checkpoint: Option[Array[Byte]] = None @@ -152,4 +161,4 @@ class HadoopCheckpointStore( override def close(): Unit = { curWriter.foreach(_.close()) } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala index a6c6d9b..5a81ecd 100644 --- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala +++ b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,13 +20,14 @@ package io.gearpump.streaming.hadoop import java.io.{ObjectInputStream, ObjectOutputStream} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import io.gearpump.cluster.UserConfig import io.gearpump.streaming.hadoop.lib.HadoopUtil import io.gearpump.streaming.hadoop.lib.rotation.{FileSizeRotation, Rotation} import io.gearpump.streaming.task.TaskContext -import io.gearpump.streaming.transaction.api.{CheckpointStoreFactory, CheckpointStore} -import io.gearpump.cluster.UserConfig -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import io.gearpump.streaming.transaction.api.{CheckpointStore, CheckpointStoreFactory} object HadoopCheckpointStoreFactory { val VERSION = 1 @@ -37,7 +38,7 @@ class HadoopCheckpointStoreFactory( @transient private var hadoopConfig: Configuration, rotation: Rotation = new FileSizeRotation(128 * Math.pow(2, 20).toLong)) extends CheckpointStoreFactory { - import HadoopCheckpointStoreFactory._ + import io.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory._ private def writeObject(out: ObjectOutputStream): Unit = { out.defaultWriteObject() @@ -52,8 +53,9 @@ class HadoopCheckpointStoreFactory( override def getCheckpointStore(conf: UserConfig, taskContext: TaskContext): CheckpointStore = { import taskContext.{appId, taskId} - val dirPath = new Path(dir + Path.SEPARATOR + s"v$VERSION", s"app$appId-task${taskId.processorId}_${taskId.index}") + val dirPath = new Path(dir + Path.SEPARATOR + s"v$VERSION", + s"app$appId-task${taskId.processorId}_${taskId.index}") val fs = HadoopUtil.getFileSystemForPath(dirPath, hadoopConfig) new HadoopCheckpointStore(dirPath, fs, hadoopConfig, rotation) } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/SequenceFileSink.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/SequenceFileSink.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/SequenceFileSink.scala index bf4f2ea..a07dbbc 100644 --- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/SequenceFileSink.scala +++ b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/SequenceFileSink.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,6 +19,10 @@ package io.gearpump.streaming.hadoop import java.text.SimpleDateFormat +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hdfs.HdfsConfiguration +import org.apache.hadoop.io.SequenceFile + import io.gearpump.Message import io.gearpump.cluster.UserConfig import io.gearpump.streaming.hadoop.lib.HadoopUtil @@ -26,9 +30,6 @@ import io.gearpump.streaming.hadoop.lib.format.{DefaultSequenceFormatter, Output import io.gearpump.streaming.hadoop.lib.rotation.{FileSizeRotation, Rotation} import io.gearpump.streaming.sink.DataSink import io.gearpump.streaming.task.{TaskContext, TaskId} -import org.apache.hadoop.fs.Path -import org.apache.hadoop.hdfs.HdfsConfiguration -import org.apache.hadoop.io.SequenceFile class SequenceFileSink( userConfig: UserConfig, @@ -43,10 +44,12 @@ class SequenceFileSink( private var appName: String = null /** - * open connection to data sink - * invoked at onStart() method of [[Task]] - * @param context is the task context at runtime - */ + * Starts connection to data sink + * + * Invoked at onStart() method of [[io.gearpump.streaming.task.Task]] + * + * @param context is the task context at runtime + */ override def open(context: TaskContext): Unit = { HadoopUtil.login(userConfig, configuration) this.appName = context.appName @@ -55,10 +58,11 @@ class SequenceFileSink( } /** - * write message into data sink - * invoked at onNext() method of [[Task]] - * @param message wraps data to be written out - */ + * Writes message into data sink + * + * Invoked at onNext() method of [[io.gearpump.streaming.task.Task]] + * @param message wraps data to be written out + */ override def write(message: Message): Unit = { val key = sequenceFormat.getKey(message) val value = sequenceFormat.getValue(message) @@ -67,7 +71,7 @@ class SequenceFileSink( } writer.append(key, value) rotation.mark(message.timestamp, writer.getLength) - if(rotation.shouldRotate){ + if (rotation.shouldRotate) { closeWriter this.writer = getNextWriter rotation.rotate @@ -75,15 +79,16 @@ class SequenceFileSink( } /** - * close connection to data sink - * invoked at onClose() method of [[Task]] - */ + * Closes connection to data sink + * + * Invoked at onClose() method of [[io.gearpump.streaming.task.Task]] + */ override def close(): Unit = { - closeWriter + closeWriter() } - private def closeWriter: Unit = { - Option(writer).foreach{ w => + private def closeWriter(): Unit = { + Option(writer).foreach { w => w.hflush() w.close() } @@ -102,4 +107,4 @@ class SequenceFileSink( val base = new Path(basePath, s"$appName-task${taskId.processorId}_${taskId.index}") new Path(base, dateFormat.format(new java.util.Date)) } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala index 40ee2f5..52acbac 100644 --- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala +++ b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,18 +20,19 @@ package io.gearpump.streaming.hadoop.lib import java.io.EOFException -import io.gearpump.TimeStamp import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import io.gearpump.TimeStamp + class HadoopCheckpointStoreReader( path: Path, hadoopConfig: Configuration) - extends Iterator[(TimeStamp, Array[Byte])] { + extends Iterator[(TimeStamp, Array[Byte])] { private val stream = HadoopUtil.getInputStream(path, hadoopConfig) private var nextTimeStamp: Option[TimeStamp] = None - private var nextData : Option[Array[Byte]] = None + private var nextData: Option[Array[Byte]] = None override def hasNext: Boolean = { if (nextTimeStamp.isDefined) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala index f8d99ee..35f2f51 100644 --- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala +++ b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,10 +18,11 @@ package io.gearpump.streaming.hadoop.lib -import io.gearpump.TimeStamp import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import io.gearpump.TimeStamp + class HadoopCheckpointStoreWriter(path: Path, hadoopConfig: Configuration) { private lazy val stream = HadoopUtil.getOutputStream(path, hadoopConfig) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopUtil.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopUtil.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopUtil.scala index f1db219..eb579e4 100644 --- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopUtil.scala +++ b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopUtil.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,19 +19,20 @@ package io.gearpump.streaming.hadoop.lib import java.io.File -import io.gearpump.cluster.UserConfig -import io.gearpump.util.{FileUtils, Constants} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.security.UserGroupInformation +import io.gearpump.cluster.UserConfig +import io.gearpump.util.{Constants, FileUtils} + private[hadoop] object HadoopUtil { def getOutputStream(path: Path, hadoopConfig: Configuration): FSDataOutputStream = { val dfs = getFileSystemForPath(path, hadoopConfig) val stream: FSDataOutputStream = { if (dfs.isFile(path)) { - dfs.append(path) + dfs.append(path) } else { dfs.create(path) } @@ -56,12 +57,13 @@ private[hadoop] object HadoopUtil { } def login(userConfig: UserConfig, configuration: Configuration): Unit = { - if(UserGroupInformation.isSecurityEnabled) { + if (UserGroupInformation.isSecurityEnabled) { val principal = userConfig.getString(Constants.GEARPUMP_KERBEROS_PRINCIPAL) val keytabContent = userConfig.getBytes(Constants.GEARPUMP_KEYTAB_FILE) - if(principal.isEmpty || keytabContent.isEmpty) { + if (principal.isEmpty || keytabContent.isEmpty) { val errorMsg = s"HDFS is security enabled, user should provide kerberos principal in " + - s"${Constants.GEARPUMP_KERBEROS_PRINCIPAL} and keytab file in ${Constants.GEARPUMP_KEYTAB_FILE}" + s"${Constants.GEARPUMP_KERBEROS_PRINCIPAL} " + + s"and keytab file in ${Constants.GEARPUMP_KEYTAB_FILE}" throw new Exception(errorMsg) } val keytabFile = File.createTempFile("login", ".keytab") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala index 8b693d0..d19e71f 100644 --- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala +++ b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,10 +17,11 @@ */ package io.gearpump.streaming.hadoop.lib.format +import org.apache.hadoop.io.{LongWritable, Text, Writable} + import io.gearpump.Message -import org.apache.hadoop.io.{LongWritable, Writable, Text} -class DefaultSequenceFormatter extends OutputFormatter{ +class DefaultSequenceFormatter extends OutputFormatter { override def getKey(message: Message): Writable = new LongWritable(message.timestamp) override def getValue(message: Message): Writable = new Text(message.msg.asInstanceOf[String]) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala index bc385f0..fe8e52e 100644 --- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala +++ b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,10 +17,11 @@ */ package io.gearpump.streaming.hadoop.lib.format -import io.gearpump.Message import org.apache.hadoop.io.Writable -trait OutputFormatter extends Serializable{ +import io.gearpump.Message + +trait OutputFormatter extends Serializable { def getKeyClass: Class[_ <: Writable] def getValueClass: Class[_ <: Writable] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala index 104a7b9..cd83ea5 100644 --- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala +++ b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -30,11 +30,8 @@ case class FileSizeRotation(maxBytes: Long) extends Rotation { override def shouldRotate: Boolean = bytesWritten >= maxBytes - override def rotate: Unit = { + override def rotate(): Unit = { bytesWritten = 0L } } - - - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/Rotation.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/Rotation.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/Rotation.scala index c56dd65..e28b222 100644 --- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/Rotation.scala +++ b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/Rotation.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -23,5 +23,5 @@ import io.gearpump.TimeStamp trait Rotation extends Serializable { def mark(timestamp: TimeStamp, offset: Long): Unit def shouldRotate: Boolean - def rotate: Unit + def rotate(): Unit } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala b/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala index db5615d..cc8a5f0 100644 --- a/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala +++ b/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,11 +18,6 @@ package io.gearpump.streaming.hadoop -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.hadoop.lib.HadoopUtil -import io.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation -import io.gearpump.streaming.task.TaskId -import io.gearpump.cluster.UserConfig import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.mockito.Mockito._ @@ -31,7 +26,14 @@ import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} -class HadoopCheckpointStoreIntegrationSpec extends PropSpec with PropertyChecks with MockitoSugar with Matchers { +import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.MockUtil +import io.gearpump.streaming.hadoop.lib.HadoopUtil +import io.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation +import io.gearpump.streaming.task.TaskId + +class HadoopCheckpointStoreIntegrationSpec + extends PropSpec with PropertyChecks with MockitoSugar with Matchers { property("HadoopCheckpointStore should persist and recover checkpoints") { val fileSizeGen = Gen.chooseNum[Int](100, 1000) @@ -44,7 +46,8 @@ class HadoopCheckpointStoreIntegrationSpec extends PropSpec with PropertyChecks when(taskContext.taskId).thenReturn(TaskId(0, 0)) val rootDirName = "test" - val rootDir = new Path(rootDirName + Path.SEPARATOR + s"v${HadoopCheckpointStoreFactory.VERSION}") + val rootDir = new Path(rootDirName + Path.SEPARATOR + + s"v${HadoopCheckpointStoreFactory.VERSION}") val subDir = new Path(rootDir, "app0-task0_0") val fs = HadoopUtil.getFileSystemForPath(rootDir, hadoopConfig) @@ -60,7 +63,7 @@ class HadoopCheckpointStoreIntegrationSpec extends PropSpec with PropertyChecks val tempFile = new Path(subDir, "checkpoints-0.store") fs.exists(tempFile) shouldBe true - checkpointStore.persist(1L , Array.fill(fileSize)(0.toByte)) + checkpointStore.persist(1L, Array.fill(fileSize)(0.toByte)) fs.exists(tempFile) shouldBe false fs.exists(new Path(subDir, "checkpoints-0-1.store")) shouldBe true @@ -80,6 +83,4 @@ class HadoopCheckpointStoreIntegrationSpec extends PropSpec with PropertyChecks fs.close() } } - - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala b/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala index 9e1890e..9b4057c 100644 --- a/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala +++ b/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,11 +18,12 @@ package io.gearpump.streaming.hadoop.lib.rotation -import io.gearpump.TimeStamp import org.scalacheck.Gen import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} +import io.gearpump.TimeStamp + class FileSizeRotationSpec extends PropSpec with PropertyChecks with Matchers { val timestampGen = Gen.chooseNum[Long](0L, 1000L) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hbase/README.md ---------------------------------------------------------------------- diff --git a/external/hbase/README.md b/external/hbase/README.md index d4b8cd0..0e55a5d 100644 --- a/external/hbase/README.md +++ b/external/hbase/README.md @@ -10,7 +10,7 @@ The message type that HBaseSink is able to handle including: 2. Tuple4[Array[Byte], Array[Byte], Array[Byte], Array[Byte]] which means (rowKey, columnGroup, columnName, value) 3. Sequence of type 1 and 2 -Suppose there is a DataSource Task will output above-mentitioned messages, you can write a simple application then: +Suppose there is a DataSource Task will output above-mentioned messages, you can write a simple application then: ```scala val sink = new HBaseSink(UserConfig.empty, "$tableName") @@ -25,7 +25,7 @@ val application = StreamApplication("HBase", Graph(computation), UserConfig.empt The HBase cluster should run on where Gearpump is deployed. Suppose HBase is installed at ```/usr/lib/hbase``` on every node and you already have your application built into a jar file. Then before submitting the application, you need to add HBase lib folder and conf folder into ```gearpump.executor.extraClasspath``` in ```conf/gear.conf```, for example ```/usr/lib/hbase/lib/*:/usr/lib/hbase/conf```. -Please note only client side's configuration change is needed. After that, you are able to submmit the application. +Please note only client side's configuration change is needed. After that, you are able to submit the application. ## Working with Secured HBASE @@ -49,4 +49,3 @@ val application = StreamApplication("HBase", Graph(computation), UserConfig.empt Note here the keytab file set into config should be a byte array. - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hbase/src/main/scala/io/gearpump/external/hbase/HBaseSink.scala ---------------------------------------------------------------------- diff --git a/external/hbase/src/main/scala/io/gearpump/external/hbase/HBaseSink.scala b/external/hbase/src/main/scala/io/gearpump/external/hbase/HBaseSink.scala index 3503b7f..a477ba6 100644 --- a/external/hbase/src/main/scala/io/gearpump/external/hbase/HBaseSink.scala +++ b/external/hbase/src/main/scala/io/gearpump/external/hbase/HBaseSink.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,18 +19,21 @@ package io.gearpump.external.hbase import java.io.{File, ObjectInputStream, ObjectOutputStream} -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.sink.DataSink -import io.gearpump.streaming.task.TaskContext -import io.gearpump.util.{FileUtils, Constants} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hbase.client.{ConnectionFactory, Connection, Put} +import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} import org.apache.hadoop.security.UserGroupInformation -class HBaseSink(userconfig: UserConfig, tableName: String, @transient var configuration: Configuration) extends DataSink{ +import io.gearpump.Message +import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.sink.DataSink +import io.gearpump.streaming.task.TaskContext +import io.gearpump.util.{Constants, FileUtils} + +class HBaseSink( + userconfig: UserConfig, tableName: String, @transient var configuration: Configuration) + extends DataSink{ lazy val connection = HBaseSink.getConnection(userconfig, configuration) lazy val table = connection.getTable(TableName.valueOf(tableName)) @@ -41,10 +44,13 @@ class HBaseSink(userconfig: UserConfig, tableName: String, @transient var config } def insert(rowKey: String, columnGroup: String, columnName: String, value: String): Unit = { - insert(Bytes.toBytes(rowKey), Bytes.toBytes(columnGroup), Bytes.toBytes(columnName), Bytes.toBytes(value)) + insert(Bytes.toBytes(rowKey), Bytes.toBytes(columnGroup), + Bytes.toBytes(columnName), Bytes.toBytes(value)) } - def insert(rowKey: Array[Byte], columnGroup: Array[Byte], columnName: Array[Byte], value: Array[Byte]): Unit = { + def insert( + rowKey: Array[Byte], columnGroup: Array[Byte], columnName: Array[Byte], value: Array[Byte]) + : Unit = { val put = new Put(rowKey) put.addColumn(columnGroup, columnName, value) table.put(put) @@ -54,10 +60,20 @@ class HBaseSink(userconfig: UserConfig, tableName: String, @transient var config msg match { case seq: Seq[Any] => seq.foreach(put) - case tuple: (String, String, String, String) => - insert(tuple._1, tuple._2, tuple._3, tuple._4) - case tuple: (Array[Byte], Array[Byte], Array[Byte], Array[Byte]) => - insert(tuple._1, tuple._2, tuple._3, tuple._4) + case tuple: (_, _, _, _) => { + tuple._1 match { + case str: String => { + insert(tuple._1.asInstanceOf[String], tuple._2.asInstanceOf[String], + tuple._3.asInstanceOf[String], tuple._4.asInstanceOf[String]) + } + case byteArray: Array[Byte@unchecked] => { + insert(tuple._1.asInstanceOf[Array[Byte]], tuple._2.asInstanceOf[Array[Byte]], + tuple._3.asInstanceOf[Array[Byte]], tuple._4.asInstanceOf[Array[Byte]]) + } + case _ => + // Skip + } + } } } @@ -93,17 +109,19 @@ object HBaseSink { new HBaseSink(userconfig, tableName) } - def apply[T](userconfig: UserConfig, tableName: String, configuration: Configuration): HBaseSink = { + def apply[T](userconfig: UserConfig, tableName: String, configuration: Configuration) + : HBaseSink = { new HBaseSink(userconfig, tableName, configuration) } private def getConnection(userConfig: UserConfig, configuration: Configuration): Connection = { - if(UserGroupInformation.isSecurityEnabled) { + if (UserGroupInformation.isSecurityEnabled) { val principal = userConfig.getString(Constants.GEARPUMP_KERBEROS_PRINCIPAL) val keytabContent = userConfig.getBytes(Constants.GEARPUMP_KEYTAB_FILE) - if(principal.isEmpty || keytabContent.isEmpty) { + if (principal.isEmpty || keytabContent.isEmpty) { val errorMsg = s"HBase is security enabled, user should provide kerberos principal in " + - s"${Constants.GEARPUMP_KERBEROS_PRINCIPAL} and keytab file in ${Constants.GEARPUMP_KEYTAB_FILE}" + s"${Constants.GEARPUMP_KERBEROS_PRINCIPAL} and keytab file " + + s"in ${Constants.GEARPUMP_KEYTAB_FILE}" throw new Exception(errorMsg) } val keytabFile = File.createTempFile("login", ".keytab") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hbase/src/main/scala/io/gearpump/external/hbase/dsl/HBaseDSLSink.scala ---------------------------------------------------------------------- diff --git a/external/hbase/src/main/scala/io/gearpump/external/hbase/dsl/HBaseDSLSink.scala b/external/hbase/src/main/scala/io/gearpump/external/hbase/dsl/HBaseDSLSink.scala index 8d4746e..11a20dc 100644 --- a/external/hbase/src/main/scala/io/gearpump/external/hbase/dsl/HBaseDSLSink.scala +++ b/external/hbase/src/main/scala/io/gearpump/external/hbase/dsl/HBaseDSLSink.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,18 +17,25 @@ */ package io.gearpump.external.hbase.dsl +import scala.language.implicitConversions + import org.apache.hadoop.conf.Configuration + import io.gearpump.cluster.UserConfig import io.gearpump.external.hbase.HBaseSink import io.gearpump.streaming.dsl.Stream -import Stream.Sink +import io.gearpump.streaming.dsl.Stream.Sink +/** Create a HBase DSL Sink */ class HBaseDSLSink[T](stream: Stream[T]) { - def writeToHbase(userConfig: UserConfig, table: String, parallism: Int, description: String): Stream[T] = { + + def writeToHbase(userConfig: UserConfig, table: String, parallism: Int, description: String) + : Stream[T] = { stream.sink(HBaseSink[T](userConfig, table), parallism, userConfig, description) } - def writeToHbase(userConfig: UserConfig, configuration: Configuration, table: String, parallism: Int, description: String): Stream[T] = { + def writeToHbase(userConfig: UserConfig, configuration: Configuration, table: String, + parallism: Int, description: String): Stream[T] = { stream.sink(HBaseSink[T](userConfig, table, configuration), parallism, userConfig, description) } } @@ -37,4 +44,4 @@ object HBaseDSLSink { implicit def streamToHBaseDSLSink[T](stream: Stream[T]): HBaseDSLSink[T] = { new HBaseDSLSink[T](stream) } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hbase/src/test/scala/io/gearpump/external/hbase/HBaseSinkSpec.scala ---------------------------------------------------------------------- diff --git a/external/hbase/src/test/scala/io/gearpump/external/hbase/HBaseSinkSpec.scala b/external/hbase/src/test/scala/io/gearpump/external/hbase/HBaseSinkSpec.scala index a0fec76..cad6581 100644 --- a/external/hbase/src/test/scala/io/gearpump/external/hbase/HBaseSinkSpec.scala +++ b/external/hbase/src/test/scala/io/gearpump/external/hbase/HBaseSinkSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,29 +17,25 @@ */ package io.gearpump.external.hbase -import org.apache.hadoop.hbase.client.{HTable, Put} -import org.apache.hadoop.hbase.util.Bytes -import org.mockito.Mockito import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers { - property("HBaseSink should insert a row successfully") { - /* - import Mockito._ - val htable = Mockito.mock(classOf[HTable]) - val row = "row" - val group = "group" - val name = "name" - val value = "1.2" - val put = new Put(Bytes.toBytes(row)) - put.add(Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value)) - val hbaseSink = HBaseSink(htable) - hbaseSink.insert(put) - verify(htable).put(put) - */ + + // import Mockito._ + // val htable = Mockito.mock(classOf[HTable]) + // val row = "row" + // val group = "group" + // val name = "name" + // val value = "1.2" + // val put = new Put(Bytes.toBytes(row)) + // put.add(Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value)) + // val hbaseSink = HBaseSink(htable) + // hbaseSink.insert(put) + // verify(htable).put(put) + } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSink.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSink.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSink.scala index f947960..b482c7c 100644 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSink.scala +++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSink.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,12 +20,13 @@ package io.gearpump.streaming.kafka import java.util.Properties +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.serialization.ByteArraySerializer + +import io.gearpump.Message import io.gearpump.streaming.kafka.lib.KafkaUtil import io.gearpump.streaming.sink.DataSink import io.gearpump.streaming.task.TaskContext -import io.gearpump.Message -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import org.apache.kafka.common.serialization.ByteArraySerializer /** * kafka sink connectors that invokes org.apache.kafka.clients.producer.KafkaProducer to send @@ -33,7 +34,8 @@ import org.apache.kafka.common.serialization.ByteArraySerializer * @param getProducer is a function to construct a KafkaProducer * @param topic is the kafka topic to write to */ -class KafkaSink private[kafka](getProducer: () => KafkaProducer[Array[Byte], Array[Byte]], topic: String) extends DataSink { +class KafkaSink private[kafka]( + getProducer: () => KafkaProducer[Array[Byte], Array[Byte]], topic: String) extends DataSink { /** * @param topic producer topic @@ -55,7 +57,7 @@ class KafkaSink private[kafka](getProducer: () => KafkaProducer[Array[Byte], Arr this(topic, KafkaUtil.buildProducerConfig(bootstrapServers)) } - // lazily construct producer since KafkaProducer is not serializable + // Lazily construct producer since KafkaProducer is not serializable private lazy val producer = getProducer() override def open(context: TaskContext): Unit = {} @@ -63,7 +65,8 @@ class KafkaSink private[kafka](getProducer: () => KafkaProducer[Array[Byte], Arr override def write(message: Message): Unit = { val record = message.msg match { case (k, v) => - new ProducerRecord[Array[Byte], Array[Byte]](topic, k.asInstanceOf[Array[Byte]], v.asInstanceOf[Array[Byte]]) + new ProducerRecord[Array[Byte], Array[Byte]](topic, k.asInstanceOf[Array[Byte]], + v.asInstanceOf[Array[Byte]]) case v => new ProducerRecord[Array[Byte], Array[Byte]](topic, v.asInstanceOf[Array[Byte]]) }
