[
https://issues.apache.org/jira/browse/APEXMALHAR-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15395155#comment-15395155
]
ASF GitHub Bot commented on APEXMALHAR-2130:
--------------------------------------------
Github user ilooner commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/345#discussion_r72389548
--- Diff:
library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
---
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
+import
org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is an implementation of WindowedPlainStorage that makes use of
{@link Spillable} data structures
+ *
+ * @param <T> Type of the value per window
+ */
+public class SpillableWindowedPlainStorage<T> implements
WindowedStorage.WindowedPlainStorage<T>
+{
+ private SpillableStateStore store;
+ private transient SpillableComplexComponentImpl sccImpl;
+ private long bucket;
+ private Serde<Window, Slice> windowSerde;
+ private Serde<T, Slice> valueSerde;
+
+ protected transient Spillable.SpillableByteMap<Window, T> internMap;
+
+ public SpillableWindowedPlainStorage()
+ {
+ }
+
+ public SpillableWindowedPlainStorage(long bucket, Serde<Window, Slice>
windowSerde, Serde<T, Slice> valueSerde)
+ {
+ this.bucket = bucket;
+ this.windowSerde = windowSerde;
+ this.valueSerde = valueSerde;
+ }
+
+ public void setStore(SpillableStateStore store)
+ {
+ this.store = store;
+ }
+
+ public void setBucket(long bucket)
+ {
+ this.bucket = bucket;
+ }
+
+ public void setWindowSerde(Serde<Window, Slice> windowSerde)
+ {
+ this.windowSerde = windowSerde;
+ }
+
+ public void setValueSerde(Serde<T, Slice> valueSerde)
+ {
+ this.valueSerde = valueSerde;
+ }
+
+ @Override
+ public void put(Window window, T value)
+ {
+ internMap.put(window, value);
+ }
+
+ @Override
+ public T get(Window window)
+ {
+ return internMap.get(window);
+ }
+
+ @Override
+ public Iterable<Map.Entry<Window, T>> entrySet()
+ {
+ return internMap.entrySet();
+ }
+
+ @Override
+ public Iterator<Map.Entry<Window, T>> iterator()
+ {
+ return internMap.entrySet().iterator();
+ }
+
+ @Override
+ public boolean containsWindow(Window window)
+ {
+ return internMap.containsKey(window);
+ }
+
+ @Override
+ public long size()
+ {
+ return internMap.size();
+ }
+
+ @Override
+ public void remove(Window window)
+ {
+ internMap.remove(window);
+ }
+
+ @Override
+ public void migrateWindow(Window fromWindow, Window toWindow)
+ {
+ internMap.put(toWindow, internMap.remove(fromWindow));
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ if (store == null) {
+ // provide a default store
+ store = new ManagedStateSpillableStateStore();
+ }
+ if (bucket == 0) {
+ // choose a bucket that is almost guaranteed to be unique
+ bucket = (context.getValue(Context.DAGContext.APPLICATION_NAME) +
"#" + context.getId()).hashCode();
+ }
+ // set default serdes
+ if (windowSerde == null) {
+ windowSerde = new SerdeKryoSlice<>();
+ }
+ if (valueSerde == null) {
+ valueSerde = new SerdeKryoSlice<>();
+ }
+ sccImpl = new SpillableComplexComponentImpl(store);
+ sccImpl.setup(context);
+ internMap = sccImpl.newSpillableByteMap(bucket, windowSerde,
valueSerde);
--- End diff --
@davidyan74 I think you are getting a size of zero because you are
allocating a new spillable data structure here. You should check if sccImpl is
null and only if it's null initialize it and all the SpillableDatastructures
> implement scalable windowed storage
> -----------------------------------
>
> Key: APEXMALHAR-2130
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2130
> Project: Apache Apex Malhar
> Issue Type: Task
> Reporter: bright chen
> Assignee: David Yan
>
> This feature is used for supporting windowing.
> The storage needs to have the following features:
> 1. Spillable key value storage (integrate with APEXMALHAR-2026)
> 2. Upon checkpoint, it saves a snapshot for the entire data set with the
> checkpointing window id. This should be done incrementally (ManagedState) to
> avoid wasting space with unchanged data
> 3. When recovering, it takes the recovery window id and restores to that
> snapshot
> 4. When a window is committed, all windows with a lower ID should be purged
> from the store.
> 5. It should implement the WindowedStorage and WindowedKeyedStorage
> interfaces, and because of 2 and 3, we may want to add methods to the
> WindowedStorage interface so that the implementation of WindowedOperator can
> notify the storage of checkpointing, recovering and committing of a window.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)