[
https://issues.apache.org/jira/browse/APEXMALHAR-2299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15575081#comment-15575081
]
Francis Fernandes commented on APEXMALHAR-2299:
-----------------------------------------------
The issue lies in the getTimeBucketAndAdjustBoundaries method of the
TimeBucketAssigner class.
This works on closed intervals on the incoming timestamps and returns a bucket.
The expiry interval is defined by two points start and end. Currently time
values < start are marked as expired, values > end cause the time window to be
shifted.
Values equal to start or end are marked as valid and this causes conflicting
keys at the boundaries.
Consider the following parameters:
{noformat}
ExpiryDuration = 1 seconds. BucketSpan = 1
second
Incoming tuples are with times = 9, 10
bucketKey = time +1, buckets has
keyValues for the timeStamps
start = -1 end = 0 buckets
= [ null]
After tuple 9
start = 9 end = 10 buckets
= [10]
After tuple 10
start = 9 end = 10 buckets
= [10] but returns key=11
{noformat}
The end result is trying to fit in more values than the number of buckets. In
the above case, even though the number of buckets is 1, both 9 and 10 are
considered valid and return the keys. The function should process \[start to
end) or (start to end\].
> TimeBasedDedupOperator throws exception during time bucket assignment in
> certain edge cases
> -------------------------------------------------------------------------------------------
>
> Key: APEXMALHAR-2299
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2299
> Project: Apache Apex Malhar
> Issue Type: Bug
> Affects Versions: 3.5.0
> Reporter: Francis Fernandes
> Assignee: Francis Fernandes
> Fix For: 3.6.0
>
> Original Estimate: 96h
> Remaining Estimate: 96h
>
> The following exception is thrown under certain edge cases:
> {noformat}
> Stopped running due to an exception. java.lang.IllegalArgumentException: new
> time bucket should have a value greater than the old time bucket
> at
> com.google.common.base.Preconditions.checkArgument(Preconditions.java:88)
> at
> org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl.handleBucketConflict(ManagedTimeUnifiedStateImpl.java:126)
> at
> org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl.prepareBucket(AbstractManagedStateImpl.java:265)
> at
> org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl.putInBucket(AbstractManagedStateImpl.java:276)
> at
> org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl.put(ManagedTimeUnifiedStateImpl.java:71)
> at
> org.apache.apex.malhar.lib.dedup.TimeBasedDedupOperator.putManagedState(TimeBasedDedupOperator.java:191)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)