Repository: spark
Updated Branches:
  refs/heads/master be03d3ad7 -> 0e6833006


[SPARK-20168][DSTREAM] Add changes to use kinesis fetches from specific 
timestamp

## What changes were proposed in this pull request?

Kinesis client can resume from a specified timestamp while creating a stream. 
We should have option to pass a timestamp in config to allow kinesis to resume 
from the given timestamp.

The patch introduces a new `KinesisInitialPositionInStream` that takes the 
`InitialPositionInStream` with the `timestamp` information that can be used to 
resume kinesis fetches from the provided timestamp.

## How was this patch tested?

Unit Tests

cc : budde brkyvz

Author: Yash Sharma <ysha...@atlassian.com>

Closes #18029 from yssharma/ysharma/kcl_resume.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e683300
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e683300
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e683300

Branch: refs/heads/master
Commit: 0e6833006d28df426eb132bb8fc82917b8e2aedd
Parents: be03d3a
Author: Yash Sharma <ysha...@atlassian.com>
Authored: Tue Dec 26 09:50:39 2017 +0200
Committer: Burak Yavuz <brk...@gmail.com>
Committed: Tue Dec 26 09:50:39 2017 +0200

----------------------------------------------------------------------
 .../kinesis/KinesisInitialPositions.java        | 91 ++++++++++++++++++++
 .../streaming/KinesisWordCountASL.scala         |  5 +-
 .../streaming/kinesis/KinesisInputDStream.scala | 31 +++++--
 .../streaming/kinesis/KinesisReceiver.scala     | 45 ++++++----
 .../spark/streaming/kinesis/KinesisUtils.scala  | 15 ++--
 .../JavaKinesisInputDStreamBuilderSuite.java    | 47 ++++++++--
 .../KinesisInputDStreamBuilderSuite.scala       | 68 +++++++++++++--
 .../streaming/kinesis/KinesisStreamSuite.scala  | 11 +--
 8 files changed, 264 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0e683300/external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java
 
b/external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java
new file mode 100644
index 0000000..206e1e4
--- /dev/null
+++ 
b/external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis;
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * A java wrapper for exposing [[InitialPositionInStream]]
+ * to the corresponding Kinesis readers.
+ */
+interface KinesisInitialPosition {
+    InitialPositionInStream getPosition();
+}
+
+public class KinesisInitialPositions {
+    public static class Latest implements KinesisInitialPosition, Serializable 
{
+        public Latest() {}
+
+        @Override
+        public InitialPositionInStream getPosition() {
+            return InitialPositionInStream.LATEST;
+        }
+    }
+
+    public static class TrimHorizon implements KinesisInitialPosition, 
Serializable {
+        public TrimHorizon() {}
+
+        @Override
+        public InitialPositionInStream getPosition() {
+            return InitialPositionInStream.TRIM_HORIZON;
+        }
+    }
+
+    public static class AtTimestamp implements KinesisInitialPosition, 
Serializable {
+        private Date timestamp;
+
+        public AtTimestamp(Date timestamp) {
+            this.timestamp = timestamp;
+        }
+
+        @Override
+        public InitialPositionInStream getPosition() {
+            return InitialPositionInStream.AT_TIMESTAMP;
+        }
+
+        public Date getTimestamp() {
+            return timestamp;
+        }
+    }
+
+
+    /**
+     * Returns instance of [[KinesisInitialPosition]] based on the passed 
[[InitialPositionInStream]].
+     * This method is used in KinesisUtils for translating the 
InitialPositionInStream
+     * to InitialPosition. This function would be removed when we deprecate 
the KinesisUtils.
+     *
+     * @return [[InitialPosition]]
+     */
+    public static KinesisInitialPosition fromKinesisInitialPosition(
+            InitialPositionInStream initialPositionInStream) throws 
UnsupportedOperationException {
+        if (initialPositionInStream == InitialPositionInStream.LATEST) {
+            return new Latest();
+        } else if (initialPositionInStream == 
InitialPositionInStream.TRIM_HORIZON) {
+            return new TrimHorizon();
+        } else {
+            // InitialPositionInStream.AT_TIMESTAMP is not supported.
+            // Use InitialPosition.atTimestamp(timestamp) instead.
+            throw new UnsupportedOperationException(
+                    "Only InitialPositionInStream.LATEST and 
InitialPositionInStream.TRIM_HORIZON " +
+                            "supported in initialPositionInStream(). Please 
use the initialPosition() from " +
+                            "builder API in KinesisInputDStream for using 
InitialPositionInStream.AT_TIMESTAMP");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0e683300/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
index cde2c4b..fcb790e 100644
--- 
a/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
@@ -24,7 +24,6 @@ import scala.util.Random
 
 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
 import com.amazonaws.services.kinesis.AmazonKinesisClient
-import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
 import com.amazonaws.services.kinesis.model.PutRecordRequest
 import org.apache.log4j.{Level, Logger}
 
@@ -33,9 +32,9 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
+import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
 import org.apache.spark.streaming.kinesis.KinesisInputDStream
 
-
 /**
  * Consumes messages from a Amazon Kinesis streams and does wordcount.
  *
@@ -139,7 +138,7 @@ object KinesisWordCountASL extends Logging {
         .streamName(streamName)
         .endpointUrl(endpointUrl)
         .regionName(regionName)
-        .initialPositionInStream(InitialPositionInStream.LATEST)
+        .initialPosition(new Latest())
         .checkpointAppName(appName)
         .checkpointInterval(kinesisCheckpointInterval)
         .storageLevel(StorageLevel.MEMORY_AND_DISK_2)

http://git-wip-us.apache.org/repos/asf/spark/blob/0e683300/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
index f61e398..1ffec01 100644
--- 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
@@ -28,6 +28,7 @@ import org.apache.spark.storage.{BlockId, StorageLevel}
 import org.apache.spark.streaming.{Duration, StreamingContext, Time}
 import org.apache.spark.streaming.api.java.JavaStreamingContext
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
 import org.apache.spark.streaming.receiver.Receiver
 import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
 
@@ -36,7 +37,7 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
     val streamName: String,
     val endpointUrl: String,
     val regionName: String,
-    val initialPositionInStream: InitialPositionInStream,
+    val initialPosition: KinesisInitialPosition,
     val checkpointAppName: String,
     val checkpointInterval: Duration,
     val _storageLevel: StorageLevel,
@@ -77,7 +78,7 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
   }
 
   override def getReceiver(): Receiver[T] = {
-    new KinesisReceiver(streamName, endpointUrl, regionName, 
initialPositionInStream,
+    new KinesisReceiver(streamName, endpointUrl, regionName, initialPosition,
       checkpointAppName, checkpointInterval, _storageLevel, messageHandler,
       kinesisCreds, dynamoDBCreds, cloudWatchCreds)
   }
@@ -100,7 +101,7 @@ object KinesisInputDStream {
     // Params with defaults
     private var endpointUrl: Option[String] = None
     private var regionName: Option[String] = None
-    private var initialPositionInStream: Option[InitialPositionInStream] = None
+    private var initialPosition: Option[KinesisInitialPosition] = None
     private var checkpointInterval: Option[Duration] = None
     private var storageLevel: Option[StorageLevel] = None
     private var kinesisCredsProvider: Option[SparkAWSCredentials] = None
@@ -182,14 +183,30 @@ object KinesisInputDStream {
 
     /**
      * Sets the initial position data is read from in the Kinesis stream. 
Defaults to
+     * [[KinesisInitialPositions.Latest]] if no custom value is specified.
+     *
+     * @param initialPosition [[KinesisInitialPosition]] value specifying 
where Spark Streaming
+     *                        will start reading records in the Kinesis stream 
from
+     * @return Reference to this [[KinesisInputDStream.Builder]]
+     */
+    def initialPosition(initialPosition: KinesisInitialPosition): Builder = {
+      this.initialPosition = Option(initialPosition)
+      this
+    }
+
+    /**
+     * Sets the initial position data is read from in the Kinesis stream. 
Defaults to
      * [[InitialPositionInStream.LATEST]] if no custom value is specified.
+     * This function would be removed when we deprecate the KinesisUtils.
      *
      * @param initialPosition InitialPositionInStream value specifying where 
Spark Streaming
      *                        will start reading records in the Kinesis stream 
from
      * @return Reference to this [[KinesisInputDStream.Builder]]
      */
+    @deprecated("use initialPosition(initialPosition: 
KinesisInitialPosition)", "2.3.0")
     def initialPositionInStream(initialPosition: InitialPositionInStream): 
Builder = {
-      initialPositionInStream = Option(initialPosition)
+      this.initialPosition = Option(
+        KinesisInitialPositions.fromKinesisInitialPosition(initialPosition))
       this
     }
 
@@ -266,7 +283,7 @@ object KinesisInputDStream {
         getRequiredParam(streamName, "streamName"),
         endpointUrl.getOrElse(DEFAULT_KINESIS_ENDPOINT_URL),
         regionName.getOrElse(DEFAULT_KINESIS_REGION_NAME),
-        initialPositionInStream.getOrElse(DEFAULT_INITIAL_POSITION_IN_STREAM),
+        initialPosition.getOrElse(DEFAULT_INITIAL_POSITION),
         getRequiredParam(checkpointAppName, "checkpointAppName"),
         checkpointInterval.getOrElse(ssc.graph.batchDuration),
         storageLevel.getOrElse(DEFAULT_STORAGE_LEVEL),
@@ -293,7 +310,6 @@ object KinesisInputDStream {
    * Creates a [[KinesisInputDStream.Builder]] for constructing 
[[KinesisInputDStream]] instances.
    *
    * @since 2.2.0
-   *
    * @return [[KinesisInputDStream.Builder]] instance
    */
   def builder: Builder = new Builder
@@ -309,7 +325,6 @@ object KinesisInputDStream {
   private[kinesis] val DEFAULT_KINESIS_ENDPOINT_URL: String =
     "https://kinesis.us-east-1.amazonaws.com";
   private[kinesis] val DEFAULT_KINESIS_REGION_NAME: String = "us-east-1"
-  private[kinesis] val DEFAULT_INITIAL_POSITION_IN_STREAM: 
InitialPositionInStream =
-    InitialPositionInStream.LATEST
+  private[kinesis] val DEFAULT_INITIAL_POSITION: KinesisInitialPosition = new 
Latest()
   private[kinesis] val DEFAULT_STORAGE_LEVEL: StorageLevel = 
StorageLevel.MEMORY_AND_DISK_2
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e683300/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 1026d0f..fa0de62 100644
--- 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -24,12 +24,13 @@ import scala.collection.mutable
 import scala.util.control.NonFatal
 
 import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, 
IRecordProcessorCheckpointer, IRecordProcessorFactory}
-import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream,
 KinesisClientLibConfiguration, Worker}
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.{KinesisClientLibConfiguration,
 Worker}
 import com.amazonaws.services.kinesis.model.Record
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.storage.{StorageLevel, StreamBlockId}
 import org.apache.spark.streaming.Duration
+import org.apache.spark.streaming.kinesis.KinesisInitialPositions.AtTimestamp
 import org.apache.spark.streaming.receiver.{BlockGenerator, 
BlockGeneratorListener, Receiver}
 import org.apache.spark.util.Utils
 
@@ -56,12 +57,13 @@ import org.apache.spark.util.Utils
  * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
  * @param regionName  Region name used by the Kinesis Client Library for
  *                    DynamoDB (lease coordination and checkpointing) and 
CloudWatch (metrics)
- * @param initialPositionInStream  In the absence of Kinesis checkpoint info, 
this is the
- *                                 worker's initial starting position in the 
stream.
- *                                 The values are either the beginning of the 
stream
- *                                 per Kinesis' limit of 24 hours
- *                                 (InitialPositionInStream.TRIM_HORIZON) or
- *                                 the tip of the stream 
(InitialPositionInStream.LATEST).
+ * @param initialPosition  Instance of [[KinesisInitialPosition]]
+ *                         In the absence of Kinesis checkpoint info, this is 
the
+ *                         worker's initial starting position in the stream.
+ *                         The values are either the beginning of the stream
+ *                         per Kinesis' limit of 24 hours
+ *                         ([[KinesisInitialPositions.TrimHorizon]]) or
+ *                         the tip of the stream 
([[KinesisInitialPositions.Latest]]).
  * @param checkpointAppName  Kinesis application name. Kinesis Apps are mapped 
to Kinesis Streams
  *                 by the Kinesis Client Library.  If you change the App name 
or Stream name,
  *                 the KCL will throw errors.  This usually requires deleting 
the backing
@@ -83,7 +85,7 @@ private[kinesis] class KinesisReceiver[T](
     val streamName: String,
     endpointUrl: String,
     regionName: String,
-    initialPositionInStream: InitialPositionInStream,
+    initialPosition: KinesisInitialPosition,
     checkpointAppName: String,
     checkpointInterval: Duration,
     storageLevel: StorageLevel,
@@ -148,18 +150,29 @@ private[kinesis] class KinesisReceiver[T](
 
     kinesisCheckpointer = new KinesisCheckpointer(receiver, 
checkpointInterval, workerId)
     val kinesisProvider = kinesisCreds.provider
-    val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
-          checkpointAppName,
-          streamName,
-          kinesisProvider,
-          dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
-          cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
-          workerId)
+
+    val kinesisClientLibConfiguration = {
+      val baseClientLibConfiguration = new KinesisClientLibConfiguration(
+        checkpointAppName,
+        streamName,
+        kinesisProvider,
+        dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
+        cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
+        workerId)
         .withKinesisEndpoint(endpointUrl)
-        .withInitialPositionInStream(initialPositionInStream)
+        .withInitialPositionInStream(initialPosition.getPosition)
         .withTaskBackoffTimeMillis(500)
         .withRegionName(regionName)
 
+      // Update the Kinesis client lib config with timestamp
+      // if InitialPositionInStream.AT_TIMESTAMP is passed
+      initialPosition match {
+        case ts: AtTimestamp =>
+          
baseClientLibConfiguration.withTimestampAtInitialPositionInStream(ts.getTimestamp)
+        case _ => baseClientLibConfiguration
+      }
+    }
+
    /*
     *  RecordProcessorFactory creates impls of IRecordProcessor.
     *  IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the

http://git-wip-us.apache.org/repos/asf/spark/blob/0e683300/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
index 1298463..2500460 100644
--- 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
@@ -73,7 +73,8 @@ object KinesisUtils {
     // Setting scope to override receiver stream's scope of "receiver stream"
     ssc.withNamedScope("kinesis stream") {
       new KinesisInputDStream[T](ssc, streamName, endpointUrl, 
validateRegion(regionName),
-        initialPositionInStream, kinesisAppName, checkpointInterval, 
storageLevel,
+        
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
+        kinesisAppName, checkpointInterval, storageLevel,
         cleanedHandler, DefaultCredentials, None, None)
     }
   }
@@ -129,7 +130,8 @@ object KinesisUtils {
         awsAccessKeyId = awsAccessKeyId,
         awsSecretKey = awsSecretKey)
       new KinesisInputDStream[T](ssc, streamName, endpointUrl, 
validateRegion(regionName),
-        initialPositionInStream, kinesisAppName, checkpointInterval, 
storageLevel,
+        
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
+        kinesisAppName, checkpointInterval, storageLevel,
         cleanedHandler, kinesisCredsProvider, None, None)
     }
   }
@@ -198,7 +200,8 @@ object KinesisUtils {
           awsAccessKeyId = awsAccessKeyId,
           awsSecretKey = awsSecretKey))
       new KinesisInputDStream[T](ssc, streamName, endpointUrl, 
validateRegion(regionName),
-        initialPositionInStream, kinesisAppName, checkpointInterval, 
storageLevel,
+        
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
+        kinesisAppName, checkpointInterval, storageLevel,
         cleanedHandler, kinesisCredsProvider, None, None)
     }
   }
@@ -243,7 +246,8 @@ object KinesisUtils {
     // Setting scope to override receiver stream's scope of "receiver stream"
     ssc.withNamedScope("kinesis stream") {
       new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, 
validateRegion(regionName),
-        initialPositionInStream, kinesisAppName, checkpointInterval, 
storageLevel,
+        
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
+        kinesisAppName, checkpointInterval, storageLevel,
         KinesisInputDStream.defaultMessageHandler, DefaultCredentials, None, 
None)
     }
   }
@@ -293,7 +297,8 @@ object KinesisUtils {
         awsAccessKeyId = awsAccessKeyId,
         awsSecretKey = awsSecretKey)
       new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, 
validateRegion(regionName),
-        initialPositionInStream, kinesisAppName, checkpointInterval, 
storageLevel,
+        
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
+        kinesisAppName, checkpointInterval, storageLevel,
         KinesisInputDStream.defaultMessageHandler, kinesisCredsProvider, None, 
None)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e683300/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java
 
b/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java
index be6d549..03becd7 100644
--- 
a/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java
+++ 
b/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java
@@ -17,14 +17,13 @@
 
 package org.apache.spark.streaming.kinesis;
 
-import org.junit.Test;
-
 import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
-
+import org.apache.spark.streaming.kinesis.KinesisInitialPositions.TrimHorizon;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.Seconds;
 import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.Seconds;
+import org.junit.Test;
 
 public class JavaKinesisInputDStreamBuilderSuite extends 
LocalJavaStreamingContext {
   /**
@@ -35,7 +34,41 @@ public class JavaKinesisInputDStreamBuilderSuite extends 
LocalJavaStreamingConte
     String streamName = "a-very-nice-stream-name";
     String endpointUrl = "https://kinesis.us-west-2.amazonaws.com";;
     String region = "us-west-2";
-    InitialPositionInStream initialPosition = 
InitialPositionInStream.TRIM_HORIZON;
+    KinesisInitialPosition initialPosition = new TrimHorizon();
+    String appName = "a-very-nice-kinesis-app";
+    Duration checkpointInterval = Seconds.apply(30);
+    StorageLevel storageLevel = StorageLevel.MEMORY_ONLY();
+
+    KinesisInputDStream<byte[]> kinesisDStream = KinesisInputDStream.builder()
+      .streamingContext(ssc)
+      .streamName(streamName)
+      .endpointUrl(endpointUrl)
+      .regionName(region)
+      .initialPosition(initialPosition)
+      .checkpointAppName(appName)
+      .checkpointInterval(checkpointInterval)
+      .storageLevel(storageLevel)
+      .build();
+    assert(kinesisDStream.streamName() == streamName);
+    assert(kinesisDStream.endpointUrl() == endpointUrl);
+    assert(kinesisDStream.regionName() == region);
+    assert(kinesisDStream.initialPosition().getPosition() == 
initialPosition.getPosition());
+    assert(kinesisDStream.checkpointAppName() == appName);
+    assert(kinesisDStream.checkpointInterval() == checkpointInterval);
+    assert(kinesisDStream._storageLevel() == storageLevel);
+    ssc.stop();
+  }
+
+  /**
+   * Test to ensure that the old API for InitialPositionInStream
+   * is supported in KinesisDStream.Builder.
+   * This test would be removed when we deprecate the KinesisUtils.
+   */
+  @Test
+  public void testJavaKinesisDStreamBuilderOldApi() {
+    String streamName = "a-very-nice-stream-name";
+    String endpointUrl = "https://kinesis.us-west-2.amazonaws.com";;
+    String region = "us-west-2";
     String appName = "a-very-nice-kinesis-app";
     Duration checkpointInterval = Seconds.apply(30);
     StorageLevel storageLevel = StorageLevel.MEMORY_ONLY();
@@ -45,7 +78,7 @@ public class JavaKinesisInputDStreamBuilderSuite extends 
LocalJavaStreamingConte
       .streamName(streamName)
       .endpointUrl(endpointUrl)
       .regionName(region)
-      .initialPositionInStream(initialPosition)
+      .initialPositionInStream(InitialPositionInStream.LATEST)
       .checkpointAppName(appName)
       .checkpointInterval(checkpointInterval)
       .storageLevel(storageLevel)
@@ -53,7 +86,7 @@ public class JavaKinesisInputDStreamBuilderSuite extends 
LocalJavaStreamingConte
     assert(kinesisDStream.streamName() == streamName);
     assert(kinesisDStream.endpointUrl() == endpointUrl);
     assert(kinesisDStream.regionName() == region);
-    assert(kinesisDStream.initialPositionInStream() == initialPosition);
+    assert(kinesisDStream.initialPosition().getPosition() == 
InitialPositionInStream.LATEST);
     assert(kinesisDStream.checkpointAppName() == appName);
     assert(kinesisDStream.checkpointInterval() == checkpointInterval);
     assert(kinesisDStream._storageLevel() == storageLevel);

http://git-wip-us.apache.org/repos/asf/spark/blob/0e683300/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala
 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala
index afa1a7f..e0e2684 100644
--- 
a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala
+++ 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala
@@ -17,12 +17,15 @@
 
 package org.apache.spark.streaming.kinesis
 
+import java.util.Calendar
+
 import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
 import org.scalatest.BeforeAndAfterEach
 import org.scalatest.mockito.MockitoSugar
 
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext, TestSuiteBase}
+import org.apache.spark.streaming.{Duration, Seconds, StreamingContext, 
TestSuiteBase}
+import 
org.apache.spark.streaming.kinesis.KinesisInitialPositions.{AtTimestamp, 
TrimHorizon}
 
 class KinesisInputDStreamBuilderSuite extends TestSuiteBase with 
BeforeAndAfterEach
    with MockitoSugar {
@@ -69,7 +72,7 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase 
with BeforeAndAfterE
     val dstream = builder.build()
     assert(dstream.endpointUrl == DEFAULT_KINESIS_ENDPOINT_URL)
     assert(dstream.regionName == DEFAULT_KINESIS_REGION_NAME)
-    assert(dstream.initialPositionInStream == 
DEFAULT_INITIAL_POSITION_IN_STREAM)
+    assert(dstream.initialPosition == DEFAULT_INITIAL_POSITION)
     assert(dstream.checkpointInterval == batchDuration)
     assert(dstream._storageLevel == DEFAULT_STORAGE_LEVEL)
     assert(dstream.kinesisCreds == DefaultCredentials)
@@ -80,7 +83,7 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase 
with BeforeAndAfterE
   test("should propagate custom non-auth values to KinesisInputDStream") {
     val customEndpointUrl = "https://kinesis.us-west-2.amazonaws.com";
     val customRegion = "us-west-2"
-    val customInitialPosition = InitialPositionInStream.TRIM_HORIZON
+    val customInitialPosition = new TrimHorizon()
     val customAppName = "a-very-nice-kinesis-app"
     val customCheckpointInterval = Seconds(30)
     val customStorageLevel = StorageLevel.MEMORY_ONLY
@@ -91,7 +94,7 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase 
with BeforeAndAfterE
     val dstream = builder
       .endpointUrl(customEndpointUrl)
       .regionName(customRegion)
-      .initialPositionInStream(customInitialPosition)
+      .initialPosition(customInitialPosition)
       .checkpointAppName(customAppName)
       .checkpointInterval(customCheckpointInterval)
       .storageLevel(customStorageLevel)
@@ -101,12 +104,67 @@ class KinesisInputDStreamBuilderSuite extends 
TestSuiteBase with BeforeAndAfterE
       .build()
     assert(dstream.endpointUrl == customEndpointUrl)
     assert(dstream.regionName == customRegion)
-    assert(dstream.initialPositionInStream == customInitialPosition)
+    assert(dstream.initialPosition == customInitialPosition)
     assert(dstream.checkpointAppName == customAppName)
     assert(dstream.checkpointInterval == customCheckpointInterval)
     assert(dstream._storageLevel == customStorageLevel)
     assert(dstream.kinesisCreds == customKinesisCreds)
     assert(dstream.dynamoDBCreds == Option(customDynamoDBCreds))
     assert(dstream.cloudWatchCreds == Option(customCloudWatchCreds))
+
+    // Testing with AtTimestamp
+    val cal = Calendar.getInstance()
+    cal.add(Calendar.DATE, -1)
+    val timestamp = cal.getTime()
+    val initialPositionAtTimestamp = new AtTimestamp(timestamp)
+
+    val dstreamAtTimestamp = builder
+      .endpointUrl(customEndpointUrl)
+      .regionName(customRegion)
+      .initialPosition(initialPositionAtTimestamp)
+      .checkpointAppName(customAppName)
+      .checkpointInterval(customCheckpointInterval)
+      .storageLevel(customStorageLevel)
+      .kinesisCredentials(customKinesisCreds)
+      .dynamoDBCredentials(customDynamoDBCreds)
+      .cloudWatchCredentials(customCloudWatchCreds)
+      .build()
+    assert(dstreamAtTimestamp.endpointUrl == customEndpointUrl)
+    assert(dstreamAtTimestamp.regionName == customRegion)
+    assert(dstreamAtTimestamp.initialPosition.getPosition
+      == initialPositionAtTimestamp.getPosition)
+    assert(
+      
dstreamAtTimestamp.initialPosition.asInstanceOf[AtTimestamp].getTimestamp.equals(timestamp))
+    assert(dstreamAtTimestamp.checkpointAppName == customAppName)
+    assert(dstreamAtTimestamp.checkpointInterval == customCheckpointInterval)
+    assert(dstreamAtTimestamp._storageLevel == customStorageLevel)
+    assert(dstreamAtTimestamp.kinesisCreds == customKinesisCreds)
+    assert(dstreamAtTimestamp.dynamoDBCreds == Option(customDynamoDBCreds))
+    assert(dstreamAtTimestamp.cloudWatchCreds == Option(customCloudWatchCreds))
+  }
+
+  test("old Api should throw UnsupportedOperationExceptionexception with 
AT_TIMESTAMP") {
+    val streamName: String = "a-very-nice-stream-name"
+    val endpointUrl: String = "https://kinesis.us-west-2.amazonaws.com";
+    val region: String = "us-west-2"
+    val appName: String = "a-very-nice-kinesis-app"
+    val checkpointInterval: Duration = Seconds.apply(30)
+    val storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
+
+    // This should not build.
+    // InitialPositionInStream.AT_TIMESTAMP is not supported in old Api.
+    // The builder Api in KinesisInputDStream should be used.
+    intercept[UnsupportedOperationException] {
+      val kinesisDStream: KinesisInputDStream[Array[Byte]] = 
KinesisInputDStream.builder
+        .streamingContext(ssc)
+        .streamName(streamName)
+        .endpointUrl(endpointUrl)
+        .regionName(region)
+        .initialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
+        .checkpointAppName(appName)
+        .checkpointInterval(checkpointInterval)
+        .storageLevel(storageLevel)
+        .build
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e683300/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index 7e5bda9..a7a68eb 100644
--- 
a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.{StorageLevel, StreamBlockId}
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
 import org.apache.spark.streaming.kinesis.KinesisReadConfigurations._
 import org.apache.spark.streaming.kinesis.KinesisTestUtils._
 import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
@@ -178,7 +179,7 @@ abstract class KinesisStreamTests(aggregateTestData: 
Boolean) extends KinesisFun
       .streamName(testUtils.streamName)
       .endpointUrl(testUtils.endpointUrl)
       .regionName(testUtils.regionName)
-      .initialPositionInStream(InitialPositionInStream.LATEST)
+      .initialPosition(new Latest())
       .checkpointInterval(Seconds(10))
       .storageLevel(StorageLevel.MEMORY_ONLY)
       .build()
@@ -209,7 +210,7 @@ abstract class KinesisStreamTests(aggregateTestData: 
Boolean) extends KinesisFun
       .streamName(testUtils.streamName)
       .endpointUrl(testUtils.endpointUrl)
       .regionName(testUtils.regionName)
-      .initialPositionInStream(InitialPositionInStream.LATEST)
+      .initialPosition(new Latest())
       .checkpointInterval(Seconds(10))
       .storageLevel(StorageLevel.MEMORY_ONLY)
       .buildWithMessageHandler(addFive(_))
@@ -245,7 +246,7 @@ abstract class KinesisStreamTests(aggregateTestData: 
Boolean) extends KinesisFun
       .streamName("dummyStream")
       .endpointUrl(dummyEndpointUrl)
       .regionName(dummyRegionName)
-      .initialPositionInStream(InitialPositionInStream.LATEST)
+      .initialPosition(new Latest())
       .checkpointInterval(Seconds(10))
       .storageLevel(StorageLevel.MEMORY_ONLY)
       .build()
@@ -293,7 +294,7 @@ abstract class KinesisStreamTests(aggregateTestData: 
Boolean) extends KinesisFun
         .streamName(localTestUtils.streamName)
         .endpointUrl(localTestUtils.endpointUrl)
         .regionName(localTestUtils.regionName)
-        .initialPositionInStream(InitialPositionInStream.LATEST)
+        .initialPosition(new Latest())
         .checkpointInterval(Seconds(10))
         .storageLevel(StorageLevel.MEMORY_ONLY)
         .build()
@@ -369,7 +370,7 @@ abstract class KinesisStreamTests(aggregateTestData: 
Boolean) extends KinesisFun
       .streamName(testUtils.streamName)
       .endpointUrl(testUtils.endpointUrl)
       .regionName(testUtils.regionName)
-      .initialPositionInStream(InitialPositionInStream.LATEST)
+      .initialPosition(new Latest())
       .checkpointInterval(Seconds(10))
       .storageLevel(StorageLevel.MEMORY_ONLY)
       .build()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to