Repository: spark Updated Branches: refs/heads/master 761c2d1b6 -> c35c60fa9
[SPARK-14028][STREAMING][KINESIS][TESTS] Remove deprecated methods; fix two other warnings ## What changes were proposed in this pull request? - Removed two methods that has been deprecated since 1.4 - Fixed two other compilation warnings ## How was this patch tested? existing test suits Author: proflin <[email protected]> Closes #11850 from lw-lin/streaming-kinesis-deprecates-warnings. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c35c60fa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c35c60fa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c35c60fa Branch: refs/heads/master Commit: c35c60fa916e92916442a98f4af123704bb9692e Parents: 761c2d1 Author: proflin <[email protected]> Authored: Mon Mar 21 08:02:06 2016 +0000 Committer: Sean Owen <[email protected]> Committed: Mon Mar 21 08:02:06 2016 +0000 ---------------------------------------------------------------------- .../spark/streaming/kinesis/KinesisUtils.scala | 86 -------------------- .../kinesis/JavaKinesisStreamSuite.java | 11 ++- .../streaming/kinesis/KinesisFunSuite.scala | 2 +- .../streaming/kinesis/KinesisStreamSuite.scala | 12 ++- 4 files changed, 13 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c35c60fa/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala ---------------------------------------------------------------------- diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala index 15ac588..a0007d3 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala @@ -225,51 +225,6 @@ object KinesisUtils { * Create an input stream that pulls messages from a Kinesis stream. * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. * - * Note: - * - * - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain - * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain - * gets AWS credentials. - * - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch. - * - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name - * in [[org.apache.spark.SparkConf]]. - * - * @param ssc StreamingContext object - * @param streamName Kinesis stream name - * @param endpointUrl Endpoint url of Kinesis service - * (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param storageLevel Storage level to use for storing the received objects - * StorageLevel.MEMORY_AND_DISK_2 is recommended. - */ - @deprecated("use other forms of createStream", "1.4.0") - def createStream( - ssc: StreamingContext, - streamName: String, - endpointUrl: String, - checkpointInterval: Duration, - initialPositionInStream: InitialPositionInStream, - storageLevel: StorageLevel - ): ReceiverInputDStream[Array[Byte]] = { - ssc.withNamedScope("kinesis stream") { - new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, - getRegionByEndpoint(endpointUrl), initialPositionInStream, ssc.sc.appName, - checkpointInterval, storageLevel, defaultMessageHandler, None) - } - } - - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. - * * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain * gets the AWS credentials. @@ -453,47 +408,6 @@ object KinesisUtils { defaultMessageHandler(_), awsAccessKeyId, awsSecretKey) } - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. - * - * Note: - * - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain - * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain - * gets AWS credentials. - * - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch. - * - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name in - * [[org.apache.spark.SparkConf]]. - * - * @param jssc Java StreamingContext object - * @param streamName Kinesis stream name - * @param endpointUrl Endpoint url of Kinesis service - * (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param storageLevel Storage level to use for storing the received objects - * StorageLevel.MEMORY_AND_DISK_2 is recommended. - */ - @deprecated("use other forms of createStream", "1.4.0") - def createStream( - jssc: JavaStreamingContext, - streamName: String, - endpointUrl: String, - checkpointInterval: Duration, - initialPositionInStream: InitialPositionInStream, - storageLevel: StorageLevel - ): JavaReceiverInputDStream[Array[Byte]] = { - createStream( - jssc.ssc, streamName, endpointUrl, checkpointInterval, initialPositionInStream, storageLevel) - } - private def getRegionByEndpoint(endpointUrl: String): String = { RegionUtils.getRegionByEndpoint(endpointUrl).getName() } http://git-wip-us.apache.org/repos/asf/spark/blob/c35c60fa/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java ---------------------------------------------------------------------- diff --git a/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java b/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java index 5c2371c..f078973 100644 --- a/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java +++ b/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java @@ -17,6 +17,7 @@ package org.apache.spark.streaming.kinesis; +import com.amazonaws.regions.RegionUtils; import com.amazonaws.services.kinesis.model.Record; import org.junit.Test; @@ -34,11 +35,13 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn public class JavaKinesisStreamSuite extends LocalJavaStreamingContext { @Test public void testKinesisStream() { - // Tests the API, does not actually test data receiving - JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream", - "https://kinesis.us-west-2.amazonaws.com", new Duration(2000), - InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2()); + String dummyEndpointUrl = KinesisTestUtils.defaultEndpointUrl(); + String dummyRegionName = RegionUtils.getRegionByEndpoint(dummyEndpointUrl).getName(); + // Tests the API, does not actually test data receiving + JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream", + dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, new Duration(2000), + StorageLevel.MEMORY_AND_DISK_2()); ssc.stop(); } http://git-wip-us.apache.org/repos/asf/spark/blob/c35c60fa/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala ---------------------------------------------------------------------- diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala index ee428f3..1c81298 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala @@ -40,7 +40,7 @@ trait KinesisFunSuite extends SparkFunSuite { if (shouldRunTests) { body } else { - ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")() + ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")(()) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/c35c60fa/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 4460b6b..0e71bf9 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -99,14 +99,10 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun } test("KinesisUtils API") { - // Tests the API, does not actually test data receiving - val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream", - dummyEndpointUrl, Seconds(2), - InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2) - val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream", + val kinesisStream1 = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream", dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2) - val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream", + val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream", dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2, dummyAWSAccessKey, dummyAWSSecretKey) @@ -154,7 +150,9 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun // Verify that KinesisBackedBlockRDD is generated even when there are no blocks val emptyRDD = kinesisStream.createBlockRDD(time, Seq.empty) - emptyRDD shouldBe a [KinesisBackedBlockRDD[Array[Byte]]] + // Verify it's KinesisBackedBlockRDD[_] rather than KinesisBackedBlockRDD[Array[Byte]], because + // the type parameter will be erased at runtime + emptyRDD shouldBe a [KinesisBackedBlockRDD[_]] emptyRDD.partitions shouldBe empty // Verify that the KinesisBackedBlockRDD has isBlockValid = false when blocks are invalid --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
