APEXMALHAR-2190 #resolve #comment Use reusable buffer for serialization in spillable data structures
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2fa1e6b1 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2fa1e6b1 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2fa1e6b1 Branch: refs/heads/master Commit: 2fa1e6b16312eecdd074520d431902f11d555221 Parents: 3799157 Author: brightchen <[email protected]> Authored: Mon Aug 15 17:46:27 2016 -0700 Committer: David Yan <[email protected]> Committed: Mon Oct 24 12:52:08 2016 -0700 ---------------------------------------------------------------------- .../spillable/SpillableBenchmarkApp.java | 69 +++++ .../spillable/SpillableTestInputOperator.java | 46 ++++ .../spillable/SpillableTestOperator.java | 189 ++++++++++++++ .../spillable/SpillableBenchmarkAppTester.java | 73 ++++++ .../spillable/SpillableDSBenchmarkTest.java | 171 +++++++++++++ .../state/ManagedStateBenchmarkAppTest.java | 101 ++++++++ .../state/ManagedStateBenchmarkAppTester.java | 101 -------- benchmark/src/test/resources/log4j.properties | 2 + .../state/managed/AbstractManagedStateImpl.java | 34 ++- .../apex/malhar/lib/state/managed/Bucket.java | 85 +++++-- .../lib/state/managed/BucketProvider.java | 40 +++ .../state/spillable/SpillableArrayListImpl.java | 17 +- .../SpillableArrayListMultimapImpl.java | 53 ++-- .../spillable/SpillableComplexComponent.java | 29 +-- .../SpillableComplexComponentImpl.java | 64 +++-- .../lib/state/spillable/SpillableMapImpl.java | 44 ++-- .../lib/state/spillable/SpillableSetImpl.java | 45 +--- .../spillable/SpillableSetMultimapImpl.java | 45 ++-- .../state/spillable/SpillableStateStore.java | 3 +- .../state/spillable/WindowBoundedMapCache.java | 5 +- .../inmem/InMemSpillableStateStore.java | 26 ++ .../utils/serde/AffixKeyValueSerdeManager.java | 76 ++++++ .../apex/malhar/lib/utils/serde/AffixSerde.java | 68 +++++ .../apex/malhar/lib/utils/serde/ArraySerde.java | 97 ++++++++ .../apex/malhar/lib/utils/serde/Block.java | 217 ++++++++++++++++ .../lib/utils/serde/BlockReleaseStrategy.java | 47 ++++ .../malhar/lib/utils/serde/BlockStream.java | 179 +++++++++++++ .../malhar/lib/utils/serde/BufferSlice.java | 100 ++++++++ .../malhar/lib/utils/serde/CollectionSerde.java | 97 ++++++++ .../serde/DefaultBlockReleaseStrategy.java | 96 +++++++ .../malhar/lib/utils/serde/GenericSerde.java | 81 ++++++ .../apex/malhar/lib/utils/serde/IntSerde.java | 45 ++++ .../utils/serde/KeyValueByteStreamProvider.java | 37 +++ .../lib/utils/serde/KeyValueSerdeManager.java | 86 +++++++ .../apex/malhar/lib/utils/serde/LongSerde.java | 45 ++++ .../apex/malhar/lib/utils/serde/PairSerde.java | 73 ++++++ .../lib/utils/serde/PassThruByteArraySerde.java | 51 ---- .../serde/PassThruByteArraySliceSerde.java | 61 ----- .../lib/utils/serde/PassThruSliceSerde.java | 32 ++- .../apex/malhar/lib/utils/serde/Serde.java | 41 +-- .../lib/utils/serde/SerdeCollectionSlice.java | 120 --------- .../malhar/lib/utils/serde/SerdeIntSlice.java | 54 ---- .../malhar/lib/utils/serde/SerdeKryoSlice.java | 100 -------- .../malhar/lib/utils/serde/SerdeLongSlice.java | 54 ---- .../malhar/lib/utils/serde/SerdePairSlice.java | 89 ------- .../lib/utils/serde/SerdeStringSlice.java | 55 ---- .../lib/utils/serde/SerializationBuffer.java | 130 ++++++++++ .../apex/malhar/lib/utils/serde/SliceUtils.java | 10 + .../malhar/lib/utils/serde/StringSerde.java | 45 ++++ .../lib/utils/serde/WindowCompleteListener.java | 29 +++ .../lib/utils/serde/WindowedBlockStream.java | 249 +++++++++++++++++++ .../impl/SpillableSessionWindowedStorage.java | 3 +- .../impl/SpillableWindowedKeyedStorage.java | 28 +-- .../impl/SpillableWindowedPlainStorage.java | 18 +- .../com/datatorrent/lib/util/TestUtils.java | 3 +- .../lib/state/managed/DefaultBucketTest.java | 48 +++- .../state/managed/ManagedStateTestUtils.java | 3 +- .../spillable/SpillableArrayListImplTest.java | 12 +- .../SpillableArrayListMultimapImplTest.java | 30 ++- .../SpillableComplexComponentImplTest.java | 6 +- .../state/spillable/SpillableMapImplTest.java | 39 ++- .../state/spillable/SpillableSetImplTest.java | 4 +- .../spillable/SpillableSetMultimapImplTest.java | 18 +- .../lib/state/spillable/SpillableTestUtils.java | 46 ++-- .../spillable/TimeBasedPriorityQueueTest.java | 3 - .../malhar/lib/utils/serde/AffixSerdeTest.java | 43 ++++ .../malhar/lib/utils/serde/BlockStreamTest.java | 179 +++++++++++++ .../lib/utils/serde/CollectionSerdeTest.java | 68 +++++ .../lib/utils/serde/GenericSerdeTest.java | 84 +++++++ .../malhar/lib/utils/serde/PairSerdeTest.java | 48 ++++ .../utils/serde/PassThruByteArraySerdeTest.java | 72 ------ .../utils/serde/SerdeCollectionSliceTest.java | 65 ----- .../lib/utils/serde/SerdeGeneralTest.java | 169 +++++++++++++ .../lib/utils/serde/SerdeKryoSliceTest.java | 79 ------ .../lib/utils/serde/SerdePairSliceTest.java | 44 ---- .../window/SpillableWindowedStorageTest.java | 17 +- 76 files changed, 3570 insertions(+), 1265 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java new file mode 100644 index 0000000..e2fe8bb --- /dev/null +++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.benchmark.spillable; + +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Preconditions; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.fileaccess.TFileImpl; + +@ApplicationAnnotation(name = "SpillableBenchmarkApp") +public class SpillableBenchmarkApp implements StreamingApplication +{ + protected final String PROP_STORE_PATH = "dt.application.SpillableBenchmarkApp.storeBasePath"; + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + // Create ActiveMQStringSinglePortOutputOperator + SpillableTestInputOperator input = new SpillableTestInputOperator(); + input.batchSize = 100; + input.sleepBetweenBatch = 0; + input = dag.addOperator("input", input); + + SpillableTestOperator testOperator = new SpillableTestOperator(); + testOperator.store = createStore(conf); + testOperator.shutdownCount = -1; + testOperator = dag.addOperator("test", testOperator ); + + + // Connect ports + dag.addStream("stream", input.output, testOperator.input).setLocality(DAG.Locality.CONTAINER_LOCAL); + } + + + public ManagedStateSpillableStateStore createStore(Configuration conf) + { + String basePath = getStoreBasePath(conf); + ManagedStateSpillableStateStore store = new ManagedStateSpillableStateStore(); + ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath); + return store; + } + + public String getStoreBasePath(Configuration conf) + { + return Preconditions.checkNotNull(conf.get(PROP_STORE_PATH), + "base path should be specified in the properties.xml"); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java new file mode 100644 index 0000000..2e33721 --- /dev/null +++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.benchmark.spillable; + +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.common.util.BaseOperator; + +public class SpillableTestInputOperator extends BaseOperator implements InputOperator +{ + public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>(); + public long count = 0; + public int batchSize = 100; + public int sleepBetweenBatch = 1; + + @Override + public void emitTuples() + { + for (int i = 0; i < batchSize; ++i) { + output.emit("" + ++count); + } + if (sleepBetweenBatch > 0) { + try { + Thread.sleep(sleepBetweenBatch); + } catch (InterruptedException e) { + //ignore + } + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java new file mode 100644 index 0000000..3c5bf71 --- /dev/null +++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.benchmark.spillable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.spillable.SpillableArrayListImpl; +import org.apache.apex.malhar.lib.state.spillable.SpillableArrayListMultimapImpl; +import org.apache.apex.malhar.lib.state.spillable.SpillableMapImpl; +import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore; +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.LongSerde; +import org.apache.apex.malhar.lib.utils.serde.StringSerde; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ShutdownException; +import com.datatorrent.common.util.BaseOperator; + +public class SpillableTestOperator extends BaseOperator implements Operator.CheckpointNotificationListener +{ + private static final Logger logger = LoggerFactory.getLogger(SpillableTestOperator.class); + + public static final byte[] ID1 = new byte[] {(byte)1}; + public static final byte[] ID2 = new byte[] {(byte)2}; + public static final byte[] ID3 = new byte[] {(byte)3}; + + public SpillableArrayListMultimapImpl<String, String> multiMap; + + public ManagedStateSpillableStateStore store; + + public long totalCount = 0; + public transient long countInWindow; + public long minWinId = -1; + public long committedWinId = -1; + public long windowId; + + public SpillableMapImpl<Long, Long> windowToCount; + + public long shutdownCount = -1; + + public static Throwable errorTrace; + + public final transient DefaultInputPort<String> input = new DefaultInputPort<String>() + { + @Override + public void process(String tuple) + { + processTuple(tuple); + } + }; + + public void processTuple(String tuple) + { + if (++totalCount == shutdownCount) { + throw new RuntimeException("Test recovery. count = " + totalCount); + } + countInWindow++; + multiMap.put("" + windowId, tuple); + } + + @Override + public void setup(OperatorContext context) + { + super.setup(context); + if (windowToCount == null) { + windowToCount = createWindowToCountMap(store); + } + if (multiMap == null) { + multiMap = createMultimap(store); + } + + store.setup(context); + multiMap.setup(context); + + checkData(); + } + + public void checkData() + { + long startTime = System.currentTimeMillis(); + logger.debug("check data: totalCount: {}; minWinId: {}; committedWinId: {}; curWinId: {}", totalCount, + this.minWinId, committedWinId, this.windowId); + for (long winId = Math.max(committedWinId + 1, minWinId); winId < this.windowId; ++winId) { + Long count = this.windowToCount.get(winId); + SpillableArrayListImpl<String> datas = (SpillableArrayListImpl<String>)multiMap.get("" + winId); + String msg; + if (((datas == null && count != null) || (datas != null && count == null)) || (datas == null && count == null)) { + msg = "Invalid data/count. datas: " + datas + "; count: " + count; + logger.error(msg); + errorTrace = new RuntimeException(msg); + throw new ShutdownException(); + } else { + int dataSize = datas.size(); + if ((long)count != (long)dataSize) { + msg = String.format("data size not equal: window Id: %d; datas size: %d; count: %d", winId, dataSize, count); + logger.error(msg); + errorTrace = new RuntimeException(msg); + throw new ShutdownException(); + } + } + } + logger.info("check data took {} millis.", System.currentTimeMillis() - startTime); + } + + + /** + * {@inheritDoc} + */ + @Override + public void beginWindow(long windowId) + { + store.beginWindow(windowId); + multiMap.beginWindow(windowId); + if (minWinId < 0) { + minWinId = windowId; + } + + this.windowId = windowId; + countInWindow = 0; + } + + @Override + public void endWindow() + { + multiMap.endWindow(); + windowToCount.put(windowId, countInWindow); + windowToCount.endWindow(); + store.endWindow(); + + if (windowId % 10 == 0) { + checkData(); + } + } + + @Override + public void beforeCheckpoint(long windowId) + { + store.beforeCheckpoint(windowId); + } + + @Override + public void checkpointed(long windowId) + { + } + + @Override + public void committed(long windowId) + { + this.committedWinId = windowId; + store.committed(windowId); + } + + public static SpillableArrayListMultimapImpl<String, String> createMultimap(SpillableStateStore store) + { + return new SpillableArrayListMultimapImpl<String, String>(store, ID1, 0L, new StringSerde(), + new StringSerde()); + } + + public static SpillableMapImpl<String, String> createMap(SpillableStateStore store) + { + return new SpillableMapImpl<String, String>(store, ID2, 0L, new StringSerde(), + new StringSerde()); + } + + public static SpillableMapImpl<Long, Long> createWindowToCountMap(SpillableStateStore store) + { + return new SpillableMapImpl<Long, Long>(store, ID3, 0L, new LongSerde(), + new LongSerde()); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java new file mode 100644 index 0000000..7f94079 --- /dev/null +++ b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.benchmark.spillable; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; + +public class SpillableBenchmarkAppTester extends SpillableBenchmarkApp +{ + private static final Logger logger = LoggerFactory.getLogger(SpillableBenchmarkAppTester.class); + public static final String basePath = "target/temp"; + @Test + public void test() throws Exception + { + Configuration conf = new Configuration(false); + + LocalMode lma = LocalMode.newInstance(); + DAG dag = lma.getDAG(); + + super.populateDAG(dag, conf); + + StreamingApplication app = new StreamingApplication() + { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + } + }; + + lma.prepareDAG(app, conf); + + // Create local cluster + final LocalMode.Controller lc = lma.getController(); + lc.run(60000); + + lc.shutdown(); + + if (SpillableTestOperator.errorTrace != null) { + logger.error("Error.", SpillableTestOperator.errorTrace); + Assert.assertNull(SpillableTestOperator.errorTrace.getMessage(), SpillableTestOperator.errorTrace); + } + } + + @Override + public String getStoreBasePath(Configuration conf) + { + return basePath; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java new file mode 100644 index 0000000..7e64c5f --- /dev/null +++ b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.benchmark.spillable; + +import java.util.Random; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.spillable.SpillableMapImpl; +import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils; +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.StringSerde; + +import com.datatorrent.lib.fileaccess.TFileImpl; + + +public class SpillableDSBenchmarkTest +{ + private static final Logger logger = LoggerFactory.getLogger(SpillableDSBenchmarkTest.class); + protected static final int loopCount = 100000000; + protected static final long oneMB = 1024 * 1024; + protected static final int keySize = 500000; + protected static final int valueSize = 100000; + protected static final int maxKeyLength = 100; + protected static final int maxValueLength = 1000; + + protected static final int tuplesPerWindow = 10000; + protected static final int checkPointWindows = 10; + protected static final int commitDelays = 100; + + protected final transient Random random = new Random(); + protected String[] keys; + protected String[] values; + + @Rule + public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); + + + @Before + public void setup() + { + keys = new String[keySize]; + for (int i = 0; i < keys.length; ++i) { + keys[i] = this.randomString(maxKeyLength); + } + + values = new String[valueSize]; + for (int i = 0; i < values.length; ++i) { + values[i] = this.randomString(maxValueLength); + } + } + + @Test + public void testSpillableMap() + { + byte[] ID1 = new byte[]{(byte)1}; + ManagedStateSpillableStateStore store = new ManagedStateSpillableStateStore(); + ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath("target/temp"); + + StringSerde keySerde = createKeySerde(); + Serde<String> valueSerde = createValueSerde(); + + SpillableMapImpl<String, String> map = new SpillableMapImpl<String, String>(store, ID1, 0L, keySerde, valueSerde); + store.setup(testMeta.operatorContext); + map.setup(testMeta.operatorContext); + + final long startTime = System.currentTimeMillis(); + + long windowId = 0; + store.beginWindow(++windowId); + map.beginWindow(windowId); + + int outputTimes = 0; + for (int i = 0; i < loopCount; ++i) { + putEntry(map); + + if (i % tuplesPerWindow == 0) { + map.endWindow(); + store.endWindow(); + + if (i % (tuplesPerWindow * checkPointWindows) == 0) { + store.beforeCheckpoint(windowId); + + if (windowId > commitDelays) { + store.committed(windowId - commitDelays); + } + } + + //next window + store.beginWindow(++windowId); + map.beginWindow(windowId); + } + + long spentTime = System.currentTimeMillis() - startTime; + if (spentTime > outputTimes * 5000) { + ++outputTimes; + logger.info("Total Statistics: Spent {} mills for {} operation. average/second: {}", spentTime, i, i * 1000 / spentTime); + checkEnvironment(); + } + } + long spentTime = System.currentTimeMillis() - startTime; + + logger.info("Spent {} mills for {} operation. average: {}", spentTime, loopCount, + loopCount / spentTime); + } + + + public void putEntry(SpillableMapImpl<String, String> map) + { + map.put(keys[random.nextInt(keys.length)], values[random.nextInt(values.length)]); + } + + public static final String characters = "0123456789ABCDEFGHIJKLMNOPKRSTUVWXYZabcdefghijklmopqrstuvwxyz"; + + protected static final char[] text = new char[Math.max(maxKeyLength, maxValueLength)]; + + public String randomString(int length) + { + for (int i = 0; i < length; i++) { + text[i] = characters.charAt(random.nextInt(characters.length())); + } + return new String(text, 0, length); + } + + public void checkEnvironment() + { + Runtime runtime = Runtime.getRuntime(); + + long maxMemory = runtime.maxMemory() / oneMB; + long allocatedMemory = runtime.totalMemory() / oneMB; + long freeMemory = runtime.freeMemory() / oneMB; + + logger.info("freeMemory: {}M; allocatedMemory: {}M; maxMemory: {}M", freeMemory, + allocatedMemory, maxMemory); + + Assert.assertFalse("Run out of memory.", allocatedMemory == maxMemory && freeMemory < 10); + } + + protected StringSerde createKeySerde() + { + return new StringSerde(); + } + + protected Serde<String> createValueSerde() + { + return new StringSerde(); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java new file mode 100644 index 0000000..4792843 --- /dev/null +++ b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.benchmark.state; + +import java.io.File; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.benchmark.state.StoreOperator.ExecMode; + +/** + * This is not a really unit test, but in fact a benchmark runner. + * Provides this class to give developers the convenience to run in local IDE environment. + * + */ +public class ManagedStateBenchmarkAppTest extends ManagedStateBenchmarkApp +{ + public static final String basePath = "target/temp"; + + @Before + public void before() + { + FileUtil.fullyDelete(new File(basePath)); + } + + @Test + public void testUpdateSync() throws Exception + { + test(ExecMode.UPDATESYNC); + } + + @Test + public void testUpdateAsync() throws Exception + { + test(ExecMode.UPDATEASYNC); + } + + @Test + public void testInsert() throws Exception + { + test(ExecMode.INSERT); + } + + public void test(ExecMode exeMode) throws Exception + { + Configuration conf = new Configuration(false); + + LocalMode lma = LocalMode.newInstance(); + DAG dag = lma.getDAG(); + + super.populateDAG(dag, conf); + storeOperator.execMode = exeMode; + + StreamingApplication app = new StreamingApplication() + { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + } + }; + + lma.prepareDAG(app, conf); + + // Create local cluster + final LocalMode.Controller lc = lma.getController(); + lc.run(300000); + + lc.shutdown(); + } + + + + @Override + public String getStoreBasePath(Configuration conf) + { + return basePath; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java deleted file mode 100644 index 4435aad..0000000 --- a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.benchmark.state; - -import java.io.File; - -import org.junit.Before; -import org.junit.Test; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileUtil; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.LocalMode; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.benchmark.state.StoreOperator.ExecMode; - -/** - * This is not a really unit test, but in fact a benchmark runner. - * Provides this class to give developers the convenience to run in local IDE environment. - * - */ -public class ManagedStateBenchmarkAppTester extends ManagedStateBenchmarkApp -{ - public static final String basePath = "target/temp"; - - @Before - public void before() - { - FileUtil.fullyDelete(new File(basePath)); - } - - @Test - public void testUpdateSync() throws Exception - { - test(ExecMode.UPDATESYNC); - } - - @Test - public void testUpdateAsync() throws Exception - { - test(ExecMode.UPDATEASYNC); - } - - @Test - public void testInsert() throws Exception - { - test(ExecMode.INSERT); - } - - public void test(ExecMode exeMode) throws Exception - { - Configuration conf = new Configuration(false); - - LocalMode lma = LocalMode.newInstance(); - DAG dag = lma.getDAG(); - - super.populateDAG(dag, conf); - storeOperator.execMode = exeMode; - - StreamingApplication app = new StreamingApplication() - { - @Override - public void populateDAG(DAG dag, Configuration conf) - { - } - }; - - lma.prepareDAG(app, conf); - - // Create local cluster - final LocalMode.Controller lc = lma.getController(); - lc.run(300000); - - lc.shutdown(); - } - - - - @Override - public String getStoreBasePath(Configuration conf) - { - return basePath; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/benchmark/src/test/resources/log4j.properties b/benchmark/src/test/resources/log4j.properties index cf0d19e..3fc0120 100644 --- a/benchmark/src/test/resources/log4j.properties +++ b/benchmark/src/test/resources/log4j.properties @@ -41,3 +41,5 @@ log4j.logger.org=info #log4j.logger.org.apache.commons.beanutils=warn log4j.logger.com.datatorrent=debug log4j.logger.org.apache.apex=debug +log4j.logger.org.apache.apex.malhar.lib.state.managed=info +log4j.logger.com.datatorrent.common.util.FSStorageAgent=info http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java index dd2bbab..20271b0 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java @@ -125,7 +125,7 @@ import com.datatorrent.netlet.util.Slice; */ public abstract class AbstractManagedStateImpl implements ManagedState, Component<OperatorContext>, Operator.CheckpointNotificationListener, ManagedStateContext, - TimeBucketAssigner.PurgeListener + TimeBucketAssigner.PurgeListener, BucketProvider { private long maxMemorySize; @@ -319,11 +319,24 @@ public abstract class AbstractManagedStateImpl return (int)(bucketId % numBuckets); } - Bucket getBucket(long bucketId) + @Override + public Bucket getBucket(long bucketId) { return buckets[getBucketIdx(bucketId)]; } + @Override + public Bucket ensureBucket(long bucketId) + { + Bucket b = getBucket(bucketId); + if (b == null) { + b = newBucket(bucketId); + b.setup(this); + buckets[getBucketIdx(bucketId)] = b; + } + return b; + } + protected Bucket newBucket(long bucketId) { return new Bucket.DefaultBucket(bucketId); @@ -384,6 +397,22 @@ public abstract class AbstractManagedStateImpl } } + /** + * get the memory usage for each bucket + * @return The map of bucket id to memory size used by the bucket + */ + public Map<Long, Long> getBucketMemoryUsage() + { + Map<Long, Long> bucketToSize = Maps.newHashMap(); + for (Bucket bucket : buckets) { + if (bucket == null) { + continue; + } + bucketToSize.put(bucket.getBucketId(), bucket.getKeyStream().size() + bucket.getValueStream().size()); + } + return bucketToSize; + } + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") @Override public void teardown() @@ -476,6 +505,7 @@ public abstract class AbstractManagedStateImpl this.keyComparator = Preconditions.checkNotNull(keyComparator); } + @Override public BucketsFileSystem getBucketsFileSystem() { return bucketsFileSystem; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java index 4fc2327..cbc4e03 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicLong; @@ -32,6 +33,10 @@ import javax.validation.constraints.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.utils.serde.KeyValueByteStreamProvider; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; +import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; @@ -46,7 +51,7 @@ import com.datatorrent.netlet.util.Slice; * * @since 3.4.0 */ -public interface Bucket extends ManagedStateComponent +public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvider { /** * @return bucket id @@ -218,13 +223,22 @@ public interface Bucket extends ManagedStateComponent private transient TreeMap<Long, BucketsFileSystem.TimeBucketMeta> cachedBucketMetas; + /** + * By default, separate keys and values into two different streams. + * key stream and value stream should be created during construction instead of setup, as the reference of the streams will be passed to the serialize method + */ + protected WindowedBlockStream keyStream = new WindowedBlockStream(); + protected WindowedBlockStream valueStream = new WindowedBlockStream(); + + protected ConcurrentLinkedQueue<Long> windowsForFreeMemory = new ConcurrentLinkedQueue<>(); + private DefaultBucket() { //for kryo bucketId = -1; } - protected DefaultBucket(long bucketId) + public DefaultBucket(long bucketId) { this.bucketId = bucketId; } @@ -321,6 +335,9 @@ public interface Bucket extends ManagedStateComponent @Override public Slice get(Slice key, long timeBucket, ReadSource readSource) { + // This call is lightweight + releaseMemory(); + key = SliceUtils.toBufferSlice(key); switch (readSource) { case MEMORY: return getFromMemory(key); @@ -392,6 +409,11 @@ public interface Bucket extends ManagedStateComponent @Override public void put(Slice key, long timeBucket, Slice value) { + // This call is lightweight + releaseMemory(); + key = SliceUtils.toBufferSlice(key); + value = SliceUtils.toBufferSlice(value); + BucketedValue bucketedValue = flash.get(key); if (bucketedValue == null) { bucketedValue = new BucketedValue(timeBucket, value); @@ -409,39 +431,45 @@ public interface Bucket extends ManagedStateComponent } } + /** + * Free memory up to the given windowId + * This method will be called by another thread. Adding concurrency control to Stream would impact the performance. + * This method only calculates the size of the memory that could be released and then sends free memory request to the operator thread + */ @Override public long freeMemory(long windowId) throws IOException { - long memoryFreed = 0; - Long clearWindowId; - - while ((clearWindowId = committedData.floorKey(windowId)) != null) { - Map<Slice, BucketedValue> windowData = committedData.remove(clearWindowId); + // calculate the size first and then send the release memory request. It could reduce the chance of conflict and increase the performance. + long size = keyStream.dataSizeUpToWindow(windowId) + valueStream.dataSizeUpToWindow(windowId); + windowsForFreeMemory.add(windowId); + return size; + } - for (Map.Entry<Slice, BucketedValue> entry: windowData.entrySet()) { - memoryFreed += entry.getKey().length + entry.getValue().getSize(); - } + /** + * This operation must be called from operator thread. It won't do anything if no memory to be freed + */ + protected long releaseMemory() + { + long memoryFreed = 0; + while (!windowsForFreeMemory.isEmpty()) { + long windowId = windowsForFreeMemory.poll(); + long originSize = keyStream.size() + valueStream.size(); + keyStream.completeWindow(windowId); + valueStream.completeWindow(windowId); + memoryFreed += originSize - (keyStream.size() + valueStream.size()); } - fileCache.clear(); - if (cachedBucketMetas != null) { - - for (BucketsFileSystem.TimeBucketMeta tbm : cachedBucketMetas.values()) { - FileAccess.FileReader reader = readers.remove(tbm.getTimeBucketId()); - if (reader != null) { - memoryFreed += tbm.getSizeInBytes(); - reader.close(); - } - } + if (memoryFreed > 0) { + LOG.debug("Total freed memory size: {}", memoryFreed); + sizeInBytes.getAndAdd(-memoryFreed); } - sizeInBytes.getAndAdd(-memoryFreed); - LOG.debug("space freed {} {}", bucketId, memoryFreed); return memoryFreed; } @Override public Map<Slice, BucketedValue> checkpoint(long windowId) { + releaseMemory(); try { //transferring the data from flash to check-pointed state in finally block and re-initializing the flash. return flash; @@ -548,6 +576,19 @@ public interface Bucket extends ManagedStateComponent return checkpointedData; } + + @Override + public WindowedBlockStream getKeyStream() + { + return keyStream; + } + + @Override + public WindowedBlockStream getValueStream() + { + return valueStream; + } + private static final Logger LOG = LoggerFactory.getLogger(DefaultBucket.class); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketProvider.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketProvider.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketProvider.java new file mode 100644 index 0000000..bbd18ac --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketProvider.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +/** + * This interface declares methods to get bucket by bucket id + * + */ +public interface BucketProvider +{ + /** + * get bucket by bucket id + * @param bucketId + * @return + */ + public Bucket getBucket(long bucketId); + + /** + * Create bucket if not exist, return the bucket + * @param bucketId + * @return + */ + public Bucket ensureBucket(long bucketId); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java index a59872c..d0ca9ff 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java @@ -26,9 +26,9 @@ import java.util.ListIterator; import javax.validation.constraints.NotNull; +import org.apache.apex.malhar.lib.utils.serde.CollectionSerde; +import org.apache.apex.malhar.lib.utils.serde.IntSerde; import org.apache.apex.malhar.lib.utils.serde.Serde; -import org.apache.apex.malhar.lib.utils.serde.SerdeCollectionSlice; -import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; import org.apache.hadoop.classification.InterfaceStability; import com.esotericsoftware.kryo.DefaultSerializer; @@ -37,7 +37,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.datatorrent.api.Context; -import com.datatorrent.netlet.util.Slice; /** * A Spillable implementation of {@link List} backed by a {@link SpillableStateStore}. @@ -58,11 +57,10 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableList<T>, Sp @NotNull private SpillableStateStore store; @NotNull - private Serde<T, Slice> serde; + private Serde<T> serde; @NotNull private SpillableMapImpl<Integer, List<T>> map; - private boolean sizeCached = false; private int size; private int numBatches; @@ -86,15 +84,15 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableList<T>, Sp */ public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix, @NotNull SpillableStateStore store, - @NotNull Serde<T, Slice> serde) + @NotNull Serde<T> serde) { this.bucketId = bucketId; this.prefix = Preconditions.checkNotNull(prefix); this.store = Preconditions.checkNotNull(store); this.serde = Preconditions.checkNotNull(serde); - map = new SpillableMapImpl<>(store, prefix, bucketId, new SerdeIntSlice(), - new SerdeCollectionSlice<>(serde, (Class<List<T>>)(Class)ArrayList.class)); + map = new SpillableMapImpl<>(store, prefix, bucketId, new IntSerde(), + new CollectionSerde<T, List<T>>(serde, (Class)ArrayList.class)); } /** @@ -111,7 +109,7 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableList<T>, Sp */ public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix, @NotNull SpillableStateStore store, - @NotNull Serde<T, Slice> serde, + @NotNull Serde<T> serde, int batchSize) { this(bucketId, prefix, store, serde); @@ -328,6 +326,7 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableList<T>, Sp @Override public void setup(Context.OperatorContext context) { + store.ensureBucket(bucketId); map.setup(context); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java index 0944583..d3340ce 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java @@ -26,10 +26,10 @@ import java.util.Set; import javax.annotation.Nullable; import javax.validation.constraints.NotNull; +import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager; +import org.apache.apex.malhar.lib.utils.serde.IntSerde; import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde; import org.apache.apex.malhar.lib.utils.serde.Serde; -import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; -import org.apache.apex.malhar.lib.utils.serde.SliceUtils; import org.apache.hadoop.classification.InterfaceStability; import com.esotericsoftware.kryo.DefaultSerializer; @@ -62,10 +62,11 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable @NotNull private SpillableMapImpl<Slice, Integer> map; private SpillableStateStore store; - private byte[] identifier; private long bucket; - private Serde<K, Slice> serdeKey; - private Serde<V, Slice> serdeValue; + private Serde<V> valueSerde; + + protected transient Context.OperatorContext context; + protected AffixKeyValueSerdeManager<K, V> keyValueSerdeManager; private SpillableArrayListMultimapImpl() { @@ -78,20 +79,20 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable * @param identifier The Id of this {@link SpillableArrayListMultimapImpl}. * @param bucket The Id of the bucket used to store this * {@link SpillableArrayListMultimapImpl} in the provided {@link SpillableStateStore}. - * @param serdeKey The {@link Serde} to use when serializing and deserializing keys. - * @param serdeKey The {@link Serde} to use when serializing and deserializing values. + * @param keySerde The {@link Serde} to use when serializing and deserializing keys. + * @param valueSerde The {@link Serde} to use when serializing and deserializing values. */ public SpillableArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, - Serde<K, Slice> serdeKey, - Serde<V, Slice> serdeValue) + Serde<K> keySerde, + Serde<V> valueSerde) { this.store = Preconditions.checkNotNull(store); - this.identifier = Preconditions.checkNotNull(identifier); this.bucket = bucket; - this.serdeKey = Preconditions.checkNotNull(serdeKey); - this.serdeValue = Preconditions.checkNotNull(serdeValue); + this.valueSerde = Preconditions.checkNotNull(valueSerde); + + keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(SIZE_KEY_SUFFIX, identifier, Preconditions.checkNotNull(keySerde), valueSerde); - map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdeIntSlice()); + map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new IntSerde()); } public SpillableStateStore getStore() @@ -110,15 +111,12 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable SpillableArrayListImpl<V> spillableArrayList = cache.get(key); if (spillableArrayList == null) { - Slice keySlice = serdeKey.serialize(key); - Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX)); - + Integer size = map.get(keyValueSerdeManager.serializeMetaKey(key, false)); if (size == null) { return null; } - Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice); - spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue); + spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyValueSerdeManager.serializeDataKey(key, false).toByteArray(), store, valueSerde); spillableArrayList.setSize(size); } @@ -179,8 +177,7 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable @Override public boolean containsKey(@Nullable Object key) { - return cache.contains((K)key) || map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key), - SIZE_KEY_SUFFIX)); + return cache.contains((K)key) || map.containsKey(keyValueSerdeManager.serializeMetaKey((K)key, false)); } @Override @@ -217,9 +214,9 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable SpillableArrayListImpl<V> spillableArrayList = getHelper(key); if (spillableArrayList == null) { - Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key)); - spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue); - + Slice keyPrefix = keyValueSerdeManager.serializeDataKey(key, true); + spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, valueSerde); + spillableArrayList.setup(context); cache.put(key, spillableArrayList); } @@ -272,14 +269,19 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable @Override public void setup(Context.OperatorContext context) { + this.context = context; + map.setup(context); isRunning = true; + + keyValueSerdeManager.setup(store, bucket); } @Override public void beginWindow(long windowId) { map.beginWindow(windowId); + keyValueSerdeManager.beginWindow(windowId); isInWindow = true; } @@ -292,13 +294,14 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable SpillableArrayListImpl<V> spillableArrayList = cache.get(key); spillableArrayList.endWindow(); - Integer size = map.put(SliceUtils.concatenate(serdeKey.serialize(key), SIZE_KEY_SUFFIX), - spillableArrayList.size()); + map.put(keyValueSerdeManager.serializeMetaKey(key, true), spillableArrayList.size()); } Preconditions.checkState(cache.getRemovedKeys().isEmpty()); cache.endWindow(); map.endWindow(); + + keyValueSerdeManager.resetReadBuffer(); } @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java index c4462d5..542a914 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java @@ -24,7 +24,6 @@ import org.apache.apex.malhar.lib.utils.serde.Serde; import com.datatorrent.api.Component; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.Operator; -import com.datatorrent.netlet.util.Slice; /** * This is a composite component containing spillable data structures. This should be used as @@ -43,7 +42,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableList}. * @return A {@link SpillableList}. */ - <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde); + <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T> serde); /** * This is a method for creating a {@link SpillableList}. @@ -53,7 +52,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableList}. * @return A {@link SpillableList}. */ - <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde); + <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T> serde); /** * This is a method for creating a {@link SpillableMap}. This method @@ -65,8 +64,8 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S * @param serdeValue The Serializer/Deserializer to use for the map's values. * @return A {@link SpillableMap}. */ - <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K, Slice> serdeKey, - Serde<V, Slice> serdeValue); + <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K> serdeKey, + Serde<V> serdeValue); /** * This is a method for creating a {@link SpillableMap}. @@ -79,7 +78,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S * @return A {@link SpillableMap}. */ <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, long bucket, - Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue); + Serde<K> serdeKey, Serde<V> serdeValue); /** * This is a method for creating a {@link SpillableListMultimap}. This method @@ -91,8 +90,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S * @param serdeValue The Serializer/Deserializer to use for the values in the map's lists. * @return A {@link SpillableListMultimap}. */ - <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K, - Slice> serdeKey, Serde<V, Slice> serdeValue); + <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue); /** * This is a method for creating a {@link SpillableListMultimap}. @@ -105,8 +103,8 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S * @return A {@link SpillableListMultimap}. */ <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(byte[] identifier, long bucket, - Serde<K, Slice> serdeKey, - Serde<V, Slice> serdeValue); + Serde<K> serdeKey, + Serde<V> serdeValue); /** * This is a method for creating a {@link SpillableSetMultimap}. @@ -117,8 +115,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S * @param serdeValue The Serializer/Deserializer to use for the values in the map's lists. * @return A {@link SpillableSetMultimap}. */ - <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K, - Slice> serdeKey, Serde<V, Slice> serdeValue); + <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue); /** * This is a method for creating a {@link SpillableMultiset}. This method @@ -128,7 +125,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableMultiset}. * @return A {@link SpillableMultiset}. */ - <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T, Slice> serde); + <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T> serde); /** * This is a method for creating a {@link SpillableMultiset}. @@ -138,7 +135,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableMultiset}. * @return A {@link SpillableMultiset}. */ - <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde); + <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T> serde); /** * This is a method for creating a {@link SpillableQueue}. This method @@ -148,7 +145,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableQueue}. * @return A {@link SpillableQueue}. */ - <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T, Slice> serde); + <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T> serde); /** * This is a method for creating a {@link SpillableQueue}. @@ -158,5 +155,5 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableQueue}. * @return A {@link SpillableQueue}. */ - <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T, Slice> serde); + <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T> serde); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java index aad219d..1a3f550 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java @@ -19,6 +19,7 @@ package org.apache.apex.malhar.lib.state.spillable; import java.util.List; +import java.util.Set; import javax.validation.constraints.NotNull; @@ -27,9 +28,9 @@ import org.apache.hadoop.classification.InterfaceStability; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.datatorrent.api.Context; -import com.datatorrent.netlet.util.Slice; /** * This is a factory that is used for Spillable datastructures. This component is used by nesting it inside of an @@ -50,6 +51,11 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent @NotNull private SpillableIdentifierGenerator identifierGenerator; + /** + * need to make sure all the buckets are created during setup. + */ + protected transient Set<Long> bucketIds = Sets.newHashSet(); + private SpillableComplexComponentImpl() { // for kryo @@ -66,84 +72,99 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent this.identifierGenerator = Preconditions.checkNotNull(identifierGenerator); } - public <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde) + @Override + public <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T> serde) { SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifierGenerator.next(), store, serde); componentList.add(list); return list; } - public <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde) + @Override + public <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T> serde) { identifierGenerator.register(identifier); SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifier, store, serde); + bucketIds.add(bucket); componentList.add(list); return list; } - public <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K, Slice> serdeKey, - Serde<V, Slice> serdeValue) + @Override + public <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K> serdeKey, + Serde<V> serdeValue) { SpillableMapImpl<K, V> map = new SpillableMapImpl<K, V>(store, identifierGenerator.next(), bucket, serdeKey, serdeValue); + bucketIds.add(bucket); componentList.add(map); return map; } - public <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, long bucket, Serde<K, Slice> serdeKey, - Serde<V, Slice> serdeValue) + @Override + public <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, long bucket, Serde<K> serdeKey, + Serde<V> serdeValue) { identifierGenerator.register(identifier); SpillableMapImpl<K, V> map = new SpillableMapImpl<K, V>(store, identifier, bucket, serdeKey, serdeValue); + bucketIds.add(bucket); componentList.add(map); return map; } - public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K, - Slice> serdeKey, Serde<V, Slice> serdeValue) + @Override + public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue) { SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<K, V>(store, identifierGenerator.next(), bucket, serdeKey, serdeValue); + bucketIds.add(bucket); componentList.add(map); return map; } + @Override public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(byte[] identifier, long bucket, - Serde<K, Slice> serdeKey, - Serde<V, Slice> serdeValue) + Serde<K> serdeKey, + Serde<V> serdeValue) { identifierGenerator.register(identifier); SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<K, V>(store, identifier, bucket, serdeKey, serdeValue); + bucketIds.add(bucket); componentList.add(map); return map; } - public <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K, - Slice> serdeKey, Serde<V, Slice> serdeValue) + @Override + public <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue) { SpillableSetMultimapImpl<K, V> map = new SpillableSetMultimapImpl<K, V>(store, identifierGenerator.next(), bucket, serdeKey, serdeValue); + bucketIds.add(bucket); componentList.add(map); return map; } - public <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T, Slice> serde) + @Override + public <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T> serde) { throw new UnsupportedOperationException("Unsupported Operation"); } - public <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde) + @Override + public <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T> serde) { throw new UnsupportedOperationException("Unsupported Operation"); } - public <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T, Slice> serde) + @Override + public <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T> serde) { throw new UnsupportedOperationException("Unsupported Operation"); } - public <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T, Slice> serde) + @Override + public <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T> serde) { throw new UnsupportedOperationException("Unsupported Operation"); } @@ -152,6 +173,15 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent public void setup(Context.OperatorContext context) { store.setup(context); + + //ensure buckets created. + for (long bucketId : bucketIds) { + store.ensureBucket(bucketId); + } + + //the bucket ids are only for setup. We don't need bucket ids during run time. + bucketIds.clear(); + for (SpillableComponent spillableComponent: componentList) { spillableComponent.setup(context); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java index 016aeec..5fa39d7 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java @@ -26,13 +26,13 @@ import java.util.Set; import javax.validation.constraints.NotNull; import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager; +import org.apache.apex.malhar.lib.utils.serde.BufferSlice; import org.apache.apex.malhar.lib.utils.serde.Serde; -import org.apache.apex.malhar.lib.utils.serde.SliceUtils; -import org.apache.commons.lang3.ArrayUtils; -import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.classification.InterfaceStability; import com.esotericsoftware.kryo.DefaultSerializer; +import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.serializers.FieldSerializer; import com.google.common.base.Preconditions; @@ -51,21 +51,20 @@ import com.datatorrent.netlet.util.Slice; public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spillable.SpillableComponent, Serializable { + private static final long serialVersionUID = 4552547110215784584L; private transient WindowBoundedMapCache<K, V> cache = new WindowBoundedMapCache<>(); - private transient MutableInt tempOffset = new MutableInt(); + private transient Input tmpInput = new Input(); @NotNull private SpillableStateStore store; @NotNull private byte[] identifier; private long bucket; - @NotNull - private Serde<K, Slice> serdeKey; - @NotNull - private Serde<V, Slice> serdeValue; private int size = 0; + protected AffixKeyValueSerdeManager<K, V> keyValueSerdeManager; + private SpillableMapImpl() { //for kryo @@ -77,17 +76,16 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi * @param identifier The Id of this {@link SpillableMapImpl}. * @param bucket The Id of the bucket used to store this * {@link SpillableMapImpl} in the provided {@link SpillableStateStore}. - * @param serdeKey The {@link Serde} to use when serializing and deserializing keys. - * @param serdeKey The {@link Serde} to use when serializing and deserializing values. + * @param keySerde The {@link Serde} to use when serializing and deserializing keys. + * @param keySerde The {@link Serde} to use when serializing and deserializing values. */ - public SpillableMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K, Slice> serdeKey, - Serde<V, Slice> serdeValue) + public SpillableMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K> keySerde, + Serde<V> valueSerde) { this.store = Preconditions.checkNotNull(store); this.identifier = Preconditions.checkNotNull(identifier); this.bucket = bucket; - this.serdeKey = Preconditions.checkNotNull(serdeKey); - this.serdeValue = Preconditions.checkNotNull(serdeValue); + keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(null, identifier, Preconditions.checkNotNull(keySerde), Preconditions.checkNotNull(valueSerde)); } public SpillableStateStore getStore() @@ -134,16 +132,17 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi return val; } - Slice valSlice = store.getSync(bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key))); + Slice valSlice = store.getSync(bucket, keyValueSerdeManager.serializeDataKey(key, false)); if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) { return null; } - tempOffset.setValue(0); - return serdeValue.deserialize(valSlice, tempOffset); + tmpInput.setBuffer(valSlice.buffer, valSlice.offset, valSlice.length); + return keyValueSerdeManager.deserializeValue(tmpInput); } + @Override public V put(K k, V v) { @@ -207,6 +206,8 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi @Override public void setup(Context.OperatorContext context) { + store.ensureBucket(bucket); + keyValueSerdeManager.setup(store, bucket); } @Override @@ -218,16 +219,15 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi public void endWindow() { for (K key: cache.getChangedKeys()) { - store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)), - serdeValue.serialize(cache.get(key))); + store.put(bucket, keyValueSerdeManager.serializeDataKey(key, true), + keyValueSerdeManager.serializeValue(cache.get(key))); } for (K key: cache.getRemovedKeys()) { - store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)), - new Slice(ArrayUtils.EMPTY_BYTE_ARRAY)); + store.put(this.bucket, keyValueSerdeManager.serializeDataKey(key, true), BufferSlice.EMPTY_SLICE); } - cache.endWindow(); + keyValueSerdeManager.resetReadBuffer(); } @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java index c2741b0..0dfc411 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java @@ -26,15 +26,15 @@ import java.util.NoSuchElementException; import javax.validation.constraints.NotNull; import org.apache.apex.malhar.lib.utils.serde.Serde; -import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.classification.InterfaceStability; import com.esotericsoftware.kryo.DefaultSerializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.FieldSerializer; import com.google.common.base.Preconditions; import com.datatorrent.api.Context; -import com.datatorrent.netlet.util.Slice; /** * A Spillable implementation of {@link List} backed by a {@link SpillableStateStore}. @@ -62,49 +62,30 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable T next; } - public static class SerdeListNodeSlice<T> implements Serde<ListNode<T>, Slice> + public static class ListNodeSerde<T> implements Serde<ListNode<T>> { - private Serde<T, Slice> serde; - private static Slice falseSlice = new Slice(new byte[]{0}); - private static Slice trueSlice = new Slice(new byte[]{1}); + private Serde<T> serde; - public SerdeListNodeSlice(@NotNull Serde<T, Slice> serde) + public ListNodeSerde(@NotNull Serde<T> serde) { this.serde = Preconditions.checkNotNull(serde); } @Override - public Slice serialize(ListNode<T> object) + public void serialize(ListNode<T> object, Output output) { - int size = 0; - - Slice slice1 = object.valid ? trueSlice : falseSlice; - size += 1; - Slice slice2 = serde.serialize(object.next); - size += slice2.length; - - byte[] bytes = new byte[size]; - System.arraycopy(slice1.buffer, slice1.offset, bytes, 0, slice1.length); - System.arraycopy(slice2.buffer, slice2.offset, bytes, slice1.length, slice2.length); - - return new Slice(bytes); + output.writeBoolean(object.valid); + serde.serialize(object.next, output); } @Override - public ListNode<T> deserialize(Slice slice, MutableInt offset) + public ListNode<T> deserialize(Input input) { ListNode<T> result = new ListNode<>(); - result.valid = slice.buffer[offset.intValue()] != 0; - offset.add(1); - result.next = serde.deserialize(slice, offset); + result.valid = input.readBoolean(); + result.next = serde.deserialize(input); return result; } - - @Override - public ListNode<T> deserialize(Slice object) - { - return deserialize(object, new MutableInt(0)); - } } @NotNull @@ -135,11 +116,11 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable */ public SpillableSetImpl(long bucketId, @NotNull byte[] prefix, @NotNull SpillableStateStore store, - @NotNull Serde<T, Slice> serde) + @NotNull Serde<T> serde) { this.store = Preconditions.checkNotNull(store); - map = new SpillableMapImpl<>(store, prefix, bucketId, serde, new SerdeListNodeSlice(serde)); + map = new SpillableMapImpl<>(store, prefix, bucketId, serde, new ListNodeSerde(serde)); } public void setSize(int size) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java index 98f60d2..76e47f2 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java @@ -27,11 +27,11 @@ import java.util.Set; import javax.annotation.Nullable; import javax.validation.constraints.NotNull; +import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager; +import org.apache.apex.malhar.lib.utils.serde.IntSerde; +import org.apache.apex.malhar.lib.utils.serde.PairSerde; import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde; import org.apache.apex.malhar.lib.utils.serde.Serde; -import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; -import org.apache.apex.malhar.lib.utils.serde.SerdePairSlice; -import org.apache.apex.malhar.lib.utils.serde.SliceUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceStability; @@ -65,10 +65,11 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul private SpillableStateStore store; private byte[] identifier; private long bucket; - private Serde<K, Slice> serdeKey; - private Serde<V, Slice> serdeValue; + private Serde<V> valueSerde; private transient List<SpillableSetImpl<V>> removedSets = new ArrayList<>(); + protected AffixKeyValueSerdeManager<K, V> keyValueSerdeManager; + protected transient Context.OperatorContext context; private SpillableSetMultimapImpl() { // for kryo @@ -84,16 +85,15 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul * @param serdeKey The {@link Serde} to use when serializing and deserializing values. */ public SpillableSetMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, - Serde<K, Slice> serdeKey, - Serde<V, Slice> serdeValue) + Serde<K> keySerde, + Serde<V> valueSerde) { this.store = Preconditions.checkNotNull(store); - this.identifier = Preconditions.checkNotNull(identifier); this.bucket = bucket; - this.serdeKey = Preconditions.checkNotNull(serdeKey); - this.serdeValue = Preconditions.checkNotNull(serdeValue); + this.valueSerde = Preconditions.checkNotNull(valueSerde); + keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(META_KEY_SUFFIX, identifier, Preconditions.checkNotNull(keySerde), valueSerde); - map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdePairSlice<>(new SerdeIntSlice(), serdeValue)); + map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new PairSerde<>(new IntSerde(), valueSerde)); } public SpillableStateStore getStore() @@ -112,17 +112,17 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul SpillableSetImpl<V> spillableSet = cache.get(key); if (spillableSet == null) { - Slice keySlice = serdeKey.serialize(key); - Pair<Integer, V> meta = map.get(SliceUtils.concatenate(keySlice, META_KEY_SUFFIX)); + Pair<Integer, V> meta = map.get(keyValueSerdeManager.serializeMetaKey(key, false)); if (meta == null) { return null; } - Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice); - spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, serdeValue); + Slice keyPrefix = keyValueSerdeManager.serializeDataKey(key, false); + spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, valueSerde); spillableSet.setSize(meta.getLeft()); spillableSet.setHead(meta.getRight()); + spillableSet.setup(context); } cache.put(key, spillableSet); @@ -166,7 +166,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul SpillableSetImpl<V> spillableSet = getHelper((K)key); if (spillableSet != null) { cache.remove((K)key); - Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX); + Slice keySlice = keyValueSerdeManager.serializeMetaKey((K)key, false); map.put(keySlice, new ImmutablePair<>(0, spillableSet.getHead())); spillableSet.clear(); removedSets.add(spillableSet); @@ -199,7 +199,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul if (cache.contains((K)key)) { return true; } - Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX); + Slice keySlice = keyValueSerdeManager.serializeMetaKey((K)key, false); Pair<Integer, V> meta = map.get(keySlice); return meta != null && meta.getLeft() > 0; } @@ -227,8 +227,8 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul SpillableSetImpl<V> spillableSet = getHelper(key); if (spillableSet == null) { - Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key)); - spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, serdeValue); + spillableSet = new SpillableSetImpl<V>(bucket, keyValueSerdeManager.serializeDataKey(key, true).toByteArray(), store, valueSerde); + spillableSet.setup(context); cache.put(key, spillableSet); } return spillableSet.add(value); @@ -284,13 +284,16 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul @Override public void setup(Context.OperatorContext context) { + this.context = context; map.setup(context); + keyValueSerdeManager.setup(store, bucket); } @Override public void beginWindow(long windowId) { map.beginWindow(windowId); + keyValueSerdeManager.beginWindow(windowId); } @Override @@ -301,7 +304,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul SpillableSetImpl<V> spillableSet = cache.get(key); spillableSet.endWindow(); - map.put(SliceUtils.concatenate(serdeKey.serialize(key), META_KEY_SUFFIX), + map.put(keyValueSerdeManager.serializeMetaKey(key, true), new ImmutablePair<>(spillableSet.size(), spillableSet.getHead())); } @@ -311,6 +314,8 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul cache.endWindow(); map.endWindow(); + + keyValueSerdeManager.resetReadBuffer(); } @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java index b6ee3c0..44f003b 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java @@ -19,6 +19,7 @@ package org.apache.apex.malhar.lib.state.spillable; import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.BucketProvider; import org.apache.hadoop.classification.InterfaceStability; import com.datatorrent.api.Component; @@ -32,6 +33,6 @@ import com.datatorrent.api.Operator; */ @InterfaceStability.Evolving public interface SpillableStateStore extends BucketedState, Component<Context.OperatorContext>, - Operator.CheckpointNotificationListener, WindowListener + Operator.CheckpointNotificationListener, WindowListener, BucketProvider { } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java index 0e1d55e..e80d38d 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java @@ -21,6 +21,9 @@ package org.apache.apex.malhar.lib.state.spillable; import java.util.Map; import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.classification.InterfaceStability; import com.google.common.base.Preconditions; @@ -39,6 +42,7 @@ import com.google.common.collect.Sets; @InterfaceStability.Evolving public class WindowBoundedMapCache<K, V> { + private static final transient Logger logger = LoggerFactory.getLogger(WindowBoundedMapCache.class); public static final int DEFAULT_MAX_SIZE = 50000; private int maxSize = DEFAULT_MAX_SIZE; @@ -109,7 +113,6 @@ public class WindowBoundedMapCache<K, V> Note: beginWindow is intentionally not implemented because many users need a cache that does not require beginWindow to be called. */ - public void endWindow() { int count = cache.size() - maxSize; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java index 61ab8a8..8acb044 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java @@ -23,7 +23,10 @@ import java.util.concurrent.Future; import javax.validation.constraints.NotNull; +import org.apache.apex.malhar.lib.state.managed.Bucket; import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.BufferSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; import org.apache.hadoop.classification.InterfaceStability; import com.google.common.collect.Maps; @@ -74,6 +77,8 @@ public class InMemSpillableStateStore implements SpillableStateStore bucket = Maps.newHashMap(); store.put(bucketId, bucket); } + key = SliceUtils.toBufferSlice(key); + value = SliceUtils.toBufferSlice(value); bucket.put(key, value); } @@ -88,6 +93,10 @@ public class InMemSpillableStateStore implements SpillableStateStore store.put(bucketId, bucket); } + if (key.getClass() == Slice.class) { + //The hashCode of Slice was not correct, so correct it + key = new BufferSlice(key); + } return bucket.get(key); } @@ -117,4 +126,21 @@ public class InMemSpillableStateStore implements SpillableStateStore { return store.toString(); } + + protected Bucket.DefaultBucket bucket; + + @Override + public Bucket getBucket(long bucketId) + { + return bucket; + } + + @Override + public Bucket ensureBucket(long bucketId) + { + if (bucket == null) { + bucket = new Bucket.DefaultBucket(1); + } + return bucket; + } }
