Repository: spark Updated Branches: refs/heads/master be03d3ad7 -> 0e6833006
[SPARK-20168][DSTREAM] Add changes to use kinesis fetches from specific timestamp ## What changes were proposed in this pull request? Kinesis client can resume from a specified timestamp while creating a stream. We should have option to pass a timestamp in config to allow kinesis to resume from the given timestamp. The patch introduces a new `KinesisInitialPositionInStream` that takes the `InitialPositionInStream` with the `timestamp` information that can be used to resume kinesis fetches from the provided timestamp. ## How was this patch tested? Unit Tests cc : budde brkyvz Author: Yash Sharma <ysha...@atlassian.com> Closes #18029 from yssharma/ysharma/kcl_resume. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e683300 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e683300 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e683300 Branch: refs/heads/master Commit: 0e6833006d28df426eb132bb8fc82917b8e2aedd Parents: be03d3a Author: Yash Sharma <ysha...@atlassian.com> Authored: Tue Dec 26 09:50:39 2017 +0200 Committer: Burak Yavuz <brk...@gmail.com> Committed: Tue Dec 26 09:50:39 2017 +0200 ---------------------------------------------------------------------- .../kinesis/KinesisInitialPositions.java | 91 ++++++++++++++++++++ .../streaming/KinesisWordCountASL.scala | 5 +- .../streaming/kinesis/KinesisInputDStream.scala | 31 +++++-- .../streaming/kinesis/KinesisReceiver.scala | 45 ++++++---- .../spark/streaming/kinesis/KinesisUtils.scala | 15 ++-- .../JavaKinesisInputDStreamBuilderSuite.java | 47 ++++++++-- .../KinesisInputDStreamBuilderSuite.scala | 68 +++++++++++++-- .../streaming/kinesis/KinesisStreamSuite.scala | 11 +-- 8 files changed, 264 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0e683300/external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java ---------------------------------------------------------------------- diff --git a/external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java b/external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java new file mode 100644 index 0000000..206e1e4 --- /dev/null +++ b/external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis; + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; + +import java.io.Serializable; +import java.util.Date; + +/** + * A java wrapper for exposing [[InitialPositionInStream]] + * to the corresponding Kinesis readers. + */ +interface KinesisInitialPosition { + InitialPositionInStream getPosition(); +} + +public class KinesisInitialPositions { + public static class Latest implements KinesisInitialPosition, Serializable { + public Latest() {} + + @Override + public InitialPositionInStream getPosition() { + return InitialPositionInStream.LATEST; + } + } + + public static class TrimHorizon implements KinesisInitialPosition, Serializable { + public TrimHorizon() {} + + @Override + public InitialPositionInStream getPosition() { + return InitialPositionInStream.TRIM_HORIZON; + } + } + + public static class AtTimestamp implements KinesisInitialPosition, Serializable { + private Date timestamp; + + public AtTimestamp(Date timestamp) { + this.timestamp = timestamp; + } + + @Override + public InitialPositionInStream getPosition() { + return InitialPositionInStream.AT_TIMESTAMP; + } + + public Date getTimestamp() { + return timestamp; + } + } + + + /** + * Returns instance of [[KinesisInitialPosition]] based on the passed [[InitialPositionInStream]]. + * This method is used in KinesisUtils for translating the InitialPositionInStream + * to InitialPosition. This function would be removed when we deprecate the KinesisUtils. + * + * @return [[InitialPosition]] + */ + public static KinesisInitialPosition fromKinesisInitialPosition( + InitialPositionInStream initialPositionInStream) throws UnsupportedOperationException { + if (initialPositionInStream == InitialPositionInStream.LATEST) { + return new Latest(); + } else if (initialPositionInStream == InitialPositionInStream.TRIM_HORIZON) { + return new TrimHorizon(); + } else { + // InitialPositionInStream.AT_TIMESTAMP is not supported. + // Use InitialPosition.atTimestamp(timestamp) instead. + throw new UnsupportedOperationException( + "Only InitialPositionInStream.LATEST and InitialPositionInStream.TRIM_HORIZON " + + "supported in initialPositionInStream(). Please use the initialPosition() from " + + "builder API in KinesisInputDStream for using InitialPositionInStream.AT_TIMESTAMP"); + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/0e683300/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala ---------------------------------------------------------------------- diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala index cde2c4b..fcb790e 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala @@ -24,7 +24,6 @@ import scala.util.Random import com.amazonaws.auth.DefaultAWSCredentialsProviderChain import com.amazonaws.services.kinesis.AmazonKinesisClient -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream import com.amazonaws.services.kinesis.model.PutRecordRequest import org.apache.log4j.{Level, Logger} @@ -33,9 +32,9 @@ import org.apache.spark.internal.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions +import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest import org.apache.spark.streaming.kinesis.KinesisInputDStream - /** * Consumes messages from a Amazon Kinesis streams and does wordcount. * @@ -139,7 +138,7 @@ object KinesisWordCountASL extends Logging { .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) - .initialPositionInStream(InitialPositionInStream.LATEST) + .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) http://git-wip-us.apache.org/repos/asf/spark/blob/0e683300/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala ---------------------------------------------------------------------- diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala index f61e398..1ffec01 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala @@ -28,6 +28,7 @@ import org.apache.spark.storage.{BlockId, StorageLevel} import org.apache.spark.streaming.{Duration, StreamingContext, Time} import org.apache.spark.streaming.api.java.JavaStreamingContext import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.scheduler.ReceivedBlockInfo @@ -36,7 +37,7 @@ private[kinesis] class KinesisInputDStream[T: ClassTag]( val streamName: String, val endpointUrl: String, val regionName: String, - val initialPositionInStream: InitialPositionInStream, + val initialPosition: KinesisInitialPosition, val checkpointAppName: String, val checkpointInterval: Duration, val _storageLevel: StorageLevel, @@ -77,7 +78,7 @@ private[kinesis] class KinesisInputDStream[T: ClassTag]( } override def getReceiver(): Receiver[T] = { - new KinesisReceiver(streamName, endpointUrl, regionName, initialPositionInStream, + new KinesisReceiver(streamName, endpointUrl, regionName, initialPosition, checkpointAppName, checkpointInterval, _storageLevel, messageHandler, kinesisCreds, dynamoDBCreds, cloudWatchCreds) } @@ -100,7 +101,7 @@ object KinesisInputDStream { // Params with defaults private var endpointUrl: Option[String] = None private var regionName: Option[String] = None - private var initialPositionInStream: Option[InitialPositionInStream] = None + private var initialPosition: Option[KinesisInitialPosition] = None private var checkpointInterval: Option[Duration] = None private var storageLevel: Option[StorageLevel] = None private var kinesisCredsProvider: Option[SparkAWSCredentials] = None @@ -182,14 +183,30 @@ object KinesisInputDStream { /** * Sets the initial position data is read from in the Kinesis stream. Defaults to + * [[KinesisInitialPositions.Latest]] if no custom value is specified. + * + * @param initialPosition [[KinesisInitialPosition]] value specifying where Spark Streaming + * will start reading records in the Kinesis stream from + * @return Reference to this [[KinesisInputDStream.Builder]] + */ + def initialPosition(initialPosition: KinesisInitialPosition): Builder = { + this.initialPosition = Option(initialPosition) + this + } + + /** + * Sets the initial position data is read from in the Kinesis stream. Defaults to * [[InitialPositionInStream.LATEST]] if no custom value is specified. + * This function would be removed when we deprecate the KinesisUtils. * * @param initialPosition InitialPositionInStream value specifying where Spark Streaming * will start reading records in the Kinesis stream from * @return Reference to this [[KinesisInputDStream.Builder]] */ + @deprecated("use initialPosition(initialPosition: KinesisInitialPosition)", "2.3.0") def initialPositionInStream(initialPosition: InitialPositionInStream): Builder = { - initialPositionInStream = Option(initialPosition) + this.initialPosition = Option( + KinesisInitialPositions.fromKinesisInitialPosition(initialPosition)) this } @@ -266,7 +283,7 @@ object KinesisInputDStream { getRequiredParam(streamName, "streamName"), endpointUrl.getOrElse(DEFAULT_KINESIS_ENDPOINT_URL), regionName.getOrElse(DEFAULT_KINESIS_REGION_NAME), - initialPositionInStream.getOrElse(DEFAULT_INITIAL_POSITION_IN_STREAM), + initialPosition.getOrElse(DEFAULT_INITIAL_POSITION), getRequiredParam(checkpointAppName, "checkpointAppName"), checkpointInterval.getOrElse(ssc.graph.batchDuration), storageLevel.getOrElse(DEFAULT_STORAGE_LEVEL), @@ -293,7 +310,6 @@ object KinesisInputDStream { * Creates a [[KinesisInputDStream.Builder]] for constructing [[KinesisInputDStream]] instances. * * @since 2.2.0 - * * @return [[KinesisInputDStream.Builder]] instance */ def builder: Builder = new Builder @@ -309,7 +325,6 @@ object KinesisInputDStream { private[kinesis] val DEFAULT_KINESIS_ENDPOINT_URL: String = "https://kinesis.us-east-1.amazonaws.com" private[kinesis] val DEFAULT_KINESIS_REGION_NAME: String = "us-east-1" - private[kinesis] val DEFAULT_INITIAL_POSITION_IN_STREAM: InitialPositionInStream = - InitialPositionInStream.LATEST + private[kinesis] val DEFAULT_INITIAL_POSITION: KinesisInitialPosition = new Latest() private[kinesis] val DEFAULT_STORAGE_LEVEL: StorageLevel = StorageLevel.MEMORY_AND_DISK_2 } http://git-wip-us.apache.org/repos/asf/spark/blob/0e683300/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala ---------------------------------------------------------------------- diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 1026d0f..fa0de62 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -24,12 +24,13 @@ import scala.collection.mutable import scala.util.control.NonFatal import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorCheckpointer, IRecordProcessorFactory} -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker} +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{KinesisClientLibConfiguration, Worker} import com.amazonaws.services.kinesis.model.Record import org.apache.spark.internal.Logging import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming.Duration +import org.apache.spark.streaming.kinesis.KinesisInitialPositions.AtTimestamp import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver} import org.apache.spark.util.Utils @@ -56,12 +57,13 @@ import org.apache.spark.util.Utils * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) * @param regionName Region name used by the Kinesis Client Library for * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). + * @param initialPosition Instance of [[KinesisInitialPosition]] + * In the absence of Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * The values are either the beginning of the stream + * per Kinesis' limit of 24 hours + * ([[KinesisInitialPositions.TrimHorizon]]) or + * the tip of the stream ([[KinesisInitialPositions.Latest]]). * @param checkpointAppName Kinesis application name. Kinesis Apps are mapped to Kinesis Streams * by the Kinesis Client Library. If you change the App name or Stream name, * the KCL will throw errors. This usually requires deleting the backing @@ -83,7 +85,7 @@ private[kinesis] class KinesisReceiver[T]( val streamName: String, endpointUrl: String, regionName: String, - initialPositionInStream: InitialPositionInStream, + initialPosition: KinesisInitialPosition, checkpointAppName: String, checkpointInterval: Duration, storageLevel: StorageLevel, @@ -148,18 +150,29 @@ private[kinesis] class KinesisReceiver[T]( kinesisCheckpointer = new KinesisCheckpointer(receiver, checkpointInterval, workerId) val kinesisProvider = kinesisCreds.provider - val kinesisClientLibConfiguration = new KinesisClientLibConfiguration( - checkpointAppName, - streamName, - kinesisProvider, - dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider), - cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider), - workerId) + + val kinesisClientLibConfiguration = { + val baseClientLibConfiguration = new KinesisClientLibConfiguration( + checkpointAppName, + streamName, + kinesisProvider, + dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider), + cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider), + workerId) .withKinesisEndpoint(endpointUrl) - .withInitialPositionInStream(initialPositionInStream) + .withInitialPositionInStream(initialPosition.getPosition) .withTaskBackoffTimeMillis(500) .withRegionName(regionName) + // Update the Kinesis client lib config with timestamp + // if InitialPositionInStream.AT_TIMESTAMP is passed + initialPosition match { + case ts: AtTimestamp => + baseClientLibConfiguration.withTimestampAtInitialPositionInStream(ts.getTimestamp) + case _ => baseClientLibConfiguration + } + } + /* * RecordProcessorFactory creates impls of IRecordProcessor. * IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the http://git-wip-us.apache.org/repos/asf/spark/blob/0e683300/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala ---------------------------------------------------------------------- diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala index 1298463..2500460 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala @@ -73,7 +73,8 @@ object KinesisUtils { // Setting scope to override receiver stream's scope of "receiver stream" ssc.withNamedScope("kinesis stream") { new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName), - initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, + KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream), + kinesisAppName, checkpointInterval, storageLevel, cleanedHandler, DefaultCredentials, None, None) } } @@ -129,7 +130,8 @@ object KinesisUtils { awsAccessKeyId = awsAccessKeyId, awsSecretKey = awsSecretKey) new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName), - initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, + KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream), + kinesisAppName, checkpointInterval, storageLevel, cleanedHandler, kinesisCredsProvider, None, None) } } @@ -198,7 +200,8 @@ object KinesisUtils { awsAccessKeyId = awsAccessKeyId, awsSecretKey = awsSecretKey)) new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName), - initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, + KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream), + kinesisAppName, checkpointInterval, storageLevel, cleanedHandler, kinesisCredsProvider, None, None) } } @@ -243,7 +246,8 @@ object KinesisUtils { // Setting scope to override receiver stream's scope of "receiver stream" ssc.withNamedScope("kinesis stream") { new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName), - initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, + KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream), + kinesisAppName, checkpointInterval, storageLevel, KinesisInputDStream.defaultMessageHandler, DefaultCredentials, None, None) } } @@ -293,7 +297,8 @@ object KinesisUtils { awsAccessKeyId = awsAccessKeyId, awsSecretKey = awsSecretKey) new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName), - initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, + KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream), + kinesisAppName, checkpointInterval, storageLevel, KinesisInputDStream.defaultMessageHandler, kinesisCredsProvider, None, None) } } http://git-wip-us.apache.org/repos/asf/spark/blob/0e683300/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java ---------------------------------------------------------------------- diff --git a/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java b/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java index be6d549..03becd7 100644 --- a/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java +++ b/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java @@ -17,14 +17,13 @@ package org.apache.spark.streaming.kinesis; -import org.junit.Test; - import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; - +import org.apache.spark.streaming.kinesis.KinesisInitialPositions.TrimHorizon; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.Seconds; +import org.junit.Test; public class JavaKinesisInputDStreamBuilderSuite extends LocalJavaStreamingContext { /** @@ -35,7 +34,41 @@ public class JavaKinesisInputDStreamBuilderSuite extends LocalJavaStreamingConte String streamName = "a-very-nice-stream-name"; String endpointUrl = "https://kinesis.us-west-2.amazonaws.com"; String region = "us-west-2"; - InitialPositionInStream initialPosition = InitialPositionInStream.TRIM_HORIZON; + KinesisInitialPosition initialPosition = new TrimHorizon(); + String appName = "a-very-nice-kinesis-app"; + Duration checkpointInterval = Seconds.apply(30); + StorageLevel storageLevel = StorageLevel.MEMORY_ONLY(); + + KinesisInputDStream<byte[]> kinesisDStream = KinesisInputDStream.builder() + .streamingContext(ssc) + .streamName(streamName) + .endpointUrl(endpointUrl) + .regionName(region) + .initialPosition(initialPosition) + .checkpointAppName(appName) + .checkpointInterval(checkpointInterval) + .storageLevel(storageLevel) + .build(); + assert(kinesisDStream.streamName() == streamName); + assert(kinesisDStream.endpointUrl() == endpointUrl); + assert(kinesisDStream.regionName() == region); + assert(kinesisDStream.initialPosition().getPosition() == initialPosition.getPosition()); + assert(kinesisDStream.checkpointAppName() == appName); + assert(kinesisDStream.checkpointInterval() == checkpointInterval); + assert(kinesisDStream._storageLevel() == storageLevel); + ssc.stop(); + } + + /** + * Test to ensure that the old API for InitialPositionInStream + * is supported in KinesisDStream.Builder. + * This test would be removed when we deprecate the KinesisUtils. + */ + @Test + public void testJavaKinesisDStreamBuilderOldApi() { + String streamName = "a-very-nice-stream-name"; + String endpointUrl = "https://kinesis.us-west-2.amazonaws.com"; + String region = "us-west-2"; String appName = "a-very-nice-kinesis-app"; Duration checkpointInterval = Seconds.apply(30); StorageLevel storageLevel = StorageLevel.MEMORY_ONLY(); @@ -45,7 +78,7 @@ public class JavaKinesisInputDStreamBuilderSuite extends LocalJavaStreamingConte .streamName(streamName) .endpointUrl(endpointUrl) .regionName(region) - .initialPositionInStream(initialPosition) + .initialPositionInStream(InitialPositionInStream.LATEST) .checkpointAppName(appName) .checkpointInterval(checkpointInterval) .storageLevel(storageLevel) @@ -53,7 +86,7 @@ public class JavaKinesisInputDStreamBuilderSuite extends LocalJavaStreamingConte assert(kinesisDStream.streamName() == streamName); assert(kinesisDStream.endpointUrl() == endpointUrl); assert(kinesisDStream.regionName() == region); - assert(kinesisDStream.initialPositionInStream() == initialPosition); + assert(kinesisDStream.initialPosition().getPosition() == InitialPositionInStream.LATEST); assert(kinesisDStream.checkpointAppName() == appName); assert(kinesisDStream.checkpointInterval() == checkpointInterval); assert(kinesisDStream._storageLevel() == storageLevel); http://git-wip-us.apache.org/repos/asf/spark/blob/0e683300/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala ---------------------------------------------------------------------- diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala index afa1a7f..e0e2684 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala @@ -17,12 +17,15 @@ package org.apache.spark.streaming.kinesis +import java.util.Calendar + import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream import org.scalatest.BeforeAndAfterEach import org.scalatest.mockito.MockitoSugar import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Seconds, StreamingContext, TestSuiteBase} +import org.apache.spark.streaming.{Duration, Seconds, StreamingContext, TestSuiteBase} +import org.apache.spark.streaming.kinesis.KinesisInitialPositions.{AtTimestamp, TrimHorizon} class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterEach with MockitoSugar { @@ -69,7 +72,7 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE val dstream = builder.build() assert(dstream.endpointUrl == DEFAULT_KINESIS_ENDPOINT_URL) assert(dstream.regionName == DEFAULT_KINESIS_REGION_NAME) - assert(dstream.initialPositionInStream == DEFAULT_INITIAL_POSITION_IN_STREAM) + assert(dstream.initialPosition == DEFAULT_INITIAL_POSITION) assert(dstream.checkpointInterval == batchDuration) assert(dstream._storageLevel == DEFAULT_STORAGE_LEVEL) assert(dstream.kinesisCreds == DefaultCredentials) @@ -80,7 +83,7 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE test("should propagate custom non-auth values to KinesisInputDStream") { val customEndpointUrl = "https://kinesis.us-west-2.amazonaws.com" val customRegion = "us-west-2" - val customInitialPosition = InitialPositionInStream.TRIM_HORIZON + val customInitialPosition = new TrimHorizon() val customAppName = "a-very-nice-kinesis-app" val customCheckpointInterval = Seconds(30) val customStorageLevel = StorageLevel.MEMORY_ONLY @@ -91,7 +94,7 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE val dstream = builder .endpointUrl(customEndpointUrl) .regionName(customRegion) - .initialPositionInStream(customInitialPosition) + .initialPosition(customInitialPosition) .checkpointAppName(customAppName) .checkpointInterval(customCheckpointInterval) .storageLevel(customStorageLevel) @@ -101,12 +104,67 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE .build() assert(dstream.endpointUrl == customEndpointUrl) assert(dstream.regionName == customRegion) - assert(dstream.initialPositionInStream == customInitialPosition) + assert(dstream.initialPosition == customInitialPosition) assert(dstream.checkpointAppName == customAppName) assert(dstream.checkpointInterval == customCheckpointInterval) assert(dstream._storageLevel == customStorageLevel) assert(dstream.kinesisCreds == customKinesisCreds) assert(dstream.dynamoDBCreds == Option(customDynamoDBCreds)) assert(dstream.cloudWatchCreds == Option(customCloudWatchCreds)) + + // Testing with AtTimestamp + val cal = Calendar.getInstance() + cal.add(Calendar.DATE, -1) + val timestamp = cal.getTime() + val initialPositionAtTimestamp = new AtTimestamp(timestamp) + + val dstreamAtTimestamp = builder + .endpointUrl(customEndpointUrl) + .regionName(customRegion) + .initialPosition(initialPositionAtTimestamp) + .checkpointAppName(customAppName) + .checkpointInterval(customCheckpointInterval) + .storageLevel(customStorageLevel) + .kinesisCredentials(customKinesisCreds) + .dynamoDBCredentials(customDynamoDBCreds) + .cloudWatchCredentials(customCloudWatchCreds) + .build() + assert(dstreamAtTimestamp.endpointUrl == customEndpointUrl) + assert(dstreamAtTimestamp.regionName == customRegion) + assert(dstreamAtTimestamp.initialPosition.getPosition + == initialPositionAtTimestamp.getPosition) + assert( + dstreamAtTimestamp.initialPosition.asInstanceOf[AtTimestamp].getTimestamp.equals(timestamp)) + assert(dstreamAtTimestamp.checkpointAppName == customAppName) + assert(dstreamAtTimestamp.checkpointInterval == customCheckpointInterval) + assert(dstreamAtTimestamp._storageLevel == customStorageLevel) + assert(dstreamAtTimestamp.kinesisCreds == customKinesisCreds) + assert(dstreamAtTimestamp.dynamoDBCreds == Option(customDynamoDBCreds)) + assert(dstreamAtTimestamp.cloudWatchCreds == Option(customCloudWatchCreds)) + } + + test("old Api should throw UnsupportedOperationExceptionexception with AT_TIMESTAMP") { + val streamName: String = "a-very-nice-stream-name" + val endpointUrl: String = "https://kinesis.us-west-2.amazonaws.com" + val region: String = "us-west-2" + val appName: String = "a-very-nice-kinesis-app" + val checkpointInterval: Duration = Seconds.apply(30) + val storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY + + // This should not build. + // InitialPositionInStream.AT_TIMESTAMP is not supported in old Api. + // The builder Api in KinesisInputDStream should be used. + intercept[UnsupportedOperationException] { + val kinesisDStream: KinesisInputDStream[Array[Byte]] = KinesisInputDStream.builder + .streamingContext(ssc) + .streamName(streamName) + .endpointUrl(endpointUrl) + .regionName(region) + .initialPositionInStream(InitialPositionInStream.AT_TIMESTAMP) + .checkpointAppName(appName) + .checkpointInterval(checkpointInterval) + .storageLevel(storageLevel) + .build + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/0e683300/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 7e5bda9..a7a68eb 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -34,6 +34,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest import org.apache.spark.streaming.kinesis.KinesisReadConfigurations._ import org.apache.spark.streaming.kinesis.KinesisTestUtils._ import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult @@ -178,7 +179,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun .streamName(testUtils.streamName) .endpointUrl(testUtils.endpointUrl) .regionName(testUtils.regionName) - .initialPositionInStream(InitialPositionInStream.LATEST) + .initialPosition(new Latest()) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .build() @@ -209,7 +210,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun .streamName(testUtils.streamName) .endpointUrl(testUtils.endpointUrl) .regionName(testUtils.regionName) - .initialPositionInStream(InitialPositionInStream.LATEST) + .initialPosition(new Latest()) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .buildWithMessageHandler(addFive(_)) @@ -245,7 +246,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun .streamName("dummyStream") .endpointUrl(dummyEndpointUrl) .regionName(dummyRegionName) - .initialPositionInStream(InitialPositionInStream.LATEST) + .initialPosition(new Latest()) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .build() @@ -293,7 +294,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun .streamName(localTestUtils.streamName) .endpointUrl(localTestUtils.endpointUrl) .regionName(localTestUtils.regionName) - .initialPositionInStream(InitialPositionInStream.LATEST) + .initialPosition(new Latest()) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .build() @@ -369,7 +370,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun .streamName(testUtils.streamName) .endpointUrl(testUtils.endpointUrl) .regionName(testUtils.regionName) - .initialPositionInStream(InitialPositionInStream.LATEST) + .initialPosition(new Latest()) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .build() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org