[
https://issues.apache.org/jira/browse/APEXMALHAR-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15396349#comment-15396349
]
ASF GitHub Bot commented on APEXMALHAR-2130:
--------------------------------------------
Github user davidyan74 commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/345#discussion_r72521168
--- Diff:
library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils;
+import
org.apache.apex.malhar.lib.window.impl.SpillableWindowedKeyedStorage;
+import
org.apache.apex.malhar.lib.window.impl.SpillableWindowedPlainStorage;
+
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+/**
+ * Unit tests for Spillable Windowed Storage
+ */
+public class SpillableWindowedStorageTest
+{
+ @Rule
+ public SpillableTestUtils.TestMeta testMeta = new
SpillableTestUtils.TestMeta();
+
+ @Ignore
+ @Test
+ public void testWindowedPlainStorage()
+ {
+ SpillableWindowedPlainStorage<Integer> storage = new
SpillableWindowedPlainStorage<>();
+ Window window1 = new Window.TimeWindow<>(1000, 10);
+ Window window2 = new Window.TimeWindow<>(1010, 10);
+ Window window3 = new Window.TimeWindow<>(1020, 10);
+ storage.setStore(testMeta.store);
+ storage.setup(testMeta.operatorContext);
+ storage.beginApexWindow(1000);
+ storage.put(window1, 1);
+ storage.put(window2, 2);
+ storage.put(window3, 3);
+ storage.endApexWindow();
+ storage.beginApexWindow(1001);
+ storage.put(window1, 4);
+ storage.put(window2, 5);
+ storage.endApexWindow();
+ storage.beforeCheckpoint(1001);
+ storage.checkpointed(1001);
+
+ SpillableWindowedPlainStorage<Integer> clonedStorage =
KryoCloneUtils.cloneObject(storage);
+
+ storage.beginApexWindow(1002);
+ storage.put(window1, 6);
+ storage.put(window2, 7);
+ storage.endApexWindow();
+
+ Assert.assertEquals(6L, storage.get(window1).longValue());
+ Assert.assertEquals(7L, storage.get(window2).longValue());
+ Assert.assertEquals(3L, storage.get(window3).longValue());
+
+ storage.beginApexWindow(1003);
+ storage.put(window1, 8);
+ storage.put(window2, 9);
+ storage.endApexWindow();
+
+ // simulating crash here
+ storage.teardown();
+
+ storage = clonedStorage;
+ storage.setup(testMeta.operatorContext);
+
--- End diff --
@ilooner I made both SpillableComplexComponentImpl and SpillableByteMap
non-transient and made the necessary changes to have them serializable by kryo.
Take a look at the commit a71d2cc. But it looks like the data is still not
fetched from disk after the recovery.
> implement scalable windowed storage
> -----------------------------------
>
> Key: APEXMALHAR-2130
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2130
> Project: Apache Apex Malhar
> Issue Type: Task
> Reporter: bright chen
> Assignee: David Yan
>
> This feature is used for supporting windowing.
> The storage needs to have the following features:
> 1. Spillable key value storage (integrate with APEXMALHAR-2026)
> 2. Upon checkpoint, it saves a snapshot for the entire data set with the
> checkpointing window id. This should be done incrementally (ManagedState) to
> avoid wasting space with unchanged data
> 3. When recovering, it takes the recovery window id and restores to that
> snapshot
> 4. When a window is committed, all windows with a lower ID should be purged
> from the store.
> 5. It should implement the WindowedStorage and WindowedKeyedStorage
> interfaces, and because of 2 and 3, we may want to add methods to the
> WindowedStorage interface so that the implementation of WindowedOperator can
> notify the storage of checkpointing, recovering and committing of a window.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)