[
https://issues.apache.org/jira/browse/APEXMALHAR-1897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15203071#comment-15203071
]
ASF GitHub Bot commented on APEXMALHAR-1897:
--------------------------------------------
Github user chandnisingh commented on a diff in the pull request:
https://github.com/apache/incubator-apex-malhar/pull/145#discussion_r56758813
--- Diff:
library/src/main/java/com/datatorrent/lib/state/managed/Bucket.java ---
@@ -0,0 +1,534 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A bucket that groups events.
+ */
+public interface Bucket extends ManagedStateComponent
+{
+ /**
+ * @return bucket id
+ */
+ long getBucketId();
+
+ /**
+ *
+ * @return size of bucket in memory.
+ */
+ long getSizeInBytes();
+
+ /**
+ * Get value of a key.
+ *
+ * @param key key.
+ * @param timeBucket time bucket of the key if known; -1 otherwise.
+ * @param source source to read from
+ * @return value of the key.
+ */
+ Slice get(Slice key, long timeBucket, ReadSource source);
+
+ /**
+ * Set value of a key.
+ *
+ * @param key key.
+ * @param timeBucket timeBucket of the key.
+ * @param value value of the key.
+ */
+ void put(Slice key, long timeBucket, Slice value);
+
+ /**
+ * Triggers the bucket to checkpoint. Returns the non checkpointed data
so far.
+ *
+ * @return non checkpointed data.
+ */
+ Map<Slice, BucketedValue> checkpoint(long windowId);
+
+ /**
+ * Triggers the bucket to commit data till provided window id.
+ *
+ * @param windowId window id
+ */
+ void committed(long windowId);
+
+ /**
+ * Triggers bucket to free memory which is already persisted in bucket
data files.
+ *
+ * @return amount of memory freed in bytes.
+ * @throws IOException
+ */
+ long freeMemory() throws IOException;
+
+ /**
+ * Allows the bucket to process/cache data which is recovered (from
window files) after failure.
+ *
+ * @param windowId largest recovery window
+ * @param recoveredData recovered data
+ */
+ void recoveredData(long windowId, Map<Slice, Bucket.BucketedValue>
recoveredData);
+
+ enum ReadSource
+ {
+ MEMORY, //state in memory in key/value form
+ READERS, //these are streams in which the key will be searched and
serialized.
+ ALL //both the above states.
+ }
+
+ class BucketedValue
+ {
+ private long timeBucket;
+ private Slice value;
+
+ protected BucketedValue()
+ {
+ }
+
+ protected BucketedValue(long timeBucket, Slice value)
+ {
+ this.timeBucket = timeBucket;
+ this.value = value;
+ }
+
+ protected long getTimeBucket()
+ {
+ return timeBucket;
+ }
+
+ protected void setTimeBucket(long timeBucket)
+ {
+ this.timeBucket = timeBucket;
+ }
+
+ public Slice getValue()
+ {
+ return value;
+ }
+
+ public void setValue(Slice value)
+ {
+ this.value = value;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof BucketedValue)) {
+ return false;
+ }
+
+ BucketedValue that = (BucketedValue)o;
+
+ return timeBucket == that.timeBucket && value.equals(that.value);
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(timeBucket, value);
+ }
+ }
+
+ /**
+ * Default bucket.<br/>
+ * Not thread-safe.
+ */
+ class DefaultBucket implements Bucket
+ {
+ private final long bucketId;
+
+ //Key -> Ordered values
+ private Map<Slice, BucketedValue> flash = Maps.newHashMap();
+
+ //Data persisted in write ahead logs. window -> bucket
+ private final transient TreeMap<Long, Map<Slice, BucketedValue>>
checkpointedData = Maps.newTreeMap();
+
+ //Data persisted in bucket data files
+ private final transient Map<Slice, BucketedValue> committedData =
Maps.newHashMap();
+
+ //Data recovered
+ private final transient TreeMap<Long, Map<Slice, BucketedValue>>
recoveredData = Maps.newTreeMap();
--- End diff --
Yes. I think it can be. I will make the change.
> Create ManagedState
> -------------------
>
> Key: APEXMALHAR-1897
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1897
> Project: Apache Apex Malhar
> Issue Type: Sub-task
> Reporter: Chandni Singh
> Assignee: Chandni Singh
> Fix For: 3.4.0
>
>
> ManagedState is described in the document below:
> https://docs.google.com/document/d/1gRWN9ufKSZSZD0N-pthlhpC9TZ8KwJ6hJlAX6nxl5f8/edit#heading=h.z87ti1fwyt0t
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)