[
https://issues.apache.org/jira/browse/APEXMALHAR-1897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15202270#comment-15202270
]
ASF GitHub Bot commented on APEXMALHAR-1897:
--------------------------------------------
Github user amberarrow commented on a diff in the pull request:
https://github.com/apache/incubator-apex-malhar/pull/145#discussion_r56730030
--- Diff:
library/src/main/java/com/datatorrent/lib/state/managed/TimeBucketAssigner.java
---
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.state.managed;
+
+import javax.validation.constraints.NotNull;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.appdata.query.WindowBoundedService;
+
+/**
+ * Keeps track of time buckets.<br/>
+ *
+ * The data of a bucket is further divided into time-buckets. This
component controls the length of time buckets,
+ * which time-bucket an event falls into and sliding the time boundaries.
+ * <p/>
+ *
+ * The configuration {@link #expireBefore} and {@link #bucketSpan} are
used to calculate number of time-buckets.
+ * For eg. if <code>expireBefore = 1 hour</code> and <code>bucketSpan = 30
minutes</code>, then <code>
+ * numBuckets = 60 minutes/ 30 minutes = 2 </code>.
+ * <p/>
+ *
+ * The time boundaries- start and end, periodically move by span of a
single time-bucket. Any event with time < start
+ * is expired. These boundaries slide between application window by the
expiry task asynchronously.<br/>
+ * The boundaries move only between an application window to ensure
consistency of a checkpoint. Checkpoint will happen
+ * at application window boundaries so if we do not restrict moving start
and end within an app window boundary, it may
+ * happen that old value of 'start' is saved with the new value of 'end'.
+ * <p/>
+ *
+ * The boundaries can also be moved by {@link #getTimeBucketFor(long)}.
The time which is passed as an argument to this
+ * method can be ahead of <code>end</code>. This means that the
corresponding event is a future event
+ * (wrt TimeBucketAssigner) and cannot be ignored. Therefore it is
accounted by sliding boundaries further.
+ */
+public class TimeBucketAssigner implements ManagedStateComponent
+{
+ @NotNull
+ private Instant referenceInstant = new Instant();
+
+ @NotNull
+ @FieldSerializer.Bind(JavaSerializer.class)
+ private Duration expireBefore = Duration.standardDays(2);
+
+ @FieldSerializer.Bind(JavaSerializer.class)
+ private Duration bucketSpan;
+
+ private long bucketSpanMillis;
+
+ private long start;
+ private long end;
+ private int numBuckets;
+ private transient long fixedStart;
+ private transient long lowestTimeBucket;
+
+ private boolean initialized;
+
+ private transient WindowBoundedService windowBoundedService;
+
+ private transient PurgeListener purgeListener = null;
+
+ private final transient Runnable expiryTask = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ synchronized (lock) {
+ start += bucketSpanMillis;
+ end += bucketSpanMillis;
+ if (purgeListener != null) {
+
purgeListener.purgeTimeBucketsLessThanEqualTo(lowestTimeBucket++);
+ }
+ }
+ }
+ };
+
+ private final transient Object lock = new Object();
+
+ @Override
+ public void setup(@NotNull ManagedStateContext managedStateContext)
+ {
+ Context.OperatorContext context =
managedStateContext.getOperatorContext();
+ fixedStart = referenceInstant.getMillis() - expireBefore.getMillis();
+
+ if (!initialized) {
+ if (bucketSpan == null) {
+ bucketSpan =
Duration.millis(context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT)
*
+
context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS));
+ }
+ start = fixedStart;
+ bucketSpanMillis = bucketSpan.getMillis();
+ numBuckets = (int)((expireBefore.getMillis() + bucketSpanMillis - 1)
/ bucketSpanMillis);
+ end = start + (numBuckets * bucketSpanMillis);
+
+ initialized = true;
+ }
+ lowestTimeBucket = (start - fixedStart) / bucketSpanMillis;
+ windowBoundedService = new WindowBoundedService(bucketSpanMillis,
expiryTask);
+ windowBoundedService.setup(context);
+ }
+
+ public void beginWindow(long windowId)
+ {
+ windowBoundedService.beginWindow(windowId);
+ }
+
+ public void endWindow()
+ {
+ windowBoundedService.endWindow();
+ }
+
+ /**
+ * Get the bucket key for the long value.
+ *
+ * @param value value from which bucket key is derived.
+ * @return -1 if value is already expired; bucket key otherwise.
+ */
+ public long getTimeBucketFor(long value)
+ {
+ synchronized (lock) {
+ if (value < start) {
+ return -1;
+ }
+ long diffFromStart = value - fixedStart;
+ long key = diffFromStart / bucketSpanMillis;
+ if (value > end) {
+ long move = ((value - end) / bucketSpanMillis + 1) *
bucketSpanMillis;
+ start += move;
+ end += move;
+ }
+ return key;
+ }
+ }
+
+ public void setPurgeListener(@NotNull PurgeListener purgeListener)
+ {
+ this.purgeListener = Preconditions.checkNotNull(purgeListener, "purge
listener");
+ }
+
+ @Override
+ public void teardown()
+ {
+ windowBoundedService.teardown();
+ }
+
+ /**
+ * @return number of buckets.
+ */
+ public int getNumBuckets()
+ {
+ return numBuckets;
+ }
+
+ /**
+ * @return reference instant
+ */
+ public Instant getReferenceInstant()
+ {
+ return referenceInstant;
+ }
+
+ /**
+ * Sets the reference instant (by default the system time when the
streaming app is created).
+ * This instant with {@link #expireBefore} is used to calculate the
{@link #start} and {@link #end}.
+ *
+ * @param referenceInstant
+ */
+ public void setReferenceInstant(Instant referenceInstant)
+ {
+ this.referenceInstant = referenceInstant;
+ }
+
+ /**
+ * @return duration before which the data is expired.
+ */
+ public Duration getExpireBefore()
+ {
+ return expireBefore;
+ }
+
+ /**
+ * Sets the duration which denotes expiry. Any event with time before
this duration is considered to be expired.
+ * @param expireBefore duration
+ */
+ public void setExpireBefore(Duration expireBefore)
+ {
+ this.expireBefore = expireBefore;
+ }
+
+ /**
+ * @return time-bucket span
+ */
+ public Duration getBucketSpan()
+ {
+ return bucketSpan;
+ }
+
+ /**
+ * Sets the length of a time bucket.
+ * @param bucketSpan length of time bucket
+ */
+ public void setBucketSpan(Duration bucketSpan)
+ {
+ this.bucketSpan = bucketSpan;
+ }
+
+ /**
+ * The listeners are informed when the time slides and time buckets
which are older than the smallest time bucket
--- End diff --
listeners are => listener is
> Create ManagedState
> -------------------
>
> Key: APEXMALHAR-1897
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1897
> Project: Apache Apex Malhar
> Issue Type: Sub-task
> Reporter: Chandni Singh
> Assignee: Chandni Singh
> Fix For: 3.4.0
>
>
> ManagedState is described in the document below:
> https://docs.google.com/document/d/1gRWN9ufKSZSZD0N-pthlhpC9TZ8KwJ6hJlAX6nxl5f8/edit#heading=h.z87ti1fwyt0t
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)