[
https://issues.apache.org/jira/browse/SPARK-31236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083327#comment-17083327
]
Thukarama Prabhu commented on SPARK-31236:
------------------------------------------
Raised this issue with AWS support and got below response. Looks like spark
code requires some fix.
The Kinesis spark code is currently using "AmazonKinesisClient" class to create
the credentials.
-
[https://urldefense.com/v3/__https://github.com/apache/spark/blob/master/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala*L142__;Iw!!CQl3mcHX2A!WBUlCK8fcxFM8B5CFx6mr_k-s02DJwzEfX8ymfNhzi2NtmMoQaXlnPXtwYkXkA4rPqwh$|https://urldefense.com/v3/__https:/github.com/apache/spark/blob/master/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala*L142__;Iw!!CQl3mcHX2A!WBUlCK8fcxFM8B5CFx6mr_k-s02DJwzEfX8ymfNhzi2NtmMoQaXlnPXtwYkXkA4rPqwh$]
Unfortunately, the service team confirmed that "AmazonKinesisClient" class is
deprecated and we do not support it anymore.
-
[https://urldefense.com/v3/__https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html__;!!CQl3mcHX2A!WBUlCK8fcxFM8B5CFx6mr_k-s02DJwzEfX8ymfNhzi2NtmMoQaXlnPXtwYkXkI7YhmXY$|https://urldefense.com/v3/__https:/docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html__;!!CQl3mcHX2A!WBUlCK8fcxFM8B5CFx6mr_k-s02DJwzEfX8ymfNhzi2NtmMoQaXlnPXtwYkXkI7YhmXY$]
The Kinesis spark code is constructing the Kinesis client from the above
deprecated class where the region can not be passed and it parses region from
the endpoint.
We currently use 'AmazonKinesisClientBuilder' class instead of
'AmazonKinesisClient' in AWS Kinesis SDK where there is an option to set the
region:
-
[https://urldefense.com/v3/__https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClientBuilder.html__;!!CQl3mcHX2A!WBUlCK8fcxFM8B5CFx6mr_k-s02DJwzEfX8ymfNhzi2NtmMoQaXlnPXtwYkXkDpPaYLV$|https://urldefense.com/v3/__https:/docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClientBuilder.html__;!!CQl3mcHX2A!WBUlCK8fcxFM8B5CFx6mr_k-s02DJwzEfX8ymfNhzi2NtmMoQaXlnPXtwYkXkDpPaYLV$]
Since Spark Streaming and Kinesis integration code is not owned by AWS, the
service team recommended that you reach out to the library maintainer to fix
the issue.
-
[https://urldefense.com/v3/__https://github.com/apache/spark/tree/master/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis__;!!CQl3mcHX2A!WBUlCK8fcxFM8B5CFx6mr_k-s02DJwzEfX8ymfNhzi2NtmMoQaXlnPXtwYkXkPpTWTeX$|https://urldefense.com/v3/__https:/github.com/apache/spark/tree/master/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis__;!!CQl3mcHX2A!WBUlCK8fcxFM8B5CFx6mr_k-s02DJwzEfX8ymfNhzi2NtmMoQaXlnPXtwYkXkPpTWTeX$]
> Spark error while consuming data from Kinesis direct end point
> --------------------------------------------------------------
>
> Key: SPARK-31236
> URL: https://issues.apache.org/jira/browse/SPARK-31236
> Project: Spark
> Issue Type: Bug
> Components: DStreams, Java API
> Affects Versions: 2.4.5
> Reporter: Thukarama Prabhu
> Priority: Critical
>
> Here is the summary of the issue I am experiencing when using kinesis direct
> URL for consuming data using spark.
> *Kinesis direct URL:*
> [https://kinesis-ae1.hdw.r53.deap.tv|https://kinesis-ae1.hdw.r53.deap.tv/]
> (Failing with Credential should be scoped to a valid region, not 'ae1')
> *Kinesis default URL:*
> [https://kinesis.us-east-1.amazonaws.com|https://kinesis.us-east-1.amazonaws.com/]
> (Working)
> Spark code for consuming data
> SparkAWSCredentials credentials =
> commonService.getSparkAWSCredentials(kinApp.propConfig);
> KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
> .streamingContext(jssc)
> .checkpointAppName(applicationName)
> .streamName(streamName)
> .endpointUrl(endpointURL)
> .regionName(regionName)
>
> .initialPosition(KinesisInitialPositions.fromKinesisInitialPosition(initPosition))
> .checkpointInterval(checkpointInterval)
> .kinesisCredentials(credentials)
> .storageLevel(StorageLevel.MEMORY_AND_DISK_2()).build();
>
> Spark version 2.4.4
> <dependency>
> <groupId>org.apache.spark</groupId>
> <artifactId>spark-streaming-kinesis-asl_2.11</artifactId>
> <version>2.4.5</version>
> </dependency>
> <dependency>
> <groupId>com.amazonaws</groupId>
> <artifactId>amazon-kinesis-client</artifactId>
> <version>1.13.3</version>
> </dependency>
> <dependency>
> <groupId>com.amazonaws</groupId>
> <artifactId>aws-java-sdk</artifactId>
> <version>1.11.747</version>
> </dependency>
>
> The spark application works fine when I use default URL but fails when I
> change to direct URL with below error. The direct URL works when I try to
> publish to direct kinesis URL. Issue only when I try to consume data.
>
> 2020-03-24 08:43:40,650 ERROR - Caught exception while sync'ing Kinesis
> shards and leases
> com.amazonaws.services.kinesis.model.AmazonKinesisException: Credential
> should be scoped to a valid region, not 'ae1'. (Service: AmazonKinesis;
> Status Code: 400; Error Code: InvalidSignatureException; Request ID:
> fb43b636-8ce2-ec77-adb7-a8ead9e038c2)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1799)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1383)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1359)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
> at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
> at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809)
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776)
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765)
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1557)
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1528)
> at
> com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy.listShards(KinesisProxy.java:326)
> at
> com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy.getShardList(KinesisProxy.java:441)
> at
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisShardSyncer.getShardList(KinesisShardSyncer.java:349)
> at
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisShardSyncer.syncShardLeases(KinesisShardSyncer.java:159)
> at
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisShardSyncer.checkAndCreateLeasesForNewShards(KinesisShardSyncer.java:112)
> at
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask.call(ShardSyncTask.java:84)
> at
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
> at
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:683)
> at
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:614)
> at
> org.apache.spark.streaming.kinesis.KinesisReceiver$$anon$1.run(KinesisReceiver.scala:191)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]