[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15396104#comment-15396104
 ] 

ASF GitHub Bot commented on APEXMALHAR-2130:
--------------------------------------------

Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/345#discussion_r72494611
  
    --- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
 ---
    @@ -0,0 +1,188 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.window.impl;
    +
    +import java.util.Iterator;
    +import java.util.Map;
    +
    +import org.apache.apex.malhar.lib.state.spillable.Spillable;
    +import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
    +import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
    +import 
org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice;
    +import org.apache.apex.malhar.lib.window.Window;
    +import org.apache.apex.malhar.lib.window.WindowedStorage;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * This is an implementation of WindowedPlainStorage that makes use of 
{@link Spillable} data structures
    + *
    + * @param <T> Type of the value per window
    + */
    +public class SpillableWindowedPlainStorage<T> implements 
WindowedStorage.WindowedPlainStorage<T>
    +{
    +  private SpillableStateStore store;
    +  private transient SpillableComplexComponentImpl sccImpl;
    +  private long bucket;
    +  private Serde<Window, Slice> windowSerde;
    +  private Serde<T, Slice> valueSerde;
    +
    +  protected transient Spillable.SpillableByteMap<Window, T> internMap;
    +
    +  public SpillableWindowedPlainStorage()
    +  {
    +  }
    +
    +  public SpillableWindowedPlainStorage(long bucket, Serde<Window, Slice> 
windowSerde, Serde<T, Slice> valueSerde)
    +  {
    +    this.bucket = bucket;
    +    this.windowSerde = windowSerde;
    +    this.valueSerde = valueSerde;
    +  }
    +
    +  public void setStore(SpillableStateStore store)
    +  {
    +    this.store = store;
    +  }
    +
    +  public void setBucket(long bucket)
    +  {
    +    this.bucket = bucket;
    +  }
    +
    +  public void setWindowSerde(Serde<Window, Slice> windowSerde)
    +  {
    +    this.windowSerde = windowSerde;
    +  }
    +
    +  public void setValueSerde(Serde<T, Slice> valueSerde)
    +  {
    +    this.valueSerde = valueSerde;
    +  }
    +
    +  @Override
    +  public void put(Window window, T value)
    +  {
    +    internMap.put(window, value);
    +  }
    +
    +  @Override
    +  public T get(Window window)
    +  {
    +    return internMap.get(window);
    +  }
    +
    +  @Override
    +  public Iterable<Map.Entry<Window, T>> entrySet()
    +  {
    +    return internMap.entrySet();
    +  }
    +
    +  @Override
    +  public Iterator<Map.Entry<Window, T>> iterator()
    +  {
    +    return internMap.entrySet().iterator();
    +  }
    +
    +  @Override
    +  public boolean containsWindow(Window window)
    +  {
    +    return internMap.containsKey(window);
    +  }
    +
    +  @Override
    +  public long size()
    +  {
    +    return internMap.size();
    +  }
    +
    +  @Override
    +  public void remove(Window window)
    +  {
    +    internMap.remove(window);
    +  }
    +
    +  @Override
    +  public void migrateWindow(Window fromWindow, Window toWindow)
    +  {
    +    internMap.put(toWindow, internMap.remove(fromWindow));
    +  }
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    if (store == null) {
    +      // provide a default store
    +      store = new ManagedStateSpillableStateStore();
    +    }
    +    if (bucket == 0) {
    +      // choose a bucket that is almost guaranteed to be unique
    +      bucket = (context.getValue(Context.DAGContext.APPLICATION_NAME) + 
"#" + context.getId()).hashCode();
    +    }
    +    // set default serdes
    +    if (windowSerde == null) {
    +      windowSerde = new SerdeKryoSlice<>();
    +    }
    +    if (valueSerde == null) {
    +      valueSerde = new SerdeKryoSlice<>();
    +    }
    +    sccImpl = new SpillableComplexComponentImpl(store);
    +    sccImpl.setup(context);
    +    internMap = sccImpl.newSpillableByteMap(bucket, windowSerde, 
valueSerde);
    --- End diff --
    
    @ilooner Wouldn't that mean we should make the SpillableByteMapImpl non 
transient and make it serializable by kryo? When I did that, it had this error:
    
    ```
    com.esotericsoftware.kryo.KryoException: 
java.lang.UnsupportedOperationException
    Serialization trace:
    internMap 
(org.apache.apex.malhar.lib.window.impl.SpillableWindowedPlainStorage)
    
        at 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505)
        at 
com.datatorrent.lib.util.KryoCloneUtils.cloneObject(KryoCloneUtils.java:125)
        at 
com.datatorrent.lib.util.KryoCloneUtils.cloneObject(KryoCloneUtils.java:145)
        at 
org.apache.apex.malhar.lib.window.SpillableWindowedStorageTest.testWindowedPlainStorage(SpillableWindowedStorageTest.java:62)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
        at org.junit.rules.RunRules.evaluate(RunRules.java:20)
        at 
com.intellij.junit4.JUnit4TestRunnerUtil$IgnoreIgnoredTestJUnit4ClassRunner.runChild(JUnit4TestRunnerUtil.java:365)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
        at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
        at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
        at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:253)
        at 
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
    Caused by: java.lang.UnsupportedOperationException
        at 
org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl.entrySet(SpillableByteMapImpl.java:161)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:80)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
        ... 33 more
    ```


> implement scalable windowed storage
> -----------------------------------
>
>                 Key: APEXMALHAR-2130
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2130
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: bright chen
>            Assignee: David Yan
>
> This feature is used for supporting windowing.
> The storage needs to have the following features:
> 1. Spillable key value storage (integrate with APEXMALHAR-2026)
> 2. Upon checkpoint, it saves a snapshot for the entire data set with the 
> checkpointing window id.  This should be done incrementally (ManagedState) to 
> avoid wasting space with unchanged data
> 3. When recovering, it takes the recovery window id and restores to that 
> snapshot
> 4. When a window is committed, all windows with a lower ID should be purged 
> from the store.
> 5. It should implement the WindowedStorage and WindowedKeyedStorage 
> interfaces, and because of 2 and 3, we may want to add methods to the 
> WindowedStorage interface so that the implementation of WindowedOperator can 
> notify the storage of checkpointing, recovering and committing of a window.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to