Repository: incubator-gobblin Updated Branches: refs/heads/master 5d343e318 -> 55689d284
[GOBBLIN-325] Add a Source and Extractor for stress testing * Configurable sleep time per record * Configurable compute time per record * Run duration or record count limit per extractor Closes #2177 from htran1/stress_test_extractor Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/55689d28 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/55689d28 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/55689d28 Branch: refs/heads/master Commit: 55689d284dacd6a60c019d1f52fd208ea0746537 Parents: 5d343e3 Author: Hung Tran <[email protected]> Authored: Wed Nov 29 14:15:32 2017 -0800 Committer: Hung Tran <[email protected]> Committed: Wed Nov 29 14:15:32 2017 -0800 ---------------------------------------------------------------------- .../gobblin/util/test/StressTestingSource.java | 181 +++++++++++++++++++ .../util/test/TestStressTestingSource.java | 168 +++++++++++++++++ 2 files changed, 349 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55689d28/gobblin-utility/src/main/java/org/apache/gobblin/util/test/StressTestingSource.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/test/StressTestingSource.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/test/StressTestingSource.java new file mode 100644 index 0000000..5d70219 --- /dev/null +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/test/StressTestingSource.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.util.test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Charsets; + +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.source.Source; +import org.apache.gobblin.source.extractor.DataRecordException; +import org.apache.gobblin.source.extractor.Extractor; +import org.apache.gobblin.source.workunit.Extract; +import org.apache.gobblin.source.workunit.Extract.TableType; +import org.apache.gobblin.source.workunit.WorkUnit; + +/** + * A {@link Source} to be used for stress testing + * + * This source uses an extractor that can be configured to have sleep and computation time before returning a record. + * The size of the returned record can also be configured. + */ +public class StressTestingSource implements Source<String, byte[]> { + public static final String CONFIG_NAMESPACE = "stressTest"; + public static final String NUM_WORK_UNITS_KEY = CONFIG_NAMESPACE + "." + "numWorkUnits"; + public static final int DEFAULT_NUM_WORK_UNITS = 1; + public static final String RUN_DURATION_KEY = CONFIG_NAMESPACE + "." + "runDurationSecs"; + public static final int DEFAULT_RUN_DURATION = 0; + public static final String COMPUTE_TIME_MICRO_KEY = CONFIG_NAMESPACE + "." + "computeTimeMicro"; + public static final int DEFAULT_COMPUTE_TIME_MICRO = 0; + public static final String SLEEP_TIME_MICRO_KEY = CONFIG_NAMESPACE + "." + "sleepTimeMicro"; + public static final int DEFAULT_SLEEP_TIME = 0; + public static final String NUM_RECORDS_KEY = CONFIG_NAMESPACE + "." + "numRecords"; + public static final int DEFAULT_NUM_RECORDS = 1; + public static final String MEM_ALLOC_BYTES_KEY = CONFIG_NAMESPACE + "." + "memAllocBytes"; + public static final int DEFAULT_MEM_ALLOC_BYTES = 8; + + private static final long INVALID_TIME = -1; + + @Override + public List<WorkUnit> getWorkunits(SourceState state) { + int numWorkUnits = state.getPropAsInt(NUM_WORK_UNITS_KEY, DEFAULT_NUM_WORK_UNITS); + + Extract extract = new Extract(TableType.APPEND_ONLY, + StressTestingSource.class.getPackage().getName(), + StressTestingSource.class.getSimpleName()); + + List<WorkUnit> wus = new ArrayList<>(numWorkUnits); + + for (int i = 1; i <= numWorkUnits; ++i) { + WorkUnit wu = new WorkUnit(extract); + wus.add(wu); + } + + return wus; + } + + @Override + public Extractor<String, byte[]> getExtractor(WorkUnitState state) { + return new ExtractorImpl(state); + } + + @Override + public void shutdown(SourceState state) { + // Nothing to do + } + + public static class ExtractorImpl implements Extractor<String, byte[]> { + private int recordsEmitted = 0; + private final long startTime; + private final long endTime; + private final int computeTimeNano; + private final int sleepTimeMicro; + private final int numRecords; + private final int memAllocBytes; + private final Random random; + + public ExtractorImpl(WorkUnitState state) { + this.random = new Random(); + this.startTime = System.currentTimeMillis(); + + int runDuration = state.getPropAsInt(RUN_DURATION_KEY, DEFAULT_RUN_DURATION); + + // set the end time based on the configured duration + if (runDuration > 0) { + this.endTime = this.startTime + runDuration * 1000; + } else { + this.endTime = INVALID_TIME; + } + + this.computeTimeNano = state.getPropAsInt(COMPUTE_TIME_MICRO_KEY, DEFAULT_COMPUTE_TIME_MICRO) * 1000; + this.sleepTimeMicro = state.getPropAsInt(SLEEP_TIME_MICRO_KEY, DEFAULT_SLEEP_TIME); + // num records only takes effect if the duration is not specified + this.numRecords = this.endTime == INVALID_TIME ? state.getPropAsInt(NUM_RECORDS_KEY, DEFAULT_NUM_RECORDS) : 0; + this.memAllocBytes = state.getPropAsInt(MEM_ALLOC_BYTES_KEY, DEFAULT_MEM_ALLOC_BYTES); + } + + @Override + public void close() throws IOException { + // Nothing to do + } + + @Override + public String getSchema() throws IOException { + return "string"; + } + + /** + * Read a record with configurable idle and compute time. + **/ + @Override + public byte[] readRecord(byte[] reuse) throws DataRecordException, IOException { + + // If an end time is configured then it is used as the stopping point otherwise the record count limit is used + if ((this.endTime != INVALID_TIME && System.currentTimeMillis() > this.endTime) || + (this.numRecords > 0 && this.recordsEmitted >= this.numRecords)) { + return null; + } + + // spend time computing + if (this.computeTimeNano > 0) { + final long startComputeNanoTime = System.nanoTime(); + final byte[] bytes = new byte[100]; + + while (System.nanoTime() - startComputeNanoTime < this.computeTimeNano) { + random.nextBytes(bytes); + } + } + + // sleep + if (this.sleepTimeMicro > 0) { + try { + TimeUnit.MICROSECONDS.sleep(this.sleepTimeMicro); + } catch (InterruptedException e) { + } + } + + this.recordsEmitted++; + + return newMessage(this.memAllocBytes); + } + + @Override public long getExpectedRecordCount() { + return this.numRecords; + } + + @Override public long getHighWatermark() { + return 0; + } + + /** + * Create a message of numBytes size. + * @param numBytes number of bytes to allocate for the message + */ + private byte[] newMessage(int numBytes) { + byte[] stringBytes = String.valueOf(this.recordsEmitted).getBytes(Charsets.UTF_8); + + return Arrays.copyOf(stringBytes, numBytes); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55689d28/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java new file mode 100644 index 0000000..aad8a3a --- /dev/null +++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.util.test; + +import java.io.IOException; +import java.util.List; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.source.extractor.DataRecordException; +import org.apache.gobblin.source.extractor.Extractor; +import org.apache.gobblin.source.workunit.WorkUnit; + +/** + * Unit tests for {@link StressTestingSource} + */ +public class TestStressTestingSource { + + @Test + public void testSourceExtractor() throws DataRecordException, IOException { + final int MEM_ALLOC_BYTES = 100; + final int NUM_WORK_UNITS = 10; + final int COMPUTE_TIME_MICRO = 10; + final int NUM_RECORDS = 10000; + + + SourceState state = new SourceState(); + state.setProp(StressTestingSource.NUM_WORK_UNITS_KEY, NUM_WORK_UNITS); + state.setProp(StressTestingSource.MEM_ALLOC_BYTES_KEY, MEM_ALLOC_BYTES); + state.setProp(StressTestingSource.COMPUTE_TIME_MICRO_KEY, COMPUTE_TIME_MICRO); + state.setProp(StressTestingSource.NUM_RECORDS_KEY, NUM_RECORDS); + + StressTestingSource source = new StressTestingSource(); + + List<WorkUnit> wus = source.getWorkunits(state); + Assert.assertEquals(wus.size(), NUM_WORK_UNITS); + + for (int i = 0; i < wus.size(); ++i) { + WorkUnit wu = wus.get(i); + WorkUnitState wuState = new WorkUnitState(wu, state); + Extractor<String, byte[]> extractor = source.getExtractor(wuState); + + Assert.assertEquals(extractor.getExpectedRecordCount(), NUM_RECORDS); + Assert.assertEquals(extractor.readRecord(null).length, 100); + } + } + + @Test + public void testComputeTime() throws DataRecordException, IOException { + final int MEM_ALLOC_BYTES = 100; + final int NUM_WORK_UNITS = 1; + final int COMPUTE_TIME_MICRO = 10000; + final int NUM_RECORDS = 500; + + SourceState state = new SourceState(); + state.setProp(StressTestingSource.NUM_WORK_UNITS_KEY, NUM_WORK_UNITS); + state.setProp(StressTestingSource.MEM_ALLOC_BYTES_KEY, MEM_ALLOC_BYTES); + state.setProp(StressTestingSource.COMPUTE_TIME_MICRO_KEY, COMPUTE_TIME_MICRO); + state.setProp(StressTestingSource.NUM_RECORDS_KEY, NUM_RECORDS); + + StressTestingSource source = new StressTestingSource(); + + List<WorkUnit> wus = source.getWorkunits(state); + Assert.assertEquals(wus.size(), NUM_WORK_UNITS); + + WorkUnit wu = wus.get(0); + WorkUnitState wuState = new WorkUnitState(wu, state); + Extractor<String, byte[]> extractor = source.getExtractor(wuState); + + byte[] record; + long startTimeNano = System.nanoTime(); + while ((record = extractor.readRecord(null)) != null) { + Assert.assertEquals(record.length, 100); + } + long endTimeNano = System.nanoTime(); + + long timeSpentMicro = (endTimeNano - startTimeNano)/(1000); + // check that there is less than 2 second difference between expected and actual time spent + Assert.assertTrue(Math.abs(timeSpentMicro - (COMPUTE_TIME_MICRO * NUM_RECORDS)) < (2000000)); + } + + @Test + public void testSleepTime() throws DataRecordException, IOException { + final int MEM_ALLOC_BYTES = 100; + final int NUM_WORK_UNITS = 1; + final int SLEEP_TIME_MICRO = 10000; + final int NUM_RECORDS = 500; + + SourceState state = new SourceState(); + state.setProp(StressTestingSource.NUM_WORK_UNITS_KEY, NUM_WORK_UNITS); + state.setProp(StressTestingSource.MEM_ALLOC_BYTES_KEY, MEM_ALLOC_BYTES); + state.setProp(StressTestingSource.SLEEP_TIME_MICRO_KEY, SLEEP_TIME_MICRO); + state.setProp(StressTestingSource.NUM_RECORDS_KEY, NUM_RECORDS); + + StressTestingSource source = new StressTestingSource(); + + List<WorkUnit> wus = source.getWorkunits(state); + Assert.assertEquals(wus.size(), NUM_WORK_UNITS); + + WorkUnit wu = wus.get(0); + WorkUnitState wuState = new WorkUnitState(wu, state); + Extractor<String, byte[]> extractor = source.getExtractor(wuState); + + byte[] record; + long startTimeNano = System.nanoTime(); + while ((record = extractor.readRecord(null)) != null) { + Assert.assertEquals(record.length, 100); + } + long endTimeNano = System.nanoTime(); + + long timeSpentMicro = (endTimeNano - startTimeNano)/(1000); + // check that there is less than 2 second difference between expected and actual time spent + Assert.assertTrue(Math.abs(timeSpentMicro - (SLEEP_TIME_MICRO * NUM_RECORDS)) < (2000000)); + } + + @Test + public void testRunDuration() throws DataRecordException, IOException { + final int MEM_ALLOC_BYTES = 100; + final int NUM_WORK_UNITS = 1; + final int SLEEP_TIME_MICRO = 1000; + final int NUM_RECORDS = 30; // this config is ignored since the duration is set + final int RUN_DURATION_SECS = 5; + + SourceState state = new SourceState(); + state.setProp(StressTestingSource.NUM_WORK_UNITS_KEY, NUM_WORK_UNITS); + state.setProp(StressTestingSource.MEM_ALLOC_BYTES_KEY, MEM_ALLOC_BYTES); + state.setProp(StressTestingSource.SLEEP_TIME_MICRO_KEY, SLEEP_TIME_MICRO); + state.setProp(StressTestingSource.NUM_RECORDS_KEY, NUM_RECORDS); + state.setProp(StressTestingSource.RUN_DURATION_KEY, RUN_DURATION_SECS); + + StressTestingSource source = new StressTestingSource(); + + List<WorkUnit> wus = source.getWorkunits(state); + Assert.assertEquals(wus.size(), NUM_WORK_UNITS); + + WorkUnit wu = wus.get(0); + WorkUnitState wuState = new WorkUnitState(wu, state); + Extractor<String, byte[]> extractor = source.getExtractor(wuState); + + byte[] record; + long startTimeNano = System.nanoTime(); + while ((record = extractor.readRecord(null)) != null) { + Assert.assertEquals(record.length, 100); + } + long endTimeNano = System.nanoTime(); + + long timeSpentMicro = (endTimeNano - startTimeNano)/(1000); + // check that there is less than 1 second difference between expected and actual time spent + Assert.assertTrue(Math.abs(timeSpentMicro - (RUN_DURATION_SECS * 1000000)) < (1000000)); + } +}
