http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala index 67c64c4..da99d64 100644 --- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala @@ -22,7 +22,7 @@ import java.util.Properties import com.twitter.bijection.Injection import kafka.api.OffsetRequest import kafka.common.TopicAndPartition -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.MockUtil import org.apache.gearpump.streaming.kafka.lib.source.consumer.{KafkaMessage, KafkaConsumer} import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient @@ -92,7 +92,7 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc property("KafkaStore should read checkpoint from timestamp on recover") { forAll(Gen.alphaStr, timestampGen) { - (topic: String, recoverTime: TimeStamp) => + (topic: String, recoverTime: MilliSeconds) => val consumer = mock[KafkaConsumer] val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] val kafkaStore = new KafkaStore(topic, producer, Some(consumer)) @@ -104,7 +104,7 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc } forAll(Gen.alphaStr, timestampGen) { - (topic: String, recoverTime: TimeStamp) => + (topic: String, recoverTime: MilliSeconds) => val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] val kafkaStore = new KafkaStore(topic, producer, None) @@ -113,12 +113,12 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc } forAll(Gen.alphaStr, timestampGen, timestampGen) { - (topic: String, recoverTime: TimeStamp, checkpointTime: TimeStamp) => + (topic: String, recoverTime: MilliSeconds, checkpointTime: MilliSeconds) => val consumer = mock[KafkaConsumer] val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] val kafkaStore = new KafkaStore(topic, producer, Some(consumer)) - val key = Injection[TimeStamp, Array[Byte]](checkpointTime) + val key = Injection[MilliSeconds, Array[Byte]](checkpointTime) val msg = key val kafkaMsg = KafkaMessage(TopicAndPartition(topic, 0), 0, Some(key), msg) @@ -139,7 +139,7 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc property("KafkaStore persist should write checkpoint with monotonically increasing timestamp") { forAll(Gen.alphaStr, timestampGen, Gen.alphaStr) { - (topic: String, checkpointTime: TimeStamp, data: String) => + (topic: String, checkpointTime: MilliSeconds, data: String) => val consumer = mock[KafkaConsumer] val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] val kafkaStore = new KafkaStore(topic, producer, Some(consumer)) @@ -155,12 +155,12 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc } def verifyProducer(producer: Producer[Array[Byte], Array[Byte]], count: Int, - topic: String, partition: Int, time: TimeStamp, data: String): Unit = { + topic: String, partition: Int, time: MilliSeconds, data: String): Unit = { verify(producer, times(count)).send( MockUtil.argMatch[ProducerRecord[Array[Byte], Array[Byte]]](record => record.topic() == topic && record.partition() == partition - && Injection.invert[TimeStamp, Array[Byte]](record.key()).get == time + && Injection.invert[MilliSeconds, Array[Byte]](record.key()).get == time && Injection.invert[String, Array[Byte]](record.value()).get == data )) }
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/project/BuildExamples.scala ---------------------------------------------------------------------- diff --git a/project/BuildExamples.scala b/project/BuildExamples.scala index afb7459..b3a8e4a 100644 --- a/project/BuildExamples.scala +++ b/project/BuildExamples.scala @@ -106,7 +106,7 @@ object BuildExamples extends sbt.Build { "commons-io" % "commons-io" % commonsIOVersion, "io.spray" %% "spray-can" % sprayVersion, "io.spray" %% "spray-routing-shapeless2" % sprayVersion - ) + ) ++ annotationDependencies ) ++ include("examples/distributeservice") ).dependsOn(core % "provided; test->test") @@ -160,11 +160,12 @@ object BuildExamples extends sbt.Build { CrossVersion.binaryScalaVersion(scalaVersion.value) ) - private def include(files: String*): Seq[Def.Setting[_]] = Seq( - assemblyExcludedJars in assembly := { - val cp = (fullClasspath in assembly).value - cp.filterNot(p => - files.exists(p.data.getAbsolutePath.contains)) - } - ) + private def include(files: String*): Seq[Def.Setting[_]] = + Seq( + assemblyExcludedJars in assembly := { + val cp = (fullClasspath in assembly).value + cp.filterNot(p => + files.exists(p.data.getAbsolutePath.contains)) + } + ) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/project/BuildExperiments.scala ---------------------------------------------------------------------- diff --git a/project/BuildExperiments.scala b/project/BuildExperiments.scala index c603ec1..84c80f0 100644 --- a/project/BuildExperiments.scala +++ b/project/BuildExperiments.scala @@ -60,7 +60,7 @@ object BuildExperiments extends sbt.Build { libraryDependencies ++= Seq( "org.json4s" %% "json4s-jackson" % "3.2.11", "com.typesafe.akka" %% "akka-stream" % akkaVersion - ), + ) ++ annotationDependencies, mainClass in(Compile, packageBin) := Some("akka.stream.gearpump.example.Test") )) .dependsOn (core % "provided", streaming % "test->test; provided") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/project/BuildGearpump.scala ---------------------------------------------------------------------- diff --git a/project/BuildGearpump.scala b/project/BuildGearpump.scala index 895c042..6c2acad 100644 --- a/project/BuildGearpump.scala +++ b/project/BuildGearpump.scala @@ -54,7 +54,17 @@ object BuildGearpump extends sbt.Build { useGpg := false, pgpSecretRing := file("./secring.asc"), pgpPublicRing := file("./pubring.asc"), - scalacOptions ++= Seq("-Yclosure-elim", "-Yinline"), + scalacOptions ++= Seq( + "-deprecation", // Emit warning and location for usages of deprecated APIs + "-encoding", "UTF-8", // Specify character encoding used by source files + "-feature", // Emit warning and location for usages of features + // that should be imported explicitly + "-language:existentials", // Enable existential types + "-language:implicitConversions", // Enable implicit conversions + "-Yclosure-elim", // Perform closure elimination + "-Yinline", // Perform inlining when possible + "-Ywarn-unused-import" // Warn on unused imports + ), publishMavenStyle := true, pgpPassphrase := Option(System.getenv().get("PASSPHRASE")).map(_.toArray), @@ -74,30 +84,32 @@ object BuildGearpump extends sbt.Build { } }, - publishArtifact in Test := true, - pomExtra := { + // scalastyle:off line.size.limit <url>https://github.com/apache/incubator-gearpump</url> - <licenses> - <license> - <name>Apache 2</name> - <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> - </license> - </licenses> - <scm> - <connection>scm:git://git.apache.org/incubator-gearpump.git</connection> - <developerConnection>scm:git:g...@github.com:apache/incubator-gearpump</developerConnection> - <url>github.com/apache/incubator-gearpump</url> - </scm> - <developers> - <developer> - <id>gearpump</id> - <name>Gearpump Team</name> - <url>http://gearpump.incubator.apache.org/community.html#who-we-are</url> - </developer> - </developers> + <licenses> + <license> + <name>Apache 2</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + </license> + </licenses> + <scm> + <connection>scm:git://git.apache.org/incubator-gearpump.git</connection> + <developerConnection>scm:git:g...@github.com:apache/incubator-gearpump</developerConnection> + <url>github.com/apache/incubator-gearpump</url> + </scm> + <developers> + <developer> + <id>gearpump</id> + <name>Gearpump Team</name> + <url>http://gearpump.incubator.apache.org/community.html#who-we-are</url> + </developer> + </developers> + // scalastyle:on line.size.limit }, + publishArtifact in Test := true, + pomPostProcess := { (node: xml.Node) => changeShadedDeps( Set( @@ -194,7 +206,7 @@ object BuildGearpump extends sbt.Build { libraryDependencies ++= Seq( "com.goldmansachs" % "gs-collections" % gsCollectionsVersion - ), + ) ++ annotationDependencies, pomPostProcess := { (node: xml.Node) => changeShadedDeps( http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/project/Dependencies.scala ---------------------------------------------------------------------- diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 40b6380..b146c08 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -56,6 +56,14 @@ object Dependencies { val rabbitmqVersion = "3.5.3" val calciteVersion = "1.12.0" + val annotationDependencies = Seq( + // work around for compiler warnings like + // "Class javax.annotation.CheckReturnValue not found - continuing with a stub" + // see https://issues.scala-lang.org/browse/SI-8978 + // marked as "provided" to be excluded from assembling + "com.google.code.findbugs" % "jsr305" % "3.0.2" % "provided" + ) + val coreDependencies = Seq( libraryDependencies ++= Seq( "org.slf4j" % "slf4j-api" % slf4jVersion, @@ -94,11 +102,12 @@ object Dependencies { exclude("org.slf4j", "slf4j-api"), "com.codahale.metrics" % "metrics-jvm" % codahaleVersion exclude("org.slf4j", "slf4j-api"), + "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test", "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test", "org.mockito" % "mockito-core" % mockitoVersion % "test", "junit" % "junit" % junitVersion % "test" - ) + ) ++ annotationDependencies ) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala index be96577..1f6141a 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala @@ -40,7 +40,7 @@ import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, MasterConfig, import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.worker.WorkerSummary import org.apache.gearpump.cluster.{ClusterConfig, UserConfig} -import org.apache.gearpump.jarstore.{JarStoreClient, FileDirective, JarStoreServer} +import org.apache.gearpump.jarstore.{JarStoreClient, FileDirective} import org.apache.gearpump.streaming.partitioner.{PartitionerByClassName, PartitionerDescription} import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest} // NOTE: This cannot be removed!!! http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala index 7b33987..762b9e4 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala @@ -23,7 +23,6 @@ import akka.http.scaladsl.marshalling.ToResponseMarshallable import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.marshalling.ToResponseMarshallable._ -import akka.http.scaladsl.server.{RejectionHandler, StandardRoute} import akka.stream.Materializer import org.apache.gearpump.util.Util // NOTE: This cannot be removed!!! http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala index 80264d2..4f2b642 100644 --- a/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala +++ b/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala @@ -27,16 +27,14 @@ import akka.testkit.TestActor.{AutoPilot, KeepRunning} import akka.testkit.{TestKit, TestProbe} import com.typesafe.config.{Config, ConfigFactory} import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} -import org.slf4j.Logger import upickle.default.read import org.apache.gearpump.cluster.AppMasterToMaster.GeneralAppMasterSummary import org.apache.gearpump.cluster.ClientToMaster.{GetLastFailure, QueryAppMasterConfig, QueryHistoryMetrics, ResolveAppId} import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest} import org.apache.gearpump.cluster.MasterToClient._ import org.apache.gearpump.cluster.{ApplicationStatus, TestUtil} -import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer} +import org.apache.gearpump.jarstore.JarStoreClient import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig} -import org.apache.gearpump.util.LogUtil // NOTE: This cannot be removed!!! import org.apache.gearpump.services.util.UpickleUtil._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala index 39c0de0..07e44c1 100644 --- a/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala +++ b/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala @@ -40,7 +40,7 @@ import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersD import org.apache.gearpump.cluster.MasterToClient._ import org.apache.gearpump.cluster.TestUtil import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary} -import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer} +import org.apache.gearpump.jarstore.JarStoreClient import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest} // NOTE: This cannot be removed!!! import org.apache.gearpump.services.util.UpickleUtil._ @@ -166,7 +166,7 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = { val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(), - FileIO.fromFile(file, chunkSize = 100000)) + FileIO.fromPath(file.toPath, chunkSize = 100000)) val body = Source.single( Multipart.FormData.BodyPart( http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala index 8a76916..f0e4f84 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala @@ -18,11 +18,9 @@ package org.apache.gearpump.streaming -import scala.language.existentials - import akka.actor.ActorRef -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.appmaster.WorkerInfo import org.apache.gearpump.cluster.scheduler.Resource import org.apache.gearpump.streaming.appmaster.TaskRegistry.TaskLocations @@ -63,7 +61,7 @@ object AppMasterToExecutor { case class StartAllTasks(dagVersion: Int) case class StartDynamicDag(dagVersion: Int) - case class TaskRegistered(taskId: TaskId, sessionId: Int, startClock: TimeStamp) + case class TaskRegistered(taskId: TaskId, sessionId: Int, startClock: MilliSeconds) case class TaskRejected(taskId: TaskId) case object RestartClockService http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala index 435414b..f15e1b3 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala @@ -18,10 +18,10 @@ package org.apache.gearpump.streaming -import scala.language.implicitConversions import scala.reflect.ClassTag import akka.actor.ActorSystem -import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, TimeStamp} +import org.apache.gearpump.Time +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster._ import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner, PartitionerDescription, PartitionerObject} import org.apache.gearpump.streaming.appmaster.AppMaster @@ -102,8 +102,8 @@ object Processor { * When input message's timestamp is beyond current processor's lifetime, * then it will not be processed by this processor. */ -case class LifeTime(birth: TimeStamp, death: TimeStamp) { - def contains(timestamp: TimeStamp): Boolean = { +case class LifeTime(birth: MilliSeconds, death: MilliSeconds) { + def contains(timestamp: MilliSeconds): Boolean = { timestamp >= birth && timestamp < death } @@ -113,8 +113,7 @@ case class LifeTime(birth: TimeStamp, death: TimeStamp) { } object LifeTime { - // MAX_TIME_MILLIS is Long.MaxValue - 1 - val Immortal = LifeTime(MIN_TIME_MILLIS, MAX_TIME_MILLIS + 1) + val Immortal = LifeTime(Time.MIN_TIME_MILLIS, Time.UNREACHABLE) } /** @@ -159,7 +158,7 @@ object StreamApplication { val graph = dag.mapVertex { processor => val updatedProcessor = ProcessorToProcessorDescription(indices(processor), processor) updatedProcessor - }.mapEdge { (node1, edge, node2) => + }.mapEdge { (_, edge, _) => PartitionerDescription(new PartitionerObject( Option(edge).getOrElse(StreamApplication.hashPartitioner))) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala index ba4b058..3c5c7da 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala @@ -21,7 +21,7 @@ package org.apache.gearpump.streaming.appmaster import java.lang.management.ManagementFactory import akka.actor._ -import org.apache.gearpump._ +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.AppMasterToMaster.ApplicationStatusChanged import org.apache.gearpump.cluster.ClientToMaster._ import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterActivated, AppMasterDataDetailRequest, ReplayFromTimestampWindowTrailingEdge} @@ -67,7 +67,7 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli import akka.pattern.ask private implicit val dispatcher = context.dispatcher - private val startTime: TimeStamp = System.currentTimeMillis() + private val startTime: MilliSeconds = System.currentTimeMillis() private val LOG: Logger = LogUtil.getLogger(getClass, app = appId) LOG.info(s"AppMaster[$appId] is launched by $username, app: $app xxxxxxxxxxxxxxxxx") @@ -322,7 +322,7 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli context.stop(self) } - private def getMinClock: Future[TimeStamp] = { + private def getMinClock: Future[MilliSeconds] = { clockService match { case Some(service) => (service ? GetLatestMinClock).asInstanceOf[Future[LatestMinClock]].map(_.clock) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala index 77a966a..90141d4 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala @@ -25,7 +25,8 @@ import java.util.concurrent.TimeUnit import akka.actor.{Actor, ActorRef, Cancellable, Stash} import com.google.common.primitives.Longs -import org.apache.gearpump.{MIN_TIME_MILLIS, TimeStamp} +import org.apache.gearpump.Time +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.ClientToMaster.GetStallingTasks import org.apache.gearpump.streaming.AppMasterToMaster.StallingTasks import org.apache.gearpump.streaming._ @@ -38,7 +39,6 @@ import org.apache.gearpump.util.LogUtil import org.slf4j.Logger import scala.concurrent.duration.FiniteDuration -import scala.language.implicitConversions /** * Maintains a global view of message timestamp in the application @@ -61,8 +61,8 @@ class ClockService( LOG.info("Initializing Clock service, get snapshotted StartClock ....") store.get(START_CLOCK).map { clock => // check for null first since - // (null).asInstanceOf[TimeStamp] is zero - val startClock = if (clock != null) clock.asInstanceOf[TimeStamp] else MIN_TIME_MILLIS + // (null).asInstanceOf[MilliSeconds] is zero + val startClock = if (clock != null) clock.asInstanceOf[MilliSeconds] else Time.MIN_TIME_MILLIS minCheckpointClock = Some(startClock) @@ -89,32 +89,32 @@ class ClockService( // We use Array instead of List for Performance consideration private var processorClocks = Array.empty[ProcessorClock] - private var checkpointClocks: Map[TaskId, Vector[TimeStamp]] = _ + private var checkpointClocks: Map[TaskId, Vector[MilliSeconds]] = _ - private var minCheckpointClock: Option[TimeStamp] = None + private var minCheckpointClock: Option[MilliSeconds] = None private def checkpointEnabled(processor: ProcessorDescription): Boolean = { val taskConf = processor.taskConf taskConf != null && taskConf.getBoolean("state.checkpoint.enable").contains(true) } - private def resetCheckpointClocks(dag: DAG, startClock: TimeStamp): Unit = { + private def resetCheckpointClocks(dag: DAG, startClock: MilliSeconds): Unit = { this.checkpointClocks = dag.processors.filter(startClock < _._2.life.death) .filter { case (_, processor) => checkpointEnabled(processor) }.flatMap { case (id, processor) => - (0 until processor.parallelism).map(TaskId(id, _) -> Vector.empty[TimeStamp]) + (0 until processor.parallelism).map(TaskId(id, _) -> Vector.empty[MilliSeconds]) } if (this.checkpointClocks.isEmpty) { minCheckpointClock = None } } - private def initDag(startClock: TimeStamp): Unit = { + private def initDag(startClock: MilliSeconds): Unit = { recoverDag(this.dag, startClock) } - private def recoverDag(dag: DAG, startClock: TimeStamp): Unit = { + private def recoverDag(dag: DAG, startClock: MilliSeconds): Unit = { this.clocks = dag.processors.filter(startClock < _._2.life.death). map { pair => val (processorId, processor) = pair @@ -131,7 +131,7 @@ class ClockService( resetCheckpointClocks(dag, startClock) } - private def dynamicDAG(dag: DAG, startClock: TimeStamp): Unit = { + private def dynamicDAG(dag: DAG, startClock: MilliSeconds): Unit = { val newClocks = dag.processors.filter(startClock < _._2.life.death). map { pair => val (processorId, processor) = pair @@ -208,7 +208,7 @@ class ClockService( } } - private def getUpStreamMinClock(processorId: ProcessorId): Option[TimeStamp] = { + private def getUpStreamMinClock(processorId: ProcessorId): Option[MilliSeconds] = { upstreamClocks.get(processorId).map(ProcessorClocks.minClock) } @@ -304,7 +304,7 @@ class ClockService( } } - private def minClock: TimeStamp = { + private def minClock: MilliSeconds = { ProcessorClocks.minClock(processorClocks) } @@ -314,7 +314,7 @@ class ClockService( healthChecker.check(minTimestamp, clocks, dag, System.currentTimeMillis()) } - private def getStartClock: TimeStamp = { + private def getStartClock: MilliSeconds = { minCheckpointClock.getOrElse(minClock) } @@ -322,7 +322,7 @@ class ClockService( store.put(START_CLOCK, getStartClock) } - private def updateCheckpointClocks(task: TaskId, time: TimeStamp): Unit = { + private def updateCheckpointClocks(task: TaskId, time: MilliSeconds): Unit = { val clocks = checkpointClocks(task) :+ time checkpointClocks += task -> clocks @@ -341,17 +341,17 @@ object ClockService { case object HealthCheck class ProcessorClock(val processorId: ProcessorId, val life: LifeTime, val parallelism: Int, - private var _min: TimeStamp = MIN_TIME_MILLIS, - private var _taskClocks: Array[TimeStamp] = null) { + private var _min: MilliSeconds = Time.MIN_TIME_MILLIS, + private var _taskClocks: Array[MilliSeconds] = null) { def copy(life: LifeTime): ProcessorClock = { new ProcessorClock(processorId, life, parallelism, _min, _taskClocks) } - def min: TimeStamp = _min - def taskClocks: Array[TimeStamp] = _taskClocks + def min: MilliSeconds = _min + def taskClocks: Array[MilliSeconds] = _taskClocks - def init(startClock: TimeStamp): Unit = { + def init(startClock: MilliSeconds): Unit = { if (taskClocks == null) { this._min = startClock this._taskClocks = new Array(parallelism) @@ -359,7 +359,7 @@ object ClockService { } } - def updateMinClock(taskIndex: Int, clock: TimeStamp): Unit = { + def updateMinClock(taskIndex: Int, clock: MilliSeconds): Unit = { taskClocks(taskIndex) = clock _min = Longs.min(taskClocks: _*) } @@ -382,8 +382,8 @@ object ClockService { /** Check for stalling tasks */ def check( - currentMinClock: TimeStamp, processorClocks: Map[ProcessorId, ProcessorClock], - dag: DAG, now: TimeStamp): Unit = { + currentMinClock: MilliSeconds, processorClocks: Map[ProcessorId, ProcessorClock], + dag: DAG, now: MilliSeconds): Unit = { var isClockStalling = false if (null == minClock || currentMinClock > minClock.appClock) { minClock = ClockValue(systemClock = now, appClock = currentMinClock) @@ -424,7 +424,7 @@ object ClockService { } object HealthChecker { - case class ClockValue(systemClock: TimeStamp, appClock: TimeStamp) { + case class ClockValue(systemClock: MilliSeconds, appClock: MilliSeconds) { def prettyPrint: String = { "(system clock: " + new Date(systemClock).toString + ", app clock: " + appClock + ")" } @@ -434,7 +434,7 @@ object ClockService { object ProcessorClocks { // Get the Min clock of all processors - def minClock(clock: Array[ProcessorClock]): TimeStamp = { + def minClock(clock: Array[ProcessorClock]): MilliSeconds = { var i = 0 var min = if (clock.length == 0) 0L else clock(0).min while (i < clock.length) { @@ -446,7 +446,7 @@ object ClockService { } case class ChangeToNewDAG(dag: DAG) - case class ChangeToNewDAGSuccess(clocks: Map[ProcessorId, TimeStamp]) + case class ChangeToNewDAGSuccess(clocks: Map[ProcessorId, MilliSeconds]) - case class StoredStartClock(clock: TimeStamp) + case class StoredStartClock(clock: MilliSeconds) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala index e023cdf..e31f863 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala @@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.appmaster import akka.actor._ import akka.pattern.ask import com.typesafe.config.Config -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.AppJar import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest} import org.apache.gearpump.cluster.worker.WorkerId @@ -47,7 +47,7 @@ class JarScheduler(appId: Int, appName: String, config: Config, factory: ActorRe private implicit val timeout = Constants.FUTURE_TIMEOUT /** Set the current DAG version active */ - def setDag(dag: DAG, startClock: Future[TimeStamp]): Unit = { + def setDag(dag: DAG, startClock: Future[MilliSeconds]): Unit = { actor ! TransitToNewDag startClock.map { start => actor ! NewDag(dag, start) @@ -82,7 +82,7 @@ object JarScheduler { case class ResourceRequestDetail(jar: AppJar, requests: Array[ResourceRequest]) - case class NewDag(dag: DAG, startTime: TimeStamp) + case class NewDag(dag: DAG, startTime: MilliSeconds) case object TransitToNewDag http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala index 126ab92..1214cd0 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.streaming.appmaster -import org.apache.gearpump._ +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.AppMasterToMaster.AppMasterSummary import org.apache.gearpump.cluster.{ApplicationStatus, UserConfig} import org.apache.gearpump.streaming.appmaster.AppMaster.ExecutorBrief @@ -32,10 +32,10 @@ case class StreamAppMasterSummary( appId: Int, appName: String = null, actorPath: String = null, - clock: TimeStamp = 0L, + clock: MilliSeconds = 0L, status: ApplicationStatus = ApplicationStatus.ACTIVE, - startTime: TimeStamp = 0L, - uptime: TimeStamp = 0L, + startTime: MilliSeconds = 0L, + uptime: MilliSeconds = 0L, user: String = null, homeDirectory: String = "", logFile: String = "", http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala index 51c4de9..bae5c02 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala @@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.appmaster import akka.actor._ import akka.pattern.ask -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge import org.apache.gearpump.streaming.AppMasterToExecutor._ import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterTask, UnRegisterTask} @@ -86,11 +86,11 @@ private[appmaster] class TaskManager( dagManager ! WatchChange(watcher = self) executorManager ! SetTaskManager(self) - private def getStartClock: Future[TimeStamp] = { + private def getStartClock: Future[MilliSeconds] = { (clockService ? GetStartClock).asInstanceOf[Future[StartClock]].map(_.clock) } - private var startClock: Future[TimeStamp] = getStartClock + private var startClock: Future[MilliSeconds] = getStartClock def receive: Receive = applicationReady(DagReadyState.empty) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala index 25a0929..8d3ffb3 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala @@ -17,8 +17,6 @@ */ package org.apache.gearpump.streaming.dsl.api.functions -import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction - object FilterFunction { def apply[T](fn: T => Boolean): FilterFunction[T] = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala index 9ff44a8..1525d6e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala @@ -18,8 +18,6 @@ package org.apache.gearpump.streaming.dsl.api.functions -import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction - /** * Combines input into an accumulator. * http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala index a4fdca6..7880c2f 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala @@ -17,8 +17,6 @@ */ package org.apache.gearpump.streaming.dsl.api.functions -import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction - object MapFunction { def apply[T, R](fn: T => R): MapFunction[T, R] = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala new file mode 100644 index 0000000..b90ba28 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.api.functions + +/** + * Superclass for all user defined function interfaces. + * This ensures all functions are serializable and provides common methods + * like setup and teardown. Users should not extend this class directly + * but subclasses like [[org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction]]. + */ +abstract class SerializableFunction extends java.io.Serializable { + + def setup(): Unit = {} + + def teardown(): Unit = {} + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala index 11e2416..adad878 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala @@ -17,7 +17,7 @@ */ package org.apache.gearpump.streaming.dsl.javaapi.functions -import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction +import org.apache.gearpump.streaming.dsl.api.functions.SerializableFunction /** * Transforms one input into zero or more outputs of possibly different types. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala new file mode 100644 index 0000000..6d43f16 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming + + +// scalastyle:off line.size.limit +/** + * + * The architecture of Gearpump Streaming DSL consists of several layers: + * + * * User facing [[org.apache.gearpump.streaming.dsl.scalaapi.Stream]] DSL. Stream is created by [[org.apache.gearpump.streaming.dsl.scalaapi.StreamApp]] + * from input source like Kafka or by applying high level operations (e.g. flatMap, window, groupBy) to user defined functions(UDFs). UDFs are subclasses + * of [[org.apache.gearpump.streaming.dsl.api.functions.SerializableFunction]], represented by [[org.apache.gearpump.streaming.dsl.plan.Op]] + * in the underlying [[org.apache.gearpump.util.Graph]]. + * * [[org.apache.gearpump.streaming.dsl.plan.Planner]], responsible for interpreting the Op Graph, optimizing it and building a low level Graph of + * [[org.apache.gearpump.streaming.Processor]]. Finally, it creates a runnable Graph of [[org.apache.gearpump.streaming.task.Task]]. + * * The execution layer is usually composed of the following four tasks. + * + * * [[org.apache.gearpump.streaming.source.DataSourceTask]] for [[org.apache.gearpump.streaming.source.DataSource]] to ingest data into Gearpump + * * [[org.apache.gearpump.streaming.sink.DataSinkTask]] for [[org.apache.gearpump.streaming.sink.DataSink]] to write data out. + * * [[org.apache.gearpump.streaming.dsl.task.GroupByTask]] to execute Ops followed by [[org.apache.gearpump.streaming.dsl.plan.GroupByOp]] + * * [[org.apache.gearpump.streaming.dsl.task.TransformTask]] to execute all other Ops. + * + * All but [[org.apache.gearpump.streaming.sink.DataSinkTask]] delegates execution to [[org.apache.gearpump.streaming.dsl.window.impl.WindowRunner]], which internally + * runs a chain of [[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]] grouped by windows. Window assignments are either explicitly defined with + * [[org.apache.gearpump.streaming.dsl.window.api.Windows]] API or implicitly in [[org.apache.gearpump.streaming.dsl.window.api.GlobalWindows]]. UDFs are eventually + * executed by [[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]]. + * + */ +// scalastyle:on line.size.limit +package object dsl { + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala index 2a45a8f..c37ced6 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala @@ -35,16 +35,29 @@ import scala.reflect.ClassTag object Op { + /** + * Concatenates two descriptions with "." or returns one if the other is empty. + */ def concatenate(desc1: String, desc2: String): String = { if (desc1 == null || desc1.isEmpty) desc2 else if (desc2 == null || desc2.isEmpty) desc1 else desc1 + "." + desc2 } + /** + * Concatenates two configs according to the following rules + * 1. The first config cannot be null. + * 2. The first config is returned if the second config is null + * 3. The second config takes precedence for overlapping config keys + */ def concatenate(config1: UserConfig, config2: UserConfig): UserConfig = { config1.withConfig(config2) } + /** + * This adds a [[org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner]] in + * [[GlobalWindows]] if a targeting [[Task]] has no executable UDF. + */ def withGlobalWindowsDummyRunner(op: Op, userConfig: UserConfig, processor: Processor[_ <: Task])(implicit system: ActorSystem): Processor[_ <: Task] = { if (userConfig.getValue(Constants.GEARPUMP_STREAMING_OPERATOR).isEmpty) { @@ -59,22 +72,37 @@ object Op { } /** - * This is a vertex on the logical plan. + * This is a vertex on the logical Graph, representing user defined functions in + * [[org.apache.gearpump.streaming.dsl.scalaapi.Stream]] DSL. */ sealed trait Op { + /** + * This comes from user function description and is used to display it on front end. + */ def description: String + /** + * This will ship user function to [[org.apache.gearpump.streaming.task.Task]] to be executed. + */ def userConfig: UserConfig + /** + * This creates a new Op by merging their user functions, user configs and descriptions. + */ def chain(op: Op)(implicit system: ActorSystem): Op + /** + * This creates a Processor after chaining. + */ def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] } /** - * This represents a low level Processor. + * This represents a low level Processor. It is deprecated since it + * doesn't work with other Ops. */ +@deprecated case class ProcessorOp[T <: Task]( processor: Class[T], parallelism: Int, @@ -99,7 +127,8 @@ case class ProcessorOp[T <: Task]( } /** - * This represents a DataSource. + * This represents a DataSource and creates a + * [[org.apache.gearpump.streaming.source.DataSourceTask]] */ case class DataSourceOp( dataSource: DataSource, @@ -142,7 +171,7 @@ case class DataSourceOp( } /** - * This represents a DataSink. + * This represents a DataSink and creates a [[org.apache.gearpump.streaming.sink.DataSinkTask]]. */ case class DataSinkOp( dataSink: DataSink, @@ -163,7 +192,7 @@ case class DataSinkOp( /** * This represents operations that can be chained together * (e.g. flatMap, map, filter, reduce) and further chained - * to another Op to be used + * to another Op to be executed */ case class TransformOp[IN, OUT]( fn: FunctionRunner[IN, OUT], @@ -201,61 +230,7 @@ case class TransformOp[IN, OUT]( } } -/** - * This is an intermediate operation, produced by chaining WindowOp and TransformOp. - * Usually, it will be chained to a DataSourceOp, GroupByOp or MergeOp. - * Otherwise, it will be translated to a Processor of TransformTask. - */ -case class WindowTransformOp[IN, OUT]( - windowRunner: WindowRunner[IN, OUT], - description: String, - userConfig: UserConfig) extends Op { - - override def chain(other: Op)(implicit system: ActorSystem): Op = { - other match { - case op: WindowTransformOp[OUT, _] => - WindowTransformOp( - WindowRunnerAT(windowRunner, op.windowRunner), - Op.concatenate(description, op.description), - Op.concatenate(userConfig, op.userConfig) - ) - case _ => - throw new OpChainException(this, other) - } - } - - override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { - // TODO: this should be chained to DataSourceOp / GroupByOp / MergeOp - Processor[TransformTask[Any, Any]](1, description, userConfig.withValue( - Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner)) - } -} - -/** - * This is an intermediate operation, produced by chaining TransformOp and WindowOp. - * It will later be chained to a WindowOp, which results in two WindowTransformOps. - * Finally, they will be chained to a single WindowTransformOp. - */ -case class TransformWindowTransformOp[IN, MIDDLE, OUT]( - transformOp: TransformOp[IN, MIDDLE], - windowTransformOp: WindowTransformOp[MIDDLE, OUT]) extends Op { - - override def description: String = { - throw new UnsupportedOperationException(s"description is not supported on $this") - } - - override def userConfig: UserConfig = { - throw new UnsupportedOperationException(s"userConfig is not supported on $this") - } - - override def chain(op: Op)(implicit system: ActorSystem): Op = { - throw new UnsupportedOperationException(s"chain is not supported on $this") - } - override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { - WindowOp(GlobalWindows()).chain(this).toProcessor - } -} /** * This represents a window aggregation, together with a following TransformOp @@ -290,7 +265,14 @@ case class WindowOp( } /** - * This represents a Processor with groupBy and window aggregation + * This represents an operation with groupBy followed by window aggregation. + * + * It can only be chained with [[WindowTransformOp]] to be executed in + * [[org.apache.gearpump.streaming.dsl.task.GroupByTask]]. + * However, it's possible a window function has no following aggregations. In that case, + * we manually tail a [[WindowOp]] with [[TransformOp]] of + * [[org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner]] to create a + * [[WindowTransformOp]]. */ case class GroupByOp[IN, GROUP] private( groupBy: IN => GROUP, @@ -325,7 +307,14 @@ case class GroupByOp[IN, GROUP] private( } /** - * This represents a Processor transforming merged streams + * This represents an operation with merge followed by window aggregation. + * + * It can only be chained with [[WindowTransformOp]] to be executed in + * [[org.apache.gearpump.streaming.dsl.task.TransformTask]]. + * However, it's possible a merge function has no following aggregations. In that case, + * we manually tail a [[WindowOp]] with [[TransformOp]] of + * [[org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner]] to create a + * [[WindowTransformOp]]. */ case class MergeOp( parallelism: Int = 1, @@ -357,7 +346,65 @@ case class MergeOp( } /** - * This is an edge on the logical plan. + * This is an intermediate operation, produced by chaining [[WindowOp]] and [[TransformOp]]. + * Usually, it will be chained to a [[DataSourceOp]], [[GroupByOp]] or [[MergeOp]]. Nonetheless, + * Op with more than 1 outgoing edge or incoming edge cannot be chained. In that case, + * it will be translated to a [[org.apache.gearpump.streaming.dsl.task.TransformTask]]. + */ +private case class WindowTransformOp[IN, OUT]( + windowRunner: WindowRunner[IN, OUT], + description: String, + userConfig: UserConfig) extends Op { + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + other match { + case op: WindowTransformOp[OUT, _] => + WindowTransformOp( + WindowRunnerAT(windowRunner, op.windowRunner), + Op.concatenate(description, op.description), + Op.concatenate(userConfig, op.userConfig) + ) + case _ => + throw new OpChainException(this, other) + } + } + + override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + // TODO: this should be chained to DataSourceOp / GroupByOp / MergeOp + Processor[TransformTask[Any, Any]](1, description, userConfig.withValue( + Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner)) + } +} + +/** + * This is an intermediate operation, produced by chaining [[TransformOp]] and + * [[WindowTransformOp]]. It will later be chained to a [[WindowOp]], which results in + * two [[WindowTransformOp]]s. Finally, they will be chained to a single WindowTransformOp. + */ +private case class TransformWindowTransformOp[IN, MIDDLE, OUT]( + transformOp: TransformOp[IN, MIDDLE], + windowTransformOp: WindowTransformOp[MIDDLE, OUT]) extends Op { + + override def description: String = { + throw new UnsupportedOperationException(s"description is not supported on $this") + } + + override def userConfig: UserConfig = { + throw new UnsupportedOperationException(s"userConfig is not supported on $this") + } + + override def chain(op: Op)(implicit system: ActorSystem): Op = { + throw new UnsupportedOperationException(s"chain is not supported on $this") + } + + override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + WindowOp(GlobalWindows()).chain(this).toProcessor + } +} + +/** + * This is an edge on the logical plan. It defines whether data should be transported locally + * or shuffled remotely between [[Op]]. */ trait OpEdge http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala index b1b39c9..04b5337 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala @@ -24,11 +24,26 @@ import org.apache.gearpump.streaming.Processor import org.apache.gearpump.streaming.task.Task import org.apache.gearpump.util.Graph +/** + * This class is responsible for turning the high level + * [[org.apache.gearpump.streaming.dsl.scalaapi.Stream]] DSL into low level + * [[org.apache.gearpump.streaming.Processor]] API. + */ class Planner { /** - * Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of the low - * level Graph API. + * This method interprets a Graph of [[Op]] and creates a Graph of + * [[org.apache.gearpump.streaming.Processor]]. + * + * It firstly reversely traverses the Graph from a leaf Op and merges it with + * its downstream Op according to the following rules. + * + * 1. The Op has only one outgoing edge and the downstream Op has only one incoming edge + * 2. Neither Op is [[ProcessorOp]] + * 3. The edge is [[Direct]] + * + * Finally the vertices of the optimized Graph are translated to Processors + * and the edges to Partitioners. */ def plan(dag: Graph[Op, OpEdge]) (implicit system: ActorSystem): Graph[Processor[_ <: Task], _ <: Partitioner] = { @@ -43,6 +58,7 @@ class Planner { case _ => new HashPartitioner } case Direct => + // FIXME: This is never used new CoLocationPartitioner } }.mapVertex(_.toProcessor) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala index ef2753e..d0c733e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala @@ -210,6 +210,7 @@ class Stream[T]( * @param parallelism parallelism level * @return new stream after processing with type [R] */ + @deprecated def process[R]( processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty, description: String = "process"): Stream[R] = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala index bce8c0c..17d77bc 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala @@ -62,6 +62,17 @@ class StreamApp( val dag = planner.plan(graph) StreamApplication(name, dag, userConfig) } + + def source[T](dataSource: DataSource, parallelism: Int = 1, + conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = { + val sourceOp = DataSourceOp(dataSource, parallelism, description, conf) + graph.addVertex(sourceOp) + new Stream[T](graph, sourceOp) + } + + def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = { + this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description) + } } object StreamApp { @@ -73,20 +84,6 @@ object StreamApp { implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication = { streamApp.plan() } - - implicit class Source(app: StreamApp) extends java.io.Serializable { - - def source[T](dataSource: DataSource, parallelism: Int = 1, - conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = { - implicit val sourceOp = DataSourceOp(dataSource, parallelism, description, conf) - app.graph.addVertex(sourceOp) - new Stream[T](app.graph, sourceOp) - } - - def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = { - this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description) - } - } } /** A test message source which generated message sequence repeatedly. */ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala index 252b5bd..2d26df6 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala @@ -17,7 +17,7 @@ */ package org.apache.gearpump.streaming.dsl.scalaapi.functions -import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction} +import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, SerializableFunction} import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction} import scala.collection.JavaConverters._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala deleted file mode 100644 index ab88bf1..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.dsl.scalaapi.functions - -/** - * Superclass for all user defined function interfaces. - * This ensures all functions are serializable and provides common methods - * like setup and teardown. Users should not extend this class directly - * but subclasses like [[FlatMapFunction]]. - */ -abstract class SerializableFunction extends java.io.Serializable { - - def setup(): Unit = {} - - def teardown(): Unit = {} - -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala index a4524a8..46b8e92 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala @@ -17,8 +17,18 @@ */ package org.apache.gearpump.streaming.dsl.window.api + +/** + * Determines relationship between multiple results for the same window. + */ sealed trait AccumulationMode +/** + * Window results are accumulated. + */ case object Accumulating extends AccumulationMode +/** + * Window results are independent. + */ case object Discarding extends AccumulationMode http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala index 02d52a0..b9a8695 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala @@ -17,7 +17,16 @@ */ package org.apache.gearpump.streaming.dsl.window.api +/** + * Determines when window results are emitted. + * For now, [[EventTimeTrigger]] is used for all applications. + */ +// TODO: Make this a public API sealed trait Trigger +/** + * Triggers emitting when watermark past the end of window on event time. + */ +// FIXME: This is no more than a tag now and the logic is hard corded in WindowRunner case object EventTimeTrigger extends Trigger http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala index a2f51c7..4db02e7 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala @@ -19,7 +19,8 @@ package org.apache.gearpump.streaming.dsl.window.api import java.time.{Duration, Instant} -import org.apache.gearpump.{MIN_TIME_MILLIS, MAX_TIME_MILLIS, TimeStamp} +import org.apache.gearpump.Time +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.dsl.window.impl.Window import scala.collection.mutable.ArrayBuffer @@ -32,8 +33,14 @@ object WindowFunction { } } +/** + * Determines how elements are assigned to windows for calculation. + */ trait WindowFunction { + /** + * Assigns elements into windows. + */ def apply[T](context: WindowFunction.Context[T]): Array[Window] def isNonMerging: Boolean @@ -46,10 +53,13 @@ abstract class NonMergingWindowFunction extends WindowFunction { object GlobalWindowFunction { - val globalWindow = Array(Window(Instant.ofEpochMilli(MIN_TIME_MILLIS), - Instant.ofEpochMilli(MAX_TIME_MILLIS))) + val globalWindow = Array(Window(Instant.ofEpochMilli(Time.MIN_TIME_MILLIS), + Instant.ofEpochMilli(Time.MAX_TIME_MILLIS))) } +/** + * All elements are assigned to the same global window for calculation. + */ case class GlobalWindowFunction() extends NonMergingWindowFunction { override def apply[T](context: WindowFunction.Context[T]): Array[Window] = { @@ -57,6 +67,12 @@ case class GlobalWindowFunction() extends NonMergingWindowFunction { } } +/** + * Elements are assigned to non-merging sliding windows for calculation. + * + * @param size window size + * @param step window step to slide forward + */ case class SlidingWindowFunction(size: Duration, step: Duration) extends NonMergingWindowFunction { @@ -80,11 +96,17 @@ case class SlidingWindowFunction(size: Duration, step: Duration) windows.toArray } - private def lastStartFor(timestamp: TimeStamp, windowStep: Long): TimeStamp = { + private def lastStartFor(timestamp: MilliSeconds, windowStep: Long): MilliSeconds = { timestamp - (timestamp + windowStep) % windowStep } } +/** + * Elements are assigned to merging windows for calculation. Windows are merged + * if their distance is within the defined gap. + * + * @param gap session gap + */ case class SessionWindowFunction(gap: Duration) extends WindowFunction { override def apply[T](context: WindowFunction.Context[T]): Array[Window] = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala index d53bc96..e15b5c4 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala @@ -20,11 +20,14 @@ package org.apache.gearpump.streaming.dsl.window.api import java.time.Duration /** - * Defines how to apply window functions. + * User facing Window DSL. + * Defines how to apply [[WindowFunction]], [[Trigger]] + * and [[AccumulationMode]]. * * @param windowFn how to divide windows * @param trigger when to trigger window result - * @param accumulationMode whether to accumulate results across windows + * @param accumulationMode whether to accumulate window results + * @param description window description */ case class Windows( windowFn: WindowFunction, @@ -47,6 +50,11 @@ case class Windows( object GlobalWindows { + /** + * Defines a [[GlobalWindowFunction]]. + * + * @return a Window definition + */ def apply(): Windows = { Windows(GlobalWindowFunction(), description = "globalWindows") } @@ -55,7 +63,7 @@ object GlobalWindows { object FixedWindows { /** - * Defines a FixedWindow. + * Defines a non-overlapping [[SlidingWindowFunction]]. * * @param size window size * @return a Window definition @@ -68,7 +76,7 @@ object FixedWindows { object SlidingWindows { /** - * Defines a SlidingWindow. + * Defines a overlapping [[SlidingWindowFunction]]. * * @param size window size * @param step window step to slide forward @@ -82,7 +90,7 @@ object SlidingWindows { object SessionWindows { /** - * Defines a SessionWindow. + * Defines a [[SessionWindowFunction]]. * * @param gap session gap * @return a Window definition http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala deleted file mode 100644 index e978983..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.dsl.window.impl - -import org.apache.gearpump.Message -import org.apache.gearpump.streaming.dsl.window.api.Trigger - -trait ReduceFnRunner { - - def process(message: Message): Unit - - def onTrigger(trigger: Trigger): Unit - -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala index 870c334..d6d08c9 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala @@ -19,22 +19,16 @@ package org.apache.gearpump.streaming.dsl.window.impl import java.time.Instant -import akka.actor.ActorSystem -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.Processor -import org.apache.gearpump.{Message, TimeStamp} -import org.apache.gearpump.streaming.dsl.window.api._ -import org.apache.gearpump.streaming.task.Task +import org.apache.gearpump.Time.MilliSeconds object Window { - def ofEpochMilli(startTime: TimeStamp, endTime: TimeStamp): Window = { + def ofEpochMilli(startTime: MilliSeconds, endTime: MilliSeconds): Window = { Window(Instant.ofEpochMilli(startTime), Instant.ofEpochMilli(endTime)) } } /** - * A window unit including startTime and excluding endTime. + * A window unit from startTime(including) to endTime(excluding). */ case class Window(startTime: Instant, endTime: Instant) extends Comparable[Window] { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala index 17a9525..ee3c067 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala @@ -31,11 +31,23 @@ import org.apache.gearpump.streaming.task.TaskUtil import scala.collection.mutable.ArrayBuffer +/** + * Inputs for [[WindowRunner]]. + */ case class TimestampedValue[T](value: T, timestamp: Instant) +/** + * Outputs triggered by [[WindowRunner]] + */ case class TriggeredOutputs[T](outputs: TraversableOnce[TimestampedValue[T]], watermark: Instant) +/** + * This is responsible for executing window calculation. + * 1. Groups elements into windows as defined by window function + * 2. Applies window calculation to each group + * 3. Emits results on triggering + */ trait WindowRunner[IN, OUT] extends java.io.Serializable { def process(timestampedValue: TimestampedValue[IN]): Unit @@ -43,6 +55,10 @@ trait WindowRunner[IN, OUT] extends java.io.Serializable { def trigger(time: Instant): TriggeredOutputs[OUT] } +/** + * A composite WindowRunner that first executes its left child and feeds results + * into result child. + */ case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE], right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] { @@ -57,6 +73,9 @@ case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE], } } +/** + * Default implementation for [[WindowRunner]]. + */ class DefaultWindowRunner[IN, OUT]( windows: Windows, fnRunner: FunctionRunner[IN, OUT]) @@ -137,11 +156,15 @@ class DefaultWindowRunner[IN, OUT]( } onTrigger(outputs, newWmk) } else { - // minimum of end of last triggered window and start of first un-triggered window + // The output watermark is the minimum of end of last triggered window + // and start of first un-triggered window TriggeredOutputs(outputs, TaskUtil.min(wmk, firstWin.startTime)) } } else { + // All windows have been triggered. if (time == Watermark.MAX) { + // This means there will be no more inputs + // so it's safe to advance to the maximum watermark. TriggeredOutputs(outputs, Watermark.MAX) } else { TriggeredOutputs(outputs, wmk) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala index 8f8b7ab..058d36b 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala @@ -22,7 +22,7 @@ import java.util import com.google.common.collect.Iterators import com.typesafe.config.Config -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.ClientToMaster.ReadOption import org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem import org.apache.gearpump.metrics.Metrics.{Histogram, Meter} @@ -64,7 +64,7 @@ class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends Met } def aggregate( - readOption: ReadOption.ReadOption, inputs: Iterator[HistoryMetricsItem], now: TimeStamp) + readOption: ReadOption.ReadOption, inputs: Iterator[HistoryMetricsItem], now: MilliSeconds) : List[HistoryMetricsItem] = { val (start, end, interval) = getTimeRange(readOption, now) val timeSlotsCount = ((end - start - 1) / interval + 1).toInt @@ -103,8 +103,8 @@ class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends Met } // Returns (start, end, interval) - private def getTimeRange(readOption: ReadOption.ReadOption, now: TimeStamp) - : (TimeStamp, TimeStamp, TimeStamp) = { + private def getTimeRange(readOption: ReadOption.ReadOption, now: MilliSeconds) + : (MilliSeconds, MilliSeconds, MilliSeconds) = { readOption match { case ReadOption.ReadRecent => val end = now @@ -229,7 +229,7 @@ object ProcessorAggregator { var p99: Double = 0 var p999: Double = 0 - var startTime: TimeStamp = Long.MaxValue + var startTime: MilliSeconds = Long.MaxValue override def aggregate(item: HistoryMetricsItem): Unit = { val input = item.value.asInstanceOf[Histogram] @@ -263,7 +263,7 @@ object ProcessorAggregator { var m1: Double = 0 var rateUnit: String = null - var startTime: TimeStamp = Long.MaxValue + var startTime: MilliSeconds = Long.MaxValue override def aggregate(item: HistoryMetricsItem): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala index 0db44f2..932c750 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala @@ -52,4 +52,8 @@ class DataSinkTask private[sink](context: TaskContext, conf: UserConfig, sink: D LOG.info("closing data sink...") sink.close() } + + override def onWatermarkProgress(watermark: Instant): Unit = { + context.updateWatermark(watermark) + } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala index 14abff8..607af85 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala @@ -19,7 +19,7 @@ package org.apache.gearpump.streaming.source import java.time.Instant -import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, Message} +import org.apache.gearpump.{Message, Time} /** * message used by source task to report source watermark. @@ -28,9 +28,14 @@ case class Watermark(instant: Instant) { def toMessage: Message = Message("watermark", instant) } +/** + * All input data with event times less than watermark have been observed + */ object Watermark { - val MAX: Instant = Instant.ofEpochMilli(MAX_TIME_MILLIS + 1) + // all input data have been observed + val MAX: Instant = Instant.ofEpochMilli(Time.MAX_TIME_MILLIS + 1) - val MIN: Instant = Instant.ofEpochMilli(MIN_TIME_MILLIS) + // no input data have been observed + val MIN: Instant = Instant.ofEpochMilli(Time.MIN_TIME_MILLIS) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala index 0e2f83a..0118c07 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.streaming.state.api -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds /** * MonoidState uses Algebird Monoid to aggregate state @@ -37,11 +37,11 @@ abstract class MonoidState[T](monoid: Monoid[T]) extends PersistentState[T] { override def get: Option[T] = Option(monoid.plus(left, right)) - override def setNextCheckpointTime(nextCheckpointTime: TimeStamp): Unit = { + override def setNextCheckpointTime(nextCheckpointTime: MilliSeconds): Unit = { checkpointTime = nextCheckpointTime } - protected def updateState(timestamp: TimeStamp, t: T): Unit = { + protected def updateState(timestamp: MilliSeconds, t: T): Unit = { if (timestamp < checkpointTime) { left = monoid.plus(left, t) } else { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala index 906d331..39b17c9 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.streaming.state.api -import org.apache.gearpump._ +import org.apache.gearpump.Time.MilliSeconds /** * PersistentState is part of the transaction API @@ -33,19 +33,19 @@ trait PersistentState[T] { * Recovers state to a previous checkpoint * usually invoked by the framework */ - def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit + def recover(timestamp: MilliSeconds, bytes: Array[Byte]): Unit /** * Updates state on a new message * this is invoked by user */ - def update(timestamp: TimeStamp, t: T): Unit + def update(timestamp: MilliSeconds, t: T): Unit /** * Sets next checkpoint time * should be invoked by the framework */ - def setNextCheckpointTime(timeStamp: TimeStamp): Unit + def setNextCheckpointTime(timeStamp: MilliSeconds): Unit /** * Gets a binary snapshot of state http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala index d3ffaa9..3a3b0a7 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala @@ -20,12 +20,13 @@ package org.apache.gearpump.streaming.state.api import java.time.Instant +import org.apache.gearpump.Message +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig} import org.apache.gearpump.streaming.task.{Task, TaskContext, UpdateCheckpointClock} import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory import org.apache.gearpump.util.LogUtil -import org.apache.gearpump.{Message, TimeStamp} object PersistentTask { val LOG = LogUtil.getLogger(getClass) @@ -41,8 +42,6 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { import taskContext._ - import org.apache.gearpump.streaming.state.api.PersistentTask._ - val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory]( PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get val checkpointStore = checkpointStoreFactory.getCheckpointStore( @@ -99,7 +98,7 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig) checkpointManager.close() } - private def reportCheckpointClock(timestamp: TimeStamp): Unit = { + private def reportCheckpointClock(timestamp: MilliSeconds): Unit = { appMaster ! UpdateCheckpointClock(taskContext.taskId, timestamp) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala index 82b7952..7d9e92a 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.streaming.state.impl -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.transaction.api.CheckpointStore /** Manage physical checkpoints to persitent storage like HDFS */ @@ -28,11 +28,11 @@ class CheckpointManager(checkpointInterval: Long, private var maxMessageTime: Long = 0L private var checkpointTime: Option[Long] = None - def recover(timestamp: TimeStamp): Option[Array[Byte]] = { + def recover(timestamp: MilliSeconds): Option[Array[Byte]] = { checkpointStore.recover(timestamp) } - def checkpoint(timestamp: TimeStamp, checkpoint: Array[Byte]): Option[TimeStamp] = { + def checkpoint(timestamp: MilliSeconds, checkpoint: Array[Byte]): Option[MilliSeconds] = { checkpointStore.persist(timestamp, checkpoint) checkpointTime = checkpointTime.collect { case time if maxMessageTime > time => time + (1 + (maxMessageTime - time) / checkpointInterval) * checkpointInterval @@ -41,7 +41,7 @@ class CheckpointManager(checkpointInterval: Long, checkpointTime } - def update(messageTime: TimeStamp): Option[TimeStamp] = { + def update(messageTime: MilliSeconds): Option[MilliSeconds] = { maxMessageTime = Math.max(maxMessageTime, messageTime) if (checkpointTime.isEmpty) { checkpointTime = Some((1 + messageTime / checkpointInterval) * checkpointInterval) @@ -50,15 +50,15 @@ class CheckpointManager(checkpointInterval: Long, checkpointTime } - def shouldCheckpoint(upstreamMinClock: TimeStamp): Boolean = { + def shouldCheckpoint(upstreamMinClock: MilliSeconds): Boolean = { checkpointTime.exists(time => upstreamMinClock >= time) } - def getCheckpointTime: Option[TimeStamp] = checkpointTime + def getCheckpointTime: Option[MilliSeconds] = checkpointTime def close(): Unit = { checkpointStore.close() } - private[impl] def getMaxMessageTime: TimeStamp = maxMessageTime + private[impl] def getMaxMessageTime: MilliSeconds = maxMessageTime }