http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala index 32d8bb7..4a161c7 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala @@ -15,9 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest +package org.apache.gearpump.integrationtest -import io.gearpump.integrationtest.minicluster.MiniCluster +import org.apache.gearpump.integrationtest.minicluster.MiniCluster /** * Provides a min cluster of Gearpump, which contains one or more masters, and workers.
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala index 6e4a471..dabcc71 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala @@ -15,13 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest +package org.apache.gearpump.integrationtest import org.scalatest._ -import io.gearpump.cluster.MasterToAppMaster -import io.gearpump.cluster.MasterToAppMaster.AppMasterData -import io.gearpump.util.LogUtil +import org.apache.gearpump.cluster.MasterToAppMaster +import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData +import org.apache.gearpump.util.LogUtil /** * The abstract test spec http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala index 9c0c779..eabc684 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala @@ -15,10 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest.checklist +package org.apache.gearpump.integrationtest.checklist -import io.gearpump.cluster.MasterToAppMaster -import io.gearpump.integrationtest.TestSpecBase +import org.apache.gearpump.cluster.MasterToAppMaster +import org.apache.gearpump.integrationtest.TestSpecBase /** * The test spec checks the command-line usage http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala index 321b395..d8bdc1e 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala @@ -15,12 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest.checklist +package org.apache.gearpump.integrationtest.checklist import org.scalatest.TestData -import io.gearpump.integrationtest.kafka._ -import io.gearpump.integrationtest.{TestSpecBase, Util} +import org.apache.gearpump.integrationtest.kafka._ +import org.apache.gearpump.integrationtest.{TestSpecBase, Util} /** * The test spec checks the Kafka datasource connector @@ -58,7 +58,7 @@ class ConnectorKafkaSpec extends TestSpecBase { kafkaCluster.produceDataToKafka(sourceTopic, messageNum) // exercise - val args = Array("io.gearpump.streaming.examples.kafka.KafkaReadWrite", + val args = Array("org.apache.gearpump.streaming.examples.kafka.KafkaReadWrite", "-zookeeperConnect", kafkaCluster.getZookeeperConnectString, "-brokerList", kafkaCluster.getBrokerListConnectString, "-sourceTopic", sourceTopic, @@ -86,7 +86,7 @@ class ConnectorKafkaSpec extends TestSpecBase { producer.start() // exercise - val args = Array("io.gearpump.streaming.examples.kafka.KafkaReadWrite", + val args = Array("org.apache.gearpump.streaming.examples.kafka.KafkaReadWrite", "-zookeeperConnect", kafkaCluster.getZookeeperConnectString, "-brokerList", kafkaCluster.getBrokerListConnectString, "-sourceTopic", sourceTopic, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala index 588e7c7..56b33c1 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala @@ -15,18 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest.checklist +package org.apache.gearpump.integrationtest.checklist -import io.gearpump.integrationtest.{TestSpecBase, Util} -import io.gearpump.metrics.Metrics.Meter -import io.gearpump.streaming._ -import io.gearpump.streaming.appmaster.ProcessorSummary +import org.apache.gearpump.integrationtest.{TestSpecBase, Util} +import org.apache.gearpump.metrics.Metrics.Meter +import org.apache.gearpump.streaming._ +import org.apache.gearpump.streaming.appmaster.ProcessorSummary class DynamicDagSpec extends TestSpecBase { lazy val solJar = cluster.queryBuiltInExampleJars("sol-").head - val splitTaskClass = "io.gearpump.streaming.examples.wordcount.Split" - val sumTaskClass = "io.gearpump.streaming.examples.wordcount.Sum" + val splitTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Split" + val sumTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Sum" val solName = "sol" "dynamic dag" should { @@ -34,7 +34,7 @@ class DynamicDagSpec extends TestSpecBase { val partitioners = restClient.queryBuiltInPartitioners() partitioners.length should be > 0 partitioners.foreach(clazz => - clazz should startWith("io.gearpump.partitioner.") + clazz should startWith("org.apache.gearpump.partitioner.") ) } @@ -86,7 +86,7 @@ class DynamicDagSpec extends TestSpecBase { }, "new processor added") processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput") - val fakeTaskClass = "io.gearpump.streaming.examples.wordcount.Fake" + val fakeTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Fake" replaceProcessor(appId, laterProcessors.keySet.max, fakeTaskClass) Util.retryUntil(() => { val processorsAfterFailure = restClient.queryStreamingAppDetail(appId).processors http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala index 20d1b12..27e4665 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala @@ -15,13 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest.checklist +package org.apache.gearpump.integrationtest.checklist import org.apache.log4j.Logger -import io.gearpump.integrationtest.{Docker, TestSpecBase, Util} -import io.gearpump.streaming._ -import io.gearpump.streaming.appmaster.ProcessorSummary +import org.apache.gearpump.integrationtest.{Docker, TestSpecBase, Util} +import org.apache.gearpump.streaming._ +import org.apache.gearpump.streaming.appmaster.ProcessorSummary /** * The test spec will perform destructive operations to check the stability @@ -33,8 +33,8 @@ class ExampleSpec extends TestSpecBase { "distributed shell" should { "execute commands on machines where its executors are running" in { val distShellJar = cluster.queryBuiltInExampleJars("distributedshell-").head - val mainClass = "io.gearpump.examples.distributedshell.DistributedShell" - val clientClass = "io.gearpump.examples.distributedshell.DistributedShellClient" + val mainClass = "org.apache.gearpump.examples.distributedshell.DistributedShell" + val clientClass = "org.apache.gearpump.examples.distributedshell.DistributedShellClient" val appId = restClient.getNextAvailableAppId() val success = restClient.submitApp(distShellJar, cluster.getWorkerHosts.length, mainClass) success shouldBe true http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala index 3f70339..bb9982a 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala @@ -16,14 +16,14 @@ * limitations under the License. */ -package io.gearpump.integrationtest.checklist +package org.apache.gearpump.integrationtest.checklist import org.apache.log4j.Logger -import io.gearpump.integrationtest.hadoop.HadoopCluster._ -import io.gearpump.integrationtest.kafka.KafkaCluster._ -import io.gearpump.integrationtest.kafka.{ResultVerifier, SimpleKafkaReader} -import io.gearpump.integrationtest.{TestSpecBase, Util} +import org.apache.gearpump.integrationtest.hadoop.HadoopCluster._ +import org.apache.gearpump.integrationtest.kafka.KafkaCluster._ +import org.apache.gearpump.integrationtest.kafka.{ResultVerifier, SimpleKafkaReader} +import org.apache.gearpump.integrationtest.{TestSpecBase, Util} /** * Checks message delivery consistency, like at-least-once, and exactly-once. @@ -55,7 +55,7 @@ class MessageDeliverySpec extends TestSpecBase { withHadoopCluster { hadoopCluster => // exercise - val args = Array("io.gearpump.streaming.examples.state.MessageCountApp", + val args = Array("org.apache.gearpump.streaming.examples.state.MessageCountApp", "-defaultFS", hadoopCluster.getDefaultFS, "-zookeeperConnect", kafkaCluster.getZookeeperConnectString, "-brokerList", kafkaCluster.getBrokerListConnectString, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala index 3ce8a5d..2f5bb64 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala @@ -15,14 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest.checklist +package org.apache.gearpump.integrationtest.checklist import scala.concurrent.duration._ -import io.gearpump.cluster.MasterToAppMaster -import io.gearpump.cluster.master.MasterStatus -import io.gearpump.cluster.worker.{WorkerId, WorkerSummary} -import io.gearpump.integrationtest.{TestSpecBase, Util} +import org.apache.gearpump.cluster.MasterToAppMaster +import org.apache.gearpump.cluster.master.MasterStatus +import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary} +import org.apache.gearpump.integrationtest.{TestSpecBase, Util} /** * The test spec checks REST service usage http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala index b002f70..4b15055 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala @@ -15,14 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest.checklist +package org.apache.gearpump.integrationtest.checklist import scala.concurrent.duration.Duration -import io.gearpump.cluster.MasterToAppMaster -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.integrationtest.{TestSpecBase, Util} -import io.gearpump.util.{Constants, LogUtil} +import org.apache.gearpump.cluster.MasterToAppMaster +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.integrationtest.{TestSpecBase, Util} +import org.apache.gearpump.util.{Constants, LogUtil} /** * The test spec will perform destructive operations to check the stability. Operations http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala index 7d0a672..b327bf4 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala @@ -15,11 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest.checklist +package org.apache.gearpump.integrationtest.checklist -import io.gearpump.integrationtest.kafka.{KafkaCluster, MessageLossDetector, SimpleKafkaReader} -import io.gearpump.integrationtest.storm.StormClient -import io.gearpump.integrationtest.{TestSpecBase, Util} +import org.apache.gearpump.integrationtest.kafka.{KafkaCluster, MessageLossDetector, SimpleKafkaReader} +import org.apache.gearpump.integrationtest.storm.StormClient +import org.apache.gearpump.integrationtest.{TestSpecBase, Util} /** * The test spec checks the compatibility of running Storm applications @@ -129,9 +129,9 @@ class StormCompatibilitySpec extends TestSpecBase { val stormJar = getStormJar(stormVersion) val topologyName = getTopologyName("storm_kafka", stormVersion) val stormKafkaTopology = - s"io.gearpump.integrationtest.storm.Storm${stormVersion}KafkaTopology" + s"org.apache.gearpump.integrationtest.storm.Storm${stormVersion}KafkaTopology" - import io.gearpump.integrationtest.kafka.KafkaCluster._ + import org.apache.gearpump.integrationtest.kafka.KafkaCluster._ withKafkaCluster(cluster) { kafkaCluster => val sourcePartitionNum = 2 http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala index 4178e66..7942308 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala @@ -15,13 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest.suites +package org.apache.gearpump.integrationtest.suites import org.scalatest._ -import io.gearpump.integrationtest.MiniClusterProvider -import io.gearpump.integrationtest.checklist._ -import io.gearpump.integrationtest.minicluster.MiniCluster +import org.apache.gearpump.integrationtest.MiniClusterProvider +import org.apache.gearpump.integrationtest.checklist._ +import org.apache.gearpump.integrationtest.minicluster.MiniCluster /** * Launch a Gearpump cluster in standalone mode and run all test specs. To test a specific http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala index aee2681..f315ad3 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest +package org.apache.gearpump.integrationtest import org.apache.log4j.Logger http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala index 9045efe..25d7ee3 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest +package org.apache.gearpump.integrationtest import scala.collection.JavaConverters._ import scala.concurrent.ExecutionContext.Implicits.global http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala index 71429b2..7e7085d 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest +package org.apache.gearpump.integrationtest import scala.concurrent.duration._ import scala.util.{Failure, Success, Try} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala index 6d277f0..f836abd 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala @@ -16,11 +16,11 @@ * limitations under the License. */ -package io.gearpump.integrationtest.hadoop +package org.apache.gearpump.integrationtest.hadoop import org.apache.log4j.Logger -import io.gearpump.integrationtest.{Docker, Util} +import org.apache.gearpump.integrationtest.{Docker, Util} object HadoopCluster { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala index 862fc58..15ba084 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala @@ -15,12 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest.kafka +package org.apache.gearpump.integrationtest.kafka import org.apache.log4j.Logger -import io.gearpump.integrationtest.minicluster.MiniCluster -import io.gearpump.integrationtest.{Docker, Util} +import org.apache.gearpump.integrationtest.minicluster.MiniCluster +import org.apache.gearpump.integrationtest.{Docker, Util} object KafkaCluster { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala index 03bb9fb..1cf3125 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest.kafka +package org.apache.gearpump.integrationtest.kafka import java.util.Properties @@ -23,7 +23,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.log4j.Logger -import io.gearpump.streaming.serializer.ChillSerializer +import org.apache.gearpump.streaming.serializer.ChillSerializer class NumericalDataProducer(topic: String, bootstrapServers: String) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala index 98aceba..1f773d3 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest.kafka +package org.apache.gearpump.integrationtest.kafka import scala.collection.mutable http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala index 29289d9..392ca86 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest.kafka +package org.apache.gearpump.integrationtest.kafka import scala.util.{Failure, Success} @@ -23,7 +23,7 @@ import kafka.api.FetchRequestBuilder import kafka.consumer.SimpleConsumer import kafka.utils.Utils -import io.gearpump.streaming.serializer.ChillSerializer +import org.apache.gearpump.streaming.serializer.ChillSerializer class SimpleKafkaReader(verifier: ResultVerifier, topic: String, partition: Int = 0, host: String, port: Int) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala index e5c7cb7..73413da 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala @@ -15,11 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest.minicluster +package org.apache.gearpump.integrationtest.minicluster import scala.sys.process._ -import io.gearpump.integrationtest.Docker +import org.apache.gearpump.integrationtest.Docker /** * A helper to instantiate the base image for different usage. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala index 5eebd30..884a8d1 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala @@ -15,12 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest.minicluster +package org.apache.gearpump.integrationtest.minicluster import org.apache.log4j.Logger -import io.gearpump.cluster.MasterToAppMaster -import io.gearpump.integrationtest.Docker +import org.apache.gearpump.cluster.MasterToAppMaster +import org.apache.gearpump.integrationtest.Docker /** * A command-line client to operate a Gearpump cluster http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala index bfdecee..4d439e8 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala @@ -15,14 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest.minicluster +package org.apache.gearpump.integrationtest.minicluster import java.io.IOException import scala.collection.mutable.ListBuffer import org.apache.log4j.Logger -import io.gearpump.integrationtest.{Docker, Util} +import org.apache.gearpump.integrationtest.{Docker, Util} /** * This class is a test driver for end-to-end integration test. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala index 72b0f84..a2d8f99 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest.minicluster +package org.apache.gearpump.integrationtest.minicluster import scala.reflect.ClassTag @@ -24,23 +24,23 @@ import org.apache.log4j.Logger import upickle.Js import upickle.default._ -import io.gearpump.cluster.AppMasterToMaster.MasterData -import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData} -import io.gearpump.cluster.MasterToClient.HistoryMetrics -import io.gearpump.cluster.master.MasterSummary -import io.gearpump.cluster.worker.{WorkerId, WorkerSummary} -import io.gearpump.cluster.{AppJar, MasterToAppMaster} -import io.gearpump.integrationtest.{Docker, Util} -import io.gearpump.services.AppMasterService.Status -import io.gearpump.services.MasterService.{AppSubmissionResult, BuiltinPartitioners} +import org.apache.gearpump.cluster.AppMasterToMaster.MasterData +import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData} +import org.apache.gearpump.cluster.MasterToClient.HistoryMetrics +import org.apache.gearpump.cluster.master.MasterSummary +import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary} +import org.apache.gearpump.cluster.{AppJar, MasterToAppMaster} +import org.apache.gearpump.integrationtest.{Docker, Util} +import org.apache.gearpump.services.AppMasterService.Status +import org.apache.gearpump.services.MasterService.{AppSubmissionResult, BuiltinPartitioners} // NOTE: This cannot be removed!!! -import io.gearpump.services.util.UpickleUtil._ -import io.gearpump.streaming.ProcessorDescription -import io.gearpump.streaming.appmaster.AppMaster.ExecutorBrief -import io.gearpump.streaming.appmaster.DagManager.{DAGOperationResult, ReplaceProcessor} -import io.gearpump.streaming.appmaster.StreamAppMasterSummary -import io.gearpump.streaming.executor.Executor.ExecutorSummary -import io.gearpump.util.{Constants, Graph} +import org.apache.gearpump.services.util.UpickleUtil._ +import org.apache.gearpump.streaming.ProcessorDescription +import org.apache.gearpump.streaming.appmaster.AppMaster.ExecutorBrief +import org.apache.gearpump.streaming.appmaster.DagManager.{DAGOperationResult, ReplaceProcessor} +import org.apache.gearpump.streaming.appmaster.StreamAppMasterSummary +import org.apache.gearpump.streaming.executor.Executor.ExecutorSummary +import org.apache.gearpump.util.{Constants, Graph} /** * A REST client to operate a Gearpump cluster http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala index ea9ca65..79adfc4 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala @@ -16,14 +16,14 @@ * limitations under the License. */ -package io.gearpump.integrationtest.storm +package org.apache.gearpump.integrationtest.storm import scala.util.Random import backtype.storm.utils.{DRPCClient, Utils} -import io.gearpump.integrationtest.minicluster.{BaseContainer, MiniCluster, RestClient} -import io.gearpump.integrationtest.{Docker, Util} +import org.apache.gearpump.integrationtest.minicluster.{BaseContainer, MiniCluster, RestClient} +import org.apache.gearpump.integrationtest.{Docker, Util} class StormClient(cluster: MiniCluster, restClient: RestClient) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala ---------------------------------------------------------------------- diff --git a/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala b/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala index 6e4f257..67a2491 100644 --- a/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala +++ b/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest.storm +package org.apache.gearpump.integrationtest.storm import backtype.storm.topology.base.BaseBasicBolt import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Storm010KafkaTopology.scala ---------------------------------------------------------------------- diff --git a/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Storm010KafkaTopology.scala b/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Storm010KafkaTopology.scala index fa48112..a0d9a42 100644 --- a/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Storm010KafkaTopology.scala +++ b/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Storm010KafkaTopology.scala @@ -16,7 +16,7 @@ * limitations under the License. */ -package io.gearpump.integrationtest.storm +package org.apache.gearpump.integrationtest.storm import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap} @@ -25,7 +25,7 @@ import backtype.storm.{Config, StormSubmitter} import storm.kafka.bolt.KafkaBolt import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts} -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} /** * Tests Storm 0.10.x compatibility over Gearpump http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala ---------------------------------------------------------------------- diff --git a/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala b/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala index 6e4f257..67a2491 100644 --- a/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala +++ b/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest.storm +package org.apache.gearpump.integrationtest.storm import backtype.storm.topology.base.BaseBasicBolt import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Storm09KafkaTopology.scala ---------------------------------------------------------------------- diff --git a/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Storm09KafkaTopology.scala b/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Storm09KafkaTopology.scala index 918dc56..5b74d60 100644 --- a/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Storm09KafkaTopology.scala +++ b/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Storm09KafkaTopology.scala @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.integrationtest.storm +package org.apache.gearpump.integrationtest.storm import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap} @@ -24,7 +24,7 @@ import backtype.storm.{Config, StormSubmitter} import storm.kafka.bolt.KafkaBolt import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts} -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} /** * Tests Storm 0.9.x compatibility over Gearpump http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/project/Build.scala ---------------------------------------------------------------------- diff --git a/project/Build.scala b/project/Build.scala index b9e6f94..9a3e980 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -226,8 +226,7 @@ object Build extends sbt.Build { external_monoid, external_serializer, external_hbase, - external_hadoopfs, - streaming) + external_hadoopfs) lazy val gearpumpUnidocSetting = scalaJavaUnidocSettings ++ Seq( unidocProjectFilter in(ScalaUnidoc, unidoc) := projectsWithDoc, @@ -244,7 +243,9 @@ object Build extends sbt.Build { ) private def ignoreUndocumentedPackages(packages: Seq[Seq[File]]): Seq[Seq[File]] = { - packages.map(_.filterNot(_.getCanonicalPath.contains("akka"))) + packages + .map(_.filterNot(_.getName.contains("$"))) + .map(_.filterNot(_.getCanonicalPath.contains("akka"))) } lazy val root = Project( http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/project/BuildExample.scala ---------------------------------------------------------------------- diff --git a/project/BuildExample.scala b/project/BuildExample.scala index 3bd93c3..a4ae87b 100644 --- a/project/BuildExample.scala +++ b/project/BuildExample.scala @@ -36,7 +36,7 @@ object BuildExample extends sbt.Build { settings = commonSettings ++ noPublish ++ myAssemblySettings ++ Seq( mainClass in(Compile, packageBin) := - Some("io.gearpump.streaming.examples.wordcountjava.WordCount"), + Some("org.apache.gearpump.streaming.examples.wordcountjava.WordCount"), target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" / CrossVersion.binaryScalaVersion(scalaVersion.value) @@ -49,7 +49,7 @@ object BuildExample extends sbt.Build { settings = commonSettings ++ noPublish ++ myAssemblySettings ++ Seq( mainClass in(Compile, packageBin) := - Some("io.gearpump.streaming.examples.wordcount.WordCount"), + Some("org.apache.gearpump.streaming.examples.wordcount.WordCount"), target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" / CrossVersion.binaryScalaVersion(scalaVersion.value) @@ -61,7 +61,7 @@ object BuildExample extends sbt.Build { base = file("examples/streaming/sol"), settings = commonSettings ++ noPublish ++ myAssemblySettings ++ Seq( - mainClass in(Compile, packageBin) := Some("io.gearpump.streaming.examples.sol.SOL"), + mainClass in(Compile, packageBin) := Some("org.apache.gearpump.streaming.examples.sol.SOL"), target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" / CrossVersion.binaryScalaVersion(scalaVersion.value) @@ -74,7 +74,7 @@ object BuildExample extends sbt.Build { settings = commonSettings ++ noPublish ++ myAssemblySettings ++ Seq( mainClass in(Compile, packageBin) := - Some("io.gearpump.streaming.examples.complexdag.Dag"), + Some("org.apache.gearpump.streaming.examples.complexdag.Dag"), target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" / CrossVersion.binaryScalaVersion(scalaVersion.value) @@ -93,7 +93,7 @@ object BuildExample extends sbt.Build { "com.lihaoyi" %% "upickle" % upickleVersion ), mainClass in(Compile, packageBin) := - Some("io.gearpump.streaming.examples.transport.Transport"), + Some("org.apache.gearpump.streaming.examples.transport.Transport"), target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" / CrossVersion.binaryScalaVersion(scalaVersion.value) @@ -106,7 +106,7 @@ object BuildExample extends sbt.Build { settings = commonSettings ++ noPublish ++ myAssemblySettings ++ Seq( mainClass in(Compile, packageBin) := - Some("io.gearpump.examples.distributedshell.DistributedShell"), + Some("org.apache.gearpump.examples.distributedshell.DistributedShell"), target in assembly := baseDirectory.value.getParentFile / "target" / CrossVersion.binaryScalaVersion(scalaVersion.value) @@ -126,7 +126,7 @@ object BuildExample extends sbt.Build { "io.spray" %% "spray-routing-shapeless2" % sprayVersion ), mainClass in(Compile, packageBin) := - Some("io.gearpump.experiments.distributeservice.DistributeService"), + Some("org.apache.gearpump.experiments.distributeservice.DistributeService"), target in assembly := baseDirectory.value.getParentFile / "target" / CrossVersion.binaryScalaVersion(scalaVersion.value) @@ -150,7 +150,7 @@ object BuildExample extends sbt.Build { exclude("org.ow2.asm", "asm") ), mainClass in(Compile, packageBin) := - Some("io.gearpump.streaming.examples.fsio.SequenceFileIO"), + Some("org.apache.gearpump.streaming.examples.fsio.SequenceFileIO"), target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" / CrossVersion.binaryScalaVersion(scalaVersion.value) @@ -163,7 +163,7 @@ object BuildExample extends sbt.Build { settings = commonSettings ++ noPublish ++ myAssemblySettings ++ Seq( mainClass in(Compile, packageBin) := - Some("io.gearpump.streaming.examples.kafka.wordcount.KafkaWordCount"), + Some("org.apache.gearpump.streaming.examples.kafka.wordcount.KafkaWordCount"), target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" / CrossVersion.binaryScalaVersion(scalaVersion.value) @@ -186,7 +186,7 @@ object BuildExample extends sbt.Build { "io.spray" %% "spray-json" % sprayJsonVersion ), mainClass in(Compile, packageBin) := - Some("io.gearpump.streaming.examples.stock.main.Stock"), + Some("org.apache.gearpump.streaming.examples.stock.main.Stock"), target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" / CrossVersion.binaryScalaVersion(scalaVersion.value) @@ -210,7 +210,7 @@ object BuildExample extends sbt.Build { "org.apache.hadoop" % "hadoop-hdfs" % clouderaVersion ), mainClass in(Compile, packageBin) := - Some("io.gearpump.streaming.examples.state.MessageCountApp"), + Some("org.apache.gearpump.streaming.examples.state.MessageCountApp"), target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" / CrossVersion.binaryScalaVersion(scalaVersion.value) @@ -223,7 +223,7 @@ object BuildExample extends sbt.Build { base = file("examples/pagerank"), settings = commonSettings ++ noPublish ++ myAssemblySettings ++ Seq( mainClass in(Compile, packageBin) := - Some("io.gearpump.experiments.pagerank.example.PageRankExample"), + Some("org.apache.gearpump.experiments.pagerank.example.PageRankExample"), target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" / CrossVersion.binaryScalaVersion(scalaVersion.value) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/project/Pack.scala ---------------------------------------------------------------------- diff --git a/project/Pack.scala b/project/Pack.scala index 7339d1d..a675bdf 100644 --- a/project/Pack.scala +++ b/project/Pack.scala @@ -62,13 +62,13 @@ object Pack extends sbt.Build { packSettings ++ Seq( packMain := Map( - "gear" -> "io.gearpump.cluster.main.Gear", - "local" -> "io.gearpump.cluster.main.Local", - "master" -> "io.gearpump.cluster.main.Master", - "worker" -> "io.gearpump.cluster.main.Worker", - "services" -> "io.gearpump.services.main.Services", - "yarnclient" -> "io.gearpump.experiments.yarn.client.Client", - "storm" -> "io.gearpump.experiments.storm.StormRunner" + "gear" -> "org.apache.gearpump.cluster.main.Gear", + "local" -> "org.apache.gearpump.cluster.main.Local", + "master" -> "org.apache.gearpump.cluster.main.Master", + "worker" -> "org.apache.gearpump.cluster.main.Worker", + "services" -> "org.apache.gearpump.services.main.Services", + "yarnclient" -> "org.apache.gearpump.experiments.yarn.client.Client", + "storm" -> "org.apache.gearpump.experiments.storm.StormRunner" ), packJvmOpts := Map( "gear" -> Seq("-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}"), http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/README.md ---------------------------------------------------------------------- diff --git a/services/README.md b/services/README.md index 478fa27..ca35301 100644 --- a/services/README.md +++ b/services/README.md @@ -11,7 +11,7 @@ cd ~/gearpump sbt clean publishLocal assembly pack target/pack/bin/local target/pack/bin/services -target/pack/bin/gear app -jar ./examples/complexdag/target/scala-2.11/gepump-examples-complexdag_2.11-0.2.4-SNAPSHOT.jar io.gearpump.streaming.examples.complexdag.Dag +target/pack/bin/gear app -jar ./examples/complexdag/target/scala-2.11/gepump-examples-complexdag_2.11-0.2.4-SNAPSHOT.jar org.apache.gearpump.streaming.examples.complexdag.Dag ``` Launch your browser at http://localhost:8090 http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/dashboard/dashboard.js ---------------------------------------------------------------------- diff --git a/services/dashboard/dashboard.js b/services/dashboard/dashboard.js index 37af2bf..87ac50f 100644 --- a/services/dashboard/dashboard.js +++ b/services/dashboard/dashboard.js @@ -22,7 +22,7 @@ 'cfp.loadingBarInterceptor', 'ngFileUpload', 'dashing', - 'io.gearpump.models' + 'org.apache.gearpump.models' ]) // configure routes http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/dashboard/services/models/dag.js ---------------------------------------------------------------------- diff --git a/services/dashboard/services/models/dag.js b/services/dashboard/services/models/dag.js index bff12db..84d2492 100644 --- a/services/dashboard/services/models/dag.js +++ b/services/dashboard/services/models/dag.js @@ -3,7 +3,7 @@ * See accompanying LICENSE file. */ -angular.module('io.gearpump.models') +angular.module('org.apache.gearpump.models') .service('Dag', function () { 'use strict'; http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/dashboard/services/models/metrics.js ---------------------------------------------------------------------- diff --git a/services/dashboard/services/models/metrics.js b/services/dashboard/services/models/metrics.js index 00b2305..3a48f1f 100644 --- a/services/dashboard/services/models/metrics.js +++ b/services/dashboard/services/models/metrics.js @@ -3,7 +3,7 @@ * See accompanying LICENSE file. */ -angular.module('io.gearpump.models') +angular.module('org.apache.gearpump.models') /** TODO: to be absorbed as scalajs (#458) */ .factory('Metrics', [function () { @@ -54,11 +54,11 @@ angular.module('io.gearpump.models') /** automatically guess metric type and decode or return null */ $auto: function (data) { switch (data.value.$type) { - case 'io.gearpump.metrics.Metrics.Meter': + case 'org.apache.gearpump.metrics.Metrics.Meter': return decoder.meter(data); - case 'io.gearpump.metrics.Metrics.Histogram': + case 'org.apache.gearpump.metrics.Metrics.Histogram': return decoder.histogram(data); - case 'io.gearpump.metrics.Metrics.Gauge': + case 'org.apache.gearpump.metrics.Metrics.Gauge': return decoder.gauge(data); default: console.warn({message: 'Unknown metric type', type: data.value.$type}); http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/dashboard/services/models/metrics_provider.js ---------------------------------------------------------------------- diff --git a/services/dashboard/services/models/metrics_provider.js b/services/dashboard/services/models/metrics_provider.js index cddd1f1..57c6f7c 100644 --- a/services/dashboard/services/models/metrics_provider.js +++ b/services/dashboard/services/models/metrics_provider.js @@ -3,7 +3,7 @@ * See accompanying LICENSE file. */ -angular.module('io.gearpump.models') +angular.module('org.apache.gearpump.models') .service('MetricsProvider', function () { 'use strict'; http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/dashboard/services/models/models.js ---------------------------------------------------------------------- diff --git a/services/dashboard/services/models/models.js b/services/dashboard/services/models/models.js index 15755bc..8265bac 100644 --- a/services/dashboard/services/models/models.js +++ b/services/dashboard/services/models/models.js @@ -3,7 +3,7 @@ * See accompanying LICENSE file. */ -angular.module('io.gearpump.models', []) +angular.module('org.apache.gearpump.models', []) /** TODO: to be absorbed as scalajs */ .factory('models', ['$timeout', 'conf', 'restapi', 'locator', 'StreamingAppDag', 'Metrics', @@ -388,7 +388,7 @@ angular.module('io.gearpump.models', []) return getter._appMetrics(appId, {all: 'latest'}); }, _appMetrics: function (appId, args) { - args.aggregator = 'io.gearpump.streaming.metrics.ProcessorAggregator'; + args.aggregator = 'org.apache.gearpump.streaming.metrics.ProcessorAggregator'; args.decoder = decoder.appMetrics; return getter._metrics('appmaster/' + appId + '/metrics/app' + appId, '', args); }, @@ -397,7 +397,7 @@ angular.module('io.gearpump.models', []) '&startTask=' + range.start + '&endTask=' + (range.stop + 1) : ''; var args = { all: 'latest', - aggregator: 'io.gearpump.streaming.metrics.TaskFilterAggregator' + + aggregator: 'org.apache.gearpump.streaming.metrics.TaskFilterAggregator' + '&startProcessor=' + processorId + '&endProcessor=' + (processorId + 1) + taskRangeArgs, decoder: decoder.appTaskLatestMetricValues }; http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/dashboard/services/models/streamingapp_dag.js ---------------------------------------------------------------------- diff --git a/services/dashboard/services/models/streamingapp_dag.js b/services/dashboard/services/models/streamingapp_dag.js index 7fd5800..c882b89 100644 --- a/services/dashboard/services/models/streamingapp_dag.js +++ b/services/dashboard/services/models/streamingapp_dag.js @@ -3,7 +3,7 @@ * See accompanying LICENSE file. */ -angular.module('io.gearpump.models') +angular.module('org.apache.gearpump.models') .service('StreamingAppDag', ['Dag', 'StreamingAppMetricsProvider', function (Dag, StreamingAppMetricsProvider) { 'use strict'; http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/dashboard/services/models/streamingapp_metrics_provider.js ---------------------------------------------------------------------- diff --git a/services/dashboard/services/models/streamingapp_metrics_provider.js b/services/dashboard/services/models/streamingapp_metrics_provider.js index 2a9f655..3a2d37b 100644 --- a/services/dashboard/services/models/streamingapp_metrics_provider.js +++ b/services/dashboard/services/models/streamingapp_metrics_provider.js @@ -3,7 +3,7 @@ * See accompanying LICENSE file. */ -angular.module('io.gearpump.models') +angular.module('org.apache.gearpump.models') .service('StreamingAppMetricsProvider', ['MetricsProvider', function (MetricsProvider) { 'use strict'; http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/dashboard/services/restapi.js ---------------------------------------------------------------------- diff --git a/services/dashboard/services/restapi.js b/services/dashboard/services/restapi.js index 29a1683..3bad10c 100644 --- a/services/dashboard/services/restapi.js +++ b/services/dashboard/services/restapi.js @@ -220,7 +220,7 @@ angular.module('dashboard') replaceDagProcessor: function (files, formFormNames, appId, oldProcessorId, newProcessorDescription, onComplete) { var url = restapiV1Root + 'appmaster/' + appId + '/dynamicdag'; var args = { - "$type": 'io.gearpump.streaming.appmaster.DagManager.ReplaceProcessor', + "$type": 'org.apache.gearpump.streaming.appmaster.DagManager.ReplaceProcessor', oldProcessorId: oldProcessorId, newProcessorDescription: angular.merge({ id: oldProcessorId http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/js/src/main/scala/io/gearpump/dashboard/DashboardApp.scala ---------------------------------------------------------------------- diff --git a/services/js/src/main/scala/io/gearpump/dashboard/DashboardApp.scala b/services/js/src/main/scala/io/gearpump/dashboard/DashboardApp.scala deleted file mode 100644 index 3d80c4e..0000000 --- a/services/js/src/main/scala/io/gearpump/dashboard/DashboardApp.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.dashboard - -import scala.scalajs.js.JSApp -import scala.scalajs.js.annotation.JSExport - -@JSExport -object DashboardApp extends JSApp { - override def main(): Unit = { - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/js/src/main/scala/org/apache/gearpump/dashboard/DashboardApp.scala ---------------------------------------------------------------------- diff --git a/services/js/src/main/scala/org/apache/gearpump/dashboard/DashboardApp.scala b/services/js/src/main/scala/org/apache/gearpump/dashboard/DashboardApp.scala new file mode 100644 index 0000000..f4e64f2 --- /dev/null +++ b/services/js/src/main/scala/org/apache/gearpump/dashboard/DashboardApp.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.dashboard + +import scala.scalajs.js.JSApp +import scala.scalajs.js.annotation.JSExport + +@JSExport +object DashboardApp extends JSApp { + override def main(): Unit = { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala b/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala deleted file mode 100644 index eb6531f..0000000 --- a/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services - -import akka.actor.ActorSystem -import akka.http.scaladsl.model._ -import akka.http.scaladsl.server.Directives._ -import akka.stream.Materializer - -// NOTE: This cannot be removed!!! -import io.gearpump.services.util.UpickleUtil._ - -/** - * AdminService is for cluster-wide managements. it is not related with - * specific application. - * - * For example: - * - Security management: Add user, remove user. - * - Configuration management: Change configurations. - * - Machine management: Add worker machines, remove worker machines, and add masters. - */ - -// TODO: Add YARN resource manager capacities to add/remove machines. -class AdminService(override val system: ActorSystem) - extends BasicService { - - protected override def prefix = Neutral - - protected override def doRoute(implicit mat: Materializer) = { - path("terminate") { - post { - system.terminate() - complete(StatusCodes.NotFound) - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala b/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala deleted file mode 100644 index 060e780..0000000 --- a/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services - -import scala.util.{Failure, Success, Try} - -import akka.actor.{ActorRef, ActorSystem} -import akka.http.scaladsl.model.{FormData, Multipart} -import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server.Route -import akka.http.scaladsl.server.directives.ParameterDirectives.ParamMagnet -import akka.stream.Materializer -import upickle.default.{read, write} - -import io.gearpump.cluster.AppMasterToMaster.{AppMasterSummary, GeneralAppMasterSummary} -import io.gearpump.cluster.ClientToMaster._ -import io.gearpump.cluster.ClusterConfig -import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest} -import io.gearpump.cluster.MasterToClient._ -import io.gearpump.jarstore.JarStoreService -import io.gearpump.services.AppMasterService.Status -// NOTE: This cannot be removed!!! -import io.gearpump.services.util.UpickleUtil._ -import io.gearpump.streaming.AppMasterToMaster.StallingTasks -import io.gearpump.streaming.appmaster.DagManager._ -import io.gearpump.streaming.appmaster.StreamAppMasterSummary -import io.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig} -import io.gearpump.util.ActorUtil.{askActor, askAppMaster} -import io.gearpump.util.FileDirective._ -import io.gearpump.util.{Constants, Util} - -/** - * Management service for AppMaster - */ -class AppMasterService(val master: ActorRef, - val jarStore: JarStoreService, override val system: ActorSystem) - extends BasicService { - - private val systemConfig = system.settings.config - private val concise = systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE) - - protected override def doRoute(implicit mat: Materializer) = pathPrefix("appmaster" / IntNumber) { - appId => { - path("dynamicdag") { - parameters(ParamMagnet("args")) { args: String => - def replaceProcessor(dagOperation: DAGOperation): Route = { - onComplete(askAppMaster[DAGOperationResult](master, appId, dagOperation)) { - case Success(value) => - complete(write(value)) - case Failure(ex) => - failWith(ex) - } - } - - val msg = java.net.URLDecoder.decode(args, "UTF-8") - val dagOperation = read[DAGOperation](msg) - (post & entity(as[Multipart.FormData])) { _ => - uploadFile { form => - val jar = form.getFile("jar").map(_.file) - - if (jar.nonEmpty) { - dagOperation match { - case replace: ReplaceProcessor => - val description = replace.newProcessorDescription.copy(jar = - Util.uploadJar(jar.get, jarStore)) - val dagOperationWithJar = replace.copy(newProcessorDescription = description) - replaceProcessor(dagOperationWithJar) - } - } else { - replaceProcessor(dagOperation) - } - } - } ~ (post & entity(as[FormData])) { _ => - replaceProcessor(dagOperation) - } - } - } ~ - path("stallingtasks") { - onComplete(askAppMaster[StallingTasks](master, appId, GetStallingTasks(appId))) { - case Success(value) => - complete(write(value)) - case Failure(ex) => failWith(ex) - } - } ~ - path("errors") { - onComplete(askAppMaster[LastFailure](master, appId, GetLastFailure(appId))) { - case Success(value) => - complete(write(value)) - case Failure(ex) => failWith(ex) - } - } ~ - path("restart") { - post { - onComplete(askActor[SubmitApplicationResult](master, RestartApplication(appId))) { - case Success(_) => - complete(write(Status(true))) - case Failure(ex) => - complete(write(Status(false, ex.getMessage))) - } - } - } ~ - path("config") { - onComplete(askActor[AppMasterConfig](master, QueryAppMasterConfig(appId))) { - case Success(value: AppMasterConfig) => - val config = Option(value.config).map(ClusterConfig.render(_, concise)).getOrElse("{}") - complete(config) - case Failure(ex) => - failWith(ex) - } - } ~ - pathPrefix("executor" / Segment) { executorIdString => - path("config") { - val executorId = Integer.parseInt(executorIdString) - onComplete(askAppMaster[ExecutorConfig](master, appId, QueryExecutorConfig(executorId))) { - case Success(value) => - val config = Option(value.config).map(ClusterConfig.render(_, concise)) - .getOrElse("{}") - complete(config) - case Failure(ex) => - failWith(ex) - } - } ~ - pathEnd { - get { - val executorId = Integer.parseInt(executorIdString) - onComplete(askAppMaster[ExecutorSummary](master, appId, - GetExecutorSummary(executorId))) { - case Success(value) => - complete(write(value)) - case Failure(ex) => - failWith(ex) - } - } - } - } ~ - path("metrics" / RestPath) { path => - parameterMap { optionMap => - parameter("aggregator" ? "") { aggregator => - parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption => - val query = QueryHistoryMetrics(path.head.toString, readOption, aggregator, optionMap) - onComplete(askAppMaster[HistoryMetrics](master, appId, query)) { - case Success(value) => - complete(write(value)) - case Failure(ex) => - failWith(ex) - } - } - } - } - } ~ - pathEnd { - get { - parameter("detail" ? "false") { detail => - val queryDetails = Try(detail.toBoolean).getOrElse(false) - val request = AppMasterDataDetailRequest(appId) - queryDetails match { - case true => - onComplete(askAppMaster[AppMasterSummary](master, appId, request)) { - case Success(value) => - value match { - case data: GeneralAppMasterSummary => - complete(write(data)) - case data: StreamAppMasterSummary => - complete(write(data)) - } - case Failure(ex) => - failWith(ex) - } - - case false => - onComplete(askActor[AppMasterData](master, AppMasterDataRequest(appId))) { - case Success(value) => - complete(write(value)) - case Failure(ex) => - failWith(ex) - } - } - } - } - } ~ - pathEnd { - delete { - val writer = (result: ShutdownApplicationResult) => { - val output = if (result.appId.isSuccess) { - Map("status" -> "success", "info" -> null) - } else { - Map("status" -> "fail", "info" -> result.appId.failed.get.toString) - } - write(output) - } - onComplete(askActor[ShutdownApplicationResult](master, ShutdownApplication(appId))) { - case Success(result) => - val output = if (result.appId.isSuccess) { - Map("status" -> "success", "info" -> null) - } else { - Map("status" -> "fail", "info" -> result.appId.failed.get.toString) - } - complete(write(output)) - case Failure(ex) => - failWith(ex) - } - } - } - } - } -} - -object AppMasterService { - case class Status(success: Boolean, reason: String = null) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/BasicService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/BasicService.scala b/services/jvm/src/main/scala/io/gearpump/services/BasicService.scala deleted file mode 100644 index 75f033b..0000000 --- a/services/jvm/src/main/scala/io/gearpump/services/BasicService.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services - -import scala.concurrent.ExecutionContext - -import akka.actor.ActorSystem -import akka.http.scaladsl.model.headers.CacheDirectives.{`max-age`, `no-cache`} -import akka.http.scaladsl.model.headers.`Cache-Control` -import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server.Route -import akka.stream.Materializer - -import io.gearpump.util.{Constants, LogUtil} -// NOTE: This cannot be removed!!! -import io.gearpump.services.util.UpickleUtil._ - -trait RouteService { - def route: Route -} - -/** - * Wraps the cache behavior, and some common utils. - */ -trait BasicService extends RouteService { - - implicit def system: ActorSystem - - implicit def timeout: akka.util.Timeout = Constants.FUTURE_TIMEOUT - - implicit def ec: ExecutionContext = system.dispatcher - - protected def doRoute(implicit mat: Materializer): Route - - protected def prefix = Slash ~ "api" / s"$REST_VERSION" - - protected val LOG = LogUtil.getLogger(getClass) - - protected def cache = false - private val noCacheHeader = `Cache-Control`(`no-cache`, `max-age`(0L)) - - def route: Route = encodeResponse { - extractMaterializer { implicit mat => - rawPathPrefix(prefix) { - if (cache) { - doRoute(mat) - } else { - respondWithHeader(noCacheHeader) { - doRoute(mat) - } - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala deleted file mode 100644 index 6ca0f98..0000000 --- a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala +++ /dev/null @@ -1,350 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services - -import java.io.{File, IOException} -import java.nio.charset.StandardCharsets.UTF_8 -import java.nio.file.Files -import java.nio.file.StandardOpenOption.{APPEND, WRITE} -import scala.collection.JavaConverters._ -import scala.concurrent.Future -import scala.util.{Failure, Success} - -import akka.actor.{ActorRef, ActorSystem} -import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server.directives.ParameterDirectives.ParamMagnet -import akka.http.scaladsl.unmarshalling.Unmarshaller._ -import akka.stream.Materializer -import com.typesafe.config.Config - -import io.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, GetMasterData, GetWorkerData, MasterData, WorkerData} -import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryMasterConfig, ReadOption} -import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest, WorkerList} -import io.gearpump.cluster.MasterToClient.{HistoryMetrics, MasterConfig, SubmitApplicationResultValue} -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.worker.WorkerSummary -import io.gearpump.cluster.{ClusterConfig, UserConfig} -import io.gearpump.jarstore.JarStoreService -import io.gearpump.partitioner.{PartitionerByClassName, PartitionerDescription} -import io.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest} -// NOTE: This cannot be removed!!! -import io.gearpump.services.util.UpickleUtil._ -import io.gearpump.streaming.{ProcessorDescription, ProcessorId, StreamApplication} -import io.gearpump.util.ActorUtil._ -import io.gearpump.util.FileDirective._ -import io.gearpump.util.{Constants, Graph, Util} - -/** Manages service for master node */ -class MasterService(val master: ActorRef, - val jarStore: JarStoreService, override val system: ActorSystem) - extends BasicService { - - import upickle.default.{read, write} - - private val systemConfig = system.settings.config - private val concise = systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE) - - protected override def doRoute(implicit mat: Materializer) = pathPrefix("master") { - pathEnd { - get { - onComplete(askActor[MasterData](master, GetMasterData)) { - case Success(value: MasterData) => complete(write(value)) - case Failure(ex) => failWith(ex) - } - } - } ~ - path("applist") { - onComplete(askActor[AppMastersData](master, AppMastersDataRequest)) { - case Success(value: AppMastersData) => - complete(write(value)) - case Failure(ex) => failWith(ex) - } - } ~ - path("workerlist") { - def future: Future[List[WorkerSummary]] = askActor[WorkerList](master, GetAllWorkers) - .flatMap { workerList => - val workers = workerList.workers - val workerDataList = List.empty[WorkerSummary] - - Future.fold(workers.map { workerId => - askWorker[WorkerData](master, workerId, GetWorkerData(workerId)) - })(workerDataList) { (workerDataList, workerData) => - workerDataList :+ workerData.workerDescription - } - } - onComplete(future) { - case Success(result: List[WorkerSummary]) => complete(write(result)) - case Failure(ex) => failWith(ex) - } - } ~ - path("config") { - onComplete(askActor[MasterConfig](master, QueryMasterConfig)) { - case Success(value: MasterConfig) => - val config = Option(value.config).map(ClusterConfig.render(_, concise)).getOrElse("{}") - complete(config) - case Failure(ex) => - failWith(ex) - } - } ~ - path("metrics" / RestPath) { path => - parameters(ParamMagnet(ReadOption.Key ? ReadOption.ReadLatest)) { readOption: String => - val query = QueryHistoryMetrics(path.head.toString, readOption) - onComplete(askActor[HistoryMetrics](master, query)) { - case Success(value) => - complete(write(value)) - case Failure(ex) => - failWith(ex) - } - } - } ~ - path("submitapp") { - post { - uploadFile { form => - val jar = form.getFile("jar").map(_.file) - val configFile = form.getFile("configfile").map(_.file) - val configString = form.getValue("configstring").getOrElse("") - val executorCount = form.getValue("executorcount").getOrElse("1").toInt - val args = form.getValue("args").getOrElse("") - - val mergedConfigFile = mergeConfig(configFile, configString) - - onComplete(Future( - MasterService.submitGearApp(jar, executorCount, args, systemConfig, mergedConfigFile) - )) { - case Success(success) => - val response = MasterService.AppSubmissionResult(success) - complete(write(response)) - case Failure(ex) => - failWith(ex) - } - } - } - } ~ - path("submitstormapp") { - post { - uploadFile { form => - val jar = form.getFile("jar").map(_.file) - val configFile = form.getFile("configfile").map(_.file) - val args = form.getValue("args").getOrElse("") - onComplete(Future( - MasterService.submitStormApp(jar, configFile, args, systemConfig) - )) { - case Success(success) => - val response = MasterService.AppSubmissionResult(success) - complete(write(response)) - case Failure(ex) => - failWith(ex) - } - } - } - } ~ - path("submitdag") { - post { - entity(as[String]) { request => - val msg = java.net.URLDecoder.decode(request, "UTF-8") - val submitApplicationRequest = read[SubmitApplicationRequest](msg) - import submitApplicationRequest.{appName, dag, processors, userconfig} - val context = ClientContext(system.settings.config, system, master) - - val graph = dag.mapVertex { processorId => - processors(processorId) - }.mapEdge { (node1, edge, node2) => - PartitionerDescription(new PartitionerByClassName(edge)) - } - - val effectiveConfig = if (userconfig == null) UserConfig.empty else userconfig - val appId = context.submit(new StreamApplication(appName, effectiveConfig, graph)) - - import upickle.default.write - val submitApplicationResultValue = SubmitApplicationResultValue(appId) - val jsonData = write(submitApplicationResultValue) - complete(jsonData) - } - } - } ~ - path("uploadjar") { - uploadFile { form => - val jar = form.getFile("jar").map(_.file) - if (jar.isEmpty) { - complete(write( - MasterService.Status(success = false, reason = "Jar file not found"))) - } else { - val jarFile = Util.uploadJar(jar.get, jarStore) - complete(write(jarFile)) - } - } - } ~ - path("partitioners") { - get { - complete(write(BuiltinPartitioners(Constants.BUILTIN_PARTITIONERS.map(_.getName)))) - } - } - } - - private def mergeConfig(configFile: Option[File], configString: String): Option[File] = { - if (configString == null || configString.isEmpty) { - configFile - } else { - configFile match { - case Some(file) => - Files.write(file.toPath, ("\n" + configString).getBytes(UTF_8), APPEND) - Some(file) - case None => - val file = File.createTempFile("\"userfile_configstring_", ".conf") - Files.write(file.toPath, configString.getBytes(UTF_8), WRITE) - Some(file) - } - } - } -} - -object MasterService { - - case class BuiltinPartitioners(partitioners: Array[String]) - - case class AppSubmissionResult(success: Boolean) - - case class Status(success: Boolean, reason: String = null) - - /** - * Submits Native Application. - */ - def submitGearApp( - jar: Option[File], executorNum: Int, args: String, - systemConfig: Config, userConfigFile: Option[File]): Boolean = { - submitAndDeleteTempFiles( - "io.gearpump.cluster.main.AppSubmitter", - argsArray = Array("-executors", executorNum.toString) ++ spaceSeparatedArgumentsToArray(args), - fileMap = Map("jar" -> jar).filter(_._2.isDefined).mapValues(_.get), - classPath = getUserApplicationClassPath, - systemConfig, - userConfigFile - ) - } - - /** - * Submits Storm application. - */ - def submitStormApp( - jar: Option[File], stormConf: Option[File], args: String, systemConfig: Config): Boolean = { - submitAndDeleteTempFiles( - "io.gearpump.experiments.storm.main.GearpumpStormClient", - argsArray = spaceSeparatedArgumentsToArray(args), - fileMap = Map("jar" -> jar, "config" -> stormConf).filter(_._2.isDefined).mapValues(_.get), - classPath = getStormApplicationClassPath, - systemConfig, - userConfigFile = None - ) - } - - private def submitAndDeleteTempFiles( - mainClass: String, argsArray: Array[String], fileMap: Map[String, File], - classPath: Array[String], systemConfig: Config, - userConfigFile: Option[File] = None): Boolean = { - try { - val jar = fileMap.get("jar") - if (jar.isEmpty) { - throw new IOException("JAR file not supplied") - } - - val process = Util.startProcess( - clusterOptions(systemConfig, userConfigFile), - classPath, - mainClass, - arguments = createFilePathArgArray(fileMap) ++ argsArray - ) - - val retval = process.exitValue() - if (retval != 0) { - throw new IOException(s"Process exit abnormally with exit code $retval.\n" + - s"Error message: ${process.logger.error}") - } - true - } finally { - fileMap.values.foreach(_.delete) - if (userConfigFile.isDefined) { - userConfigFile.get.delete() - } - } - } - - /** - * Returns Java options for gearpump cluster - */ - private def clusterOptions(systemConfig: Config, userConfigFile: Option[File]): Array[String] = { - var options = Array( - s"-D${Constants.GEARPUMP_HOME}=${systemConfig.getString(Constants.GEARPUMP_HOME)}", - s"-D${Constants.GEARPUMP_HOSTNAME}=${systemConfig.getString(Constants.GEARPUMP_HOSTNAME)}", - s"-D${Constants.PREFER_IPV4}=true" - ) - - val masters = systemConfig.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala - .toList.flatMap(Util.parseHostList) - options ++= masters.zipWithIndex.map { case (master, index) => - s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.$index=${master.host}:${master.port}" - }.toArray[String] - - if (userConfigFile.isDefined) { - options :+= s"-D${Constants.GEARPUMP_CUSTOM_CONFIG_FILE}=${userConfigFile.get.getPath}" - } - options - } - - /** - * Filter all defined file paths and store their config key and path into an array. - */ - private def createFilePathArgArray(fileMap: Map[String, File]): Array[String] = { - var args = Array.empty[String] - fileMap.foreach({ case (key, path) => - args ++= Array(s"-$key", path.getPath) - }) - args - } - - /** - * Returns a space separated arguments as an array. - */ - private def spaceSeparatedArgumentsToArray(str: String): Array[String] = { - str.split(" +").filter(_.nonEmpty) - } - - private val homeDir = System.getProperty(Constants.GEARPUMP_HOME) + "/" - private val libHomeDir = homeDir + "lib/" - - private def getUserApplicationClassPath: Array[String] = { - Array( - homeDir + "conf", - libHomeDir + "daemon/*", - libHomeDir + "yarn/*", - libHomeDir + "*" - ) - } - - private def getStormApplicationClassPath: Array[String] = { - getUserApplicationClassPath ++ Array( - libHomeDir + "storm/*" - ) - } - - case class SubmitApplicationRequest( - appName: String, - processors: Map[ProcessorId, ProcessorDescription], - dag: Graph[Int, String], - userconfig: UserConfig) -}
