fix #1641, add exactly-once it
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/313b6c45 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/313b6c45 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/313b6c45 Branch: refs/heads/master Commit: 313b6c45e7e9950b445649832b2b4a7f977e8d5b Parents: 269838e Author: manuzhang <[email protected]> Authored: Tue Feb 16 16:55:05 2016 +0800 Committer: manuzhang <[email protected]> Committed: Tue Apr 26 14:23:04 2016 +0800 ---------------------------------------------------------------------- .../kafka/wordcount/KafkaWordCount.scala | 4 +- .../examples/state/MessageCountApp.scala | 39 ++++++++-- .../state/processor/CountProcessor.scala | 12 +-- .../examples/state/MessageCountAppSpec.scala | 17 ++++- .../state/processor/CountProcessorSpec.scala | 33 +++++---- .../processor/WindowAverageProcessorSpec.scala | 31 ++++---- .../checklist/MessageDeliverySpec.scala | 78 ++++++++++++++++++++ .../checklist/StormCompatibilitySpec.scala | 26 +------ .../suites/StandaloneModeSuite.scala | 3 +- .../integrationtest/hadoop/HadoopCluster.scala | 41 ++++++++++ .../integrationtest/kafka/KafkaCluster.scala | 26 +++++++ .../kafka/NumericalDataProducer.scala | 5 +- .../integrationtest/kafka/ResultVerifier.scala | 9 ++- .../kafka/SimpleKafkaReader.scala | 6 +- project/BuildExample.scala | 2 +- project/BuildIntegrationTest.scala | 3 +- .../streaming/state/api/PersistentTask.scala | 53 +++++++++---- .../state/impl/CheckpointManager.scala | 33 ++++----- .../state/impl/CheckpointManagerSpec.scala | 18 ++--- 19 files changed, 318 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala index 17cfd50..51bc3d6 100644 --- a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala +++ b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala @@ -37,10 +37,10 @@ object KafkaWordCount extends AkkaApp with ArgumentsParser { private val LOG: Logger = LogUtil.getLogger(getClass) override val options: Array[(String, CLIOption[Any])] = Array( - "source" -> CLIOption[Int]("<hom many kafka producer tasks>", required = false, defaultValue = Some(1)), + "source" -> CLIOption[Int]("<how many kafka source tasks>", required = false, defaultValue = Some(1)), "split" -> CLIOption[Int]("<how many split tasks>", required = false, defaultValue = Some(1)), "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)), - "sink" -> CLIOption[Int]("<hom many kafka processor tasks", required = false, defaultValue = Some(1)) + "sink" -> CLIOption[Int]("<how many kafka sink tasks>", required = false, defaultValue = Some(1)) ) def application(config: ParseResult, system: ActorSystem) : StreamApplication = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala index 9d0980f..e9ceb8b 100644 --- a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala +++ b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala @@ -19,8 +19,11 @@ package io.gearpump.streaming.examples.state import akka.actor.ActorSystem +import io.gearpump.streaming.kafka.{KafkaSink, KafkaSource, KafkaStorageFactory} +import io.gearpump.streaming.sink.DataSinkProcessor +import io.gearpump.streaming.source.DataSourceProcessor import io.gearpump.streaming.{StreamApplication, Processor} -import io.gearpump.streaming.examples.state.processor.{CountProcessor, NumberGeneratorProcessor} +import io.gearpump.streaming.examples.state.processor.CountProcessor import io.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory import io.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation import io.gearpump.streaming.state.impl.PersistentStateConfig @@ -33,15 +36,29 @@ import io.gearpump.util.{AkkaApp, Graph} import org.apache.hadoop.conf.Configuration object MessageCountApp extends AkkaApp with ArgumentsParser { + val SOURCE_TASK = "sourceTask" + val COUNT_TASK = "countTask" + val SINK_TASK = "sinkTask" + val SOURCE_TOPIC = "sourceTopic" + val SINK_TOPIC = "sinkTopic" + val ZOOKEEPER_CONNECT = "zookeeperConnect" + val BROKER_LIST = "brokerList" + val DEFAULT_FS = "defaultFS" override val options: Array[(String, CLIOption[Any])] = Array( - "gen" -> CLIOption("<how many gen tasks>", required = false, defaultValue = Some(1)), - "count" -> CLIOption("<how mange count tasks", required = false, defaultValue = Some(1)) + SOURCE_TASK -> CLIOption[Int]("<how many kafka source tasks>", required = false, defaultValue = Some(1)), + COUNT_TASK -> CLIOption("<how many count tasks>", required = false, defaultValue = Some(1)), + SINK_TASK -> CLIOption[Int]("<how many kafka sink tasks>", required = false, defaultValue = Some(1)), + SOURCE_TOPIC -> CLIOption[String]("<kafka source topic>", required = true), + SINK_TOPIC -> CLIOption[String]("<kafka sink topic>", required = true), + ZOOKEEPER_CONNECT -> CLIOption[String]("<Zookeeper connect string, e.g. localhost:2181/kafka>", required = true), + BROKER_LIST -> CLIOption[String]("<Kafka broker list, e.g. localhost:9092>", required = true), + DEFAULT_FS -> CLIOption[String]("<name of the default file system, e.g. hdfs://localhost:9000>", required = true) ) def application(config: ParseResult)(implicit system: ActorSystem) : StreamApplication = { val hadoopConfig = new Configuration - hadoopConfig.set("fs.defaultFS", "hdfs://localhost:9000") + hadoopConfig.set("fs.defaultFS", config.getString(DEFAULT_FS)) val checkpointStoreFactory = new HadoopCheckpointStoreFactory("MessageCount", hadoopConfig, // rotate on 1KB new FileSizeRotation(1000)) @@ -50,10 +67,18 @@ object MessageCountApp extends AkkaApp with ArgumentsParser { .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L) .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, checkpointStoreFactory) - val gen = Processor[NumberGeneratorProcessor](config.getInt("gen")) - val count = Processor[CountProcessor](config.getInt("count"), taskConf = taskConfig) + val zookeeperConnect = config.getString(ZOOKEEPER_CONNECT) + val brokerList = config.getString(BROKER_LIST) + val offsetStorageFactory = new KafkaStorageFactory(zookeeperConnect, brokerList) + val sourceTopic = config.getString(SOURCE_TOPIC) + val kafkaSource = new KafkaSource(sourceTopic, zookeeperConnect, offsetStorageFactory) + val sourceProcessor = DataSourceProcessor(kafkaSource, config.getInt(SOURCE_TASK)) + val countProcessor = Processor[CountProcessor](config.getInt(COUNT_TASK), taskConf = taskConfig) + val kafkaSink = new KafkaSink(config.getString(SINK_TOPIC), brokerList) + val sinkProcessor = DataSinkProcessor(kafkaSink, config.getInt(SINK_TASK)) val partitioner = new HashPartitioner() - val app = StreamApplication("MessageCount", Graph(gen ~ partitioner ~> count), UserConfig.empty) + val graph = Graph(sourceProcessor ~ partitioner ~> countProcessor ~ partitioner ~> sinkProcessor) + val app = StreamApplication("MessageCount", graph, UserConfig.empty) app } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala index ba16261..04cea69 100644 --- a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala +++ b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala @@ -27,15 +27,15 @@ import io.gearpump.cluster.UserConfig import io.gearpump.Message class CountProcessor(taskContext: TaskContext, conf: UserConfig) - extends PersistentTask[Long](taskContext, conf) { + extends PersistentTask[Int](taskContext, conf) { - override def persistentState: PersistentState[Long] = { - import com.twitter.algebird.Monoid.longMonoid - new NonWindowState[Long](new AlgebirdMonoid(longMonoid), new ChillSerializer[Long]) + override def persistentState: PersistentState[Int] = { + import com.twitter.algebird.Monoid.intMonoid + new NonWindowState[Int](new AlgebirdMonoid(intMonoid), new ChillSerializer[Int]) } - override def processMessage(state: PersistentState[Long], message: Message): Unit = { - state.update(message.timestamp, 1L) + override def processMessage(state: PersistentState[Int], message: Message): Unit = { + state.update(message.timestamp, 1) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala index eb0c4e9..8053f57 100644 --- a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala +++ b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala @@ -18,12 +18,14 @@ package io.gearpump.streaming.examples.state +import io.gearpump.streaming.examples.state.MessageCountApp._ import io.gearpump.cluster.ClientToMaster.SubmitApplication import io.gearpump.cluster.MasterToClient.SubmitApplicationResult import io.gearpump.cluster.{MasterHarness, TestUtil} import org.scalatest.{Matchers, BeforeAndAfter, PropSpec} import org.scalatest.prop.PropertyChecks + import scala.concurrent.Future import scala.util.Success @@ -40,10 +42,17 @@ class MessageCountAppSpec extends PropSpec with PropertyChecks with Matchers wit override def config = TestUtil.DEFAULT_CONFIG property("MessageCount should succeed to submit application with required arguments") { - val requiredArgs = Array.empty[String] + val requiredArgs = Array( + s"-$SOURCE_TOPIC", "source", + s"-$SINK_TOPIC", "sink", + s"-$ZOOKEEPER_CONNECT", "localhost:2181", + s"-$BROKER_LIST", "localhost:9092", + s"-$DEFAULT_FS", "hdfs://localhost:9000" + ) val optionalArgs = Array( - "-gen", "2", - "-count", "2" + s"-$SOURCE_TASK", "2", + s"-$COUNT_TASK", "2", + s"-$SINK_TASK", "2" ) val args = { @@ -51,9 +60,11 @@ class MessageCountAppSpec extends PropSpec with PropertyChecks with Matchers wit ("requiredArgs", "optionalArgs"), (requiredArgs, optionalArgs.take(0)), (requiredArgs, optionalArgs.take(2)), + (requiredArgs, optionalArgs.take(4)), (requiredArgs, optionalArgs) ) } + val masterReceiver = createMockMaster() forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) => val args = requiredArgs ++ optionalArgs http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala index c24dd14..cbc9cce 100644 --- a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala +++ b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala @@ -21,8 +21,9 @@ package io.gearpump.streaming.examples.state.processor import akka.actor.ActorSystem import akka.testkit.TestProbe import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig} -import io.gearpump.streaming.task.{StartTime, ReportCheckpointClock} +import io.gearpump.streaming.state.api.PersistentTask +import io.gearpump.streaming.state.impl.PersistentStateConfig +import io.gearpump.streaming.task.ReportCheckpointClock import io.gearpump.streaming.transaction.api.CheckpointStoreFactory import io.gearpump.Message import io.gearpump.cluster.UserConfig @@ -33,6 +34,8 @@ import org.scalacheck.Gen import org.scalatest.{Matchers, PropSpec} import org.scalatest.prop.PropertyChecks +import scala.concurrent.duration._ + class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers { property("CountProcessor should update state") { @@ -42,13 +45,14 @@ class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers { implicit val system = ActorSystem("test") val longGen = Gen.chooseNum[Long](1, 1000) - forAll(longGen, longGen) { - (data: Long, num: Long) => + forAll(longGen) { + (num: Long) => val conf = UserConfig.empty .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true) .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, num) .withValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, new InMemoryCheckpointStoreFactory) + val count = new CountProcessor(taskContext, conf) val appMaster = TestProbe()(system) @@ -57,19 +61,22 @@ class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers { count.onStart(StartTime(0L)) appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, 0L)) - for (i <- 1L to num) { - when(taskContext.upstreamMinClock).thenReturn(0L) - count.onNext(Message("" + data, 0L)) - } - count.state.get shouldBe Some(num) + for (i <- 0L to num) { + count.onNext(Message("", i)) + count.state.get shouldBe Some(i + 1) + } + // next checkpoint time is at num + // not yet + when(taskContext.upstreamMinClock).thenReturn(0L) + count.onNext(PersistentTask.CHECKPOINT) + appMaster.expectNoMsg(10 milliseconds) + // time to checkpoint when(taskContext.upstreamMinClock).thenReturn(num) - count.onNext(Message("" + data, num)) + count.onNext(PersistentTask.CHECKPOINT) + // only the state before checkpoint time is checkpointed appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, num)) - - - count.state.get shouldBe Some(num + 1) } system.shutdown() http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala index bfc1ad0..03bc8fb 100644 --- a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala +++ b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala @@ -22,8 +22,9 @@ import akka.actor.ActorSystem import akka.testkit.TestProbe import com.twitter.algebird.AveragedValue import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, WindowConfig, PersistentStateConfig} -import io.gearpump.streaming.task.{StartTime, ReportCheckpointClock} +import io.gearpump.streaming.state.api.PersistentTask +import io.gearpump.streaming.state.impl.{WindowConfig, PersistentStateConfig} +import io.gearpump.streaming.task.ReportCheckpointClock import io.gearpump.streaming.transaction.api.CheckpointStoreFactory import io.gearpump.Message import io.gearpump.cluster.UserConfig @@ -34,17 +35,18 @@ import org.scalacheck.Gen import org.scalatest.{Matchers, PropSpec} import org.scalatest.prop.PropertyChecks +import scala.concurrent.duration._ + class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Matchers { property("WindowAverageProcessor should update state") { - val taskContext = MockUtil.mockTaskContext - implicit val system = ActorSystem("test") - val longGen = Gen.chooseNum[Long](1, 1000) forAll(longGen, longGen) { (data: Long, num: Long) => + val taskContext = MockUtil.mockTaskContext + val windowSize = num val windowStep = num @@ -52,7 +54,6 @@ class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Match .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true) .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, num) .withValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, new InMemoryCheckpointStoreFactory) - .withValue(WindowConfig.NAME, WindowConfig(windowSize, windowStep)) val windowAverage = new WindowAverageProcessor(taskContext, conf) @@ -63,19 +64,21 @@ class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Match windowAverage.onStart(StartTime(0L)) appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, 0L)) - for (i <- 1L to num) { - when(taskContext.upstreamMinClock).thenReturn(0L) - windowAverage.onNext(Message("" + data, 0L)) + for (i <- 0L until num) { + windowAverage.onNext(Message("" + data, i)) + windowAverage.state.get shouldBe Some(AveragedValue(i + 1, data)) } - windowAverage.state.get shouldBe Some(AveragedValue(num, data)) + // next checkpoint time is at num + // not yet + when(taskContext.upstreamMinClock).thenReturn(0L) + windowAverage.onNext(PersistentTask.CHECKPOINT) + appMaster.expectNoMsg(10 milliseconds) + // time to checkpoint when(taskContext.upstreamMinClock).thenReturn(num) - windowAverage.onNext(Message("" + data, num)) + windowAverage.onNext(PersistentTask.CHECKPOINT) appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, num)) - - - windowAverage.state.get shouldBe Some(AveragedValue(1L, data)) } system.shutdown() http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala new file mode 100644 index 0000000..3f029ef --- /dev/null +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala @@ -0,0 +1,78 @@ +package io.gearpump.integrationtest.checklist + +import io.gearpump.integrationtest.hadoop.HadoopCluster._ +import io.gearpump.integrationtest.{Util, TestSpecBase} +import io.gearpump.integrationtest.kafka.{SimpleKafkaReader, MessageLossDetector, NumericalDataProducer} +import io.gearpump.integrationtest.kafka.KafkaCluster._ + +class MessageDeliverySpec extends TestSpecBase { + + override def beforeAll(): Unit = { + super.beforeAll() + } + + override def afterAll(): Unit = { + super.afterAll() + } + + override def afterEach() = { + super.afterEach() + } + + "Gearpump" should { + "support exactly-once message delivery" in { + withKafkaCluster(cluster) { kafkaCluster => + // setup + val sourcePartitionNum = 1 + val sourceTopic = "topic1" + val sinkTopic = "topic2" + + // Generate number sequence (1, 2, 3, ...) to the topic + kafkaCluster.createTopic(sourceTopic, sourcePartitionNum) + + withDataProducer(sourceTopic, kafkaCluster.getBrokerListConnectString) { producer => + + withHadoopCluster { hadoopCluster => + // exercise + val args = Array("io.gearpump.streaming.examples.state.MessageCountApp", + "-defaultFS", hadoopCluster.getDefaultFS, + "-zookeeperConnect", kafkaCluster.getZookeeperConnectString, + "-brokerList", kafkaCluster.getBrokerListConnectString, + "-sourceTopic", sourceTopic, + "-sinkTopic", sinkTopic, + "-sourceTask", sourcePartitionNum).mkString(" ") + val appId = restClient.getNextAvailableAppId() + + val stateJar = cluster.queryBuiltInExampleJars("state-").head + val success = restClient.submitApp(stateJar, args) + success shouldBe true + + // verify #1 + expectAppIsRunning(appId, "MessageCount") + Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0) + + // wait for checkpoint to take place + Thread.sleep(1000) + + // verify #2 + val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max + restClient.killExecutor(appId, executorToKill) shouldBe true + Util.retryUntil(restClient.queryExecutorBrief(appId).map(_.executorId).max > executorToKill) + + producer.stop() + + // verify #3 + val count = kafkaCluster.getLatestOffset(sourceTopic) + 1 + val detector = new MessageLossDetector(count) + val kafkaReader = new SimpleKafkaReader(detector, sinkTopic, + host = kafkaCluster.advertisedHost, port = kafkaCluster.advertisedPort) + Util.retryUntil({ + kafkaReader.read() + detector.received(count) + }) + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala index 3baabf6..ef737a0 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala @@ -17,7 +17,7 @@ */ package io.gearpump.integrationtest.checklist -import io.gearpump.integrationtest.kafka.{KafkaCluster, MessageLossDetector, NumericalDataProducer, SimpleKafkaReader} +import io.gearpump.integrationtest.kafka.{KafkaCluster, MessageLossDetector, SimpleKafkaReader} import io.gearpump.integrationtest.storm.StormClient import io.gearpump.integrationtest.{TestSpecBase, Util} @@ -43,27 +43,6 @@ class StormCompatibilitySpec extends TestSpecBase { testCode("010") } - def withKafkaCluster(testCode: KafkaCluster => Unit): Unit = { - val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway, "kafka") - try { - kafkaCluster.start() - testCode(kafkaCluster) - } finally { - kafkaCluster.shutDown() - } - } - - def withDataProducer(topic: String, brokerList: String) - (testCode: NumericalDataProducer => Unit): Unit = { - val producer = new NumericalDataProducer(topic, brokerList) - try { - producer.start() - testCode(producer) - } finally { - producer.stop() - } - } - def getTopologyName(name: String, stormVersion: String): String = { s"${name}_$stormVersion" } @@ -146,7 +125,8 @@ class StormCompatibilitySpec extends TestSpecBase { val topologyName = getTopologyName("storm_kafka", stormVersion) val stormKafkaTopology = s"io.gearpump.integrationtest.storm.Storm${stormVersion}KafkaTopology" - withKafkaCluster { + import KafkaCluster._ + withKafkaCluster(cluster) { kafkaCluster => val sourcePartitionNum = 2 val sinkPartitionNum = 1 http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala index c7a2c3e..b202d01 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala @@ -33,7 +33,8 @@ class StandaloneModeSuite extends Suites( new ExampleSpec, new DynamicDagSpec, new StabilitySpec, - new StormCompatibilitySpec + new StormCompatibilitySpec, + new MessageDeliverySpec ) with BeforeAndAfterAll { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala new file mode 100644 index 0000000..1f14094 --- /dev/null +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala @@ -0,0 +1,41 @@ +package io.gearpump.integrationtest.hadoop + +import io.gearpump.integrationtest.Docker +import org.apache.log4j.Logger + +object HadoopCluster { + + def withHadoopCluster(testCode: HadoopCluster => Unit): Unit = { + val hadoopCluster = new HadoopCluster + try { + hadoopCluster.start() + testCode(hadoopCluster) + } finally { + hadoopCluster.shutDown() + } + } +} +/** + * This class maintains a single node Hadoop cluster + */ +class HadoopCluster { + + private val LOG = Logger.getLogger(getClass) + private val HADOOP_DOCKER_IMAGE = "sequenceiq/hadoop-docker:2.6.0" + private val HADOOP_HOST = "hadoop0" + + def start(): Unit = { + Docker.createAndStartContainer(HADOOP_HOST, HADOOP_DOCKER_IMAGE, "") + LOG.info("Hadoop cluster is started.") + } + + def getDefaultFS: String = { + val hostIPAddr = Docker.getContainerIPAddr(HADOOP_HOST) + s"hdfs://$hostIPAddr:9000" + } + + def shutDown(): Unit = { + Docker.killAndRemoveContainer(HADOOP_HOST) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala index 4e3ebe2..b053ffe 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala @@ -17,9 +17,35 @@ */ package io.gearpump.integrationtest.kafka +import io.gearpump.integrationtest.minicluster.MiniCluster import io.gearpump.integrationtest.{Docker, Util} import org.apache.log4j.Logger +object KafkaCluster { + + def withKafkaCluster(cluster: MiniCluster)(testCode: KafkaCluster => Unit): Unit = { + val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway, "kafka") + try { + kafkaCluster.start() + testCode(kafkaCluster) + } finally { + kafkaCluster.shutDown() + } + } + + def withDataProducer(topic: String, brokerList: String) + (testCode: NumericalDataProducer => Unit): Unit = { + val producer = new NumericalDataProducer(topic, brokerList) + try { + producer.start() + testCode(producer) + } finally { + producer.stop() + } + } + +} + /** * This class maintains a single node Kafka cluster with integrated Zookeeper. */ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala index db3fe3a..13de780 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala @@ -19,7 +19,7 @@ package io.gearpump.integrationtest.kafka import java.util.Properties -import com.twitter.bijection.Injection +import io.gearpump.streaming.serializer.ChillSerializer import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.log4j.Logger @@ -29,6 +29,7 @@ class NumericalDataProducer(topic: String, bootstrapServers: String) { private val LOG = Logger.getLogger(getClass) private val producer = createProducer private val WRITE_SLEEP_NANOS = 10 + private val serializer = new ChillSerializer[Int] var lastWriteNum = 0 def start(): Unit = { @@ -54,7 +55,7 @@ class NumericalDataProducer(topic: String, bootstrapServers: String) { try { while (!Thread.currentThread.isInterrupted) { lastWriteNum += 1 - val msg = Injection[String, Array[Byte]](lastWriteNum.toString) + val msg = serializer.serialize(lastWriteNum) val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, msg) producer.send(record) Thread.sleep(0, WRITE_SLEEP_NANOS) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala index 58aa914..7330529 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala @@ -20,14 +20,13 @@ package io.gearpump.integrationtest.kafka import scala.collection.mutable trait ResultVerifier { - def onNext(msg: String): Unit + def onNext(num: Int): Unit } class MessageLossDetector(totalNum: Int) extends ResultVerifier { private val bitSets = new mutable.BitSet(totalNum) - override def onNext(msg: String): Unit = { - val num = msg.toInt + override def onNext(num: Int): Unit = { bitSets.add(num) } @@ -35,4 +34,8 @@ class MessageLossDetector(totalNum: Int) extends ResultVerifier { 1.to(totalNum).forall(bitSets) } + def received(num: Int): Boolean = { + bitSets(num) + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala index 73c60e0..e2e1dbc 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala @@ -17,17 +17,17 @@ */ package io.gearpump.integrationtest.kafka -import com.twitter.bijection.Injection +import io.gearpump.streaming.serializer.ChillSerializer import kafka.api.FetchRequestBuilder import kafka.consumer.SimpleConsumer import kafka.utils.Utils - import scala.util.{Failure, Success} class SimpleKafkaReader(verifier: ResultVerifier, topic: String, partition: Int = 0, host: String, port: Int) { private val consumer = new SimpleConsumer(host, port, 100000, 64 * 1024, "") + private val serializer = new ChillSerializer[Int] private var offset = 0L def read(): Unit = { @@ -36,7 +36,7 @@ class SimpleKafkaReader(verifier: ResultVerifier, topic: String, partition: Int ).messageSet(topic, partition) for (messageAndOffset <- messageSet) { - Injection.invert[String, Array[Byte]](Utils.readBytes(messageAndOffset.message.payload)) match { + serializer.deserialize(Utils.readBytes(messageAndOffset.message.payload)) match { case Success(msg) => offset = messageAndOffset.nextOffset verifier.onNext(msg) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/project/BuildExample.scala ---------------------------------------------------------------------- diff --git a/project/BuildExample.scala b/project/BuildExample.scala index e239c41..0bee08e 100644 --- a/project/BuildExample.scala +++ b/project/BuildExample.scala @@ -176,7 +176,7 @@ object BuildExample extends sbt.Build { target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" / CrossVersion.binaryScalaVersion(scalaVersion.value) ) - ) dependsOn (streaming % "test->test; provided", external_hadoopfs, external_monoid, external_serializer) + ) dependsOn (streaming % "test->test; provided", external_hadoopfs, external_monoid, external_serializer, external_kafka) lazy val pagerank = Project( id = "gearpump-examples-pagerank", http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/project/BuildIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/project/BuildIntegrationTest.scala b/project/BuildIntegrationTest.scala index acdb708..17c7a3a 100644 --- a/project/BuildIntegrationTest.scala +++ b/project/BuildIntegrationTest.scala @@ -34,7 +34,8 @@ object BuildIntegrationTest extends sbt.Build { streaming % "test->test; provided", services % "test->test; provided", external_kafka, - storm + storm, + external_serializer ) // integration test for Storm 0.9.x http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala b/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala index c8a5fd1..fbf507f 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala @@ -18,12 +18,22 @@ package io.gearpump.streaming.state.api +import java.util.concurrent.TimeUnit + import io.gearpump.cluster.UserConfig import io.gearpump.streaming.state.impl.{PersistentStateConfig, CheckpointManager} import io.gearpump.streaming.task.{ReportCheckpointClock, StartTime, Task, TaskContext} import io.gearpump.streaming.transaction.api.CheckpointStoreFactory +import io.gearpump.util.LogUtil import io.gearpump.{Message, TimeStamp} +import scala.concurrent.duration.FiniteDuration + +object PersistentTask { + val CHECKPOINT = Message("checkpoint") + val LOG = LogUtil.getLogger(getClass) +} + /** * PersistentTask is part of the transaction API * @@ -32,11 +42,15 @@ import io.gearpump.{Message, TimeStamp} */ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + import io.gearpump.streaming.state.api.PersistentTask._ + import taskContext._ val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get val checkpointStore = checkpointStoreFactory.getCheckpointStore(conf, taskContext) val checkpointInterval = conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get val checkpointManager = new CheckpointManager(checkpointInterval, checkpointStore) + // system time interval to attempt checkpoint + private val checkpointAttemptInterval = 1000L /** * subclass should override this method to pass in @@ -64,24 +78,29 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig) checkpointManager .recover(timestamp) .foreach(state.recover(timestamp, _)) - state.setNextCheckpointTime(checkpointManager.getCheckpointTime) + reportCheckpointClock(timestamp) + scheduleCheckpoint(checkpointAttemptInterval) } final override def onNext(message: Message): Unit = { - val checkpointTime = checkpointManager.getCheckpointTime - - processMessage(state, message) - - checkpointManager.update(message.timestamp) - val upstreamMinClock = taskContext.upstreamMinClock - if (checkpointManager.shouldCheckpoint(upstreamMinClock)) { - val serialized = state.checkpoint() - checkpointManager.checkpoint(checkpointTime, serialized) - reportCheckpointClock(checkpointTime) - - val nextCheckpointTime = checkpointManager.updateCheckpointTime() - state.setNextCheckpointTime(nextCheckpointTime) + message match { + case CHECKPOINT => + val upstreamMinClock = taskContext.upstreamMinClock + if (checkpointManager.shouldCheckpoint(upstreamMinClock)) { + checkpointManager.getCheckpointTime.foreach { checkpointTime => + val serialized = state.checkpoint() + checkpointManager.checkpoint(checkpointTime, serialized) + .foreach(state.setNextCheckpointTime) + taskContext.output(Message(serialized, checkpointTime)) + reportCheckpointClock(checkpointTime) + } + } + scheduleCheckpoint(checkpointAttemptInterval) + case _ => + checkpointManager.update(message.timestamp) + .foreach(state.setNextCheckpointTime) + processMessage(state, message) } } @@ -89,7 +108,11 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig) checkpointManager.close() } + private def scheduleCheckpoint(interval: Long): Unit = { + scheduleOnce(new FiniteDuration(interval, TimeUnit.MILLISECONDS))(self ! CHECKPOINT) + } + private def reportCheckpointClock(timestamp: TimeStamp): Unit = { - taskContext.appMaster ! ReportCheckpointClock(taskContext.taskId, timestamp) + appMaster ! ReportCheckpointClock(taskContext.taskId, timestamp) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala index 5f7d86d..5fcbbcb 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala @@ -21,40 +21,39 @@ package io.gearpump.streaming.state.impl import io.gearpump.TimeStamp import io.gearpump.streaming.transaction.api.CheckpointStore - class CheckpointManager(checkpointInterval: Long, checkpointStore: CheckpointStore) { - private var maxMessageTime = 0L - private var checkpointTime = checkpointInterval - private var lastCheckpointTime = 0L + private var maxMessageTime: Long = 0L + private var checkpointTime: Option[Long] = None def recover(timestamp: TimeStamp): Option[Array[Byte]] = { - checkpointTime = (timestamp / checkpointInterval + 1) * checkpointInterval checkpointStore.recover(timestamp) } - def checkpoint(timestamp: TimeStamp, checkpoint: Array[Byte]): Unit = { + def checkpoint(timestamp: TimeStamp, checkpoint: Array[Byte]): Option[TimeStamp] = { checkpointStore.persist(timestamp, checkpoint) - lastCheckpointTime = checkpointTime + checkpointTime = checkpointTime.collect { case time if maxMessageTime > time => + time + (1 + (maxMessageTime - time) / checkpointInterval) * checkpointInterval + } + + checkpointTime } - def update(messageTime: TimeStamp): Unit = { + def update(messageTime: TimeStamp): Option[TimeStamp] = { maxMessageTime = Math.max(maxMessageTime, messageTime) + if (checkpointTime.isEmpty) { + checkpointTime = Some((1 + messageTime / checkpointInterval) * checkpointInterval) + } + + checkpointTime } def shouldCheckpoint(upstreamMinClock: TimeStamp): Boolean = { - upstreamMinClock >= checkpointTime && checkpointTime > lastCheckpointTime + checkpointTime.exists(time => upstreamMinClock >= time) } - def getCheckpointTime: TimeStamp = checkpointTime - - def updateCheckpointTime(): TimeStamp = { - if (maxMessageTime >= checkpointTime) { - checkpointTime += (1 + (maxMessageTime - checkpointTime) / checkpointInterval) * checkpointInterval - } - checkpointTime - } + def getCheckpointTime: Option[TimeStamp] = checkpointTime def close(): Unit = { checkpointStore.close() http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala index 6e6f915..483200f 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala @@ -40,7 +40,6 @@ class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers w checkpointManager.recover(timestamp) verify(checkpointStore).recover(timestamp) - checkpointManager.getCheckpointTime - timestamp should be <= checkpointInterval } } @@ -69,22 +68,21 @@ class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers w } property("CheckpointManager should update checkpoint time according to max message timestamp") { - val timestampListGen = Gen.containerOf[Array, TimeStamp](timestampGen) suchThat (_.nonEmpty) - forAll(timestampListGen, checkpointIntervalGen) { - (timestamps: Array[TimeStamp], checkpointInterval: Long) => + forAll(timestampGen, checkpointIntervalGen) { + (timestamp: TimeStamp, checkpointInterval: Long) => val checkpointStore = mock[CheckpointStore] val checkpointManager = new CheckpointManager(checkpointInterval, checkpointStore) - timestamps.foreach(checkpointManager.update) - val maxTimestamp = timestamps.max - checkpointManager.getMaxMessageTime shouldBe maxTimestamp + checkpointManager.update(timestamp) + checkpointManager.getMaxMessageTime shouldBe timestamp + + val checkpointTime = checkpointManager.getCheckpointTime.get + timestamp should (be < checkpointTime and be >= (checkpointTime - checkpointInterval)) - val checkpointTime = checkpointManager.getCheckpointTime checkpointManager.checkpoint(checkpointTime, Array.empty[Byte]) verify(checkpointStore).persist(MockitoMatchers.eq(checkpointTime), MockitoMatchers.anyObject[Array[Byte]]()) - val newCheckpointTime = checkpointManager.updateCheckpointTime() - maxTimestamp should (be < newCheckpointTime and be >= (newCheckpointTime - checkpointInterval)) + checkpointManager.getCheckpointTime shouldBe empty } }
