Repository: apex-malhar Updated Branches: refs/heads/master bb3dca1b4 -> 91767c589
APEXMALHAR-2339 #resolve #comment Windowed Operator benchmarking Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/7e22686b Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/7e22686b Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/7e22686b Branch: refs/heads/master Commit: 7e22686b1041ad537e633a380db4ee9d7459435f Parents: 70154f6 Author: brightchen <[email protected]> Authored: Wed Nov 16 11:25:26 2016 -0800 Committer: brightchen <[email protected]> Committed: Wed Jan 4 10:42:10 2017 -0800 ---------------------------------------------------------------------- .../benchmark/window/AbstractGenerator.java | 86 ++++++++ .../AbstractWindowedOperatorBenchmarkApp.java | 215 +++++++++++++++++++ .../KeyedWindowedOperatorBenchmarkApp.java | 138 ++++++++++++ .../window/WindowedOperatorBenchmarkApp.java | 123 +++++++++++ .../KeyedWindowedOperatorBenchmarkAppTest.java | 75 +++++++ .../WindowedOperatorBenchmarkAppTest.java | 75 +++++++ benchmark/src/test/resources/log4j.properties | 2 +- .../apex/malhar/lib/state/managed/Bucket.java | 14 +- 8 files changed, 722 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractGenerator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractGenerator.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractGenerator.java new file mode 100644 index 0000000..c5b1594 --- /dev/null +++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractGenerator.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.benchmark.window; + +import java.nio.ByteBuffer; +import java.util.Random; + +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.common.util.BaseOperator; + +public abstract class AbstractGenerator<T> extends BaseOperator implements InputOperator +{ + public final transient DefaultOutputPort<T> data = new DefaultOutputPort<T>(); + + protected int emitBatchSize = 1000; + protected byte[] val = ByteBuffer.allocate(1000).putLong(1234).array(); + protected int rate = 20000; + protected int emitCount = 0; + protected final Random random = new Random(); + protected int range = 1000 * 60; // one minute range of hot keys + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + emitCount = 0; + } + + public int getEmitBatchSize() + { + return emitBatchSize; + } + + public void setEmitBatchSize(int emitBatchSize) + { + this.emitBatchSize = emitBatchSize; + } + + public int getRate() + { + return rate; + } + + public void setRate(int rate) + { + this.rate = rate; + } + + public int getRange() + { + return range; + } + + public void setRange(int range) + { + this.range = range; + } + + @Override + public void emitTuples() + { + for (int i = 0; i < emitBatchSize && emitCount < rate; i++) { + data.emit(generateNextTuple()); + emitCount++; + } + } + + protected abstract T generateNextTuple(); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java new file mode 100644 index 0000000..09f7653 --- /dev/null +++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.benchmark.window; + +import java.io.IOException; +import java.io.Serializable; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl; +import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore; +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedTimeUnifiedStateSpillableStateStore; +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.WindowedStorage; +import org.apache.apex.malhar.lib.window.impl.AbstractWindowedOperator; +import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Stats.OperatorStats; +import com.datatorrent.api.StatsListener; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.benchmark.window.WindowedOperatorBenchmarkApp.WindowedGenerator; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.lib.stream.DevNull; + +public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O extends AbstractWindowedOperator> implements StreamingApplication +{ + protected static final String PROP_STORE_PATH = "dt.application.WindowedOperatorBenchmark.storeBasePath"; + protected static final String DEFAULT_BASE_PATH = "WindowedOperatorBenchmark/Store"; + protected static final int ALLOWED_LATENESS = 19000; + + protected int timeRange = 1000 * 60; // one minute range of hot keys + + protected Class<G> generatorClass; + protected Class<O> windowedOperatorClass; + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + TestStatsListener sl = new TestStatsListener(); + sl.adjustRate = conf.getBoolean("dt.ManagedStateBenchmark.adjustRate", false); + + G generator = createGenerator(); + dag.addOperator("Generator", generator); + //generator.setRange(timeRange); + dag.setAttribute(generator, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl)); + + O windowedOperator = createWindowedOperator(conf); + dag.addOperator("windowedOperator", windowedOperator); + dag.setAttribute(windowedOperator, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl)); + //dag.addStream("Data", generator.data, windowedOperator.input).setLocality(Locality.CONTAINER_LOCAL); + connectGeneratorToWindowedOperator(dag, generator, windowedOperator); + +// WatermarkGenerator watermarkGenerator = new WatermarkGenerator(); +// dag.addOperator("WatermarkGenerator", watermarkGenerator); +// dag.addStream("Control", watermarkGenerator.control, windowedOperator.controlInput).setLocality(Locality.CONTAINER_LOCAL); + + DevNull output = dag.addOperator("output", new DevNull()); + dag.addStream("output", windowedOperator.output, output.data).setLocality(Locality.CONTAINER_LOCAL); + } + + protected G createGenerator() + { + try { + return generatorClass.newInstance(); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + protected O createWindowedOperator(Configuration conf) + { + SpillableStateStore store = createStore(conf); + try { + O windowedOperator = this.windowedOperatorClass.newInstance(); + SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(store); + windowedOperator.addComponent("SpillableComplexComponent", sccImpl); + + windowedOperator.setDataStorage(createDataStorage(sccImpl)); + windowedOperator.setRetractionStorage(createRetractionStorage(sccImpl)); + windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage()); + windowedOperator.setAccumulation(createAccumulation()); + + windowedOperator.setAllowedLateness(Duration.millis(ALLOWED_LATENESS)); + windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.standardMinutes(1))); + //accumulating mode + windowedOperator.setTriggerOption(TriggerOption.AtWatermark().withEarlyFiringsAtEvery(Duration.standardSeconds(1)).accumulatingFiredPanes().firingOnlyUpdatedPanes()); + windowedOperator.setFixedWatermark(30000); + //windowedOperator.setTriggerOption(TriggerOption.AtWatermark()); + + return windowedOperator; + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + protected abstract WindowedStorage createDataStorage(SpillableComplexComponentImpl sccImpl); + + protected abstract WindowedStorage createRetractionStorage(SpillableComplexComponentImpl sccImpl); + + protected abstract Accumulation createAccumulation(); + + protected abstract void connectGeneratorToWindowedOperator(DAG dag, G generator, O windowedOperator); + + protected SpillableStateStore createStore(Configuration conf) + { + String basePath = getStoreBasePath(conf); + ManagedTimeUnifiedStateSpillableStateStore store = new ManagedTimeUnifiedStateSpillableStateStore(); + store.getTimeBucketAssigner().setBucketSpan(Duration.millis(10000)); + ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath); + + return store; + } + + public String getStoreBasePath(Configuration conf) + { + String basePath = conf.get(PROP_STORE_PATH); + if (basePath == null || basePath.isEmpty()) { + basePath = DEFAULT_BASE_PATH; + } + return basePath; + } + + + public static class TestStatsListener implements StatsListener, Serializable + { + private static final Logger LOG = LoggerFactory.getLogger(TestStatsListener.class); + private static final long serialVersionUID = 1L; + SetPropertyRequest cmd = new SetPropertyRequest(); + + long uwId; + long dwId; + long resumewid; + int rate; + int queueSize; + boolean adjustRate; + + @Override + public Response processStats(BatchedOperatorStats stats) + { + if (!stats.getLastWindowedStats().isEmpty()) { + OperatorStats os = stats.getLastWindowedStats().get(stats.getLastWindowedStats().size() - 1); + if (os.inputPorts != null && !os.inputPorts.isEmpty()) { + dwId = os.windowId; + queueSize = os.inputPorts.get(0).queueSize; + if (uwId - dwId < 5) { + // keep operator busy + rate = Math.max(1000, rate); + rate += rate / 10; + } else if (uwId - dwId > 20) { + // operator is behind + if (resumewid < dwId) { + resumewid = uwId - 15; + rate -= rate / 10; + } + } + } else { + //LOG.debug("uwid-dwid {} skip {} rate {}, queueSize {}", uwId - dwId, resumewid - dwId, rate, queueSize); + // upstream operator + uwId = os.windowId; + if (adjustRate) { + Response rsp = new Response(); + cmd.rate = resumewid < dwId ? rate : 0; + rsp.operatorRequests = Lists.newArrayList(cmd); + return rsp; + } + } + } + return null; + } + + public static class SetPropertyRequest implements OperatorRequest, Serializable + { + private static final long serialVersionUID = 1L; + int rate; + + @Override + public OperatorResponse execute(Operator oper, int arg1, long arg2) throws IOException + { + if (oper instanceof WindowedGenerator) { + LOG.debug("Setting rate to {}", rate); + ((WindowedGenerator)oper).rate = rate; + } + return null; + } + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java new file mode 100644 index 0000000..7b2085d --- /dev/null +++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.benchmark.window; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl; +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Tuple.TimestampedTuple; +import org.apache.apex.malhar.lib.window.WindowedStorage; +import org.apache.apex.malhar.lib.window.accumulation.Count; +import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage; +import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl; +import org.apache.apex.malhar.lib.window.impl.SpillableWindowedKeyedStorage; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.lib.util.KeyValPair; + +public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchmarkApp<KeyedWindowedOperatorBenchmarkApp.KeyedWindowedGenerator, KeyedWindowedOperatorBenchmarkApp.MyKeyedWindowedOperator> +{ + public KeyedWindowedOperatorBenchmarkApp() + { + generatorClass = KeyedWindowedGenerator.class; + windowedOperatorClass = KeyedWindowedOperatorBenchmarkApp.MyKeyedWindowedOperator.class; + } + + @SuppressWarnings("unchecked") + @Override + protected void connectGeneratorToWindowedOperator(DAG dag, KeyedWindowedGenerator generator, + KeyedWindowedOperatorBenchmarkApp.MyKeyedWindowedOperator windowedOperator) + { + dag.addStream("Data", generator.data, windowedOperator.input).setLocality(Locality.CONTAINER_LOCAL); + } + + protected static class MyKeyedWindowedOperator extends KeyedWindowedOperatorImpl + { + private static final Logger logger = LoggerFactory.getLogger(MyKeyedWindowedOperator.class); + + private long logWindows = 20; + private long windowCount = 0; + private long beginTime = System.currentTimeMillis(); + private long tupleCount = 0; + private long totalBeginTime = System.currentTimeMillis(); + private long totalCount = 0; + + private long droppedCount = 0; + @Override + public void dropTuple(Tuple input) + { + droppedCount++; + } + + @Override + public void endWindow() + { + super.endWindow(); + if (++windowCount == logWindows) { + long endTime = System.currentTimeMillis(); + tupleCount -= droppedCount; + totalCount += tupleCount; + logger.info("total: count: {}; time: {}; average: {}; period: count: {}; dropped: {}; time: {}; average: {}", + totalCount, endTime - totalBeginTime, totalCount * 1000 / (endTime - totalBeginTime), + tupleCount, droppedCount, endTime - beginTime, tupleCount * 1000 / (endTime - beginTime)); + windowCount = 0; + beginTime = System.currentTimeMillis(); + tupleCount = 0; + droppedCount = 0; + } + } + + @Override + public void processTuple(Tuple tuple) + { + super.processTuple(tuple); + ++tupleCount; + } + } + + protected static class KeyedWindowedGenerator extends AbstractGenerator<Tuple.TimestampedTuple<KeyValPair<String, Long>>> + { + @Override + protected TimestampedTuple<KeyValPair<String, Long>> generateNextTuple() + { + return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(System.currentTimeMillis() - random.nextInt(60000), + new KeyValPair<String, Long>("" + random.nextInt(100000), (long)random.nextInt(100))); + } + } + + @Override + protected Accumulation createAccumulation() + { + return new Count(); + } + + private boolean useInMemoryStorage = false; + + @Override + protected WindowedStorage createDataStorage(SpillableComplexComponentImpl sccImpl) + { + if (useInMemoryStorage) { + return new InMemoryWindowedKeyedStorage(); + } + SpillableWindowedKeyedStorage dataStorage = new SpillableWindowedKeyedStorage(); + dataStorage.setSpillableComplexComponent(sccImpl); + return dataStorage; + } + + @Override + protected WindowedStorage createRetractionStorage(SpillableComplexComponentImpl sccImpl) + { + if (useInMemoryStorage) { + return new InMemoryWindowedKeyedStorage(); + } + SpillableWindowedKeyedStorage retractionStorage = new SpillableWindowedKeyedStorage(); + retractionStorage.setSpillableComplexComponent(sccImpl); + return retractionStorage; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java new file mode 100644 index 0000000..98275ce --- /dev/null +++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.benchmark.window; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl; +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Tuple.TimestampedTuple; +import org.apache.apex.malhar.lib.window.WindowedStorage; +import org.apache.apex.malhar.lib.window.accumulation.Count; +import org.apache.apex.malhar.lib.window.impl.SpillableWindowedPlainStorage; +import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +@ApplicationAnnotation(name = "WindowedOperatorBenchmark") +public class WindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchmarkApp<WindowedOperatorBenchmarkApp.WindowedGenerator, WindowedOperatorBenchmarkApp.MyWindowedOperator> +{ + public WindowedOperatorBenchmarkApp() + { + generatorClass = WindowedGenerator.class; + windowedOperatorClass = WindowedOperatorBenchmarkApp.MyWindowedOperator.class; + } + + + protected static class WindowedGenerator extends AbstractGenerator<Tuple.TimestampedTuple<Long>> + { + @Override + protected TimestampedTuple<Long> generateNextTuple() + { + return new Tuple.TimestampedTuple<Long>(System.currentTimeMillis() - random.nextInt(120000), (long)random.nextInt(100)); + } + } + + protected static class MyWindowedOperator extends WindowedOperatorImpl + { + private static final Logger logger = LoggerFactory.getLogger(MyWindowedOperator.class); + + private long logWindows = 20; + private long windowCount = 0; + private long beginTime = System.currentTimeMillis(); + private long tupleCount = 0; + private long totalBeginTime = System.currentTimeMillis(); + private long totalCount = 0; + + @Override + public void endWindow() + { + super.endWindow(); + if (++windowCount == logWindows) { + long endTime = System.currentTimeMillis(); + totalCount += tupleCount; + logger.info("total: count: {}; time: {}; average: {}; period: count: {}; time: {}; average: {}", + totalCount, endTime - totalBeginTime, totalCount * 1000 / (endTime - totalBeginTime), + tupleCount, endTime - beginTime, tupleCount * 1000 / (endTime - beginTime)); + windowCount = 0; + beginTime = System.currentTimeMillis(); + tupleCount = 0; + } + } + + @Override + public void processTuple(Tuple tuple) + { + super.processTuple(tuple); + ++tupleCount; + } + } + + @SuppressWarnings("unchecked") + @Override + protected void connectGeneratorToWindowedOperator(DAG dag, WindowedGenerator generator, + MyWindowedOperator windowedOperator) + { + dag.addStream("Data", generator.data, windowedOperator.input).setLocality(Locality.CONTAINER_LOCAL); + } + + + @Override + protected Accumulation createAccumulation() + { + return new Count(); + } + + + @Override + protected WindowedStorage createDataStorage(SpillableComplexComponentImpl sccImpl) + { + SpillableWindowedPlainStorage plainDataStorage = new SpillableWindowedPlainStorage(); + plainDataStorage.setSpillableComplexComponent(sccImpl); + return plainDataStorage; + } + + + @Override + protected WindowedStorage createRetractionStorage(SpillableComplexComponentImpl sccImpl) + { + SpillableWindowedPlainStorage plainRetractionStorage = new SpillableWindowedPlainStorage(); + plainRetractionStorage.setSpillableComplexComponent(sccImpl); + return plainRetractionStorage; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/benchmark/src/test/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkAppTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkAppTest.java new file mode 100644 index 0000000..2bc9335 --- /dev/null +++ b/benchmark/src/test/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkAppTest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.benchmark.window; + +import java.io.File; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; + +public class KeyedWindowedOperatorBenchmarkAppTest extends KeyedWindowedOperatorBenchmarkApp +{ + public static final String basePath = "target/temp"; + + @Before + public void before() + { + FileUtil.fullyDelete(new File(basePath)); + } + + @Test + public void test() throws Exception + { + Configuration conf = new Configuration(false); + + LocalMode lma = LocalMode.newInstance(); + DAG dag = lma.getDAG(); + + super.populateDAG(dag, conf); + + StreamingApplication app = new StreamingApplication() + { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + } + }; + + lma.prepareDAG(app, conf); + + // Create local cluster + final LocalMode.Controller lc = lma.getController(); + lc.run(3000000); + + lc.shutdown(); + } + + @Override + public String getStoreBasePath(Configuration conf) + { + return basePath; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/benchmark/src/test/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkAppTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkAppTest.java new file mode 100644 index 0000000..4a16396 --- /dev/null +++ b/benchmark/src/test/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkAppTest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.benchmark.window; + +import java.io.File; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; + +public class WindowedOperatorBenchmarkAppTest extends WindowedOperatorBenchmarkApp +{ + public static final String basePath = "target/temp"; + + @Before + public void before() + { + FileUtil.fullyDelete(new File(basePath)); + } + + @Test + public void test() throws Exception + { + Configuration conf = new Configuration(false); + + LocalMode lma = LocalMode.newInstance(); + DAG dag = lma.getDAG(); + + super.populateDAG(dag, conf); + + StreamingApplication app = new StreamingApplication() + { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + } + }; + + lma.prepareDAG(app, conf); + + // Create local cluster + final LocalMode.Controller lc = lma.getController(); + lc.run(3000000); + + lc.shutdown(); + } + + @Override + public String getStoreBasePath(Configuration conf) + { + return basePath; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/benchmark/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/benchmark/src/test/resources/log4j.properties b/benchmark/src/test/resources/log4j.properties index 3fc0120..92e48b7 100644 --- a/benchmark/src/test/resources/log4j.properties +++ b/benchmark/src/test/resources/log4j.properties @@ -23,7 +23,7 @@ log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n log4j.appender.CONSOLE.threshold=${test.log.console.threshold} -test.log.console.threshold=DEBUG +test.log.console.threshold=INFO log4j.appender.RFA=org.apache.log4j.RollingFileAppender log4j.appender.RFA.layout=org.apache.log4j.PatternLayout http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java index 3c18b2f..4f2cefd 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java @@ -302,7 +302,7 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide if (timeBucket != -1) { BucketedValue bucketedValue = getValueFromTimeBucketReader(key, timeBucket); if (bucketedValue != null) { - if (timeBucket == cachedBucketMetas.firstKey()) { + if (!cachedBucketMetas.isEmpty() && timeBucket == cachedBucketMetas.firstKey()) { //if the requested time bucket is the latest time bucket on file, the key/value is put in the file cache. //Since the size of the whole time-bucket is added to total size, there is no need to add the size of //entries in file cache. @@ -493,9 +493,11 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide memoryFreed += originSize - (keyStream.size() + valueStream.size()); } - //release the free memory immediately - keyStream.releaseAllFreeMemory(); - valueStream.releaseAllFreeMemory(); + if (memoryFreed > 0) { + //release the free memory immediately + keyStream.releaseAllFreeMemory(); + valueStream.releaseAllFreeMemory(); + } return memoryFreed; } @@ -540,7 +542,9 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide //so it will be re-written by BucketsDataManager try { BucketsFileSystem.TimeBucketMeta tbm = cachedBucketMetas.get(bucketedValue.getTimeBucket()); - memoryFreed += tbm.getSizeInBytes(); + if (tbm != null) { + memoryFreed += tbm.getSizeInBytes(); + } LOG.debug("closing reader {} {}", bucketId, bucketedValue.getTimeBucket()); reader.close(); } catch (IOException e) {
