Denis Chudov created IGNITE-19700:
-------------------------------------
Summary: RocksDB scan time spikes up at some moments
Key: IGNITE-19700
URL: https://issues.apache.org/jira/browse/IGNITE-19700
Project: Ignite
Issue Type: Task
Reporter: Denis Chudov
Prerequisites:
RocksDBKeyValueStorage filled with 1000 random keys and values, 5000 random
tombstones, 100 values with prefix "tables", 5000 tombstones with prefix
"tables".
Load profile:
* Thread1: performs storage.invoke with the same key and value of size 500k
bytes, once per 100 ms.
* Thread2: performs storage.range for prefix "tables" once per 200 ms,
collects all entries from cursor.
* Thread3: performs storage.get with random key once per 3 ms.
Each operation performed by Thread2 mostly takes 20-50 ms, but sometimes this
time spikes up to hundreds of milliseconds (or even seconds): and this lasts
for some time (I observed up to half of a minute), after that the time returns
to normal values:
{code:java}
2023-06-09 17:09:05:971 +0300 [INFO][Thread-5][RocksDBLoadTest] time 31, size
100
2023-06-09 17:09:06:223 +0300 [INFO][Thread-5][RocksDBLoadTest] time 50, size
100
2023-06-09 17:09:06:471 +0300 [INFO][Thread-5][RocksDBLoadTest] time 47, size
100
2023-06-09 17:09:06:715 +0300 [INFO][Thread-5][RocksDBLoadTest] time 44, size
100
2023-06-09 17:09:07:483 +0300 [INFO][Thread-5][RocksDBLoadTest] time 566, size
100
2023-06-09 17:09:08:228 +0300 [INFO][Thread-5][RocksDBLoadTest] time 543, size
100
2023-06-09 17:09:09:000 +0300 [INFO][Thread-5][RocksDBLoadTest] time 571, size
100
2023-06-09 17:09:09:774 +0300 [INFO][Thread-5][RocksDBLoadTest] time 572, size
100
2023-06-09 17:09:10:570 +0300 [INFO][Thread-5][RocksDBLoadTest] time 596, size
100
2023-06-09 17:09:11:323 +0300 [INFO][Thread-5][RocksDBLoadTest] time 552, size
100
2023-06-09 17:09:12:103 +0300 [INFO][Thread-5][RocksDBLoadTest] time 579, size
100
2023-06-09 17:09:12:861 +0300 [INFO][Thread-5][RocksDBLoadTest] time 556, size
100{code}
On teamcity it was even over 6 seconds:
[https://ci.ignite.apache.org/buildConfiguration/ApacheIgnite3xGradle_Test_RunUnitTests/7283421?buildTab=log&focusLine=46540&expandAll=true&logFilter=debug&logView=flowAware]
Reproducer:
{code:java}
package org.apache.ignite.internal.metastorage;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
import java.io.ObjectStreamException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.dsl.CompoundCondition;
import org.apache.ignite.internal.metastorage.dsl.ConditionType;
import org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
import org.apache.ignite.internal.metastorage.server.AndCondition;
import org.apache.ignite.internal.metastorage.server.Condition;
import org.apache.ignite.internal.metastorage.server.ExistenceCondition;
import org.apache.ignite.internal.metastorage.server.OrCondition;
import org.apache.ignite.internal.metastorage.server.RevisionCondition;
import org.apache.ignite.internal.metastorage.server.TombstoneCondition;
import org.apache.ignite.internal.metastorage.server.ValueCondition;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(WorkDirectoryExtension.class)
public class RocksDBLoadTest {
private static final IgniteLogger LOG =
Loggers.forClass(RocksDBLoadTest.class);
private byte[] randomBytes() {
return UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
}
private byte[] randomBytes(String prefix) {
return (prefix + UUID.randomUUID()).getBytes(StandardCharsets.UTF_8);
}
/**
* Increments the last character of the given string.
*/
private static String incrementLastChar(String str) {
char lastChar = str.charAt(str.length() - 1);
return str.substring(0, str.length() - 1) + (char) (lastChar + 1);
}
@Test
public void test(@WorkDirectory Path path) throws InterruptedException {
System.out.println("start");
HybridClock clock = new HybridClockImpl();
RocksDbKeyValueStorage storage = new RocksDbKeyValueStorage("asd",
path.resolve("rocksdbtest"));
storage.start();
for (int i = 0; i < 1000; i++) {
storage.put(randomBytes(), randomBytes(), clock.now());
}
for (int i = 0; i < 5000; i++) {
storage.put(randomBytes(), TOMBSTONE, clock.now());
}
for (int i = 0; i < 100; i++) {
storage.put(randomBytes("tables"), randomBytes(), clock.now());
}
for (int i = 0; i < 5000; i++) {
storage.put(randomBytes("tables"), TOMBSTONE, clock.now());
}
ByteArray leaseKey = ByteArray.fromString("placementdriver.leases");
AtomicBoolean leasesStopped = new AtomicBoolean();
AtomicBoolean rangeStopped = new AtomicBoolean();
Thread leases = new Thread(() -> {
byte[] leaseRaw = new byte[500_000];
byte a = 0;
while (!leasesStopped.get()) {
byte[] renewedLease = new byte[500_000];
renewedLease[0] = ++a;
storage.invoke(
toCondition(or(notExists(leaseKey),
value(leaseKey).eq(leaseRaw))),
List.of(put(leaseKey, renewedLease)),
List.of(noop()),
clock.now()
);
leaseRaw = renewedLease;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
leases.start();
Thread range = new Thread(() -> {
while (!rangeStopped.get()) {
long start = System.currentTimeMillis();
Cursor<Entry> cursor =
storage.range("tables".getBytes(StandardCharsets.UTF_8),
incrementLastChar("tables").getBytes(StandardCharsets.UTF_8));
List<Object> list = new ArrayList<>();
for(Entry e : cursor) {
if (!e.tombstone()) {
list.add(e.value());
}
}
LOG.info("time " + (System.currentTimeMillis() - start) + ",
size " + list.size());
cursor.close();
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
range.start();
for (int i = 0; i < 180_000; i++) {
storage.get(randomBytes());
Thread.sleep(3);
}
leasesStopped.set(true);
rangeStopped.set(true);
leases.join();
range.join();
}
private static Condition
toCondition(org.apache.ignite.internal.metastorage.dsl.Condition condition) {
if (condition instanceof SimpleCondition.ValueCondition) {
var valueCondition = (SimpleCondition.ValueCondition) condition;
return new ValueCondition(
toValueConditionType(valueCondition.type()),
valueCondition.key(),
valueCondition.value()
);
} else if (condition instanceof SimpleCondition.RevisionCondition) {
var revisionCondition = (SimpleCondition.RevisionCondition)
condition;
return new RevisionCondition(
toRevisionConditionType(revisionCondition.type()),
revisionCondition.key(),
revisionCondition.revision()
);
} else if (condition instanceof SimpleCondition) {
var simpleCondition = (SimpleCondition) condition;
switch (simpleCondition.type()) {
case KEY_EXISTS:
return new
ExistenceCondition(ExistenceCondition.Type.EXISTS, simpleCondition.key());
case KEY_NOT_EXISTS:
return new
ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, simpleCondition.key());
case TOMBSTONE:
return new TombstoneCondition(simpleCondition.key());
default:
throw new IllegalArgumentException("Unexpected simple
condition type " + simpleCondition.type());
}
} else if (condition instanceof CompoundCondition) {
CompoundCondition compoundCondition = (CompoundCondition) condition;
Condition leftCondition =
toCondition(compoundCondition.leftCondition());
Condition rightCondition =
toCondition(compoundCondition.rightCondition());
switch (compoundCondition.type()) {
case AND:
return new AndCondition(leftCondition, rightCondition);
case OR:
return new OrCondition(leftCondition, rightCondition);
default:
throw new IllegalArgumentException("Unexpected compound
condition type " + compoundCondition.type());
}
} else {
throw new IllegalArgumentException("Unknown condition " +
condition);
}
}
private static ValueCondition.Type toValueConditionType(ConditionType type)
{
switch (type) {
case VAL_EQUAL:
return ValueCondition.Type.EQUAL;
case VAL_NOT_EQUAL:
return ValueCondition.Type.NOT_EQUAL;
case VAL_GREATER:
return ValueCondition.Type.GREATER;
case VAL_GREATER_OR_EQUAL:
return ValueCondition.Type.GREATER_OR_EQUAL;
case VAL_LESS:
return ValueCondition.Type.LESS;
case VAL_LESS_OR_EQUAL:
return ValueCondition.Type.LESS_OR_EQUAL;
default:
throw new IllegalArgumentException("Unexpected value condition
type " + type);
}
}
private static RevisionCondition.Type toRevisionConditionType(ConditionType
type) {
switch (type) {
case REV_EQUAL:
return RevisionCondition.Type.EQUAL;
case REV_NOT_EQUAL:
return RevisionCondition.Type.NOT_EQUAL;
case REV_GREATER:
return RevisionCondition.Type.GREATER;
case REV_GREATER_OR_EQUAL:
return RevisionCondition.Type.GREATER_OR_EQUAL;
case REV_LESS:
return RevisionCondition.Type.LESS;
case REV_LESS_OR_EQUAL:
return RevisionCondition.Type.LESS_OR_EQUAL;
default:
throw new IllegalArgumentException("Unexpected revision
condition type " + type);
}
}
} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)