sijie closed pull request #1713: [tools][perf] Add a `segread` perf command to
read distributedlog streams in segment splits
URL: https://github.com/apache/bookkeeper/pull/1713
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/DlogPerfCommandGroup.java
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/DlogPerfCommandGroup.java
index 75adc4b292..d38c8399b3 100644
---
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/DlogPerfCommandGroup.java
+++
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/DlogPerfCommandGroup.java
@@ -18,6 +18,7 @@
import org.apache.bookkeeper.tools.framework.CliCommandGroup;
import org.apache.bookkeeper.tools.framework.CliSpec;
import org.apache.bookkeeper.tools.perf.dlog.ReadCommand;
+import org.apache.bookkeeper.tools.perf.dlog.SegmentReadCommand;
import org.apache.bookkeeper.tools.perf.dlog.WriteCommand;
/**
@@ -34,6 +35,7 @@
.withParent(BKPerf.NAME)
.addCommand(new WriteCommand())
.addCommand(new ReadCommand())
+ .addCommand(new SegmentReadCommand())
.build();
public DlogPerfCommandGroup() {
diff --git
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReader.java
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReader.java
index 7497d7df3b..ba0ea7ba61 100644
---
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReader.java
+++
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReader.java
@@ -28,128 +28,35 @@
package org.apache.bookkeeper.tools.perf.dlog;
-import com.beust.jcommander.Parameter;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectWriter;
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
-import org.HdrHistogram.Histogram;
-import org.HdrHistogram.Recorder;
import org.apache.bookkeeper.common.net.ServiceURI;
-import org.apache.bookkeeper.tools.framework.CliFlags;
-import org.apache.bookkeeper.tools.perf.utils.PaddingDecimalFormat;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.api.namespace.Namespace;
-import org.apache.distributedlog.api.namespace.NamespaceBuilder;
/**
- * A perf writer to evaluate write performance.
+ * A perf reader to evaluate read performance.
*/
@Slf4j
-public class PerfReader implements Runnable {
-
- /**
- * Flags for the write command.
- */
- public static class Flags extends CliFlags {
-
- @Parameter(
- names = {
- "-ln", "--log-name"
- },
- description = "Log name or log name pattern if more than 1 log is
specified at `--num-logs`")
- public String logName = "test-log-%06d";
-
- @Parameter(
- names = {
- "-l", "--num-logs"
- },
- description = "Number of log streams")
- public int numLogs = 1;
-
- @Parameter(
- names = {
- "-t", "--threads"
- },
- description = "Number of threads reading")
- public int numThreads = 1;
-
- @Parameter(
- names = {
- "-mr", "--max-readahead-records"
- },
- description = "Max readhead records")
- public int maxReadAheadRecords = 1000000;
-
- @Parameter(
- names = {
- "-bs", "--readahead-batch-size"
- },
- description = "ReadAhead Batch Size, in entries"
- )
- public int readAheadBatchSize = 4;
-
- }
-
-
- // stats
- private final LongAdder recordsRead = new LongAdder();
- private final LongAdder bytesRead = new LongAdder();
-
- private final ServiceURI serviceURI;
- private final Flags flags;
- private final Recorder recorder = new Recorder(
- TimeUnit.SECONDS.toMillis(120000), 5
- );
- private final Recorder cumulativeRecorder = new Recorder(
- TimeUnit.SECONDS.toMillis(120000), 5
- );
- private final AtomicBoolean isDone = new AtomicBoolean(false);
+public class PerfReader extends PerfReaderBase {
PerfReader(ServiceURI serviceURI, Flags flags) {
- this.serviceURI = serviceURI;
- this.flags = flags;
+ super(serviceURI, flags);
}
@Override
- public void run() {
- try {
- execute();
- } catch (Exception e) {
- log.error("Encountered exception at running dlog perf writer", e);
- }
- }
-
- void execute() throws Exception {
- ObjectMapper m = new ObjectMapper();
- ObjectWriter w = m.writerWithDefaultPrettyPrinter();
- log.info("Starting dlog perf reader with config : {}",
w.writeValueAsString(flags));
-
- DistributedLogConfiguration conf = newDlogConf(flags);
- try (Namespace namespace = NamespaceBuilder.newBuilder()
- .conf(conf)
- .uri(serviceURI.getUri())
- .build()) {
- execute(namespace);
- }
- }
-
- void execute(Namespace namespace) throws Exception {
+ protected void execute(Namespace namespace) throws Exception {
List<Pair<Integer, DistributedLogManager>> managers = new
ArrayList<>(flags.numLogs);
for (int i = 0; i < flags.numLogs; i++) {
String logName = String.format(flags.logName, i);
@@ -218,73 +125,8 @@ void read(List<DistributedLogManager> logs) throws
Exception {
}
}
- void reportStats() {
- // Print report stats
- long oldTime = System.nanoTime();
- Histogram reportHistogram = null;
- while (true) {
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- break;
- }
-
- if (isDone.get()) {
- break;
- }
-
- long now = System.nanoTime();
- double elapsed = (now - oldTime) / 1e9;
- double rate = recordsRead.sumThenReset() / elapsed;
- double throughput = bytesRead.sumThenReset() / elapsed / 1024 /
1024;
-
- reportHistogram = recorder.getIntervalHistogram(reportHistogram);
-
- log.info("Throughput read : {} records/s --- {} MB/s --- Latency:
mean:"
- + " {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct:
{} - 99.99pct: {} - Max: {}",
- throughputFormat.format(rate),
throughputFormat.format(throughput),
- dec.format(reportHistogram.getMean() / 1000.0),
- dec.format(reportHistogram.getValueAtPercentile(50) /
1000.0),
- dec.format(reportHistogram.getValueAtPercentile(95) /
1000.0),
- dec.format(reportHistogram.getValueAtPercentile(99) /
1000.0),
- dec.format(reportHistogram.getValueAtPercentile(99.9) /
1000.0),
- dec.format(reportHistogram.getValueAtPercentile(99.99) /
1000.0),
- dec.format(reportHistogram.getMaxValue() / 1000.0));
-
- reportHistogram.reset();
-
- oldTime = now;
- }
-
- }
-
- private static DistributedLogConfiguration newDlogConf(Flags flags) {
- return new DistributedLogConfiguration()
- .setReadAheadBatchSize(flags.readAheadBatchSize)
- .setReadAheadMaxRecords(flags.maxReadAheadRecords)
- .setReadAheadWaitTime(200);
- }
-
-
- private static final DecimalFormat throughputFormat = new
PaddingDecimalFormat("0.0", 8);
- private static final DecimalFormat dec = new PaddingDecimalFormat("0.000",
7);
-
- private static void printAggregatedStats(Recorder recorder) {
- Histogram reportHistogram = recorder.getIntervalHistogram();
-
- log.info("Aggregated latency stats --- Latency: mean: {} ms - med: {}
- 95pct: {} - 99pct: {}"
- + " - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}",
- dec.format(reportHistogram.getMean() / 1000.0),
- dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
- dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
- dec.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
- dec.format(reportHistogram.getValueAtPercentile(99.9) /
1000.0),
- dec.format(reportHistogram.getValueAtPercentile(99.99) /
1000.0),
- dec.format(reportHistogram.getValueAtPercentile(99.999) /
1000.0),
- dec.format(reportHistogram.getMaxValue() / 1000.0));
- }
}
diff --git
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReaderBase.java
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReaderBase.java
new file mode 100644
index 0000000000..9520ef9cff
--- /dev/null
+++
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReaderBase.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.dlog;
+
+import com.beust.jcommander.Parameter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import java.text.DecimalFormat;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+import lombok.extern.slf4j.Slf4j;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.Recorder;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.perf.utils.PaddingDecimalFormat;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+
+@Slf4j
+abstract class PerfReaderBase implements Runnable {
+
+ /**
+ * Flags for the write command.
+ */
+ public static class Flags extends CliFlags {
+
+ @Parameter(
+ names = {
+ "-ln", "--log-name"
+ },
+ description = "Log name or log name pattern if more than 1 log is
specified at `--num-logs`")
+ public String logName = "test-log-%06d";
+
+ @Parameter(
+ names = {
+ "-l", "--num-logs"
+ },
+ description = "Number of log streams")
+ public int numLogs = 1;
+
+ @Parameter(
+ names = {
+ "-t", "--threads"
+ },
+ description = "Number of threads reading")
+ public int numThreads = 1;
+
+ @Parameter(
+ names = {
+ "-mr", "--max-readahead-records"
+ },
+ description = "Max readhead records")
+ public int maxReadAheadRecords = 1000000;
+
+ @Parameter(
+ names = {
+ "-ns", "--num-splits-per-segment"
+ },
+ description = "Num splits per segment")
+ public int numSplitsPerSegment = 1;
+
+ @Parameter(
+ names = {
+ "-bs", "--readahead-batch-size"
+ },
+ description = "ReadAhead Batch Size, in entries"
+ )
+ public int readAheadBatchSize = 4;
+
+ }
+
+
+ // stats
+ protected final LongAdder recordsRead = new LongAdder();
+ protected final LongAdder bytesRead = new LongAdder();
+
+ protected final ServiceURI serviceURI;
+ protected final Flags flags;
+ protected final Recorder recorder = new Recorder(
+ TimeUnit.SECONDS.toMillis(120000), 5
+ );
+ protected final Recorder cumulativeRecorder = new Recorder(
+ TimeUnit.SECONDS.toMillis(120000), 5
+ );
+ protected final AtomicBoolean isDone = new AtomicBoolean(false);
+
+ PerfReaderBase(ServiceURI serviceURI, Flags flags) {
+ this.serviceURI = serviceURI;
+ this.flags = flags;
+ }
+
+ protected void execute() throws Exception {
+ ObjectMapper m = new ObjectMapper();
+ ObjectWriter w = m.writerWithDefaultPrettyPrinter();
+ log.info("Starting dlog perf reader with config : {}",
w.writeValueAsString(flags));
+
+ DistributedLogConfiguration conf = newDlogConf(flags);
+ try (Namespace namespace = NamespaceBuilder.newBuilder()
+ .conf(conf)
+ .uri(serviceURI.getUri())
+ .build()) {
+ execute(namespace);
+ }
+ }
+
+ protected void reportStats() {
+ // Print report stats
+ long oldTime = System.nanoTime();
+
+ Histogram reportHistogram = null;
+
+ while (true) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ break;
+ }
+
+ if (isDone.get()) {
+ break;
+ }
+
+ long now = System.nanoTime();
+ double elapsed = (now - oldTime) / 1e9;
+
+ double rate = recordsRead.sumThenReset() / elapsed;
+ double throughput = bytesRead.sumThenReset() / elapsed / 1024 /
1024;
+
+ reportHistogram = recorder.getIntervalHistogram(reportHistogram);
+
+ log.info("Throughput read : {} records/s --- {} MB/s --- Latency:
mean:"
+ + " {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct:
{} - 99.99pct: {} - Max: {}",
+ THROUGHPUT_FORMAT.format(rate),
THROUGHPUT_FORMAT.format(throughput),
+ PADDING_DECIMAL_FORMAT.format(reportHistogram.getMean() /
1000.0),
+
PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(50) /
1000.0),
+
PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(95) /
1000.0),
+
PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(99) /
1000.0),
+
PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(99.9) /
1000.0),
+
PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(99.99) /
1000.0),
+
PADDING_DECIMAL_FORMAT.format(reportHistogram.getMaxValue() / 1000.0));
+
+ reportHistogram.reset();
+
+ oldTime = now;
+ }
+
+ }
+
+ protected abstract void execute(Namespace namespace) throws Exception;
+
+ @Override
+ public void run() {
+ try {
+ execute();
+ } catch (Exception e) {
+ log.error("Encountered exception at running dlog perf writer", e);
+ }
+ }
+
+ private static DistributedLogConfiguration newDlogConf(Flags flags) {
+ DistributedLogConfiguration conf = new DistributedLogConfiguration()
+ .setReadAheadBatchSize(flags.readAheadBatchSize)
+ .setReadAheadMaxRecords(flags.maxReadAheadRecords)
+ .setReadAheadWaitTime(200);
+ conf.setProperty("bkc.numChannelsPerBookie", 8);
+ return conf;
+ }
+
+ protected static final DecimalFormat THROUGHPUT_FORMAT = new
PaddingDecimalFormat("0.0", 8);
+ protected static final DecimalFormat PADDING_DECIMAL_FORMAT = new
PaddingDecimalFormat("0.000", 7);
+
+ protected static void printAggregatedStats(Recorder recorder) {
+ Histogram reportHistogram = recorder.getIntervalHistogram();
+
+ log.info("Aggregated latency stats --- Latency: mean: {} ms - med: {}
- 95pct: {} - 99pct: {}"
+ + " - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}",
+ PADDING_DECIMAL_FORMAT.format(reportHistogram.getMean() /
1000.0),
+
PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(50) /
1000.0),
+
PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(95) /
1000.0),
+
PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(99) /
1000.0),
+
PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(99.9) /
1000.0),
+
PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(99.99) /
1000.0),
+
PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(99.999) /
1000.0),
+ PADDING_DECIMAL_FORMAT.format(reportHistogram.getMaxValue() /
1000.0));
+ }
+
+}
diff --git
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfSegmentReader.java
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfSegmentReader.java
new file mode 100644
index 0000000000..1d77b8d4c9
--- /dev/null
+++
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfSegmentReader.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.dlog;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.distributedlog.Entry;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.LogSegmentMetadata;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.exceptions.EndOfLogSegmentException;
+import org.apache.distributedlog.logsegment.LogSegmentEntryReader;
+import org.apache.distributedlog.namespace.NamespaceDriver.Role;
+
+/**
+ * A perf writer to evaluate write performance.
+ */
+@Slf4j
+public class PerfSegmentReader extends PerfReaderBase {
+
+ @Data
+ static class Split {
+ final DistributedLogManager manager;
+ final LogSegmentMetadata segment;
+ final long startEntryId;
+ final long endEntryId;
+ }
+
+ PerfSegmentReader(ServiceURI serviceURI, Flags flags) {
+ super(serviceURI, flags);
+ }
+
+ @Override
+ protected void execute(Namespace namespace) throws Exception {
+ List<DistributedLogManager> managers = new ArrayList<>(flags.numLogs);
+ for (int i = 0; i < flags.numLogs; i++) {
+ String logName = String.format(flags.logName, i);
+ managers.add(namespace.openLog(logName));
+ }
+ log.info("Successfully open {} logs", managers.size());
+
+ // Get all the log segments
+ final List<Pair<DistributedLogManager, LogSegmentMetadata>> segments =
managers.stream()
+ .flatMap(manager -> {
+ try {
+ return manager.getLogSegments().stream().map(segment ->
Pair.of(manager, segment));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ })
+ .collect(Collectors.toList());
+
+ final List<Split> splits = segments.stream()
+ .flatMap(entry -> getNumSplits(entry.getLeft(),
entry.getRight()).stream())
+ .collect(Collectors.toList());
+
+ // register shutdown hook to aggregate stats
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ isDone.set(true);
+ printAggregatedStats(cumulativeRecorder);
+ }));
+
+ ExecutorService executor =
Executors.newFixedThreadPool(flags.numThreads);
+ try {
+ for (int i = 0; i < flags.numThreads; i++) {
+ final int idx = i;
+ final List<Split> splitsThisThread = splits
+ .stream()
+ .filter(split -> splits.indexOf(split) % flags.numThreads
== idx)
+ .collect(Collectors.toList());
+ executor.submit(() -> {
+ try {
+ read(splitsThisThread);
+ } catch (Exception e) {
+ log.error("Encountered error at writing records", e);
+ }
+ });
+ }
+ log.info("Started {} write threads", flags.numThreads);
+ reportStats();
+ } finally {
+ executor.shutdown();
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ executor.shutdownNow();
+ }
+ managers.forEach(manager -> manager.asyncClose());
+ }
+ }
+
+ void read(List<Split> splits) throws Exception {
+ log.info("Read thread started with : splits = {}",
+ splits.stream()
+ .map(l -> "(log = " + l.manager.getStreamName() + ", segment =
"
+ + l.segment.getLogSegmentSequenceNumber() + " [" +
l.startEntryId + ", " + l.endEntryId + "])")
+ .collect(Collectors.toList()));
+
+ splits.forEach(entry -> {
+ try {
+ readSegmentSplit(entry);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ void readSegmentSplit(Split split) throws Exception {
+ LogSegmentEntryReader reader =
result(split.manager.getNamespaceDriver().getLogSegmentEntryStore(Role.READER)
+ .openReader(split.segment, split.getStartEntryId()));
+ reader.start();
+
+ try {
+ MutableBoolean isDone = new MutableBoolean(false);
+ while (!isDone.booleanValue()) {
+ // 100 is just an indicator
+ List<Entry.Reader> entries = result(reader.readNext(100));
+ entries.forEach(entry -> {
+ LogRecordWithDLSN record;
+ try {
+ while ((record = entry.nextRecord()) != null) {
+ recordsRead.increment();
+
bytesRead.add(record.getPayloadBuf().readableBytes());
+ }
+ } catch (IOException ioe) {
+ throw new UncheckedIOException(ioe);
+ } finally {
+ entry.release();
+ }
+ if (split.getEndEntryId() >= 0 && entry.getEntryId() >=
split.getEndEntryId()) {
+ isDone.setValue(true);
+ }
+ });
+ }
+ } catch (EndOfLogSegmentException e) {
+ // we reached end of log segment
+ return;
+ } finally {
+ reader.asyncClose();
+ }
+
+ }
+
+ List<Split> getNumSplits(DistributedLogManager manager, LogSegmentMetadata
segment) {
+ if (flags.numSplitsPerSegment <= 1) {
+ // do split
+ return Lists.newArrayList(
+ new Split(
+ manager,
+ segment,
+ 0L,
+ -1L)
+ );
+ } else {
+ long lastEntryId = segment.getLastEntryId();
+ long numEntriesPerSplit = (lastEntryId + 1) / 2;
+ long nextEntryId = 0L;
+ List<Split> splitsInSegment = new
ArrayList<>(flags.numSplitsPerSegment);
+ for (int i = 0; i < flags.numSplitsPerSegment; i++) {
+ long startEntryId = nextEntryId;
+ long endEntryId;
+ if (i == flags.numSplitsPerSegment - 1) {
+ endEntryId = lastEntryId;
+ } else {
+ endEntryId = nextEntryId + numEntriesPerSplit - 1;
+ }
+ splitsInSegment.add(new Split(
+ manager,
+ segment,
+ startEntryId,
+ endEntryId
+ ));
+ nextEntryId = endEntryId + 1;
+ }
+ return splitsInSegment;
+ }
+ }
+
+}
diff --git
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfWriter.java
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfWriter.java
index e29dc6441b..946616f30b 100644
---
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfWriter.java
+++
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfWriter.java
@@ -109,6 +109,13 @@
description = "Number of records to write in total. If 0, it will
keep writing")
public long numRecords = 0;
+ @Parameter(
+ names = {
+ "-mlss", "--max-log-segment-size"
+ },
+ description = "Max log segment size")
+ public int maxLogSegmentSize = 64 * 1024 * 1024;
+
@Parameter(
names = {
"-b", "--num-bytes"
@@ -387,7 +394,8 @@ private static DistributedLogConfiguration
newDlogConf(Flags flags) {
.setOutputBufferSize(512 * 1024)
.setPeriodicFlushFrequencyMilliSeconds(2)
.setWriteLockEnabled(false)
- .setMaxLogSegmentBytes(512 * 1024 * 1024) // 512MB
+ .setMaxLogSegmentBytes(flags.maxLogSegmentSize)
+ .setLogSegmentRollingIntervalMinutes(1)
.setExplicitTruncationByApplication(true);
}
diff --git
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java
index 001cacba3d..3ec8e9cd9d 100644
---
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java
+++
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java
@@ -19,7 +19,7 @@
import org.apache.bookkeeper.tools.common.BKCommand;
import org.apache.bookkeeper.tools.common.BKFlags;
import org.apache.bookkeeper.tools.framework.CliSpec;
-import org.apache.bookkeeper.tools.perf.dlog.PerfReader.Flags;
+import org.apache.bookkeeper.tools.perf.dlog.PerfReaderBase.Flags;
import org.apache.commons.configuration.CompositeConfiguration;
/**
diff --git
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/SegmentReadCommand.java
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/SegmentReadCommand.java
new file mode 100644
index 0000000000..f75955a245
--- /dev/null
+++
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/SegmentReadCommand.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.dlog;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.tools.common.BKCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.bookkeeper.tools.perf.dlog.PerfReaderBase.Flags;
+import org.apache.commons.configuration.CompositeConfiguration;
+
+/**
+ * Command to read log records to bookkeeper segments.
+ */
+@Slf4j
+public class SegmentReadCommand extends BKCommand<Flags> {
+
+ private static final String NAME = "segread";
+ private static final String DESC = "Read log records from distributedlog
streams by breaking it down to segments";
+
+ public SegmentReadCommand() {
+ super(CliSpec.<Flags>newBuilder()
+ .withName(NAME)
+ .withDescription(DESC)
+ .withFlags(new Flags())
+ .build());
+ }
+
+ @Override
+ protected boolean apply(ServiceURI serviceURI,
+ CompositeConfiguration conf,
+ BKFlags globalFlags, Flags cmdFlags) {
+ if (serviceURI == null) {
+ log.warn("No service uri is provided. Use default
'distributedlog://localhost/distributedlog'.");
+ serviceURI =
ServiceURI.create("distributedlog://localhost/distributedlog");
+ }
+
+ PerfSegmentReader reader = new PerfSegmentReader(serviceURI, cmdFlags);
+ reader.run();
+ return true;
+ }
+
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services