[ 
https://issues.apache.org/jira/browse/SPARK-31236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083327#comment-17083327
 ] 

Thukarama Prabhu commented on SPARK-31236:
------------------------------------------

Raised this issue with AWS support and got below response. Looks like spark 
code requires some fix.

The Kinesis spark code is currently using "AmazonKinesisClient" class to create 
the credentials.

- 
[https://urldefense.com/v3/__https://github.com/apache/spark/blob/master/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala*L142__;Iw!!CQl3mcHX2A!WBUlCK8fcxFM8B5CFx6mr_k-s02DJwzEfX8ymfNhzi2NtmMoQaXlnPXtwYkXkA4rPqwh$|https://urldefense.com/v3/__https:/github.com/apache/spark/blob/master/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala*L142__;Iw!!CQl3mcHX2A!WBUlCK8fcxFM8B5CFx6mr_k-s02DJwzEfX8ymfNhzi2NtmMoQaXlnPXtwYkXkA4rPqwh$]

 Unfortunately, the service team confirmed that "AmazonKinesisClient" class is 
deprecated and we do not support it anymore. 

- 
[https://urldefense.com/v3/__https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html__;!!CQl3mcHX2A!WBUlCK8fcxFM8B5CFx6mr_k-s02DJwzEfX8ymfNhzi2NtmMoQaXlnPXtwYkXkI7YhmXY$|https://urldefense.com/v3/__https:/docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html__;!!CQl3mcHX2A!WBUlCK8fcxFM8B5CFx6mr_k-s02DJwzEfX8ymfNhzi2NtmMoQaXlnPXtwYkXkI7YhmXY$]

 The Kinesis spark code is constructing the Kinesis client from the above 
deprecated class where the region can not be passed and it parses region from 
the endpoint. 

 

We currently use 'AmazonKinesisClientBuilder' class instead of 
'AmazonKinesisClient' in AWS Kinesis SDK where there is an option to set the 
region:

- 
[https://urldefense.com/v3/__https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClientBuilder.html__;!!CQl3mcHX2A!WBUlCK8fcxFM8B5CFx6mr_k-s02DJwzEfX8ymfNhzi2NtmMoQaXlnPXtwYkXkDpPaYLV$|https://urldefense.com/v3/__https:/docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClientBuilder.html__;!!CQl3mcHX2A!WBUlCK8fcxFM8B5CFx6mr_k-s02DJwzEfX8ymfNhzi2NtmMoQaXlnPXtwYkXkDpPaYLV$]

 

Since Spark Streaming and Kinesis integration code is not owned by AWS, the 
service team recommended that you reach out to the library maintainer to fix 
the issue.

- 
[https://urldefense.com/v3/__https://github.com/apache/spark/tree/master/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis__;!!CQl3mcHX2A!WBUlCK8fcxFM8B5CFx6mr_k-s02DJwzEfX8ymfNhzi2NtmMoQaXlnPXtwYkXkPpTWTeX$|https://urldefense.com/v3/__https:/github.com/apache/spark/tree/master/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis__;!!CQl3mcHX2A!WBUlCK8fcxFM8B5CFx6mr_k-s02DJwzEfX8ymfNhzi2NtmMoQaXlnPXtwYkXkPpTWTeX$]

 

> Spark error while consuming data from Kinesis direct end point
> --------------------------------------------------------------
>
>                 Key: SPARK-31236
>                 URL: https://issues.apache.org/jira/browse/SPARK-31236
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams, Java API
>    Affects Versions: 2.4.5
>            Reporter: Thukarama Prabhu
>            Priority: Critical
>
> Here is the summary of the issue I am experiencing when using kinesis direct 
> URL for consuming data using spark.
> *Kinesis direct URL:* 
> [https://kinesis-ae1.hdw.r53.deap.tv|https://kinesis-ae1.hdw.r53.deap.tv/] 
> (Failing with Credential should be scoped to a valid region, not 'ae1')
> *Kinesis default URL:* 
> [https://kinesis.us-east-1.amazonaws.com|https://kinesis.us-east-1.amazonaws.com/]
>  (Working)
> Spark code for consuming data
> SparkAWSCredentials credentials = 
> commonService.getSparkAWSCredentials(kinApp.propConfig);
> KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
>         .streamingContext(jssc)
>         .checkpointAppName(applicationName)
>         .streamName(streamName)
>         .endpointUrl(endpointURL)
>         .regionName(regionName)
>         
> .initialPosition(KinesisInitialPositions.fromKinesisInitialPosition(initPosition))
>         .checkpointInterval(checkpointInterval)
>         .kinesisCredentials(credentials)
>         .storageLevel(StorageLevel.MEMORY_AND_DISK_2()).build();
>  
> Spark version 2.4.4
> <dependency>
>     <groupId>org.apache.spark</groupId>
>     <artifactId>spark-streaming-kinesis-asl_2.11</artifactId>
>     <version>2.4.5</version>
> </dependency>
> <dependency>
>     <groupId>com.amazonaws</groupId>
>     <artifactId>amazon-kinesis-client</artifactId>
>     <version>1.13.3</version>
> </dependency>
> <dependency>
>     <groupId>com.amazonaws</groupId>
>     <artifactId>aws-java-sdk</artifactId>
>     <version>1.11.747</version>
> </dependency>
>  
> The spark application works fine when I use default URL but fails when I 
> change to direct URL with below error. The direct URL works when I try to 
> publish to direct kinesis URL. Issue only when I try to consume data.
>  
> 2020-03-24 08:43:40,650 ERROR - Caught exception while sync'ing Kinesis 
> shards and leases
> com.amazonaws.services.kinesis.model.AmazonKinesisException: Credential 
> should be scoped to a valid region, not 'ae1'.  (Service: AmazonKinesis; 
> Status Code: 400; Error Code: InvalidSignatureException; Request ID: 
> fb43b636-8ce2-ec77-adb7-a8ead9e038c2)
>                 at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1799)
>                 at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1383)
>                 at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1359)
>                 at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
>                 at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
>                 at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
>                 at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
>                 at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
>                 at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
>                 at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
>                 at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
>                 at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809)
>                 at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776)
>                 at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765)
>                 at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1557)
>                 at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1528)
>                 at 
> com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy.listShards(KinesisProxy.java:326)
>                 at 
> com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy.getShardList(KinesisProxy.java:441)
>                 at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisShardSyncer.getShardList(KinesisShardSyncer.java:349)
>                 at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisShardSyncer.syncShardLeases(KinesisShardSyncer.java:159)
>                 at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisShardSyncer.checkAndCreateLeasesForNewShards(KinesisShardSyncer.java:112)
>                 at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask.call(ShardSyncTask.java:84)
>                 at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
>                 at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:683)
>                 at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:614)
>                 at 
> org.apache.spark.streaming.kinesis.KinesisReceiver$$anon$1.run(KinesisReceiver.scala:191)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to