Repository: bahir Updated Branches: refs/heads/master e0e49e23c -> 0601698c3
[BAHIR-103] New module with common utilities and test classes Closes #73 Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/0601698c Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/0601698c Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/0601698c Branch: refs/heads/master Commit: 0601698c3721fb3db58431683e556af28ffc0d6a Parents: e0e49e2 Author: Lukasz Antoniak <[email protected]> Authored: Mon Dec 3 08:52:10 2018 -0800 Committer: Luciano Resende <[email protected]> Committed: Tue Dec 11 23:04:51 2018 -0300 ---------------------------------------------------------------------- common/pom.xml | 78 ++++++++++++++++++++ .../org/apache/bahir/utils/FileHelper.scala | 46 ++++++++++++ .../scala/org/apache/bahir/utils/Logging.scala | 24 ++++++ .../scala/org/apache/bahir/utils/Retry.scala | 51 +++++++++++++ .../apache/spark/ConditionalSparkFunSuite.scala | 45 +++++++++++ .../streaming/LocalJavaStreamingContext.java | 44 +++++++++++ distribution/pom.xml | 22 +++++- pom.xml | 10 +-- sql-cloudant/pom.xml | 12 +++ .../bahir/cloudant/ClientSparkFunSuite.scala | 31 ++------ .../bahir/cloudant/CloudantAllDocsDFSuite.scala | 14 ++-- .../bahir/cloudant/CloudantChangesDFSuite.scala | 19 ++--- .../bahir/cloudant/CloudantOptionSuite.scala | 16 ++-- .../bahir/cloudant/CloudantSparkSQLSuite.scala | 8 +- .../org/apache/bahir/cloudant/TestUtils.scala | 14 +--- sql-streaming-akka/pom.xml | 5 ++ .../org/apache/bahir/utils/BahirUtils.scala | 47 ------------ .../scala/org/apache/bahir/utils/Logging.scala | 24 ------ .../streaming/akka/AkkaStreamSourceSuite.scala | 4 +- sql-streaming-mqtt/pom.xml | 5 ++ .../sql/streaming/mqtt/CachedMQTTClient.scala | 1 + .../sql/streaming/mqtt/MQTTStreamSink.scala | 1 + .../bahir/sql/streaming/mqtt/MQTTUtils.scala | 33 --------- .../org/apache/bahir/utils/BahirUtils.scala | 48 ------------ .../scala/org/apache/bahir/utils/Logging.scala | 25 ------- .../streaming/mqtt/LocalMessageStoreSuite.scala | 4 +- .../streaming/mqtt/MQTTStreamSinkSuite.scala | 4 +- .../streaming/mqtt/MQTTStreamSourceSuite.scala | 4 +- streaming-akka/pom.xml | 7 ++ .../streaming/akka/JavaAkkaUtilsSuite.java | 57 +++++++------- streaming-mqtt/pom.xml | 7 ++ .../streaming/LocalJavaStreamingContext.java | 44 ----------- .../spark/streaming/mqtt/MQTTStreamSuite.scala | 1 + streaming-pubnub/pom.xml | 7 ++ .../streaming/LocalJavaStreamingContext.java | 43 ----------- streaming-pubsub/pom.xml | 7 ++ .../streaming/LocalJavaStreamingContext.java | 44 ----------- .../spark/streaming/pubsub/PubsubFunSuite.scala | 46 ------------ .../streaming/pubsub/PubsubStreamSuite.scala | 9 ++- .../streaming/pubsub/PubsubTestUtils.scala | 5 +- streaming-twitter/pom.xml | 7 ++ .../streaming/LocalJavaStreamingContext.java | 44 ----------- streaming-zeromq/pom.xml | 7 ++ .../streaming/LocalJavaStreamingContext.java | 44 ----------- 44 files changed, 457 insertions(+), 561 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/common/pom.xml ---------------------------------------------------------------------- diff --git a/common/pom.xml b/common/pom.xml new file mode 100644 index 0000000..d7757bb --- /dev/null +++ b/common/pom.xml @@ -0,0 +1,78 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.bahir</groupId> + <artifactId>bahir-parent_2.11</artifactId> + <version>2.4.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <groupId>org.apache.bahir</groupId> + <artifactId>bahir-common_2.11</artifactId> + <properties> + <sbt.project.name>bahir-common</sbt.project.name> + </properties> + <packaging>jar</packaging> + <name>Apache Bahir - Common</name> + <url>http://bahir.apache.org/</url> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-tags_${scala.binary.version}</artifactId> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>compile</scope> + <optional>true</optional> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalacheck</groupId> + <artifactId>scalacheck_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/common/src/main/scala/org/apache/bahir/utils/FileHelper.scala ---------------------------------------------------------------------- diff --git a/common/src/main/scala/org/apache/bahir/utils/FileHelper.scala b/common/src/main/scala/org/apache/bahir/utils/FileHelper.scala new file mode 100644 index 0000000..1800871 --- /dev/null +++ b/common/src/main/scala/org/apache/bahir/utils/FileHelper.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.utils + +import java.io.{File, IOException} +import java.nio.file.{Files, FileVisitResult, Path, SimpleFileVisitor} +import java.nio.file.attribute.BasicFileAttributes + +object FileHelper extends Logging { + def deleteFileQuietly(file: File): Path = { + Files.walkFileTree(file.toPath, new SimpleFileVisitor[Path]() { + override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = { + try { + Files.delete(file) + } catch { + case t: Throwable => log.warn("Failed to delete", t) + } + FileVisitResult.CONTINUE + } + + override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = { + try { + Files.delete(dir) + } catch { + case t: Throwable => log.warn("Failed to delete", t) + } + FileVisitResult.CONTINUE + } + }) + } +} http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/common/src/main/scala/org/apache/bahir/utils/Logging.scala ---------------------------------------------------------------------- diff --git a/common/src/main/scala/org/apache/bahir/utils/Logging.scala b/common/src/main/scala/org/apache/bahir/utils/Logging.scala new file mode 100644 index 0000000..776ed5a --- /dev/null +++ b/common/src/main/scala/org/apache/bahir/utils/Logging.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.utils + +import org.slf4j.LoggerFactory + +trait Logging { + final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$")) +} http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/common/src/main/scala/org/apache/bahir/utils/Retry.scala ---------------------------------------------------------------------- diff --git a/common/src/main/scala/org/apache/bahir/utils/Retry.scala b/common/src/main/scala/org/apache/bahir/utils/Retry.scala new file mode 100644 index 0000000..a5c429a --- /dev/null +++ b/common/src/main/scala/org/apache/bahir/utils/Retry.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.utils + +object Retry { + /** + * Retry invocation of given code. + * @param attempts Number of attempts to try executing given code. -1 represents infinity. + * @param pauseMs Number of backoff milliseconds. + * @param retryExceptions Types of exceptions to retry. + * @param code Function to execute. + * @tparam A Type parameter. + * @return Returns result of function execution or exception in case of failure. + */ + def apply[A](attempts: Int, pauseMs: Long, retryExceptions: Class[_]*)(code: => A): A = { + var result: Option[A] = None + var success = false + var remaining = attempts + while (!success) { + try { + remaining -= 1 + result = Some(code) + success = true + } + catch { + case e: Exception => + if (retryExceptions.contains(e.getClass) && (attempts == -1 || remaining > 0)) { + Thread.sleep(pauseMs) + } else { + throw e + } + } + } + result.get + } +} http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/common/src/test/java/org/apache/spark/ConditionalSparkFunSuite.scala ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/spark/ConditionalSparkFunSuite.scala b/common/src/test/java/org/apache/spark/ConditionalSparkFunSuite.scala new file mode 100644 index 0000000..922ec5f --- /dev/null +++ b/common/src/test/java/org/apache/spark/ConditionalSparkFunSuite.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +trait ConditionalSparkFunSuite extends SparkFunSuite { + /** + * Run test if given predicate is satisfied. + * @param testName Test name + * @param condition If satisfied, test will be executed + * @param testBody Test body + */ + def testIf(testName: String, condition: () => Boolean)(testBody: => Unit) { + if (condition()) { + test(testName)(testBody) + } else { + ignore(testName)(testBody) + } + } + + /** + * Run given code only if predicate has been satisfied. + * @param condition If satisfied, run code block + * @param body Code block + */ + def runIf(condition: () => Boolean)(body: => Unit): Unit = { + if (condition()) { + body + } + } +} http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/common/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/common/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java new file mode 100644 index 0000000..012c7fb --- /dev/null +++ b/common/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming; + +import org.apache.spark.SparkConf; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.junit.After; +import org.junit.Before; + +public abstract class LocalJavaStreamingContext { + protected transient JavaStreamingContext ssc; + + @Before + public void setUp() { + final SparkConf conf = new SparkConf() + .setMaster("local[2]") + .setAppName("test") + .set("spark.streaming.clock", "org.apache.spark.util.ManualClock"); + ssc = new JavaStreamingContext(conf, new Duration(1000)); + ssc.checkpoint("checkpoint"); + } + + @After + public void tearDown() { + ssc.stop(); + ssc = null; + } +} + http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/distribution/pom.xml ---------------------------------------------------------------------- diff --git a/distribution/pom.xml b/distribution/pom.xml index ea1ed49..18ba854 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -34,6 +34,21 @@ <dependencies> <dependency> <groupId>org.apache.bahir</groupId> + <artifactId>spark-sql-cloudant_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>spark-sql-streaming-akka_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>spark-sql-streaming-mqtt_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.bahir</groupId> <artifactId>spark-streaming-akka_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> @@ -44,7 +59,12 @@ </dependency> <dependency> <groupId>org.apache.bahir</groupId> - <artifactId>spark-sql-streaming-mqtt_${scala.binary.version}</artifactId> + <artifactId>spark-streaming-pubnub_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>spark-streaming-pubsub_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 52fb22c..34862ee 100644 --- a/pom.xml +++ b/pom.xml @@ -75,15 +75,16 @@ </mailingLists> <modules> + <module>common</module> <module>sql-cloudant</module> - <module>streaming-akka</module> <module>sql-streaming-akka</module> - <module>streaming-mqtt</module> <module>sql-streaming-mqtt</module> + <module>streaming-akka</module> + <module>streaming-mqtt</module> + <module>streaming-pubnub</module> + <module>streaming-pubsub</module> <module>streaming-twitter</module> <module>streaming-zeromq</module> - <module>streaming-pubsub</module> - <module>streaming-pubnub</module> </modules> <properties> @@ -107,7 +108,6 @@ <!-- Streaming Akka connector --> <akka.group>com.typesafe.akka</akka.group> <akka.version>2.5.12</akka.version> - <akka_zeromq.version>2.3.16</akka_zeromq.version> <protobuf.version>2.5.0</protobuf.version> <jsr305.version>3.0.1</jsr305.version> http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-cloudant/pom.xml ---------------------------------------------------------------------- diff --git a/sql-cloudant/pom.xml b/sql-cloudant/pom.xml index d1a3be7..d81232a 100644 --- a/sql-cloudant/pom.xml +++ b/sql-cloudant/pom.xml @@ -36,6 +36,18 @@ <dependencies> <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>bahir-common_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>bahir-common_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> <groupId>com.cloudant</groupId> <artifactId>cloudant-client</artifactId> <version>2.11.0</version> http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala index aa8a48e..ed14f17 100644 --- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala +++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala @@ -26,12 +26,12 @@ import com.cloudant.client.api.CloudantClient import com.google.gson.{Gson, JsonArray, JsonObject} import org.scalatest.BeforeAndAfter -import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.{ConditionalSparkFunSuite, SparkConf} import org.apache.spark.sql.SparkSession -import org.apache.bahir.cloudant.TestUtils.shouldRunTests +import org.apache.bahir.utils.FileHelper -class ClientSparkFunSuite extends SparkFunSuite with BeforeAndAfter { +class ClientSparkFunSuite extends ConditionalSparkFunSuite with BeforeAndAfter { private val tempDir: File = new File(System.getProperty("java.io.tmpdir") + "/sql-cloudant/") var client: CloudantClient = _ @@ -40,7 +40,7 @@ class ClientSparkFunSuite extends SparkFunSuite with BeforeAndAfter { var spark: SparkSession = _ override def beforeAll() { - runIfTestsEnabled("Prepare Cloudant test databases") { + runIf(TestUtils.shouldRunTest) { tempDir.mkdirs() tempDir.deleteOnExit() setupClient() @@ -49,7 +49,7 @@ class ClientSparkFunSuite extends SparkFunSuite with BeforeAndAfter { } override def afterAll() { - TestUtils.deleteRecursively(tempDir) + FileHelper.deleteFileQuietly(tempDir) deleteTestDbs() teardownClient() spark.close() @@ -119,25 +119,4 @@ class ClientSparkFunSuite extends SparkFunSuite with BeforeAndAfter { def deleteTestDb(dbName: String) { client.deleteDB(dbName) } - - /** Run the test if environment variable is set or ignore the test */ - def testIfEnabled(testName: String)(testBody: => Unit) { - if (shouldRunTests) { - test(testName)(testBody) - } else { - ignore(s"$testName [enable by setting env var CLOUDANT_USER and " + - s"CLOUDANT_PASSWORD]")(testBody) - } - } - - - /** Run the body of code only if tests are enabled */ - def runIfTestsEnabled(message: String)(body: => Unit): Unit = { - if (shouldRunTests) { - body - } else { - ignore(s"$message [enable by setting env var CLOUDANT_USER and " + - s"CLOUDANT_PASSWORD]")(()) - } - } } http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantAllDocsDFSuite.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantAllDocsDFSuite.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantAllDocsDFSuite.scala index 982bbf9..635fa32 100644 --- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantAllDocsDFSuite.scala +++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantAllDocsDFSuite.scala @@ -35,7 +35,7 @@ class CloudantAllDocsDFSuite extends ClientSparkFunSuite { .getOrCreate() } - testIfEnabled("load and save data from Cloudant database") { + testIf("load and save data from Cloudant database", TestUtils.shouldRunTest) { // Loading data from Cloudant db val df = spark.read.format("org.apache.bahir.cloudant").load("n_flight") // Caching df in memory to speed computations @@ -45,7 +45,7 @@ class CloudantAllDocsDFSuite extends ClientSparkFunSuite { assert(df.count() == 1967) } - testIfEnabled("load and count data from Cloudant search index") { + testIf("load and count data from Cloudant search index", TestUtils.shouldRunTest) { val df = spark.read.format("org.apache.bahir.cloudant") .option("index", "_design/view/_search/n_flights").load("n_flight") val total = df.filter(df("flightSegmentId") >"AA9") @@ -54,7 +54,7 @@ class CloudantAllDocsDFSuite extends ClientSparkFunSuite { assert(total == 50) } - testIfEnabled("load data and count rows in filtered dataframe") { + testIf("load data and count rows in filtered dataframe", TestUtils.shouldRunTest) { // Loading data from Cloudant db val df = spark.read.format("org.apache.bahir.cloudant") .load("n_airportcodemapping") @@ -63,7 +63,7 @@ class CloudantAllDocsDFSuite extends ClientSparkFunSuite { } // save data to Cloudant test - testIfEnabled("save filtered dataframe to database") { + testIf("save filtered dataframe to database", TestUtils.shouldRunTest) { val df = spark.read.format("org.apache.bahir.cloudant").load("n_flight") // Saving data frame with filter to Cloudant db @@ -80,7 +80,7 @@ class CloudantAllDocsDFSuite extends ClientSparkFunSuite { } // createDBOnSave option test - testIfEnabled("save dataframe to database using createDBOnSave=true option") { + testIf("save dataframe to database using createDBOnSave=true option", TestUtils.shouldRunTest) { val df = spark.read.format("org.apache.bahir.cloudant") .load("n_airportcodemapping") @@ -106,13 +106,13 @@ class CloudantAllDocsDFSuite extends ClientSparkFunSuite { } // view option tests - testIfEnabled("load and count data from view") { + testIf("load and count data from view", TestUtils.shouldRunTest) { val df = spark.read.format("org.apache.bahir.cloudant") .option("view", "_design/view/_view/AA0").load("n_flight") assert(df.count() == 5) } - testIfEnabled("load data from view with MapReduce function") { + testIf("load data from view with MapReduce function", TestUtils.shouldRunTest) { val df = spark.read.format("org.apache.bahir.cloudant") .option("view", "_design/view/_view/AAreduce?reduce=true") .load("n_flight") http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala index 5e8f6f6..4210566 100644 --- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala +++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala @@ -39,7 +39,7 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite { spark.close() } - testIfEnabled("load and save data from Cloudant database") { + testIf("load and save data from Cloudant database", TestUtils.shouldRunTest) { // Loading data from Cloudant db val df = spark.read.format("org.apache.bahir.cloudant").load("n_flight") // Caching df in memory to speed computations @@ -50,7 +50,7 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite { assert(df.count() == 1967) } - testIfEnabled("load and count data from Cloudant search index") { + testIf("load and count data from Cloudant search index", TestUtils.shouldRunTest) { val df = spark.read.format("org.apache.bahir.cloudant") .option("index", "_design/view/_search/n_flights").load("n_flight") val total = df.filter(df("flightSegmentId") >"AA9") @@ -59,7 +59,7 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite { assert(total == 50) } - testIfEnabled("load data and verify deleted doc is not in results") { + testIf("load data and verify deleted doc is not in results", TestUtils.shouldRunTest) { val db = client.database("n_flight", false) // delete a saved doc to verify it's not included when loading data db.remove(deletedDoc.get("_id").getAsString, deletedDoc.get("_rev").getAsString) @@ -71,7 +71,7 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite { assert(!df.columns.contains("_deleted")) } - testIfEnabled("load data and count rows in filtered dataframe") { + testIf("load data and count rows in filtered dataframe", TestUtils.shouldRunTest) { // Loading data from Cloudant db val df = spark.read.format("org.apache.bahir.cloudant") .load("n_airportcodemapping") @@ -80,7 +80,7 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite { } // save data to Cloudant test - testIfEnabled("save filtered dataframe to database") { + testIf("save filtered dataframe to database", TestUtils.shouldRunTest) { val df = spark.read.format("org.apache.bahir.cloudant").load("n_flight") // Saving data frame with filter to Cloudant db @@ -97,7 +97,7 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite { } // createDBOnSave option test - testIfEnabled("save dataframe to database using createDBOnSave=true option") { + testIf("save dataframe to database using createDBOnSave=true option", TestUtils.shouldRunTest) { val df = spark.read.format("org.apache.bahir.cloudant") .load("n_airportcodemapping") @@ -127,20 +127,21 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite { } // view option tests - testIfEnabled("load and count data from view") { + testIf("load and count data from view", TestUtils.shouldRunTest) { val df = spark.read.format("org.apache.bahir.cloudant") .option("view", "_design/view/_view/AA0").load("n_flight") assert(df.count() == 5) } - testIfEnabled("load data from view with MapReduce function") { + testIf("load data from view with MapReduce function", TestUtils.shouldRunTest) { val df = spark.read.format("org.apache.bahir.cloudant") .option("view", "_design/view/_view/AAreduce?reduce=true") .load("n_flight") assert(df.count() == 1) } - testIfEnabled("load data and verify total count of selector, filter, and view option") { + testIf("load data and verify total count of selector, filter, and view option", + TestUtils.shouldRunTest) { val df = spark.read.format("org.apache.bahir.cloudant") .option("selector", "{\"flightSegmentId\": {\"$eq\": \"AA2\"}}") .load("n_flight") http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala index c487937..4bc66e0 100644 --- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala +++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala @@ -29,7 +29,7 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter { spark.close() } - testIfEnabled("invalid api receiver option throws an error message") { + testIf("invalid api receiver option throws an error message", TestUtils.shouldRunTest) { spark = SparkSession.builder().config(conf) .config("cloudant.host", TestUtils.getHost) .config("cloudant.username", TestUtils.getUsername) @@ -44,7 +44,7 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter { s"is invalid. Please supply the valid option '_all_docs' or '_changes'.") } - testIfEnabled("empty username option throws an error message") { + testIf("empty username option throws an error message", TestUtils.shouldRunTest) { spark = SparkSession.builder().config(conf) .config("cloudant.host", TestUtils.getHost) .config("cloudant.username", "") @@ -58,7 +58,7 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter { s"is empty. Please supply the required value.") } - testIfEnabled("empty password option throws an error message") { + testIf("empty password option throws an error message", TestUtils.shouldRunTest) { spark = SparkSession.builder().config(conf) .config("cloudant.host", TestUtils.getHost) .config("cloudant.username", TestUtils.getUsername) @@ -72,7 +72,7 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter { s"is empty. Please supply the required value.") } - testIfEnabled("empty databaseName throws an error message") { + testIf("empty databaseName throws an error message", TestUtils.shouldRunTest) { spark = SparkSession.builder().config(conf) .config("cloudant.host", TestUtils.getHost) .config("cloudant.username", TestUtils.getUsername) @@ -86,7 +86,8 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter { s"Please supply the required value.") } - testIfEnabled("incorrect password throws an error message for changes receiver") { + testIf("incorrect password throws an error message for changes receiver", + TestUtils.shouldRunTest) { spark = SparkSession.builder().config(conf) .config("cloudant.protocol", TestUtils.getProtocol) .config("cloudant.host", TestUtils.getHost) @@ -103,7 +104,7 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter { "\"reason\":\"Name or password is incorrect.\"}") } - testIfEnabled("string with valid value for cloudant.numberOfRetries option") { + testIf("string with valid value for cloudant.numberOfRetries option", TestUtils.shouldRunTest) { spark = SparkSession.builder().config(conf) .config("cloudant.host", TestUtils.getHost) .config("cloudant.username", TestUtils.getUsername) @@ -115,7 +116,8 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter { assert(df.count() === 2) } - testIfEnabled("invalid value for cloudant.numberOfRetries option throws an error message") { + testIf("invalid value for cloudant.numberOfRetries option throws an error message", + TestUtils.shouldRunTest) { spark = SparkSession.builder().config(conf) .config("cloudant.host", TestUtils.getHost) .config("cloudant.username", TestUtils.getUsername) http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantSparkSQLSuite.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantSparkSQLSuite.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantSparkSQLSuite.scala index 41e5e89..9b70314 100644 --- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantSparkSQLSuite.scala +++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantSparkSQLSuite.scala @@ -25,8 +25,6 @@ class CloudantSparkSQLSuite extends ClientSparkFunSuite { protected override def _sqlContext: SQLContext = spark.sqlContext } - import testImplicits._ - val endpoint = "_all_docs" override def beforeAll() { @@ -40,8 +38,8 @@ class CloudantSparkSQLSuite extends ClientSparkFunSuite { .getOrCreate() } - testIfEnabled("verify results from temp view of database n_airportcodemapping") { - + testIf("verify results from temp view of database n_airportcodemapping", + TestUtils.shouldRunTest) { // create a temp table from Cloudant db and query it using sql syntax val sparkSql = spark.sql( s""" @@ -69,7 +67,7 @@ class CloudantSparkSQLSuite extends ClientSparkFunSuite { assert(df2count == airportData.count()) } - testIfEnabled("verify results from temp view of index in n_flight") { + testIf("verify results from temp view of index in n_flight", TestUtils.shouldRunTest) { // create a temp table from Cloudant index and query it using sql syntax val sparkSql = spark.sql( s""" http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/TestUtils.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/TestUtils.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/TestUtils.scala index dee6542..2904c25 100644 --- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/TestUtils.scala +++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/TestUtils.scala @@ -17,8 +17,6 @@ package org.apache.bahir.cloudant -import java.io.File - object TestUtils { // Set CouchDB/Cloudant host, username and password for local testing private val host = System.getenv("CLOUDANT_HOST") @@ -37,15 +35,6 @@ object TestUtils { "n_flightsegment" ) - def deleteRecursively(file: File): Unit = { - if (file.isDirectory) { - file.listFiles.foreach(deleteRecursively) - } - if (file.exists && !file.delete) { - throw new Exception(s"Unable to delete ${file.getAbsolutePath}") - } - } - // default value is https for cloudant.com accounts def getProtocol: String = { if (protocol != null && !protocol.isEmpty) { @@ -71,12 +60,11 @@ object TestUtils { password } - lazy val shouldRunTests = { + def shouldRunTest(): Boolean = { val isEnvSet = (username != null && !username.isEmpty) && (password != null && !password.isEmpty) if (isEnvSet) { // scalastyle:off println - // Print this so that they are easily visible on the console and not hidden in the log4j logs. println( s""" |Sql-cloudant tests that require Cloudant databases have been enabled by http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-akka/pom.xml ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/pom.xml b/sql-streaming-akka/pom.xml index 028b719..98586c7 100644 --- a/sql-streaming-akka/pom.xml +++ b/sql-streaming-akka/pom.xml @@ -36,6 +36,11 @@ <dependencies> <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>bahir-common_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-tags_${scala.binary.version}</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala deleted file mode 100644 index 996a0a1..0000000 --- a/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.bahir.utils - -import java.io.{File, IOException} -import java.nio.file.{Files, FileVisitResult, Path, SimpleFileVisitor} -import java.nio.file.attribute.BasicFileAttributes - -object BahirUtils extends Logging { - - def recursiveDeleteDir(dir: File): Path = { - Files.walkFileTree(dir.toPath, new SimpleFileVisitor[Path]() { - override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = { - try { - Files.delete(file) - } catch { - case t: Throwable => log.warn("Failed to delete", t) - } - FileVisitResult.CONTINUE - } - - override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = { - try { - Files.delete(dir) - } catch { - case t: Throwable => log.warn("Failed to delete", t) - } - FileVisitResult.CONTINUE - } - }) - } -} http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala deleted file mode 100644 index 776ed5a..0000000 --- a/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.bahir.utils - -import org.slf4j.LoggerFactory - -trait Logging { - final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$")) -} http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala index f61b067..b04ed3c 100644 --- a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala +++ b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.types.StructType -import org.apache.bahir.utils.BahirUtils +import org.apache.bahir.utils.FileHelper class AkkaStreamSourceSuite extends SparkFunSuite with BeforeAndAfter { @@ -50,7 +50,7 @@ class AkkaStreamSourceSuite extends SparkFunSuite with BeforeAndAfter { after { Persistence.close() - BahirUtils.recursiveDeleteDir(tempDir) + FileHelper.deleteFileQuietly(tempDir) } protected val tmpDir: String = tempDir.getAbsolutePath http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/pom.xml ---------------------------------------------------------------------- diff --git a/sql-streaming-mqtt/pom.xml b/sql-streaming-mqtt/pom.xml index 242f24b..63497dc 100644 --- a/sql-streaming-mqtt/pom.xml +++ b/sql-streaming-mqtt/pom.xml @@ -36,6 +36,11 @@ <dependencies> <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>bahir-common_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-tags_${scala.binary.version}</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala index f825eea..fed2601 100644 --- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala +++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala @@ -27,6 +27,7 @@ import scala.util.control.NonFatal import org.apache.spark.SparkEnv import org.apache.bahir.utils.Logging +import org.apache.bahir.utils.Retry private[mqtt] object CachedMQTTClient extends Logging { http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala index 8654b88..f449e57 100644 --- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala +++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType import org.apache.bahir.utils.Logging +import org.apache.bahir.utils.Retry class MQTTStreamWriter (schema: StructType, parameters: DataSourceOptions) http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTUtils.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTUtils.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTUtils.scala index 79fe7a2..a615d28 100644 --- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTUtils.scala +++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTUtils.scala @@ -82,36 +82,3 @@ private[mqtt] object MQTTUtils extends Logging { (brokerUrl, clientId, topic, persistence, mqttConnectOptions, qos) } } - -private[mqtt] object Retry { - /** - * Retry invocation of given code. - * @param attempts Number of attempts to try executing given code. -1 represents infinity. - * @param pauseMs Number of backoff milliseconds. - * @param retryExceptions Types of exceptions to retry. - * @param code Function to execute. - * @tparam A Type parameter. - * @return Returns result of function execution or exception in case of failure. - */ - def apply[A](attempts: Int, pauseMs: Long, retryExceptions: Class[_]*)(code: => A): A = { - var result: Option[A] = None - var success = false - var remaining = attempts - while ( ! success ) { - try { - remaining -= 1 - result = Some( code ) - success = true - } - catch { - case e: Exception => - if (retryExceptions.contains(e.getClass) && (attempts == -1 || remaining > 0)) { - Thread.sleep(pauseMs) - } else { - throw e - } - } - } - result.get - } -} http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/BahirUtils.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/BahirUtils.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/BahirUtils.scala deleted file mode 100644 index 3d27b06..0000000 --- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/BahirUtils.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.bahir.utils - -import java.io.{File, IOException} -import java.nio.file.{Files, FileVisitResult, Path, SimpleFileVisitor} -import java.nio.file.attribute.BasicFileAttributes - -object BahirUtils extends Logging { - - def recursiveDeleteDir(dir: File): Path = { - Files.walkFileTree(dir.toPath, new SimpleFileVisitor[Path]() { - override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = { - try { - Files.delete(file) - } catch { - case t: Throwable => log.warn("Failed to delete", t) - } - FileVisitResult.CONTINUE - } - - override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = { - try { - Files.delete(dir) - } catch { - case t: Throwable => log.warn("Failed to delete", t) - } - FileVisitResult.CONTINUE - } - }) - } - -} http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/Logging.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/Logging.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/Logging.scala deleted file mode 100644 index cbe97e9..0000000 --- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/Logging.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.bahir.utils - -import org.slf4j.LoggerFactory - - -trait Logging { - final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$")) -} http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/LocalMessageStoreSuite.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/LocalMessageStoreSuite.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/LocalMessageStoreSuite.scala index d1bbe18..0b6b80b 100644 --- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/LocalMessageStoreSuite.scala +++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/LocalMessageStoreSuite.scala @@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.SparkFunSuite -import org.apache.bahir.utils.BahirUtils +import org.apache.bahir.utils.FileHelper class LocalMessageStoreSuite extends SparkFunSuite with BeforeAndAfter { @@ -48,7 +48,7 @@ class LocalMessageStoreSuite extends SparkFunSuite with BeforeAndAfter { after { persistence.clear() persistence.close() - BahirUtils.recursiveDeleteDir(tempDir) + FileHelper.deleteFileQuietly(tempDir) } test("serialize and deserialize") { http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala index 14ea962..d72ba17 100644 --- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala +++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} -import org.apache.bahir.utils.BahirUtils +import org.apache.bahir.utils.FileHelper class MQTTStreamSinkSuite extends SparkFunSuite with SharedSparkContext with BeforeAndAfter { @@ -57,7 +57,7 @@ class MQTTStreamSinkSuite extends SparkFunSuite with SharedSparkContext with Bef testClient.disconnect() testClient.close() mqttTestUtils.teardown() - BahirUtils.recursiveDeleteDir(tempDir) + FileHelper.deleteFileQuietly(tempDir) } protected def createContextAndDF(messages: String*): (SQLContext, DataFrame) = { http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala index bb82715..a7eb770 100644 --- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala +++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.streaming.{DataStreamReader, StreamingQuery} -import org.apache.bahir.utils.BahirUtils +import org.apache.bahir.utils.FileHelper class MQTTStreamSourceSuite extends SparkFunSuite with SharedSparkContext with BeforeAndAfter { @@ -50,7 +50,7 @@ class MQTTStreamSourceSuite extends SparkFunSuite with SharedSparkContext with B after { mqttTestUtils.teardown() - BahirUtils.recursiveDeleteDir(tempDir) + FileHelper.deleteFileQuietly(tempDir) } protected val tmpDir: String = tempDir.getAbsolutePath http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-akka/pom.xml ---------------------------------------------------------------------- diff --git a/streaming-akka/pom.xml b/streaming-akka/pom.xml index 7929f33..5b94c7a 100644 --- a/streaming-akka/pom.xml +++ b/streaming-akka/pom.xml @@ -36,6 +36,13 @@ <dependencies> <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>bahir-common_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-tags_${scala.binary.version}</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java ---------------------------------------------------------------------- diff --git a/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java b/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java index 4a6d578..1c8fd78 100644 --- a/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java +++ b/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java @@ -21,50 +21,45 @@ import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.SupervisorStrategy; import akka.util.Timeout; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.junit.Test; import org.apache.spark.api.java.function.Function0; import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import java.util.concurrent.TimeUnit; -public class JavaAkkaUtilsSuite { - - @Test // tests the API, does not actually test data receiving - public void testAkkaUtils() { - JavaStreamingContext jsc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); - try { - JavaReceiverInputDStream<String> test1 = AkkaUtils.<String>createStream( - jsc, Props.create(JavaTestActor.class), "test"); - JavaReceiverInputDStream<String> test2 = AkkaUtils.<String>createStream( - jsc, Props.create(JavaTestActor.class), "test", StorageLevel.MEMORY_AND_DISK_SER_2()); - JavaReceiverInputDStream<String> test3 = AkkaUtils.<String>createStream( - jsc, - Props.create(JavaTestActor.class), - "test", StorageLevel.MEMORY_AND_DISK_SER_2(), - new ActorSystemCreatorForTest(), - SupervisorStrategy.defaultStrategy()); - } finally { - jsc.stop(); +public class JavaAkkaUtilsSuite extends LocalJavaStreamingContext { + @Test + public void testAkkaUtils() { + // tests the API, does not actually test data receiving + JavaReceiverInputDStream<String> test1 = AkkaUtils.<String>createStream( + ssc, Props.create(JavaTestActor.class), "test" + ); + JavaReceiverInputDStream<String> test2 = AkkaUtils.<String>createStream( + ssc, Props.create(JavaTestActor.class), "test", + StorageLevel.MEMORY_AND_DISK_SER_2() + ); + JavaReceiverInputDStream<String> test3 = AkkaUtils.<String>createStream( + ssc, Props.create(JavaTestActor.class), "test", + StorageLevel.MEMORY_AND_DISK_SER_2(), new ActorSystemCreatorForTest(), + SupervisorStrategy.defaultStrategy() + ); } - } } class ActorSystemCreatorForTest implements Function0<ActorSystem> { - @Override - public ActorSystem call() { - return null; - } + @Override + public ActorSystem call() { + return null; + } } - class JavaTestActor extends JavaActorReceiver { - @Override - public void onReceive(Object message) throws Exception { - store((String) message); - store((String) message, new Timeout(1000, TimeUnit.MILLISECONDS)); - } + @Override + public void onReceive(Object message) throws Exception { + store((String) message); + store((String) message, new Timeout(1000, TimeUnit.MILLISECONDS)); + } } http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-mqtt/pom.xml ---------------------------------------------------------------------- diff --git a/streaming-mqtt/pom.xml b/streaming-mqtt/pom.xml index f23aa83..0f6d809 100644 --- a/streaming-mqtt/pom.xml +++ b/streaming-mqtt/pom.xml @@ -36,6 +36,13 @@ <dependencies> <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>bahir-common_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-tags_${scala.binary.version}</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java ---------------------------------------------------------------------- diff --git a/streaming-mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming-mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java deleted file mode 100644 index cfedb5a..0000000 --- a/streaming-mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming; - -import org.apache.spark.SparkConf; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.junit.After; -import org.junit.Before; - -public abstract class LocalJavaStreamingContext { - - protected transient JavaStreamingContext ssc; - - @Before - public void setUp() { - SparkConf conf = new SparkConf() - .setMaster("local[2]") - .setAppName("test") - .set("spark.streaming.clock", "org.apache.spark.util.ManualClock"); - ssc = new JavaStreamingContext(conf, new Duration(1000)); - ssc.checkpoint("checkpoint"); - } - - @After - public void tearDown() { - ssc.stop(); - ssc = null; - } -} http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala ---------------------------------------------------------------------- diff --git a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 6ef551b..d86aa98 100644 --- a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -77,6 +77,7 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter } ssc.stop() } + test("mqtt input stream2") { val sendMessage1 = "MQTT demo for spark streaming1" val sendMessage2 = "MQTT demo for spark streaming2" http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-pubnub/pom.xml ---------------------------------------------------------------------- diff --git a/streaming-pubnub/pom.xml b/streaming-pubnub/pom.xml index ac0b925..464cfce 100644 --- a/streaming-pubnub/pom.xml +++ b/streaming-pubnub/pom.xml @@ -35,6 +35,13 @@ <dependencies> <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>bahir-common_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-tags_${scala.binary.version}</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java ---------------------------------------------------------------------- diff --git a/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java deleted file mode 100644 index 448fb5e..0000000 --- a/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming; - -import org.apache.spark.SparkConf; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.junit.After; -import org.junit.Before; - -public abstract class LocalJavaStreamingContext { - protected transient JavaStreamingContext ssc; - - @Before - public void setUp() { - SparkConf conf = new SparkConf() - .setMaster("local[2]") - .setAppName("test") - .set("spark.streaming.clock", "org.apache.spark.util.ManualClock"); - ssc = new JavaStreamingContext(conf, new Duration(1000)); - ssc.checkpoint("checkpoint"); - } - - @After - public void tearDown() { - ssc.stop(); - ssc = null; - } -} http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-pubsub/pom.xml ---------------------------------------------------------------------- diff --git a/streaming-pubsub/pom.xml b/streaming-pubsub/pom.xml index 0885152..f6ecd37 100644 --- a/streaming-pubsub/pom.xml +++ b/streaming-pubsub/pom.xml @@ -36,6 +36,13 @@ <dependencies> <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>bahir-common_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-tags_${scala.binary.version}</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java ---------------------------------------------------------------------- diff --git a/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java deleted file mode 100644 index cfedb5a..0000000 --- a/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming; - -import org.apache.spark.SparkConf; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.junit.After; -import org.junit.Before; - -public abstract class LocalJavaStreamingContext { - - protected transient JavaStreamingContext ssc; - - @Before - public void setUp() { - SparkConf conf = new SparkConf() - .setMaster("local[2]") - .setAppName("test") - .set("spark.streaming.clock", "org.apache.spark.util.ManualClock"); - ssc = new JavaStreamingContext(conf, new Duration(1000)); - ssc.checkpoint("checkpoint"); - } - - @After - public void tearDown() { - ssc.stop(); - ssc = null; - } -} http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala ---------------------------------------------------------------------- diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala deleted file mode 100644 index acdceb7..0000000 --- a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.pubsub - -import org.apache.spark.SparkFunSuite - -/** - * Helper class that runs Google Cloud Pub/Sub real data transfer tests of - * ignores them based on env variable is set or not. - */ -trait PubsubFunSuite extends SparkFunSuite { - import PubsubTestUtils._ - - /** Run the test if environment variable is set or ignore the test */ - def testIfEnabled(testName: String)(testBody: => Unit) { - if (shouldRunTests) { - test(testName)(testBody) - } else { - ignore(s"$testName [enable by setting env var $envVarNameForEnablingTests=1]")(testBody) - } - } - - /** Run the give body of code only if Kinesis tests are enabled */ - def runIfTestsEnabled(message: String)(body: => Unit): Unit = { - if (shouldRunTests) { - body - } else { - ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")(()) - } - } -} http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala ---------------------------------------------------------------------- diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala index d91c7e6..8f499cb 100644 --- a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala +++ b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala @@ -25,11 +25,12 @@ import scala.language.postfixOps import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.Eventually +import org.apache.spark.ConditionalSparkFunSuite import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext -class PubsubStreamSuite extends PubsubFunSuite with Eventually with BeforeAndAfter { +class PubsubStreamSuite extends ConditionalSparkFunSuite with Eventually with BeforeAndAfter { val batchDuration = Seconds(1) @@ -50,7 +51,7 @@ class PubsubStreamSuite extends PubsubFunSuite with Eventually with BeforeAndAft private var subForCreateFullName: String = null override def beforeAll(): Unit = { - runIfTestsEnabled("Prepare PubsubTestUtils") { + runIf(PubsubTestUtils.shouldRunTest) { pubsubTestUtils = new PubsubTestUtils topicFullName = pubsubTestUtils.getFullTopicPath(topicName) subscriptionFullName = pubsubTestUtils.getFullSubscriptionPath(subscriptionName) @@ -88,7 +89,7 @@ class PubsubStreamSuite extends PubsubFunSuite with Eventually with BeforeAndAft PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2) } - testIfEnabled("pubsub input stream") { + testIf("pubsub input stream", PubsubTestUtils.shouldRunTest) { val receiveStream = PubsubUtils.createStream( ssc, PubsubTestUtils.projectId, Some(topicName), subscriptionName, PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2) @@ -112,7 +113,7 @@ class PubsubStreamSuite extends PubsubFunSuite with Eventually with BeforeAndAft } } - testIfEnabled("pubsub input stream, create pubsub") { + testIf("pubsub input stream, create pubsub", PubsubTestUtils.shouldRunTest) { val receiveStream = PubsubUtils.createStream( ssc, PubsubTestUtils.projectId, Some(topicName), subForCreateName, PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2) http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala ---------------------------------------------------------------------- diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala index 9dd719a..39597ca 100644 --- a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala +++ b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala @@ -101,11 +101,10 @@ private[pubsub] object PubsubTestUtils { val envVarNameForP12KeyPath = "GCP_TEST_P12_KEY_PATH" val envVarNameForAccount = "GCP_TEST_ACCOUNT" - lazy val shouldRunTests = { - val isEnvSet = sys.env.get(envVarNameForEnablingTests) == Some("1") + def shouldRunTest(): Boolean = { + val isEnvSet = sys.env.get(envVarNameForEnablingTests).contains("1") if (isEnvSet) { // scalastyle:off println - // Print this so that they are easily visible on the console and not hidden in the log4j logs. println( s""" |Google Pub/Sub tests that actually send data has been enabled by setting the environment http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-twitter/pom.xml ---------------------------------------------------------------------- diff --git a/streaming-twitter/pom.xml b/streaming-twitter/pom.xml index 1f18100..2bf29b5 100644 --- a/streaming-twitter/pom.xml +++ b/streaming-twitter/pom.xml @@ -36,6 +36,13 @@ <dependencies> <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>bahir-common_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-tags_${scala.binary.version}</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java ---------------------------------------------------------------------- diff --git a/streaming-twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming-twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java deleted file mode 100644 index cfedb5a..0000000 --- a/streaming-twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming; - -import org.apache.spark.SparkConf; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.junit.After; -import org.junit.Before; - -public abstract class LocalJavaStreamingContext { - - protected transient JavaStreamingContext ssc; - - @Before - public void setUp() { - SparkConf conf = new SparkConf() - .setMaster("local[2]") - .setAppName("test") - .set("spark.streaming.clock", "org.apache.spark.util.ManualClock"); - ssc = new JavaStreamingContext(conf, new Duration(1000)); - ssc.checkpoint("checkpoint"); - } - - @After - public void tearDown() { - ssc.stop(); - ssc = null; - } -} http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-zeromq/pom.xml ---------------------------------------------------------------------- diff --git a/streaming-zeromq/pom.xml b/streaming-zeromq/pom.xml index 92115f2..5aaf2fa 100644 --- a/streaming-zeromq/pom.xml +++ b/streaming-zeromq/pom.xml @@ -36,6 +36,13 @@ <dependencies> <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>bahir-common_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-tags_${scala.binary.version}</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java ---------------------------------------------------------------------- diff --git a/streaming-zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming-zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java deleted file mode 100644 index f9cee96..0000000 --- a/streaming-zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming; - -import org.apache.spark.SparkConf; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.junit.After; -import org.junit.Before; - -public abstract class LocalJavaStreamingContext { - protected transient JavaStreamingContext ssc; - - @Before - public void setUp() { - final SparkConf conf = new SparkConf() - .setMaster("local[2]") - .setAppName("test") - .set("spark.streaming.clock", "org.apache.spark.util.ManualClock"); - ssc = new JavaStreamingContext(conf, new Duration(1000)); - ssc.checkpoint("checkpoint"); - } - - @After - public void tearDown() { - ssc.stop(); - ssc = null; - } -} -
