Repository: spark Updated Branches: refs/heads/master 8730fbb47 -> 4b7ff3092
[SPARK-7787] [STREAMING] Fix serialization issue of SerializableAWSCredentials Lack of default constructor causes deserialization to fail. This occurs only when the AWS credentials are explicitly specified through KinesisUtils. Author: Tathagata Das <[email protected]> Closes #6316 from tdas/SPARK-7787 and squashes the following commits: 248ca5c [Tathagata Das] Fixed serializability Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b7ff309 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b7ff309 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b7ff309 Branch: refs/heads/master Commit: 4b7ff3092c53827817079e0810563cbb0b9d0747 Parents: 8730fbb Author: Tathagata Das <[email protected]> Authored: Thu May 21 11:39:32 2015 -0700 Committer: Tathagata Das <[email protected]> Committed: Thu May 21 11:39:32 2015 -0700 ---------------------------------------------------------------------- .../streaming/kinesis/KinesisReceiver.scala | 5 +++- .../kinesis/KinesisReceiverSuite.scala | 30 +++++++++----------- 2 files changed, 17 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4b7ff309/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala ---------------------------------------------------------------------- diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 9016449..800202e 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -31,7 +31,10 @@ import org.apache.spark.util.Utils private[kinesis] case class SerializableAWSCredentials(accessKeyId: String, secretKey: String) - extends BasicAWSCredentials(accessKeyId, secretKey) with Serializable + extends AWSCredentials { + override def getAWSAccessKeyId: String = accessKeyId + override def getAWSSecretKey: String = secretKey +} /** * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver. http://git-wip-us.apache.org/repos/asf/spark/blob/4b7ff309/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala ---------------------------------------------------------------------- diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala index 7c17ee9..cd19c33 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala @@ -20,27 +20,18 @@ import java.nio.ByteBuffer import scala.collection.JavaConversions.seqAsJavaList -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.Milliseconds -import org.apache.spark.streaming.Seconds -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.TestSuiteBase -import org.apache.spark.util.{ManualClock, Clock} - -import org.mockito.Mockito._ -import org.scalatest.BeforeAndAfter -import org.scalatest.Matchers -import org.scalatest.mock.MockitoSugar - -import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException -import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException -import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException -import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, KinesisClientLibDependencyException, ShutdownException, ThrottlingException} import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason import com.amazonaws.services.kinesis.model.Record -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import org.mockito.Mockito._ +import org.scalatest.{BeforeAndAfter, Matchers} +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext, TestSuiteBase} +import org.apache.spark.util.{Clock, ManualClock, Utils} /** * Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor @@ -99,6 +90,11 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft ssc.stop() } + test("check serializability of SerializableAWSCredentials") { + Utils.deserialize[SerializableAWSCredentials]( + Utils.serialize(new SerializableAWSCredentials("x", "y"))) + } + test("process records including store and checkpoint") { when(receiverMock.isStopped()).thenReturn(false) when(checkpointStateMock.shouldCheckpoint()).thenReturn(true) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
