[
https://issues.apache.org/jira/browse/APEXMALHAR-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15396104#comment-15396104
]
ASF GitHub Bot commented on APEXMALHAR-2130:
--------------------------------------------
Github user davidyan74 commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/345#discussion_r72494611
--- Diff:
library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
---
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
+import
org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is an implementation of WindowedPlainStorage that makes use of
{@link Spillable} data structures
+ *
+ * @param <T> Type of the value per window
+ */
+public class SpillableWindowedPlainStorage<T> implements
WindowedStorage.WindowedPlainStorage<T>
+{
+ private SpillableStateStore store;
+ private transient SpillableComplexComponentImpl sccImpl;
+ private long bucket;
+ private Serde<Window, Slice> windowSerde;
+ private Serde<T, Slice> valueSerde;
+
+ protected transient Spillable.SpillableByteMap<Window, T> internMap;
+
+ public SpillableWindowedPlainStorage()
+ {
+ }
+
+ public SpillableWindowedPlainStorage(long bucket, Serde<Window, Slice>
windowSerde, Serde<T, Slice> valueSerde)
+ {
+ this.bucket = bucket;
+ this.windowSerde = windowSerde;
+ this.valueSerde = valueSerde;
+ }
+
+ public void setStore(SpillableStateStore store)
+ {
+ this.store = store;
+ }
+
+ public void setBucket(long bucket)
+ {
+ this.bucket = bucket;
+ }
+
+ public void setWindowSerde(Serde<Window, Slice> windowSerde)
+ {
+ this.windowSerde = windowSerde;
+ }
+
+ public void setValueSerde(Serde<T, Slice> valueSerde)
+ {
+ this.valueSerde = valueSerde;
+ }
+
+ @Override
+ public void put(Window window, T value)
+ {
+ internMap.put(window, value);
+ }
+
+ @Override
+ public T get(Window window)
+ {
+ return internMap.get(window);
+ }
+
+ @Override
+ public Iterable<Map.Entry<Window, T>> entrySet()
+ {
+ return internMap.entrySet();
+ }
+
+ @Override
+ public Iterator<Map.Entry<Window, T>> iterator()
+ {
+ return internMap.entrySet().iterator();
+ }
+
+ @Override
+ public boolean containsWindow(Window window)
+ {
+ return internMap.containsKey(window);
+ }
+
+ @Override
+ public long size()
+ {
+ return internMap.size();
+ }
+
+ @Override
+ public void remove(Window window)
+ {
+ internMap.remove(window);
+ }
+
+ @Override
+ public void migrateWindow(Window fromWindow, Window toWindow)
+ {
+ internMap.put(toWindow, internMap.remove(fromWindow));
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ if (store == null) {
+ // provide a default store
+ store = new ManagedStateSpillableStateStore();
+ }
+ if (bucket == 0) {
+ // choose a bucket that is almost guaranteed to be unique
+ bucket = (context.getValue(Context.DAGContext.APPLICATION_NAME) +
"#" + context.getId()).hashCode();
+ }
+ // set default serdes
+ if (windowSerde == null) {
+ windowSerde = new SerdeKryoSlice<>();
+ }
+ if (valueSerde == null) {
+ valueSerde = new SerdeKryoSlice<>();
+ }
+ sccImpl = new SpillableComplexComponentImpl(store);
+ sccImpl.setup(context);
+ internMap = sccImpl.newSpillableByteMap(bucket, windowSerde,
valueSerde);
--- End diff --
@ilooner Wouldn't that mean we should make the SpillableByteMapImpl non
transient and make it serializable by kryo? When I did that, it had this error:
```
com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
internMap
(org.apache.apex.malhar.lib.window.impl.SpillableWindowedPlainStorage)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505)
at
com.datatorrent.lib.util.KryoCloneUtils.cloneObject(KryoCloneUtils.java:125)
at
com.datatorrent.lib.util.KryoCloneUtils.cloneObject(KryoCloneUtils.java:145)
at
org.apache.apex.malhar.lib.window.SpillableWindowedStorageTest.testWindowedPlainStorage(SpillableWindowedStorageTest.java:62)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at
com.intellij.junit4.JUnit4TestRunnerUtil$IgnoreIgnoredTestJUnit4ClassRunner.runChild(JUnit4TestRunnerUtil.java:365)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:253)
at
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.lang.UnsupportedOperationException
at
org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl.entrySet(SpillableByteMapImpl.java:161)
at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:80)
at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
... 33 more
```
> implement scalable windowed storage
> -----------------------------------
>
> Key: APEXMALHAR-2130
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2130
> Project: Apache Apex Malhar
> Issue Type: Task
> Reporter: bright chen
> Assignee: David Yan
>
> This feature is used for supporting windowing.
> The storage needs to have the following features:
> 1. Spillable key value storage (integrate with APEXMALHAR-2026)
> 2. Upon checkpoint, it saves a snapshot for the entire data set with the
> checkpointing window id. This should be done incrementally (ManagedState) to
> avoid wasting space with unchanged data
> 3. When recovering, it takes the recovery window id and restores to that
> snapshot
> 4. When a window is committed, all windows with a lower ID should be purged
> from the store.
> 5. It should implement the WindowedStorage and WindowedKeyedStorage
> interfaces, and because of 2 and 3, we may want to add methods to the
> WindowedStorage interface so that the implementation of WindowedOperator can
> notify the storage of checkpointing, recovering and committing of a window.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)