[ https://issues.apache.org/jira/browse/APEXMALHAR-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15396349#comment-15396349 ]
ASF GitHub Bot commented on APEXMALHAR-2130: -------------------------------------------- Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/345#discussion_r72521168 --- Diff: library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java --- @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils; +import org.apache.apex.malhar.lib.window.impl.SpillableWindowedKeyedStorage; +import org.apache.apex.malhar.lib.window.impl.SpillableWindowedPlainStorage; + +import com.datatorrent.lib.util.KryoCloneUtils; + +/** + * Unit tests for Spillable Windowed Storage + */ +public class SpillableWindowedStorageTest +{ + @Rule + public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); + + @Ignore + @Test + public void testWindowedPlainStorage() + { + SpillableWindowedPlainStorage<Integer> storage = new SpillableWindowedPlainStorage<>(); + Window window1 = new Window.TimeWindow<>(1000, 10); + Window window2 = new Window.TimeWindow<>(1010, 10); + Window window3 = new Window.TimeWindow<>(1020, 10); + storage.setStore(testMeta.store); + storage.setup(testMeta.operatorContext); + storage.beginApexWindow(1000); + storage.put(window1, 1); + storage.put(window2, 2); + storage.put(window3, 3); + storage.endApexWindow(); + storage.beginApexWindow(1001); + storage.put(window1, 4); + storage.put(window2, 5); + storage.endApexWindow(); + storage.beforeCheckpoint(1001); + storage.checkpointed(1001); + + SpillableWindowedPlainStorage<Integer> clonedStorage = KryoCloneUtils.cloneObject(storage); + + storage.beginApexWindow(1002); + storage.put(window1, 6); + storage.put(window2, 7); + storage.endApexWindow(); + + Assert.assertEquals(6L, storage.get(window1).longValue()); + Assert.assertEquals(7L, storage.get(window2).longValue()); + Assert.assertEquals(3L, storage.get(window3).longValue()); + + storage.beginApexWindow(1003); + storage.put(window1, 8); + storage.put(window2, 9); + storage.endApexWindow(); + + // simulating crash here + storage.teardown(); + + storage = clonedStorage; + storage.setup(testMeta.operatorContext); + --- End diff -- @ilooner I made both SpillableComplexComponentImpl and SpillableByteMap non-transient and made the necessary changes to have them serializable by kryo. Take a look at the commit a71d2cc. But it looks like the data is still not fetched from disk after the recovery. > implement scalable windowed storage > ----------------------------------- > > Key: APEXMALHAR-2130 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2130 > Project: Apache Apex Malhar > Issue Type: Task > Reporter: bright chen > Assignee: David Yan > > This feature is used for supporting windowing. > The storage needs to have the following features: > 1. Spillable key value storage (integrate with APEXMALHAR-2026) > 2. Upon checkpoint, it saves a snapshot for the entire data set with the > checkpointing window id. This should be done incrementally (ManagedState) to > avoid wasting space with unchanged data > 3. When recovering, it takes the recovery window id and restores to that > snapshot > 4. When a window is committed, all windows with a lower ID should be purged > from the store. > 5. It should implement the WindowedStorage and WindowedKeyedStorage > interfaces, and because of 2 and 3, we may want to add methods to the > WindowedStorage interface so that the implementation of WindowedOperator can > notify the storage of checkpointing, recovering and committing of a window. -- This message was sent by Atlassian JIRA (v6.3.4#6332)