Repository: apex-malhar Updated Branches: refs/heads/master f006ac6f5 -> c5a12e4e7
APEXMALHAR-2205 #resolve #comment State management benchmark Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c5a12e4e Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c5a12e4e Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c5a12e4e Branch: refs/heads/master Commit: c5a12e4e747c5be840a16e4c932cbc1dbff79894 Parents: f006ac6 Author: brightchen <[email protected]> Authored: Fri Aug 26 16:09:12 2016 -0700 Committer: brightchen <[email protected]> Committed: Thu Sep 1 15:00:29 2016 -0700 ---------------------------------------------------------------------- benchmark/pom.xml | 5 + .../state/ManagedStateBenchmarkApp.java | 215 +++++++++++++++++++ .../benchmark/state/StoreOperator.java | 127 +++++++++++ .../state/ManagedStateBenchmarkAppTester.java | 70 ++++++ 4 files changed, 417 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c5a12e4e/benchmark/pom.xml ---------------------------------------------------------------------- diff --git a/benchmark/pom.xml b/benchmark/pom.xml index f09ae81..d5451b9 100644 --- a/benchmark/pom.xml +++ b/benchmark/pom.xml @@ -595,6 +595,11 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + <version>2.9.4</version> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c5a12e4e/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java new file mode 100644 index 0000000..25e3971 --- /dev/null +++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.benchmark.state; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.Random; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.managed.ManagedStateImpl; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Stats.OperatorStats; +import com.datatorrent.api.StatsListener; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.lib.util.KeyValPair; + +@ApplicationAnnotation(name = "ManagedStateBenchmark") +public class ManagedStateBenchmarkApp implements StreamingApplication +{ + private static final Logger logger = LoggerFactory.getLogger(ManagedStateBenchmarkApp.class); + + protected static final String PROP_STORE_PATH = "dt.application.ManagedStateBenchmark.storeBasePath"; + protected static final String DEFAULT_BASE_PATH = "ManagedStateBenchmark/Store"; + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + TestStatsListener sl = new TestStatsListener(); + sl.adjustRate = conf.getBoolean("dt.ManagedStateBenchmark.adjustRate", false); + TestGenerator gen = dag.addOperator("Generator", new TestGenerator()); + dag.setAttribute(gen, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl)); + + StoreOperator storeOperator = new StoreOperator(); + storeOperator.setStore(createStore(conf)); + StoreOperator store = dag.addOperator("Store", storeOperator); + + dag.setAttribute(store, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl)); + + dag.addStream("Events", gen.data, store.input).setLocality(Locality.CONTAINER_LOCAL); + } + + public ManagedStateImpl createStore(Configuration conf) + { + String basePath = getStoreBasePath(conf); + ManagedStateImpl store = new ManagedStateImpl(); + ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath); + return store; + } + + public String getStoreBasePath(Configuration conf) + { + + String basePath = conf.get(PROP_STORE_PATH); + if (basePath == null || basePath.isEmpty()) { + basePath = DEFAULT_BASE_PATH; + } + return basePath; + } + + public static class TestGenerator extends BaseOperator implements InputOperator + { + public final transient DefaultOutputPort<KeyValPair<byte[], byte[]>> data = new DefaultOutputPort<KeyValPair<byte[], byte[]>>(); + int emitBatchSize = 1000; + byte[] val = ByteBuffer.allocate(1000).putLong(1234).array(); + int rate = 20000; + int emitCount = 0; + private final Random random = new Random(); + private int range = 1000 * 60; // one minute range of hot keys + + public int getEmitBatchSize() + { + return emitBatchSize; + } + + public void setEmitBatchSize(int emitBatchSize) + { + this.emitBatchSize = emitBatchSize; + } + + public int getRate() + { + return rate; + } + + public void setRate(int rate) + { + this.rate = rate; + } + + public int getRange() + { + return range; + } + + public void setRange(int range) + { + this.range = range; + } + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + emitCount = 0; + } + + @Override + public void emitTuples() + { + long timestamp = System.currentTimeMillis(); + for (int i = 0; i < emitBatchSize && emitCount < rate; i++) { + byte[] key = ByteBuffer.allocate(16).putLong((timestamp - timestamp % range) + random.nextInt(range)).putLong(i) + .array(); + data.emit(new KeyValPair<byte[], byte[]>(key, val)); + emitCount++; + } + } + } + + public static class TestStatsListener implements StatsListener, Serializable + { + private static final Logger LOG = LoggerFactory.getLogger(TestStatsListener.class); + private static final long serialVersionUID = 1L; + SetPropertyRequest cmd = new SetPropertyRequest(); + + long uwId; + long dwId; + long resumewid; + int rate; + int queueSize; + boolean adjustRate; + + @Override + public Response processStats(BatchedOperatorStats stats) + { + if (!stats.getLastWindowedStats().isEmpty()) { + OperatorStats os = stats.getLastWindowedStats().get(stats.getLastWindowedStats().size() - 1); + if (os.inputPorts != null && !os.inputPorts.isEmpty()) { + dwId = os.windowId; + queueSize = os.inputPorts.get(0).queueSize; + if (uwId - dwId < 5) { + // keep operator busy + rate = Math.max(1000, rate); + rate += rate / 10; + } else if (uwId - dwId > 20) { + // operator is behind + if (resumewid < dwId) { + resumewid = uwId - 15; + rate -= rate / 10; + } + } + } else { + LOG.debug("uwid-dwid {} skip {} rate {}, queueSize {}", uwId - dwId, resumewid - dwId, rate, queueSize); + // upstream operator + uwId = os.windowId; + if (adjustRate) { + Response rsp = new Response(); + cmd.rate = resumewid < dwId ? rate : 0; + rsp.operatorRequests = Lists.newArrayList(cmd); + return rsp; + } + } + } + return null; + } + + public static class SetPropertyRequest implements OperatorRequest, Serializable + { + private static final long serialVersionUID = 1L; + int rate; + + @Override + public OperatorResponse execute(Operator oper, int arg1, long arg2) throws IOException + { + if (oper instanceof TestGenerator) { + LOG.debug("Setting rate to {}", rate); + ((TestGenerator)oper).rate = rate; + } + return null; + } + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c5a12e4e/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java new file mode 100644 index 0000000..0d9c42b --- /dev/null +++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.benchmark.state; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.managed.ManagedStateImpl; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.netlet.util.Slice; + +public class StoreOperator extends BaseOperator implements Operator.CheckpointNotificationListener +{ + private static final Logger logger = LoggerFactory.getLogger(StoreOperator.class); + + protected static final int numOfWindowPerStatistics = 10; + + protected ManagedStateImpl store; + protected long bucketId = 1; + + protected long lastCheckPointWindowId = -1; + protected long currentWindowId; + protected long tupleCount = 0; + protected int windowCountPerStatistics = 0; + protected long statisticsBeginTime = 0; + + public final transient DefaultInputPort<KeyValPair<byte[], byte[]>> input = new DefaultInputPort<KeyValPair<byte[], byte[]>>() + { + @Override + public void process(KeyValPair<byte[], byte[]> tuple) + { + processTuple(tuple); + } + }; + + @Override + public void setup(OperatorContext context) + { + store.setup(context); + } + + @Override + public void beginWindow(long windowId) + { + currentWindowId = windowId; + store.beginWindow(windowId); + if (statisticsBeginTime <= 0) { + statisticsBeginTime = System.currentTimeMillis(); + } + } + + @Override + public void endWindow() + { + store.endWindow(); + if (++windowCountPerStatistics >= numOfWindowPerStatistics) { + logStatistics(); + windowCountPerStatistics = 0; + } + } + + protected void processTuple(KeyValPair<byte[], byte[]> tuple) + { + Slice key = new Slice(tuple.getKey()); + Slice value = new Slice(tuple.getValue()); + store.put(bucketId, key, value); + ++tupleCount; + } + + @Override + public void checkpointed(long windowId) + { + } + + @Override + public void committed(long windowId) + { + store.committed(windowId); + } + + @Override + public void beforeCheckpoint(long windowId) + { + store.beforeCheckpoint(windowId); + logger.info("beforeCheckpoint {}", windowId); + } + + public ManagedStateImpl getStore() + { + return store; + } + + public void setStore(ManagedStateImpl store) + { + this.store = store; + } + + protected void logStatistics() + { + long spentTime = System.currentTimeMillis() - statisticsBeginTime; + logger.info("Time Spent: {}, Processed tuples: {}, rate: {}", spentTime, tupleCount, tupleCount / spentTime); + + statisticsBeginTime = System.currentTimeMillis(); + tupleCount = 0; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c5a12e4e/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java new file mode 100644 index 0000000..ca5e245 --- /dev/null +++ b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.benchmark.state; + +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; + +/** + * This is not a really unit test, but in fact a benchmark runner. + * Provides this class to give developers the convenience to run in local IDE environment. + * + */ +public class ManagedStateBenchmarkAppTester extends ManagedStateBenchmarkApp +{ + public static final String basePath = "target/temp"; + + @Test + public void test() throws Exception + { + Configuration conf = new Configuration(false); + + LocalMode lma = LocalMode.newInstance(); + DAG dag = lma.getDAG(); + + super.populateDAG(dag, conf); + + StreamingApplication app = new StreamingApplication() + { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + } + }; + + lma.prepareDAG(app, conf); + + // Create local cluster + final LocalMode.Controller lc = lma.getController(); + lc.run(300000); + + lc.shutdown(); + } + + @Override + public String getStoreBasePath(Configuration conf) + { + return basePath; + } +}
