[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15396349#comment-15396349
 ] 

ASF GitHub Bot commented on APEXMALHAR-2130:
--------------------------------------------

Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/345#discussion_r72521168
  
    --- Diff: 
library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
 ---
    @@ -0,0 +1,122 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.window;
    +
    +import org.junit.Assert;
    +import org.junit.Ignore;
    +import org.junit.Rule;
    +import org.junit.Test;
    +
    +import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils;
    +import 
org.apache.apex.malhar.lib.window.impl.SpillableWindowedKeyedStorage;
    +import 
org.apache.apex.malhar.lib.window.impl.SpillableWindowedPlainStorage;
    +
    +import com.datatorrent.lib.util.KryoCloneUtils;
    +
    +/**
    + * Unit tests for Spillable Windowed Storage
    + */
    +public class SpillableWindowedStorageTest
    +{
    +  @Rule
    +  public SpillableTestUtils.TestMeta testMeta = new 
SpillableTestUtils.TestMeta();
    +
    +  @Ignore
    +  @Test
    +  public void testWindowedPlainStorage()
    +  {
    +    SpillableWindowedPlainStorage<Integer> storage = new 
SpillableWindowedPlainStorage<>();
    +    Window window1 = new Window.TimeWindow<>(1000, 10);
    +    Window window2 = new Window.TimeWindow<>(1010, 10);
    +    Window window3 = new Window.TimeWindow<>(1020, 10);
    +    storage.setStore(testMeta.store);
    +    storage.setup(testMeta.operatorContext);
    +    storage.beginApexWindow(1000);
    +    storage.put(window1, 1);
    +    storage.put(window2, 2);
    +    storage.put(window3, 3);
    +    storage.endApexWindow();
    +    storage.beginApexWindow(1001);
    +    storage.put(window1, 4);
    +    storage.put(window2, 5);
    +    storage.endApexWindow();
    +    storage.beforeCheckpoint(1001);
    +    storage.checkpointed(1001);
    +
    +    SpillableWindowedPlainStorage<Integer> clonedStorage = 
KryoCloneUtils.cloneObject(storage);
    +
    +    storage.beginApexWindow(1002);
    +    storage.put(window1, 6);
    +    storage.put(window2, 7);
    +    storage.endApexWindow();
    +
    +    Assert.assertEquals(6L, storage.get(window1).longValue());
    +    Assert.assertEquals(7L, storage.get(window2).longValue());
    +    Assert.assertEquals(3L, storage.get(window3).longValue());
    +
    +    storage.beginApexWindow(1003);
    +    storage.put(window1, 8);
    +    storage.put(window2, 9);
    +    storage.endApexWindow();
    +
    +    // simulating crash here
    +    storage.teardown();
    +
    +    storage = clonedStorage;
    +    storage.setup(testMeta.operatorContext);
    +
    --- End diff --
    
    @ilooner I made both SpillableComplexComponentImpl and SpillableByteMap 
non-transient and made the necessary changes to have them serializable by kryo. 
Take a look at the commit a71d2cc. But it looks like the data is still not 
fetched from disk after the recovery.


> implement scalable windowed storage
> -----------------------------------
>
>                 Key: APEXMALHAR-2130
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2130
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: bright chen
>            Assignee: David Yan
>
> This feature is used for supporting windowing.
> The storage needs to have the following features:
> 1. Spillable key value storage (integrate with APEXMALHAR-2026)
> 2. Upon checkpoint, it saves a snapshot for the entire data set with the 
> checkpointing window id.  This should be done incrementally (ManagedState) to 
> avoid wasting space with unchanged data
> 3. When recovering, it takes the recovery window id and restores to that 
> snapshot
> 4. When a window is committed, all windows with a lower ID should be purged 
> from the store.
> 5. It should implement the WindowedStorage and WindowedKeyedStorage 
> interfaces, and because of 2 and 3, we may want to add methods to the 
> WindowedStorage interface so that the implementation of WindowedOperator can 
> notify the storage of checkpointing, recovering and committing of a window.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to