[
https://issues.apache.org/jira/browse/SPARK-31236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084419#comment-17084419
]
Thukarama Prabhu edited comment on SPARK-31236 at 4/15/20, 10:45 PM:
---------------------------------------------------------------------
Looks like this requires major code changes. Spark API currently uses KCL 1.x
(not currently supported from AWS) implementation. We need to migrate to KCL
2.x.
[https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#worker-migration]
was (Author: pthukar):
Looks like this requires major code changes. Spark API currently uses KCL 1.x
implementation. We need to migrate to KCL 2.x.
[https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#worker-migration]
> Spark error while consuming data from Kinesis direct end point
> --------------------------------------------------------------
>
> Key: SPARK-31236
> URL: https://issues.apache.org/jira/browse/SPARK-31236
> Project: Spark
> Issue Type: Bug
> Components: DStreams, Java API
> Affects Versions: 2.4.5
> Reporter: Thukarama Prabhu
> Priority: Critical
>
> Here is the summary of the issue I am experiencing when using kinesis direct
> URL for consuming data using spark.
> *Kinesis direct URL:*
> [https://kinesis-ae1.hdw.r53.deap.tv|https://kinesis-ae1.hdw.r53.deap.tv/]
> (Failing with Credential should be scoped to a valid region, not 'ae1')
> *Kinesis default URL:*
> [https://kinesis.us-east-1.amazonaws.com|https://kinesis.us-east-1.amazonaws.com/]
> (Working)
> Spark code for consuming data
> SparkAWSCredentials credentials =
> commonService.getSparkAWSCredentials(kinApp.propConfig);
> KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
> .streamingContext(jssc)
> .checkpointAppName(applicationName)
> .streamName(streamName)
> .endpointUrl(endpointURL)
> .regionName(regionName)
>
> .initialPosition(KinesisInitialPositions.fromKinesisInitialPosition(initPosition))
> .checkpointInterval(checkpointInterval)
> .kinesisCredentials(credentials)
> .storageLevel(StorageLevel.MEMORY_AND_DISK_2()).build();
>
> Spark version 2.4.4
> <dependency>
> <groupId>org.apache.spark</groupId>
> <artifactId>spark-streaming-kinesis-asl_2.11</artifactId>
> <version>2.4.5</version>
> </dependency>
> <dependency>
> <groupId>com.amazonaws</groupId>
> <artifactId>amazon-kinesis-client</artifactId>
> <version>1.13.3</version>
> </dependency>
> <dependency>
> <groupId>com.amazonaws</groupId>
> <artifactId>aws-java-sdk</artifactId>
> <version>1.11.747</version>
> </dependency>
>
> The spark application works fine when I use default URL but fails when I
> change to direct URL with below error. The direct URL works when I try to
> publish to direct kinesis URL. Issue only when I try to consume data.
>
> 2020-03-24 08:43:40,650 ERROR - Caught exception while sync'ing Kinesis
> shards and leases
> com.amazonaws.services.kinesis.model.AmazonKinesisException: Credential
> should be scoped to a valid region, not 'ae1'. (Service: AmazonKinesis;
> Status Code: 400; Error Code: InvalidSignatureException; Request ID:
> fb43b636-8ce2-ec77-adb7-a8ead9e038c2)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1799)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1383)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1359)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
> at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
> at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809)
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776)
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765)
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1557)
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1528)
> at
> com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy.listShards(KinesisProxy.java:326)
> at
> com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy.getShardList(KinesisProxy.java:441)
> at
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisShardSyncer.getShardList(KinesisShardSyncer.java:349)
> at
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisShardSyncer.syncShardLeases(KinesisShardSyncer.java:159)
> at
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisShardSyncer.checkAndCreateLeasesForNewShards(KinesisShardSyncer.java:112)
> at
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask.call(ShardSyncTask.java:84)
> at
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
> at
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:683)
> at
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:614)
> at
> org.apache.spark.streaming.kinesis.KinesisReceiver$$anon$1.run(KinesisReceiver.scala:191)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]