[ 
https://issues.apache.org/jira/browse/BEAM-3881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16412301#comment-16412301
 ] 

Kevin Peterson commented on BEAM-3881:
--------------------------------------

I added a log statement 
[here|https://github.com/apache/beam/blob/master/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java#L163],
 and ran the pipeline again.

Looks like the {{CountSince}} input value is nonsensical.
{noformat}
BEAM-3881 --- CountSince: -290308-12-21T19:59:05.225Z, CountTo: 
2018-03-24T00:07:12.818Z
{noformat}
Digging in a bit more, I think this was actually fixed in 
[https://github.com/apache/beam/commit/994c7f3f714d51270f71234a4f5dba752a70766a],
 which changed how the watermark was initialized:
{noformat}
-  private Instant lastWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+  private Instant lastWatermark = 
Instant.now().minus(MAX_KINESIS_STREAM_RETENTION_PERIOD);
{noformat}

That change should be in v2.4, so I'll update the pipeline and close the ticket 
if it works.

> Failure reading backlog in KinesisIO
> ------------------------------------
>
>                 Key: BEAM-3881
>                 URL: https://issues.apache.org/jira/browse/BEAM-3881
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kinesis
>            Reporter: Kevin Peterson
>            Assignee: Alexey Romanenko
>            Priority: Major
>
> I'm gettingĀ an error when reading from Kinesis in my pipeline. Using Beam 
> v2.3, running on Google Cloud Dataflow.
> I'm constructing the source via:
> {code:java}
> KinesisIO.Read read = KinesisIO
>                 .read()
>                 .withAWSClientsProvider(
>                     configuration.getAwsAccessKeyId(),
>                     configuration.getAwsSecretAccessKey(),
>                     region)
>                 .withStreamName(configuration.getKinesisStream())
>                 .withUpToDateThreshold(Duration.standardMinutes(30))
>                 .withInitialTimestampInStream(configuration.getStartTime());
> {code}
> The exception is:
> {noformat}
> Mar 19, 2018 12:54:41 PM 
> org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
> SEVERE: 2018-03-19T19:54:53.010Z: (2896b8774de760ec): 
> java.lang.RuntimeException: Unknown kinesis failure, when trying to reach 
> kinesis
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:223)
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:161)
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:150)
> org.apache.beam.sdk.io.kinesis.KinesisReader.getTotalBacklogBytes(KinesisReader.java:200)
> com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:398)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1199)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:137)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:940)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArithmeticException: Value cannot fit in an int: 
> 153748225435
> org.joda.time.field.FieldUtils.safeToInt(FieldUtils.java:206)
> org.joda.time.field.BaseDurationField.getDifference(BaseDurationField.java:141)
> org.joda.time.base.BaseSingleFieldPeriod.between(BaseSingleFieldPeriod.java:72)
> org.joda.time.Minutes.minutesBetween(Minutes.java:101)
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.lambda$getBacklogBytes$3(SimplifiedKinesisClient.java:163)
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:205)
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:161)
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:150)
> org.apache.beam.sdk.io.kinesis.KinesisReader.getTotalBacklogBytes(KinesisReader.java:200)
> com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:398)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1199)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:137)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:940)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745){noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to