[ https://issues.apache.org/jira/browse/APEXMALHAR-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15396104#comment-15396104 ]
ASF GitHub Bot commented on APEXMALHAR-2130: -------------------------------------------- Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/345#discussion_r72494611 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java --- @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import java.util.Iterator; +import java.util.Map; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl; +import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore; +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowedStorage; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * This is an implementation of WindowedPlainStorage that makes use of {@link Spillable} data structures + * + * @param <T> Type of the value per window + */ +public class SpillableWindowedPlainStorage<T> implements WindowedStorage.WindowedPlainStorage<T> +{ + private SpillableStateStore store; + private transient SpillableComplexComponentImpl sccImpl; + private long bucket; + private Serde<Window, Slice> windowSerde; + private Serde<T, Slice> valueSerde; + + protected transient Spillable.SpillableByteMap<Window, T> internMap; + + public SpillableWindowedPlainStorage() + { + } + + public SpillableWindowedPlainStorage(long bucket, Serde<Window, Slice> windowSerde, Serde<T, Slice> valueSerde) + { + this.bucket = bucket; + this.windowSerde = windowSerde; + this.valueSerde = valueSerde; + } + + public void setStore(SpillableStateStore store) + { + this.store = store; + } + + public void setBucket(long bucket) + { + this.bucket = bucket; + } + + public void setWindowSerde(Serde<Window, Slice> windowSerde) + { + this.windowSerde = windowSerde; + } + + public void setValueSerde(Serde<T, Slice> valueSerde) + { + this.valueSerde = valueSerde; + } + + @Override + public void put(Window window, T value) + { + internMap.put(window, value); + } + + @Override + public T get(Window window) + { + return internMap.get(window); + } + + @Override + public Iterable<Map.Entry<Window, T>> entrySet() + { + return internMap.entrySet(); + } + + @Override + public Iterator<Map.Entry<Window, T>> iterator() + { + return internMap.entrySet().iterator(); + } + + @Override + public boolean containsWindow(Window window) + { + return internMap.containsKey(window); + } + + @Override + public long size() + { + return internMap.size(); + } + + @Override + public void remove(Window window) + { + internMap.remove(window); + } + + @Override + public void migrateWindow(Window fromWindow, Window toWindow) + { + internMap.put(toWindow, internMap.remove(fromWindow)); + } + + @Override + public void setup(Context.OperatorContext context) + { + if (store == null) { + // provide a default store + store = new ManagedStateSpillableStateStore(); + } + if (bucket == 0) { + // choose a bucket that is almost guaranteed to be unique + bucket = (context.getValue(Context.DAGContext.APPLICATION_NAME) + "#" + context.getId()).hashCode(); + } + // set default serdes + if (windowSerde == null) { + windowSerde = new SerdeKryoSlice<>(); + } + if (valueSerde == null) { + valueSerde = new SerdeKryoSlice<>(); + } + sccImpl = new SpillableComplexComponentImpl(store); + sccImpl.setup(context); + internMap = sccImpl.newSpillableByteMap(bucket, windowSerde, valueSerde); --- End diff -- @ilooner Wouldn't that mean we should make the SpillableByteMapImpl non transient and make it serializable by kryo? When I did that, it had this error: ``` com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: internMap (org.apache.apex.malhar.lib.window.impl.SpillableWindowedPlainStorage) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505) at com.datatorrent.lib.util.KryoCloneUtils.cloneObject(KryoCloneUtils.java:125) at com.datatorrent.lib.util.KryoCloneUtils.cloneObject(KryoCloneUtils.java:145) at org.apache.apex.malhar.lib.window.SpillableWindowedStorageTest.testWindowedPlainStorage(SpillableWindowedStorageTest.java:62) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at com.intellij.junit4.JUnit4TestRunnerUtil$IgnoreIgnoredTestJUnit4ClassRunner.runChild(JUnit4TestRunnerUtil.java:365) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.junit.runner.JUnitCore.run(JUnitCore.java:160) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:253) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) Caused by: java.lang.UnsupportedOperationException at org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl.entrySet(SpillableByteMapImpl.java:161) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:80) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ... 33 more ``` > implement scalable windowed storage > ----------------------------------- > > Key: APEXMALHAR-2130 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2130 > Project: Apache Apex Malhar > Issue Type: Task > Reporter: bright chen > Assignee: David Yan > > This feature is used for supporting windowing. > The storage needs to have the following features: > 1. Spillable key value storage (integrate with APEXMALHAR-2026) > 2. Upon checkpoint, it saves a snapshot for the entire data set with the > checkpointing window id. This should be done incrementally (ManagedState) to > avoid wasting space with unchanged data > 3. When recovering, it takes the recovery window id and restores to that > snapshot > 4. When a window is committed, all windows with a lower ID should be purged > from the store. > 5. It should implement the WindowedStorage and WindowedKeyedStorage > interfaces, and because of 2 and 3, we may want to add methods to the > WindowedStorage interface so that the implementation of WindowedOperator can > notify the storage of checkpointing, recovering and committing of a window. -- This message was sent by Atlassian JIRA (v6.3.4#6332)