http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/LaunchClusterSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/LaunchClusterSpec.scala b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/LaunchClusterSpec.scala new file mode 100644 index 0000000..c0a4ee2 --- /dev/null +++ b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/LaunchClusterSpec.scala @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.yarn.client + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream} +import java.util.Random +import java.util.zip.{ZipEntry, ZipOutputStream} + +import akka.actor.ActorSystem +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, GetActiveConfig} +import org.apache.gearpump.experiments.yarn.glue.Records._ +import org.apache.gearpump.experiments.yarn.glue.{FileSystem, YarnClient, YarnConfig} +import org.apache.gearpump.util.FileUtils +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import scala.concurrent.Await +import scala.concurrent.duration.Duration +import scala.util.Try +class LaunchClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + implicit var system: ActorSystem = null + + val rand = new Random() + + private def randomArray(size: Int): Array[Byte] = { + val array = new Array[Byte](size) + rand.nextBytes(array) + array + } + val appId = ApplicationId.newInstance(0L, 0) + + val akka = ConfigFactory.parseString( + + """ + |gearpump { + | yarn { + | client { + | package -path = "/user/gearpump/gearpump.zip" + | } + | + | applicationmaster { + | ## Memory of YarnAppMaster + | command = "$JAVA_HOME/bin/java -Xmx512m" + | memory = "512" + | vcores = "1" + | queue = "default" + | } + | + | master { + | ## Memory of master daemon + | command = "$JAVA_HOME/bin/java -Xmx512m" + | containers = "2" + | memory = "512" + | vcores = "1" + | } + | + | worker { + | ## memory of worker daemon + | command = "$JAVA_HOME/bin/java -Xmx512m" + | containers = "4" + | ## This also contains all memory for child executors. + | memory = "4096" + | vcores = "1" + | } + | services { + | enabled = true + | } + | } + |} + """.stripMargin).withFallback(TestUtil.DEFAULT_CONFIG) + + override def beforeAll(): Unit = { + system = ActorSystem(getClass.getSimpleName, akka) + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + it should "reject non-zip files" in { + val yarnConfig = mock(classOf[YarnConfig]) + val yarnClient = mock(classOf[YarnClient]) + val fs = mock(classOf[FileSystem]) + val appMasterResolver = mock(classOf[AppMasterResolver]) + + val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver) + val packagePath = "gearpump.zip2" + assert(Try(launcher.submit("gearpump", packagePath)).isFailure) + } + + it should "reject if we cannot find the package file on HDFS" in { + val yarnConfig = mock(classOf[YarnConfig]) + val yarnClient = mock(classOf[YarnClient]) + val fs = mock(classOf[FileSystem]) + val appMasterResolver = mock(classOf[AppMasterResolver]) + + val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver) + val packagePath = "gearpump.zip" + when(fs.exists(anyString)).thenReturn(false) + assert(Try(launcher.submit("gearpump", packagePath)).isFailure) + } + + it should "throw when package exists on HDFS, but the file is corrupted" in { + val yarnConfig = mock(classOf[YarnConfig]) + val yarnClient = mock(classOf[YarnClient]) + val fs = mock(classOf[FileSystem]) + val appMasterResolver = mock(classOf[AppMasterResolver]) + + val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver) + val packagePath = "gearpump.zip" + when(fs.exists(anyString)).thenReturn(true) + + val content = new ByteArrayInputStream(randomArray(10)) + when(fs.open(anyString)).thenReturn(content) + assert(Try(launcher.submit("gearpump", packagePath)).isFailure) + content.close() + } + + it should "throw when the HDFS package version is not consistent with local version" in { + val yarnConfig = mock(classOf[YarnConfig]) + val yarnClient = mock(classOf[YarnClient]) + val fs = mock(classOf[FileSystem]) + val appMasterResolver = mock(classOf[AppMasterResolver]) + + val version = "gearpump-0.2" + val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, + appMasterResolver, version) + val packagePath = "gearpump.zip" + when(fs.exists(anyString)).thenReturn(true) + + val oldVesion = "gearpump-0.1" + when(fs.open(anyString)).thenReturn(zipInputStream(oldVesion)) + assert(Try(launcher.submit("gearpump", packagePath)).isFailure) + } + + it should "upload config file to HDFS when submitting" in { + val yarnConfig = mock(classOf[YarnConfig]) + val yarnClient = mock(classOf[YarnClient]) + val fs = mock(classOf[FileSystem]) + val appMasterResolver = mock(classOf[AppMasterResolver]) + + val version = "gearpump-0.2" + val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, + fs, system, appMasterResolver, version) + val packagePath = "gearpump.zip" + + val out = mock(classOf[OutputStream]) + when(fs.exists(anyString)).thenReturn(true) + when(fs.create(anyString)).thenReturn(out) + when(fs.getHomeDirectory).thenReturn("/root") + + when(fs.open(anyString)).thenReturn(zipInputStream(version)) + + val report = mock(classOf[ApplicationReport]) + when(yarnClient.awaitApplication(any[ApplicationId], anyLong())).thenReturn(report) + + when(report.getApplicationId).thenReturn(appId) + when(yarnClient.createApplication).thenReturn(appId) + assert(appId == launcher.submit("gearpump", packagePath)) + + // 3 Config files are uploaded to HDFS, one is akka.conf, + // one is yarn-site.xml, one is log4j.properties. + verify(fs, times(3)).create(anyString) + verify(out, times(3)).close() + + // val workerResources = ArgumentCaptor.forClass(classOf[List[Resource]]) + // scalastyle:off line.size.limit + val expectedCommand = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.2/conf:pack/gearpump-0.2/dashboard:pack/gearpump-0.2/lib/*:pack/gearpump-0.2/lib/daemon/*:pack/gearpump-0.2/lib/services/*:pack/gearpump-0.2/lib/yarn/*:$CLASSPATH -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.2 -Dgearpump.binary-version-with-scala-version=gearpump-0.2 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname={{NM_HOST}} org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster -conf /root/.gearpump_application_0_0000/conf/ -package gearpump.zip 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr" + // scalastyle:on line.size.limit + verify(yarnClient).submit("gearpump", appId, expectedCommand, + Resource.newInstance(512, 1), "default", + "gearpump.zip", "/root/.gearpump_application_0_0000/conf/") + } + + it should "save active config from Gearpump cluster" in { + val yarnConfig = mock(classOf[YarnConfig]) + val yarnClient = mock(classOf[YarnClient]) + val fs = mock(classOf[FileSystem]) + val appMasterResolver = mock(classOf[AppMasterResolver]) + val appMaster = TestProbe() + + val version = "gearpump-0.2" + val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, + appMasterResolver, version) + val outputPath = java.io.File.createTempFile("LaunchClusterSpec", ".conf") + + when(appMasterResolver.resolve(any[ApplicationId], anyInt)).thenReturn(appMaster.ref) + val fileFuture = launcher.saveConfig(appId, outputPath.getPath) + appMaster.expectMsgType[GetActiveConfig] + appMaster.reply(ActiveConfig(ConfigFactory.empty())) + + import scala.concurrent.duration._ + val file = Await.result(fileFuture, 30.seconds).asInstanceOf[java.io.File] + + assert(!FileUtils.read(file).isEmpty) + file.delete() + } + + private def zipInputStream(version: String): InputStream = { + val bytes = new ByteArrayOutputStream(1000) + val zipOut = new ZipOutputStream(bytes) + + // Not available on BufferedOutputStream + zipOut.putNextEntry(new ZipEntry(s"$version/README.md")) + zipOut.write("README".getBytes()) + // Not available on BufferedOutputStream + zipOut.closeEntry() + zipOut.close() + new ByteArrayInputStream(bytes.toByteArray) + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/ManageClusterSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/ManageClusterSpec.scala b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/ManageClusterSpec.scala new file mode 100644 index 0000000..01960ad --- /dev/null +++ b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/ManageClusterSpec.scala @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.yarn.client + +import akka.actor.ActorSystem +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory +import org.apache.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, RemoveWorker} +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.cluster.main.ParseResult +import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, ClusterInfo, GetActiveConfig, Kill, QueryClusterInfo, QueryVersion, Version} +import org.apache.gearpump.experiments.yarn.client.ManageCluster._ +import org.apache.gearpump.experiments.yarn.glue.Records.ApplicationId +import org.apache.gearpump.util.FileUtils +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +class ManageClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + + implicit var system: ActorSystem = null + + override def beforeAll(): Unit = { + system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + it should "getConfig from remote Gearpump" in { + val appId = ApplicationId.newInstance(0L, 0) + val appMaster = TestProbe() + val manager = new ManageCluster(appId, appMaster.ref, system) + + val output = java.io.File.createTempFile("managerClusterSpec", ".conf") + + val future = manager.command(GET_CONFIG, new ParseResult(Map("output" -> output.toString), + Array.empty[String])) + appMaster.expectMsgType[GetActiveConfig] + appMaster.reply(ActiveConfig(ConfigFactory.empty())) + import scala.concurrent.duration._ + Await.result(future, 30.seconds) + + val content = FileUtils.read(output) + assert(content.length > 0) + output.delete() + } + + it should "addworker" in { + val appId = ApplicationId.newInstance(0L, 0) + val appMaster = TestProbe() + val manager = new ManageCluster(appId, appMaster.ref, system) + + val future = manager.command(ADD_WORKER, new ParseResult(Map("count" -> 1.toString), + Array.empty[String])) + appMaster.expectMsg(AddWorker(1)) + appMaster.reply(CommandResult(true)) + import scala.concurrent.duration._ + val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult] + assert(result.success) + } + + it should "removeworker" in { + val appId = ApplicationId.newInstance(0L, 0) + val appMaster = TestProbe() + val manager = new ManageCluster(appId, appMaster.ref, system) + + val future = manager.command(REMOVE_WORKER, new ParseResult(Map("container" -> "1"), + Array.empty[String])) + appMaster.expectMsg(RemoveWorker("1")) + appMaster.reply(CommandResult(true)) + import scala.concurrent.duration._ + val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult] + assert(result.success) + } + + it should "get version" in { + val appId = ApplicationId.newInstance(0L, 0) + val appMaster = TestProbe() + val manager = new ManageCluster(appId, appMaster.ref, system) + val future = manager.command(VERSION, new ParseResult(Map("container" -> "1"), + Array.empty[String])) + appMaster.expectMsg(QueryVersion) + appMaster.reply(Version("version 0.1")) + import scala.concurrent.duration._ + val result = Await.result(future, 30.seconds).asInstanceOf[Version] + assert(result.version == "version 0.1") + } + + it should "get cluster info" in { + val appId = ApplicationId.newInstance(0L, 0) + val appMaster = TestProbe() + val manager = new ManageCluster(appId, appMaster.ref, system) + + val output = java.io.File.createTempFile("managerClusterSpec", ".conf") + + val future = manager.command(QUERY, new ParseResult(Map.empty[String, String], + Array.empty[String])) + appMaster.expectMsg(QueryClusterInfo) + appMaster.reply(ClusterInfo(List("master"), List("worker"))) + import scala.concurrent.duration._ + val result = Await.result(future, 30.seconds).asInstanceOf[ClusterInfo] + assert(result.masters.sameElements(List("master"))) + assert(result.workers.sameElements(List("worker"))) + } + + it should "kill the cluster" in { + val appId = ApplicationId.newInstance(0L, 0) + val appMaster = TestProbe() + val manager = new ManageCluster(appId, appMaster.ref, system) + + val output = java.io.File.createTempFile("managerClusterSpec", ".conf") + + val future = manager.command(KILL, new ParseResult(Map("container" -> "1"), + Array.empty[String])) + appMaster.expectMsg(Kill) + appMaster.reply(CommandResult(true)) + import scala.concurrent.duration._ + val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult] + assert(result.success) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStore.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStore.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStore.scala deleted file mode 100644 index 194c9a5..0000000 --- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStore.scala +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.hadoop - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.slf4j.Logger - -import io.gearpump.TimeStamp -import io.gearpump.streaming.hadoop.lib.rotation.Rotation -import io.gearpump.streaming.hadoop.lib.{HadoopCheckpointStoreReader, HadoopCheckpointStoreWriter} -import io.gearpump.streaming.transaction.api.CheckpointStore -import io.gearpump.util.LogUtil - -object HadoopCheckpointStore { - val LOG: Logger = LogUtil.getLogger(classOf[HadoopCheckpointStore]) -} - -/** - * Stores timestamp-checkpoint mapping to Hadoop-compatible filesystem. - * - * Store file layout: - * {{{ - * timestamp1, index1, - * timestamp2, index2, - * ... - * timestampN, indexN - * }}} - */ -class HadoopCheckpointStore( - dir: Path, - fs: FileSystem, - hadoopConfig: Configuration, - rotation: Rotation) - extends CheckpointStore { - - private[hadoop] var curTime = 0L - private[hadoop] var curStartTime = curTime - private[hadoop] var curFile: Option[String] = None - private[hadoop] var curWriter: Option[HadoopCheckpointStoreWriter] = None - // regex (checkpoints-$startTime-$endTime.store) for complete checkpoint file, - private val compRegex = - """checkpoints-(\d+)-(\d+).store""".r - // regex (checkpoints-$startTime.store) for temporary checkpoint file - private val tempRegex = - """checkpoints-(\d+).store""".r - - /** - * Persists a pair of timestamp and checkpoint, which: - * - * 1. creates a temporary checkpoint file, checkpoints-\$startTime.store, if not exist - * 2. writes out (timestamp, checkpoint) and marks rotation - * 3. rotates checkpoint file if needed - * a. renames temporary checkpoint file to checkpoints-\$startTime-\$endTime.store - * b. closes current writer and reset - * c. rotation rotates - */ - override def persist(timestamp: TimeStamp, checkpoint: Array[Byte]): Unit = { - curTime = timestamp - if (curWriter.isEmpty) { - curStartTime = curTime - curFile = Some(s"checkpoints-$curStartTime.store") - curWriter = curFile.map(file => - new HadoopCheckpointStoreWriter(new Path(dir, file), hadoopConfig)) - } - - curWriter.foreach { w => - val offset = w.write(timestamp, checkpoint) - rotation.mark(timestamp, offset) - } - - if (rotation.shouldRotate) { - curFile.foreach { f => - fs.rename(new Path(dir, f), new Path(dir, s"checkpoints-$curStartTime-$curTime.store")) - curWriter.foreach(_.close()) - curWriter = None - } - rotation.rotate - } - } - - /** - * Recovers checkpoint given timestamp, which - * {{{ - * 1. returns None if no store exists - * 2. searches checkpoint stores for - * a. complete store checkpoints-\$startTime-\$endTime.store - * where startTime <= timestamp <= endTime - * b. temporary store checkpoints-\$startTime.store - * where startTime <= timestamp - * 3. renames store to checkpoints-\$startTime-\$endTime.store - * 4. deletes all stores whose name has a startTime larger than timestamp - * 5. looks for the checkpoint in the found store - * }}} - */ - override def recover(timestamp: TimeStamp): Option[Array[Byte]] = { - var checkpoint: Option[Array[Byte]] = None - - if (fs.exists(dir)) { - var checkpointFile: Option[Path] = None - fs.listStatus(dir).map(_.getPath).foreach { file => - val fileName = file.getName - fileName match { - case compRegex(start, end) => - val startTime = start.toLong - val endTime = end.toLong - if (timestamp >= startTime && timestamp <= endTime) { - checkpointFile = Some(new Path(dir, fileName)) - } else if (timestamp < startTime) { - fs.delete(file, true) - } - case tempRegex(start) => - val startTime = start.toLong - if (timestamp >= startTime) { - val newFile = new Path(dir, s"checkpoints-$startTime-$timestamp.store") - fs.rename(new Path(dir, fileName), newFile) - checkpointFile = Some(newFile) - } - } - } - - checkpointFile.foreach { file => - val reader = new HadoopCheckpointStoreReader(file, hadoopConfig) - - @annotation.tailrec - def read: Option[Array[Byte]] = { - if (reader.hasNext) { - val (time, bytes) = reader.next() - if (time == timestamp) { - Some(bytes) - } else { - read - } - } else { - None - } - } - checkpoint = read - reader.close() - } - } - checkpoint - } - - override def close(): Unit = { - curWriter.foreach(_.close()) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala deleted file mode 100644 index 5a81ecd..0000000 --- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.hadoop - -import java.io.{ObjectInputStream, ObjectOutputStream} - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path - -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.hadoop.lib.HadoopUtil -import io.gearpump.streaming.hadoop.lib.rotation.{FileSizeRotation, Rotation} -import io.gearpump.streaming.task.TaskContext -import io.gearpump.streaming.transaction.api.{CheckpointStore, CheckpointStoreFactory} - -object HadoopCheckpointStoreFactory { - val VERSION = 1 -} - -class HadoopCheckpointStoreFactory( - dir: String, - @transient private var hadoopConfig: Configuration, - rotation: Rotation = new FileSizeRotation(128 * Math.pow(2, 20).toLong)) - extends CheckpointStoreFactory { - import io.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory._ - - private def writeObject(out: ObjectOutputStream): Unit = { - out.defaultWriteObject() - hadoopConfig.write(out) - } - - private def readObject(in: ObjectInputStream): Unit = { - in.defaultReadObject() - hadoopConfig = new Configuration(false) - hadoopConfig.readFields(in) - } - - override def getCheckpointStore(conf: UserConfig, taskContext: TaskContext): CheckpointStore = { - import taskContext.{appId, taskId} - val dirPath = new Path(dir + Path.SEPARATOR + s"v$VERSION", - s"app$appId-task${taskId.processorId}_${taskId.index}") - val fs = HadoopUtil.getFileSystemForPath(dirPath, hadoopConfig) - new HadoopCheckpointStore(dirPath, fs, hadoopConfig, rotation) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/SequenceFileSink.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/SequenceFileSink.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/SequenceFileSink.scala deleted file mode 100644 index a07dbbc..0000000 --- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/SequenceFileSink.scala +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.hadoop - -import java.text.SimpleDateFormat - -import org.apache.hadoop.fs.Path -import org.apache.hadoop.hdfs.HdfsConfiguration -import org.apache.hadoop.io.SequenceFile - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.hadoop.lib.HadoopUtil -import io.gearpump.streaming.hadoop.lib.format.{DefaultSequenceFormatter, OutputFormatter} -import io.gearpump.streaming.hadoop.lib.rotation.{FileSizeRotation, Rotation} -import io.gearpump.streaming.sink.DataSink -import io.gearpump.streaming.task.{TaskContext, TaskId} - -class SequenceFileSink( - userConfig: UserConfig, - basePath: String, - rotation: Rotation = new FileSizeRotation(128 * Math.pow(2, 20).toLong), - sequenceFormat: OutputFormatter = new DefaultSequenceFormatter) - extends DataSink{ - @transient private lazy val configuration = new HdfsConfiguration() - private val dateFormat = new SimpleDateFormat("yyyy_MM_dd-HH-mm-ss") - private var writer: SequenceFile.Writer = null - private var taskId: TaskId = null - private var appName: String = null - - /** - * Starts connection to data sink - * - * Invoked at onStart() method of [[io.gearpump.streaming.task.Task]] - * - * @param context is the task context at runtime - */ - override def open(context: TaskContext): Unit = { - HadoopUtil.login(userConfig, configuration) - this.appName = context.appName - this.taskId = context.taskId - this.writer = getNextWriter - } - - /** - * Writes message into data sink - * - * Invoked at onNext() method of [[io.gearpump.streaming.task.Task]] - * @param message wraps data to be written out - */ - override def write(message: Message): Unit = { - val key = sequenceFormat.getKey(message) - val value = sequenceFormat.getValue(message) - if (writer == null) { - writer = getNextWriter - } - writer.append(key, value) - rotation.mark(message.timestamp, writer.getLength) - if (rotation.shouldRotate) { - closeWriter - this.writer = getNextWriter - rotation.rotate - } - } - - /** - * Closes connection to data sink - * - * Invoked at onClose() method of [[io.gearpump.streaming.task.Task]] - */ - override def close(): Unit = { - closeWriter() - } - - private def closeWriter(): Unit = { - Option(writer).foreach { w => - w.hflush() - w.close() - } - } - - private def getNextWriter: SequenceFile.Writer = { - SequenceFile.createWriter( - configuration, - SequenceFile.Writer.file(getNextFilePath), - SequenceFile.Writer.keyClass(sequenceFormat.getKeyClass), - SequenceFile.Writer.valueClass(sequenceFormat.getValueClass) - ) - } - - private def getNextFilePath: Path = { - val base = new Path(basePath, s"$appName-task${taskId.processorId}_${taskId.index}") - new Path(base, dateFormat.format(new java.util.Date)) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala deleted file mode 100644 index 52acbac..0000000 --- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.hadoop.lib - -import java.io.EOFException - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path - -import io.gearpump.TimeStamp - -class HadoopCheckpointStoreReader( - path: Path, - hadoopConfig: Configuration) - extends Iterator[(TimeStamp, Array[Byte])] { - - private val stream = HadoopUtil.getInputStream(path, hadoopConfig) - private var nextTimeStamp: Option[TimeStamp] = None - private var nextData: Option[Array[Byte]] = None - - override def hasNext: Boolean = { - if (nextTimeStamp.isDefined) { - true - } else { - try { - nextTimeStamp = Some(stream.readLong()) - val length = stream.readInt() - val buffer = new Array[Byte](length) - stream.readFully(buffer) - nextData = Some(buffer) - true - } catch { - case e: EOFException => - close() - false - case e: Exception => - close() - throw e - } - } - } - - override def next(): (TimeStamp, Array[Byte]) = { - val timeAndData = for { - time <- nextTimeStamp - data <- nextData - } yield (time, data) - nextTimeStamp = None - nextData = None - timeAndData.get - } - - def close(): Unit = { - stream.close() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala deleted file mode 100644 index 35f2f51..0000000 --- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.hadoop.lib - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path - -import io.gearpump.TimeStamp - -class HadoopCheckpointStoreWriter(path: Path, hadoopConfig: Configuration) { - private lazy val stream = HadoopUtil.getOutputStream(path, hadoopConfig) - - def write(timestamp: TimeStamp, data: Array[Byte]): Long = { - stream.writeLong(timestamp) - stream.writeInt(data.length) - stream.write(data) - stream.hflush() - stream.getPos() - } - - def close(): Unit = { - stream.close() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopUtil.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopUtil.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopUtil.scala deleted file mode 100644 index eb579e4..0000000 --- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopUtil.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.hadoop.lib - -import java.io.File - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs._ -import org.apache.hadoop.security.UserGroupInformation - -import io.gearpump.cluster.UserConfig -import io.gearpump.util.{Constants, FileUtils} - -private[hadoop] object HadoopUtil { - - def getOutputStream(path: Path, hadoopConfig: Configuration): FSDataOutputStream = { - val dfs = getFileSystemForPath(path, hadoopConfig) - val stream: FSDataOutputStream = { - if (dfs.isFile(path)) { - dfs.append(path) - } else { - dfs.create(path) - } - } - stream - } - - def getInputStream(path: Path, hadoopConfig: Configuration): FSDataInputStream = { - val dfs = getFileSystemForPath(path, hadoopConfig) - val stream = dfs.open(path) - stream - } - - def getFileSystemForPath(path: Path, hadoopConfig: Configuration): FileSystem = { - // For local file systems, return the raw local file system, such calls to flush() - // actually flushes the stream. - val fs = path.getFileSystem(hadoopConfig) - fs match { - case localFs: LocalFileSystem => localFs.getRawFileSystem - case _ => fs - } - } - - def login(userConfig: UserConfig, configuration: Configuration): Unit = { - if (UserGroupInformation.isSecurityEnabled) { - val principal = userConfig.getString(Constants.GEARPUMP_KERBEROS_PRINCIPAL) - val keytabContent = userConfig.getBytes(Constants.GEARPUMP_KEYTAB_FILE) - if (principal.isEmpty || keytabContent.isEmpty) { - val errorMsg = s"HDFS is security enabled, user should provide kerberos principal in " + - s"${Constants.GEARPUMP_KERBEROS_PRINCIPAL} " + - s"and keytab file in ${Constants.GEARPUMP_KEYTAB_FILE}" - throw new Exception(errorMsg) - } - val keytabFile = File.createTempFile("login", ".keytab") - FileUtils.writeByteArrayToFile(keytabFile, keytabContent.get) - keytabFile.setExecutable(false) - keytabFile.setWritable(false) - keytabFile.setReadable(true, true) - - UserGroupInformation.setConfiguration(configuration) - UserGroupInformation.loginUserFromKeytab(principal.get, keytabFile.getAbsolutePath) - keytabFile.delete() - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala deleted file mode 100644 index d19e71f..0000000 --- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.hadoop.lib.format - -import org.apache.hadoop.io.{LongWritable, Text, Writable} - -import io.gearpump.Message - -class DefaultSequenceFormatter extends OutputFormatter { - override def getKey(message: Message): Writable = new LongWritable(message.timestamp) - - override def getValue(message: Message): Writable = new Text(message.msg.asInstanceOf[String]) - - override def getKeyClass: Class[_ <: Writable] = classOf[LongWritable] - - override def getValueClass: Class[_ <: Writable] = classOf[Text] -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala deleted file mode 100644 index fe8e52e..0000000 --- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.hadoop.lib.format - -import org.apache.hadoop.io.Writable - -import io.gearpump.Message - -trait OutputFormatter extends Serializable { - def getKeyClass: Class[_ <: Writable] - - def getValueClass: Class[_ <: Writable] - - def getKey(message: Message): Writable - - def getValue(message: Message): Writable -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala deleted file mode 100644 index cd83ea5..0000000 --- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.hadoop.lib.rotation - -import io.gearpump.TimeStamp - -case class FileSizeRotation(maxBytes: Long) extends Rotation { - - private var bytesWritten = 0L - - override def mark(timestamp: TimeStamp, offset: Long): Unit = { - bytesWritten = offset - } - - override def shouldRotate: Boolean = bytesWritten >= maxBytes - - override def rotate(): Unit = { - bytesWritten = 0L - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/Rotation.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/Rotation.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/Rotation.scala deleted file mode 100644 index e28b222..0000000 --- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/Rotation.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.hadoop.lib.rotation - -import io.gearpump.TimeStamp - -trait Rotation extends Serializable { - def mark(timestamp: TimeStamp, offset: Long): Unit - def shouldRotate: Boolean - def rotate(): Unit -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala new file mode 100644 index 0000000..a18cce6 --- /dev/null +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.hadoop + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.slf4j.Logger + +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.streaming.hadoop.lib.rotation.Rotation +import org.apache.gearpump.streaming.hadoop.lib.{HadoopCheckpointStoreReader, HadoopCheckpointStoreWriter} +import org.apache.gearpump.streaming.transaction.api.CheckpointStore +import org.apache.gearpump.util.LogUtil + +object HadoopCheckpointStore { + val LOG: Logger = LogUtil.getLogger(classOf[HadoopCheckpointStore]) +} + +/** + * Stores timestamp-checkpoint mapping to Hadoop-compatible filesystem. + * + * Store file layout: + * {{{ + * timestamp1, index1, + * timestamp2, index2, + * ... + * timestampN, indexN + * }}} + */ +class HadoopCheckpointStore( + dir: Path, + fs: FileSystem, + hadoopConfig: Configuration, + rotation: Rotation) + extends CheckpointStore { + + private[hadoop] var curTime = 0L + private[hadoop] var curStartTime = curTime + private[hadoop] var curFile: Option[String] = None + private[hadoop] var curWriter: Option[HadoopCheckpointStoreWriter] = None + // regex (checkpoints-$startTime-$endTime.store) for complete checkpoint file, + private val compRegex = + """checkpoints-(\d+)-(\d+).store""".r + // regex (checkpoints-$startTime.store) for temporary checkpoint file + private val tempRegex = + """checkpoints-(\d+).store""".r + + /** + * Persists a pair of timestamp and checkpoint, which: + * + * 1. creates a temporary checkpoint file, checkpoints-\$startTime.store, if not exist + * 2. writes out (timestamp, checkpoint) and marks rotation + * 3. rotates checkpoint file if needed + * a. renames temporary checkpoint file to checkpoints-\$startTime-\$endTime.store + * b. closes current writer and reset + * c. rotation rotates + */ + override def persist(timestamp: TimeStamp, checkpoint: Array[Byte]): Unit = { + curTime = timestamp + if (curWriter.isEmpty) { + curStartTime = curTime + curFile = Some(s"checkpoints-$curStartTime.store") + curWriter = curFile.map(file => + new HadoopCheckpointStoreWriter(new Path(dir, file), hadoopConfig)) + } + + curWriter.foreach { w => + val offset = w.write(timestamp, checkpoint) + rotation.mark(timestamp, offset) + } + + if (rotation.shouldRotate) { + curFile.foreach { f => + fs.rename(new Path(dir, f), new Path(dir, s"checkpoints-$curStartTime-$curTime.store")) + curWriter.foreach(_.close()) + curWriter = None + } + rotation.rotate + } + } + + /** + * Recovers checkpoint given timestamp, which + * {{{ + * 1. returns None if no store exists + * 2. searches checkpoint stores for + * a. complete store checkpoints-\$startTime-\$endTime.store + * where startTime <= timestamp <= endTime + * b. temporary store checkpoints-\$startTime.store + * where startTime <= timestamp + * 3. renames store to checkpoints-\$startTime-\$endTime.store + * 4. deletes all stores whose name has a startTime larger than timestamp + * 5. looks for the checkpoint in the found store + * }}} + */ + override def recover(timestamp: TimeStamp): Option[Array[Byte]] = { + var checkpoint: Option[Array[Byte]] = None + + if (fs.exists(dir)) { + var checkpointFile: Option[Path] = None + fs.listStatus(dir).map(_.getPath).foreach { file => + val fileName = file.getName + fileName match { + case compRegex(start, end) => + val startTime = start.toLong + val endTime = end.toLong + if (timestamp >= startTime && timestamp <= endTime) { + checkpointFile = Some(new Path(dir, fileName)) + } else if (timestamp < startTime) { + fs.delete(file, true) + } + case tempRegex(start) => + val startTime = start.toLong + if (timestamp >= startTime) { + val newFile = new Path(dir, s"checkpoints-$startTime-$timestamp.store") + fs.rename(new Path(dir, fileName), newFile) + checkpointFile = Some(newFile) + } + } + } + + checkpointFile.foreach { file => + val reader = new HadoopCheckpointStoreReader(file, hadoopConfig) + + @annotation.tailrec + def read: Option[Array[Byte]] = { + if (reader.hasNext) { + val (time, bytes) = reader.next() + if (time == timestamp) { + Some(bytes) + } else { + read + } + } else { + None + } + } + checkpoint = read + reader.close() + } + } + checkpoint + } + + override def close(): Unit = { + curWriter.foreach(_.close()) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala new file mode 100644 index 0000000..e5e0f13 --- /dev/null +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.hadoop + +import java.io.{ObjectInputStream, ObjectOutputStream} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.hadoop.lib.HadoopUtil +import org.apache.gearpump.streaming.hadoop.lib.rotation.{FileSizeRotation, Rotation} +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.streaming.transaction.api.{CheckpointStore, CheckpointStoreFactory} + +object HadoopCheckpointStoreFactory { + val VERSION = 1 +} + +class HadoopCheckpointStoreFactory( + dir: String, + @transient private var hadoopConfig: Configuration, + rotation: Rotation = new FileSizeRotation(128 * Math.pow(2, 20).toLong)) + extends CheckpointStoreFactory { + import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory._ + + private def writeObject(out: ObjectOutputStream): Unit = { + out.defaultWriteObject() + hadoopConfig.write(out) + } + + private def readObject(in: ObjectInputStream): Unit = { + in.defaultReadObject() + hadoopConfig = new Configuration(false) + hadoopConfig.readFields(in) + } + + override def getCheckpointStore(conf: UserConfig, taskContext: TaskContext): CheckpointStore = { + import taskContext.{appId, taskId} + val dirPath = new Path(dir + Path.SEPARATOR + s"v$VERSION", + s"app$appId-task${taskId.processorId}_${taskId.index}") + val fs = HadoopUtil.getFileSystemForPath(dirPath, hadoopConfig) + new HadoopCheckpointStore(dirPath, fs, hadoopConfig, rotation) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala new file mode 100644 index 0000000..bb56003 --- /dev/null +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.hadoop + +import java.text.SimpleDateFormat + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hdfs.HdfsConfiguration +import org.apache.hadoop.io.SequenceFile + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.hadoop.lib.HadoopUtil +import org.apache.gearpump.streaming.hadoop.lib.format.{DefaultSequenceFormatter, OutputFormatter} +import org.apache.gearpump.streaming.hadoop.lib.rotation.{FileSizeRotation, Rotation} +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.task.{TaskContext, TaskId} + +class SequenceFileSink( + userConfig: UserConfig, + basePath: String, + rotation: Rotation = new FileSizeRotation(128 * Math.pow(2, 20).toLong), + sequenceFormat: OutputFormatter = new DefaultSequenceFormatter) + extends DataSink{ + @transient private lazy val configuration = new HdfsConfiguration() + private val dateFormat = new SimpleDateFormat("yyyy_MM_dd-HH-mm-ss") + private var writer: SequenceFile.Writer = null + private var taskId: TaskId = null + private var appName: String = null + + /** + * Starts connection to data sink + * + * Invoked at onStart() method of [[org.apache.gearpump.streaming.task.Task]] + * + * @param context is the task context at runtime + */ + override def open(context: TaskContext): Unit = { + HadoopUtil.login(userConfig, configuration) + this.appName = context.appName + this.taskId = context.taskId + this.writer = getNextWriter + } + + /** + * Writes message into data sink + * + * Invoked at onNext() method of [[org.apache.gearpump.streaming.task.Task]] + * @param message wraps data to be written out + */ + override def write(message: Message): Unit = { + val key = sequenceFormat.getKey(message) + val value = sequenceFormat.getValue(message) + if (writer == null) { + writer = getNextWriter + } + writer.append(key, value) + rotation.mark(message.timestamp, writer.getLength) + if (rotation.shouldRotate) { + closeWriter + this.writer = getNextWriter + rotation.rotate + } + } + + /** + * Closes connection to data sink + * + * Invoked at onClose() method of [[org.apache.gearpump.streaming.task.Task]] + */ + override def close(): Unit = { + closeWriter() + } + + private def closeWriter(): Unit = { + Option(writer).foreach { w => + w.hflush() + w.close() + } + } + + private def getNextWriter: SequenceFile.Writer = { + SequenceFile.createWriter( + configuration, + SequenceFile.Writer.file(getNextFilePath), + SequenceFile.Writer.keyClass(sequenceFormat.getKeyClass), + SequenceFile.Writer.valueClass(sequenceFormat.getValueClass) + ) + } + + private def getNextFilePath: Path = { + val base = new Path(basePath, s"$appName-task${taskId.processorId}_${taskId.index}") + new Path(base, dateFormat.format(new java.util.Date)) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala new file mode 100644 index 0000000..082e963 --- /dev/null +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.hadoop.lib + +import java.io.EOFException + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.gearpump.TimeStamp + +class HadoopCheckpointStoreReader( + path: Path, + hadoopConfig: Configuration) + extends Iterator[(TimeStamp, Array[Byte])] { + + private val stream = HadoopUtil.getInputStream(path, hadoopConfig) + private var nextTimeStamp: Option[TimeStamp] = None + private var nextData: Option[Array[Byte]] = None + + override def hasNext: Boolean = { + if (nextTimeStamp.isDefined) { + true + } else { + try { + nextTimeStamp = Some(stream.readLong()) + val length = stream.readInt() + val buffer = new Array[Byte](length) + stream.readFully(buffer) + nextData = Some(buffer) + true + } catch { + case e: EOFException => + close() + false + case e: Exception => + close() + throw e + } + } + } + + override def next(): (TimeStamp, Array[Byte]) = { + val timeAndData = for { + time <- nextTimeStamp + data <- nextData + } yield (time, data) + nextTimeStamp = None + nextData = None + timeAndData.get + } + + def close(): Unit = { + stream.close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala new file mode 100644 index 0000000..11c12c4 --- /dev/null +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.hadoop.lib + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.gearpump.TimeStamp + +class HadoopCheckpointStoreWriter(path: Path, hadoopConfig: Configuration) { + private lazy val stream = HadoopUtil.getOutputStream(path, hadoopConfig) + + def write(timestamp: TimeStamp, data: Array[Byte]): Long = { + stream.writeLong(timestamp) + stream.writeInt(data.length) + stream.write(data) + stream.hflush() + stream.getPos() + } + + def close(): Unit = { + stream.close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopUtil.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopUtil.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopUtil.scala new file mode 100644 index 0000000..935b52c --- /dev/null +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopUtil.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.hadoop.lib + +import java.io.File + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs._ +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.util.{Constants, FileUtils} + +private[hadoop] object HadoopUtil { + + def getOutputStream(path: Path, hadoopConfig: Configuration): FSDataOutputStream = { + val dfs = getFileSystemForPath(path, hadoopConfig) + val stream: FSDataOutputStream = { + if (dfs.isFile(path)) { + dfs.append(path) + } else { + dfs.create(path) + } + } + stream + } + + def getInputStream(path: Path, hadoopConfig: Configuration): FSDataInputStream = { + val dfs = getFileSystemForPath(path, hadoopConfig) + val stream = dfs.open(path) + stream + } + + def getFileSystemForPath(path: Path, hadoopConfig: Configuration): FileSystem = { + // For local file systems, return the raw local file system, such calls to flush() + // actually flushes the stream. + val fs = path.getFileSystem(hadoopConfig) + fs match { + case localFs: LocalFileSystem => localFs.getRawFileSystem + case _ => fs + } + } + + def login(userConfig: UserConfig, configuration: Configuration): Unit = { + if (UserGroupInformation.isSecurityEnabled) { + val principal = userConfig.getString(Constants.GEARPUMP_KERBEROS_PRINCIPAL) + val keytabContent = userConfig.getBytes(Constants.GEARPUMP_KEYTAB_FILE) + if (principal.isEmpty || keytabContent.isEmpty) { + val errorMsg = s"HDFS is security enabled, user should provide kerberos principal in " + + s"${Constants.GEARPUMP_KERBEROS_PRINCIPAL} " + + s"and keytab file in ${Constants.GEARPUMP_KEYTAB_FILE}" + throw new Exception(errorMsg) + } + val keytabFile = File.createTempFile("login", ".keytab") + FileUtils.writeByteArrayToFile(keytabFile, keytabContent.get) + keytabFile.setExecutable(false) + keytabFile.setWritable(false) + keytabFile.setReadable(true, true) + + UserGroupInformation.setConfiguration(configuration) + UserGroupInformation.loginUserFromKeytab(principal.get, keytabFile.getAbsolutePath) + keytabFile.delete() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala new file mode 100644 index 0000000..318c071 --- /dev/null +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.hadoop.lib.format + +import org.apache.hadoop.io.{LongWritable, Text, Writable} + +import org.apache.gearpump.Message + +class DefaultSequenceFormatter extends OutputFormatter { + override def getKey(message: Message): Writable = new LongWritable(message.timestamp) + + override def getValue(message: Message): Writable = new Text(message.msg.asInstanceOf[String]) + + override def getKeyClass: Class[_ <: Writable] = classOf[LongWritable] + + override def getValueClass: Class[_ <: Writable] = classOf[Text] +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala new file mode 100644 index 0000000..435d0fc --- /dev/null +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.hadoop.lib.format + +import org.apache.hadoop.io.Writable + +import org.apache.gearpump.Message + +trait OutputFormatter extends Serializable { + def getKeyClass: Class[_ <: Writable] + + def getValueClass: Class[_ <: Writable] + + def getKey(message: Message): Writable + + def getValue(message: Message): Writable +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala new file mode 100644 index 0000000..72be9c3 --- /dev/null +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.hadoop.lib.rotation + +import org.apache.gearpump.TimeStamp + +case class FileSizeRotation(maxBytes: Long) extends Rotation { + + private var bytesWritten = 0L + + override def mark(timestamp: TimeStamp, offset: Long): Unit = { + bytesWritten = offset + } + + override def shouldRotate: Boolean = bytesWritten >= maxBytes + + override def rotate(): Unit = { + bytesWritten = 0L + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/Rotation.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/Rotation.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/Rotation.scala new file mode 100644 index 0000000..cd8c04a --- /dev/null +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/Rotation.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.hadoop.lib.rotation + +import org.apache.gearpump.TimeStamp + +trait Rotation extends Serializable { + def mark(timestamp: TimeStamp, offset: Long): Unit + def shouldRotate: Boolean + def rotate(): Unit +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala b/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala deleted file mode 100644 index cc8a5f0..0000000 --- a/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.hadoop - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.hadoop.lib.HadoopUtil -import io.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation -import io.gearpump.streaming.task.TaskId - -class HadoopCheckpointStoreIntegrationSpec - extends PropSpec with PropertyChecks with MockitoSugar with Matchers { - - property("HadoopCheckpointStore should persist and recover checkpoints") { - val fileSizeGen = Gen.chooseNum[Int](100, 1000) - forAll(fileSizeGen) { (fileSize: Int) => - val userConfig = UserConfig.empty - val taskContext = MockUtil.mockTaskContext - val hadoopConfig = new Configuration() - - when(taskContext.appId).thenReturn(0) - when(taskContext.taskId).thenReturn(TaskId(0, 0)) - - val rootDirName = "test" - val rootDir = new Path(rootDirName + Path.SEPARATOR + - s"v${HadoopCheckpointStoreFactory.VERSION}") - val subDir = new Path(rootDir, "app0-task0_0") - - val fs = HadoopUtil.getFileSystemForPath(rootDir, hadoopConfig) - fs.delete(rootDir, true) - fs.exists(rootDir) shouldBe false - - val checkpointStoreFactory = new HadoopCheckpointStoreFactory( - rootDirName, hadoopConfig, new FileSizeRotation(fileSize)) - val checkpointStore = checkpointStoreFactory.getCheckpointStore(userConfig, taskContext) - - checkpointStore.persist(0L, Array(0.toByte)) - - val tempFile = new Path(subDir, "checkpoints-0.store") - fs.exists(tempFile) shouldBe true - - checkpointStore.persist(1L, Array.fill(fileSize)(0.toByte)) - fs.exists(tempFile) shouldBe false - fs.exists(new Path(subDir, "checkpoints-0-1.store")) shouldBe true - - checkpointStore.persist(2L, Array(0.toByte)) - val newTempFile = new Path(subDir, "checkpoints-2.store") - fs.exists(newTempFile) shouldBe true - - for (i <- 0 to 2) { - val optCp = checkpointStore.recover(i) - optCp should not be empty - } - fs.exists(newTempFile) shouldBe false - fs.exists(new Path(subDir, "checkpoints-2-2.store")) shouldBe true - - checkpointStore.close() - fs.delete(rootDir, true) - fs.close() - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala b/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala deleted file mode 100644 index 9b4057c..0000000 --- a/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.hadoop.lib.rotation - -import org.scalacheck.Gen -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import io.gearpump.TimeStamp - -class FileSizeRotationSpec extends PropSpec with PropertyChecks with Matchers { - - val timestampGen = Gen.chooseNum[Long](0L, 1000L) - val fileSizeGen = Gen.chooseNum[Long](1, Long.MaxValue) - - property("FileSize rotation rotates on file size") { - forAll(timestampGen, fileSizeGen) { (timestamp: TimeStamp, fileSize: Long) => - val rotation = new FileSizeRotation(fileSize) - rotation.shouldRotate shouldBe false - rotation.mark(timestamp, rotation.maxBytes / 2) - rotation.shouldRotate shouldBe false - rotation.mark(timestamp, rotation.maxBytes) - rotation.shouldRotate shouldBe true - rotation.rotate - rotation.shouldRotate shouldBe false - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala new file mode 100644 index 0000000..4fd8dc1 --- /dev/null +++ b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.hadoop + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.hadoop.lib.HadoopUtil +import org.apache.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation +import org.apache.gearpump.streaming.task.TaskId + +class HadoopCheckpointStoreIntegrationSpec + extends PropSpec with PropertyChecks with MockitoSugar with Matchers { + + property("HadoopCheckpointStore should persist and recover checkpoints") { + val fileSizeGen = Gen.chooseNum[Int](100, 1000) + forAll(fileSizeGen) { (fileSize: Int) => + val userConfig = UserConfig.empty + val taskContext = MockUtil.mockTaskContext + val hadoopConfig = new Configuration() + + when(taskContext.appId).thenReturn(0) + when(taskContext.taskId).thenReturn(TaskId(0, 0)) + + val rootDirName = "test" + val rootDir = new Path(rootDirName + Path.SEPARATOR + + s"v${HadoopCheckpointStoreFactory.VERSION}") + val subDir = new Path(rootDir, "app0-task0_0") + + val fs = HadoopUtil.getFileSystemForPath(rootDir, hadoopConfig) + fs.delete(rootDir, true) + fs.exists(rootDir) shouldBe false + + val checkpointStoreFactory = new HadoopCheckpointStoreFactory( + rootDirName, hadoopConfig, new FileSizeRotation(fileSize)) + val checkpointStore = checkpointStoreFactory.getCheckpointStore(userConfig, taskContext) + + checkpointStore.persist(0L, Array(0.toByte)) + + val tempFile = new Path(subDir, "checkpoints-0.store") + fs.exists(tempFile) shouldBe true + + checkpointStore.persist(1L, Array.fill(fileSize)(0.toByte)) + fs.exists(tempFile) shouldBe false + fs.exists(new Path(subDir, "checkpoints-0-1.store")) shouldBe true + + checkpointStore.persist(2L, Array(0.toByte)) + val newTempFile = new Path(subDir, "checkpoints-2.store") + fs.exists(newTempFile) shouldBe true + + for (i <- 0 to 2) { + val optCp = checkpointStore.recover(i) + optCp should not be empty + } + fs.exists(newTempFile) shouldBe false + fs.exists(new Path(subDir, "checkpoints-2-2.store")) shouldBe true + + checkpointStore.close() + fs.delete(rootDir, true) + fs.close() + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala new file mode 100644 index 0000000..4eab3c9 --- /dev/null +++ b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.hadoop.lib.rotation + +import org.scalacheck.Gen +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import org.apache.gearpump.TimeStamp + +class FileSizeRotationSpec extends PropSpec with PropertyChecks with Matchers { + + val timestampGen = Gen.chooseNum[Long](0L, 1000L) + val fileSizeGen = Gen.chooseNum[Long](1, Long.MaxValue) + + property("FileSize rotation rotates on file size") { + forAll(timestampGen, fileSizeGen) { (timestamp: TimeStamp, fileSize: Long) => + val rotation = new FileSizeRotation(fileSize) + rotation.shouldRotate shouldBe false + rotation.mark(timestamp, rotation.maxBytes / 2) + rotation.shouldRotate shouldBe false + rotation.mark(timestamp, rotation.maxBytes) + rotation.shouldRotate shouldBe true + rotation.rotate + rotation.shouldRotate shouldBe false + } + } +}
