[
https://issues.apache.org/jira/browse/IGNITE-19700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Denis Chudov updated IGNITE-19700:
----------------------------------
Description:
Motivation:
Prerequisites:
RocksDBKeyValueStorage filled with 1000 random keys and values, 5000 random
tombstones, 100 values with prefix "tables", 5000 tombstones with prefix
"tables".
Load profile:
* Thread1: performs storage.invoke with the same key and value of size 500k
bytes, once per 100 ms.
* Thread2: performs storage.range for prefix "tables" once per 200 ms,
collects all entries from cursor.
* Thread3: performs storage.get with random key once per 3 ms.
Each operation performed by Thread2 mostly takes 20-50 ms, but sometimes this
time spikes up to hundreds of milliseconds (or even seconds): and this lasts
for some time (I observed up to half of a minute), after that the time returns
to normal values:
{code:java}
2023-06-09 17:09:05:971 +0300 [INFO][Thread-5][RocksDBLoadTest] time 31, size
100
2023-06-09 17:09:06:223 +0300 [INFO][Thread-5][RocksDBLoadTest] time 50, size
100
2023-06-09 17:09:06:471 +0300 [INFO][Thread-5][RocksDBLoadTest] time 47, size
100
2023-06-09 17:09:06:715 +0300 [INFO][Thread-5][RocksDBLoadTest] time 44, size
100
2023-06-09 17:09:07:483 +0300 [INFO][Thread-5][RocksDBLoadTest] time 566, size
100
2023-06-09 17:09:08:228 +0300 [INFO][Thread-5][RocksDBLoadTest] time 543, size
100
2023-06-09 17:09:09:000 +0300 [INFO][Thread-5][RocksDBLoadTest] time 571, size
100
2023-06-09 17:09:09:774 +0300 [INFO][Thread-5][RocksDBLoadTest] time 572, size
100
2023-06-09 17:09:10:570 +0300 [INFO][Thread-5][RocksDBLoadTest] time 596, size
100
2023-06-09 17:09:11:323 +0300 [INFO][Thread-5][RocksDBLoadTest] time 552, size
100
2023-06-09 17:09:12:103 +0300 [INFO][Thread-5][RocksDBLoadTest] time 579, size
100
2023-06-09 17:09:12:861 +0300 [INFO][Thread-5][RocksDBLoadTest] time 556, size
100{code}
On teamcity it was even over 6 seconds:
[https://ci.ignite.apache.org/buildConfiguration/ApacheIgnite3xGradle_Test_RunUnitTests/7283421?buildTab=log&focusLine=46540&expandAll=true&logFilter=debug&logView=flowAware]
Reproducer:
{code:java}
package org.apache.ignite.internal.metastorage;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
import java.io.ObjectStreamException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.dsl.CompoundCondition;
import org.apache.ignite.internal.metastorage.dsl.ConditionType;
import org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
import org.apache.ignite.internal.metastorage.server.AndCondition;
import org.apache.ignite.internal.metastorage.server.Condition;
import org.apache.ignite.internal.metastorage.server.ExistenceCondition;
import org.apache.ignite.internal.metastorage.server.OrCondition;
import org.apache.ignite.internal.metastorage.server.RevisionCondition;
import org.apache.ignite.internal.metastorage.server.TombstoneCondition;
import org.apache.ignite.internal.metastorage.server.ValueCondition;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(WorkDirectoryExtension.class)
public class RocksDBLoadTest {
private static final IgniteLogger LOG =
Loggers.forClass(RocksDBLoadTest.class);
private byte[] randomBytes() {
return UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
}
private byte[] randomBytes(String prefix) {
return (prefix + UUID.randomUUID()).getBytes(StandardCharsets.UTF_8);
}
/**
* Increments the last character of the given string.
*/
private static String incrementLastChar(String str) {
char lastChar = str.charAt(str.length() - 1);
return str.substring(0, str.length() - 1) + (char) (lastChar + 1);
}
@Test
public void test(@WorkDirectory Path path) throws InterruptedException {
System.out.println("start");
HybridClock clock = new HybridClockImpl();
RocksDbKeyValueStorage storage = new RocksDbKeyValueStorage("asd",
path.resolve("rocksdbtest"));
storage.start();
for (int i = 0; i < 1000; i++) {
storage.put(randomBytes(), randomBytes(), clock.now());
}
for (int i = 0; i < 5000; i++) {
storage.put(randomBytes(), TOMBSTONE, clock.now());
}
for (int i = 0; i < 100; i++) {
storage.put(randomBytes("tables"), randomBytes(), clock.now());
}
for (int i = 0; i < 5000; i++) {
storage.put(randomBytes("tables"), TOMBSTONE, clock.now());
}
ByteArray leaseKey = ByteArray.fromString("placementdriver.leases");
AtomicBoolean leasesStopped = new AtomicBoolean();
AtomicBoolean rangeStopped = new AtomicBoolean();
Thread leases = new Thread(() -> {
byte[] leaseRaw = new byte[500_000];
byte a = 0;
while (!leasesStopped.get()) {
byte[] renewedLease = new byte[500_000];
renewedLease[0] = ++a;
storage.invoke(
toCondition(or(notExists(leaseKey),
value(leaseKey).eq(leaseRaw))),
List.of(put(leaseKey, renewedLease)),
List.of(noop()),
clock.now()
);
leaseRaw = renewedLease;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
leases.start();
Thread range = new Thread(() -> {
while (!rangeStopped.get()) {
long start = System.currentTimeMillis();
Cursor<Entry> cursor =
storage.range("tables".getBytes(StandardCharsets.UTF_8),
incrementLastChar("tables").getBytes(StandardCharsets.UTF_8));
List<Object> list = new ArrayList<>();
for(Entry e : cursor) {
if (!e.tombstone()) {
list.add(e.value());
}
}
LOG.info("time " + (System.currentTimeMillis() - start) + ",
size " + list.size());
cursor.close();
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
range.start();
for (int i = 0; i < 180_000; i++) {
storage.get(randomBytes());
Thread.sleep(3);
}
leasesStopped.set(true);
rangeStopped.set(true);
leases.join();
range.join();
}
private static Condition
toCondition(org.apache.ignite.internal.metastorage.dsl.Condition condition) {
if (condition instanceof SimpleCondition.ValueCondition) {
var valueCondition = (SimpleCondition.ValueCondition) condition;
return new ValueCondition(
toValueConditionType(valueCondition.type()),
valueCondition.key(),
valueCondition.value()
);
} else if (condition instanceof SimpleCondition.RevisionCondition) {
var revisionCondition = (SimpleCondition.RevisionCondition)
condition;
return new RevisionCondition(
toRevisionConditionType(revisionCondition.type()),
revisionCondition.key(),
revisionCondition.revision()
);
} else if (condition instanceof SimpleCondition) {
var simpleCondition = (SimpleCondition) condition;
switch (simpleCondition.type()) {
case KEY_EXISTS:
return new
ExistenceCondition(ExistenceCondition.Type.EXISTS, simpleCondition.key());
case KEY_NOT_EXISTS:
return new
ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, simpleCondition.key());
case TOMBSTONE:
return new TombstoneCondition(simpleCondition.key());
default:
throw new IllegalArgumentException("Unexpected simple
condition type " + simpleCondition.type());
}
} else if (condition instanceof CompoundCondition) {
CompoundCondition compoundCondition = (CompoundCondition) condition;
Condition leftCondition =
toCondition(compoundCondition.leftCondition());
Condition rightCondition =
toCondition(compoundCondition.rightCondition());
switch (compoundCondition.type()) {
case AND:
return new AndCondition(leftCondition, rightCondition);
case OR:
return new OrCondition(leftCondition, rightCondition);
default:
throw new IllegalArgumentException("Unexpected compound
condition type " + compoundCondition.type());
}
} else {
throw new IllegalArgumentException("Unknown condition " +
condition);
}
}
private static ValueCondition.Type toValueConditionType(ConditionType type)
{
switch (type) {
case VAL_EQUAL:
return ValueCondition.Type.EQUAL;
case VAL_NOT_EQUAL:
return ValueCondition.Type.NOT_EQUAL;
case VAL_GREATER:
return ValueCondition.Type.GREATER;
case VAL_GREATER_OR_EQUAL:
return ValueCondition.Type.GREATER_OR_EQUAL;
case VAL_LESS:
return ValueCondition.Type.LESS;
case VAL_LESS_OR_EQUAL:
return ValueCondition.Type.LESS_OR_EQUAL;
default:
throw new IllegalArgumentException("Unexpected value condition
type " + type);
}
}
private static RevisionCondition.Type toRevisionConditionType(ConditionType
type) {
switch (type) {
case REV_EQUAL:
return RevisionCondition.Type.EQUAL;
case REV_NOT_EQUAL:
return RevisionCondition.Type.NOT_EQUAL;
case REV_GREATER:
return RevisionCondition.Type.GREATER;
case REV_GREATER_OR_EQUAL:
return RevisionCondition.Type.GREATER_OR_EQUAL;
case REV_LESS:
return RevisionCondition.Type.LESS;
case REV_LESS_OR_EQUAL:
return RevisionCondition.Type.LESS_OR_EQUAL;
default:
throw new IllegalArgumentException("Unexpected revision
condition type " + type);
}
}
} {code}
was:
Prerequisites:
RocksDBKeyValueStorage filled with 1000 random keys and values, 5000 random
tombstones, 100 values with prefix "tables", 5000 tombstones with prefix
"tables".
Load profile:
* Thread1: performs storage.invoke with the same key and value of size 500k
bytes, once per 100 ms.
* Thread2: performs storage.range for prefix "tables" once per 200 ms,
collects all entries from cursor.
* Thread3: performs storage.get with random key once per 3 ms.
Each operation performed by Thread2 mostly takes 20-50 ms, but sometimes this
time spikes up to hundreds of milliseconds (or even seconds): and this lasts
for some time (I observed up to half of a minute), after that the time returns
to normal values:
{code:java}
2023-06-09 17:09:05:971 +0300 [INFO][Thread-5][RocksDBLoadTest] time 31, size
100
2023-06-09 17:09:06:223 +0300 [INFO][Thread-5][RocksDBLoadTest] time 50, size
100
2023-06-09 17:09:06:471 +0300 [INFO][Thread-5][RocksDBLoadTest] time 47, size
100
2023-06-09 17:09:06:715 +0300 [INFO][Thread-5][RocksDBLoadTest] time 44, size
100
2023-06-09 17:09:07:483 +0300 [INFO][Thread-5][RocksDBLoadTest] time 566, size
100
2023-06-09 17:09:08:228 +0300 [INFO][Thread-5][RocksDBLoadTest] time 543, size
100
2023-06-09 17:09:09:000 +0300 [INFO][Thread-5][RocksDBLoadTest] time 571, size
100
2023-06-09 17:09:09:774 +0300 [INFO][Thread-5][RocksDBLoadTest] time 572, size
100
2023-06-09 17:09:10:570 +0300 [INFO][Thread-5][RocksDBLoadTest] time 596, size
100
2023-06-09 17:09:11:323 +0300 [INFO][Thread-5][RocksDBLoadTest] time 552, size
100
2023-06-09 17:09:12:103 +0300 [INFO][Thread-5][RocksDBLoadTest] time 579, size
100
2023-06-09 17:09:12:861 +0300 [INFO][Thread-5][RocksDBLoadTest] time 556, size
100{code}
On teamcity it was even over 6 seconds:
[https://ci.ignite.apache.org/buildConfiguration/ApacheIgnite3xGradle_Test_RunUnitTests/7283421?buildTab=log&focusLine=46540&expandAll=true&logFilter=debug&logView=flowAware]
Reproducer:
{code:java}
package org.apache.ignite.internal.metastorage;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
import java.io.ObjectStreamException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.dsl.CompoundCondition;
import org.apache.ignite.internal.metastorage.dsl.ConditionType;
import org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
import org.apache.ignite.internal.metastorage.server.AndCondition;
import org.apache.ignite.internal.metastorage.server.Condition;
import org.apache.ignite.internal.metastorage.server.ExistenceCondition;
import org.apache.ignite.internal.metastorage.server.OrCondition;
import org.apache.ignite.internal.metastorage.server.RevisionCondition;
import org.apache.ignite.internal.metastorage.server.TombstoneCondition;
import org.apache.ignite.internal.metastorage.server.ValueCondition;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(WorkDirectoryExtension.class)
public class RocksDBLoadTest {
private static final IgniteLogger LOG =
Loggers.forClass(RocksDBLoadTest.class);
private byte[] randomBytes() {
return UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
}
private byte[] randomBytes(String prefix) {
return (prefix + UUID.randomUUID()).getBytes(StandardCharsets.UTF_8);
}
/**
* Increments the last character of the given string.
*/
private static String incrementLastChar(String str) {
char lastChar = str.charAt(str.length() - 1);
return str.substring(0, str.length() - 1) + (char) (lastChar + 1);
}
@Test
public void test(@WorkDirectory Path path) throws InterruptedException {
System.out.println("start");
HybridClock clock = new HybridClockImpl();
RocksDbKeyValueStorage storage = new RocksDbKeyValueStorage("asd",
path.resolve("rocksdbtest"));
storage.start();
for (int i = 0; i < 1000; i++) {
storage.put(randomBytes(), randomBytes(), clock.now());
}
for (int i = 0; i < 5000; i++) {
storage.put(randomBytes(), TOMBSTONE, clock.now());
}
for (int i = 0; i < 100; i++) {
storage.put(randomBytes("tables"), randomBytes(), clock.now());
}
for (int i = 0; i < 5000; i++) {
storage.put(randomBytes("tables"), TOMBSTONE, clock.now());
}
ByteArray leaseKey = ByteArray.fromString("placementdriver.leases");
AtomicBoolean leasesStopped = new AtomicBoolean();
AtomicBoolean rangeStopped = new AtomicBoolean();
Thread leases = new Thread(() -> {
byte[] leaseRaw = new byte[500_000];
byte a = 0;
while (!leasesStopped.get()) {
byte[] renewedLease = new byte[500_000];
renewedLease[0] = ++a;
storage.invoke(
toCondition(or(notExists(leaseKey),
value(leaseKey).eq(leaseRaw))),
List.of(put(leaseKey, renewedLease)),
List.of(noop()),
clock.now()
);
leaseRaw = renewedLease;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
leases.start();
Thread range = new Thread(() -> {
while (!rangeStopped.get()) {
long start = System.currentTimeMillis();
Cursor<Entry> cursor =
storage.range("tables".getBytes(StandardCharsets.UTF_8),
incrementLastChar("tables").getBytes(StandardCharsets.UTF_8));
List<Object> list = new ArrayList<>();
for(Entry e : cursor) {
if (!e.tombstone()) {
list.add(e.value());
}
}
LOG.info("time " + (System.currentTimeMillis() - start) + ",
size " + list.size());
cursor.close();
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
range.start();
for (int i = 0; i < 180_000; i++) {
storage.get(randomBytes());
Thread.sleep(3);
}
leasesStopped.set(true);
rangeStopped.set(true);
leases.join();
range.join();
}
private static Condition
toCondition(org.apache.ignite.internal.metastorage.dsl.Condition condition) {
if (condition instanceof SimpleCondition.ValueCondition) {
var valueCondition = (SimpleCondition.ValueCondition) condition;
return new ValueCondition(
toValueConditionType(valueCondition.type()),
valueCondition.key(),
valueCondition.value()
);
} else if (condition instanceof SimpleCondition.RevisionCondition) {
var revisionCondition = (SimpleCondition.RevisionCondition)
condition;
return new RevisionCondition(
toRevisionConditionType(revisionCondition.type()),
revisionCondition.key(),
revisionCondition.revision()
);
} else if (condition instanceof SimpleCondition) {
var simpleCondition = (SimpleCondition) condition;
switch (simpleCondition.type()) {
case KEY_EXISTS:
return new
ExistenceCondition(ExistenceCondition.Type.EXISTS, simpleCondition.key());
case KEY_NOT_EXISTS:
return new
ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, simpleCondition.key());
case TOMBSTONE:
return new TombstoneCondition(simpleCondition.key());
default:
throw new IllegalArgumentException("Unexpected simple
condition type " + simpleCondition.type());
}
} else if (condition instanceof CompoundCondition) {
CompoundCondition compoundCondition = (CompoundCondition) condition;
Condition leftCondition =
toCondition(compoundCondition.leftCondition());
Condition rightCondition =
toCondition(compoundCondition.rightCondition());
switch (compoundCondition.type()) {
case AND:
return new AndCondition(leftCondition, rightCondition);
case OR:
return new OrCondition(leftCondition, rightCondition);
default:
throw new IllegalArgumentException("Unexpected compound
condition type " + compoundCondition.type());
}
} else {
throw new IllegalArgumentException("Unknown condition " +
condition);
}
}
private static ValueCondition.Type toValueConditionType(ConditionType type)
{
switch (type) {
case VAL_EQUAL:
return ValueCondition.Type.EQUAL;
case VAL_NOT_EQUAL:
return ValueCondition.Type.NOT_EQUAL;
case VAL_GREATER:
return ValueCondition.Type.GREATER;
case VAL_GREATER_OR_EQUAL:
return ValueCondition.Type.GREATER_OR_EQUAL;
case VAL_LESS:
return ValueCondition.Type.LESS;
case VAL_LESS_OR_EQUAL:
return ValueCondition.Type.LESS_OR_EQUAL;
default:
throw new IllegalArgumentException("Unexpected value condition
type " + type);
}
}
private static RevisionCondition.Type toRevisionConditionType(ConditionType
type) {
switch (type) {
case REV_EQUAL:
return RevisionCondition.Type.EQUAL;
case REV_NOT_EQUAL:
return RevisionCondition.Type.NOT_EQUAL;
case REV_GREATER:
return RevisionCondition.Type.GREATER;
case REV_GREATER_OR_EQUAL:
return RevisionCondition.Type.GREATER_OR_EQUAL;
case REV_LESS:
return RevisionCondition.Type.LESS;
case REV_LESS_OR_EQUAL:
return RevisionCondition.Type.LESS_OR_EQUAL;
default:
throw new IllegalArgumentException("Unexpected revision
condition type " + type);
}
}
} {code}
> RocksDB scan time spikes up at some moments
> -------------------------------------------
>
> Key: IGNITE-19700
> URL: https://issues.apache.org/jira/browse/IGNITE-19700
> Project: Ignite
> Issue Type: Task
> Reporter: Denis Chudov
> Priority: Major
> Labels: ignite-3
>
> Motivation:
>
> Prerequisites:
> RocksDBKeyValueStorage filled with 1000 random keys and values, 5000 random
> tombstones, 100 values with prefix "tables", 5000 tombstones with prefix
> "tables".
> Load profile:
> * Thread1: performs storage.invoke with the same key and value of size 500k
> bytes, once per 100 ms.
> * Thread2: performs storage.range for prefix "tables" once per 200 ms,
> collects all entries from cursor.
> * Thread3: performs storage.get with random key once per 3 ms.
> Each operation performed by Thread2 mostly takes 20-50 ms, but sometimes this
> time spikes up to hundreds of milliseconds (or even seconds): and this lasts
> for some time (I observed up to half of a minute), after that the time
> returns to normal values:
> {code:java}
> 2023-06-09 17:09:05:971 +0300 [INFO][Thread-5][RocksDBLoadTest] time 31, size
> 100
> 2023-06-09 17:09:06:223 +0300 [INFO][Thread-5][RocksDBLoadTest] time 50, size
> 100
> 2023-06-09 17:09:06:471 +0300 [INFO][Thread-5][RocksDBLoadTest] time 47, size
> 100
> 2023-06-09 17:09:06:715 +0300 [INFO][Thread-5][RocksDBLoadTest] time 44, size
> 100
> 2023-06-09 17:09:07:483 +0300 [INFO][Thread-5][RocksDBLoadTest] time 566,
> size 100
> 2023-06-09 17:09:08:228 +0300 [INFO][Thread-5][RocksDBLoadTest] time 543,
> size 100
> 2023-06-09 17:09:09:000 +0300 [INFO][Thread-5][RocksDBLoadTest] time 571,
> size 100
> 2023-06-09 17:09:09:774 +0300 [INFO][Thread-5][RocksDBLoadTest] time 572,
> size 100
> 2023-06-09 17:09:10:570 +0300 [INFO][Thread-5][RocksDBLoadTest] time 596,
> size 100
> 2023-06-09 17:09:11:323 +0300 [INFO][Thread-5][RocksDBLoadTest] time 552,
> size 100
> 2023-06-09 17:09:12:103 +0300 [INFO][Thread-5][RocksDBLoadTest] time 579,
> size 100
> 2023-06-09 17:09:12:861 +0300 [INFO][Thread-5][RocksDBLoadTest] time 556,
> size 100{code}
> On teamcity it was even over 6 seconds:
> [https://ci.ignite.apache.org/buildConfiguration/ApacheIgnite3xGradle_Test_RunUnitTests/7283421?buildTab=log&focusLine=46540&expandAll=true&logFilter=debug&logView=flowAware]
>
> Reproducer:
> {code:java}
> package org.apache.ignite.internal.metastorage;
> import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
> import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
> import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
> import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
> import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
> import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
> import java.io.ObjectStreamException;
> import java.nio.charset.StandardCharsets;
> import java.nio.file.Path;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.UUID;
> import java.util.concurrent.atomic.AtomicBoolean;
> import java.util.logging.Logger;
> import org.apache.ignite.internal.hlc.HybridClock;
> import org.apache.ignite.internal.hlc.HybridClockImpl;
> import org.apache.ignite.internal.logger.IgniteLogger;
> import org.apache.ignite.internal.logger.Loggers;
> import org.apache.ignite.internal.metastorage.dsl.CompoundCondition;
> import org.apache.ignite.internal.metastorage.dsl.ConditionType;
> import org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
> import org.apache.ignite.internal.metastorage.server.AndCondition;
> import org.apache.ignite.internal.metastorage.server.Condition;
> import org.apache.ignite.internal.metastorage.server.ExistenceCondition;
> import org.apache.ignite.internal.metastorage.server.OrCondition;
> import org.apache.ignite.internal.metastorage.server.RevisionCondition;
> import org.apache.ignite.internal.metastorage.server.TombstoneCondition;
> import org.apache.ignite.internal.metastorage.server.ValueCondition;
> import
> org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
> import org.apache.ignite.internal.testframework.WorkDirectory;
> import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
> import org.apache.ignite.internal.util.Cursor;
> import org.apache.ignite.lang.ByteArray;
> import org.junit.jupiter.api.Test;
> import org.junit.jupiter.api.extension.ExtendWith;
> @ExtendWith(WorkDirectoryExtension.class)
> public class RocksDBLoadTest {
> private static final IgniteLogger LOG =
> Loggers.forClass(RocksDBLoadTest.class);
> private byte[] randomBytes() {
> return UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
> }
> private byte[] randomBytes(String prefix) {
> return (prefix + UUID.randomUUID()).getBytes(StandardCharsets.UTF_8);
> }
> /**
> * Increments the last character of the given string.
> */
> private static String incrementLastChar(String str) {
> char lastChar = str.charAt(str.length() - 1);
> return str.substring(0, str.length() - 1) + (char) (lastChar + 1);
> }
> @Test
> public void test(@WorkDirectory Path path) throws InterruptedException {
> System.out.println("start");
> HybridClock clock = new HybridClockImpl();
> RocksDbKeyValueStorage storage = new RocksDbKeyValueStorage("asd",
> path.resolve("rocksdbtest"));
> storage.start();
> for (int i = 0; i < 1000; i++) {
> storage.put(randomBytes(), randomBytes(), clock.now());
> }
> for (int i = 0; i < 5000; i++) {
> storage.put(randomBytes(), TOMBSTONE, clock.now());
> }
> for (int i = 0; i < 100; i++) {
> storage.put(randomBytes("tables"), randomBytes(), clock.now());
> }
> for (int i = 0; i < 5000; i++) {
> storage.put(randomBytes("tables"), TOMBSTONE, clock.now());
> }
> ByteArray leaseKey = ByteArray.fromString("placementdriver.leases");
> AtomicBoolean leasesStopped = new AtomicBoolean();
> AtomicBoolean rangeStopped = new AtomicBoolean();
> Thread leases = new Thread(() -> {
> byte[] leaseRaw = new byte[500_000];
> byte a = 0;
> while (!leasesStopped.get()) {
> byte[] renewedLease = new byte[500_000];
> renewedLease[0] = ++a;
> storage.invoke(
> toCondition(or(notExists(leaseKey),
> value(leaseKey).eq(leaseRaw))),
> List.of(put(leaseKey, renewedLease)),
> List.of(noop()),
> clock.now()
> );
> leaseRaw = renewedLease;
> try {
> Thread.sleep(100);
> } catch (InterruptedException e) {
> throw new RuntimeException(e);
> }
> }
> });
> leases.start();
> Thread range = new Thread(() -> {
> while (!rangeStopped.get()) {
> long start = System.currentTimeMillis();
> Cursor<Entry> cursor =
>
> storage.range("tables".getBytes(StandardCharsets.UTF_8),
> incrementLastChar("tables").getBytes(StandardCharsets.UTF_8));
> List<Object> list = new ArrayList<>();
> for(Entry e : cursor) {
> if (!e.tombstone()) {
> list.add(e.value());
> }
> }
> LOG.info("time " + (System.currentTimeMillis() - start) + ",
> size " + list.size());
> cursor.close();
> try {
> Thread.sleep(200);
> } catch (InterruptedException e) {
> throw new RuntimeException(e);
> }
> }
> });
> range.start();
> for (int i = 0; i < 180_000; i++) {
> storage.get(randomBytes());
> Thread.sleep(3);
> }
> leasesStopped.set(true);
> rangeStopped.set(true);
> leases.join();
> range.join();
> }
> private static Condition
> toCondition(org.apache.ignite.internal.metastorage.dsl.Condition condition) {
> if (condition instanceof SimpleCondition.ValueCondition) {
> var valueCondition = (SimpleCondition.ValueCondition) condition;
> return new ValueCondition(
> toValueConditionType(valueCondition.type()),
> valueCondition.key(),
> valueCondition.value()
> );
> } else if (condition instanceof SimpleCondition.RevisionCondition) {
> var revisionCondition = (SimpleCondition.RevisionCondition)
> condition;
> return new RevisionCondition(
> toRevisionConditionType(revisionCondition.type()),
> revisionCondition.key(),
> revisionCondition.revision()
> );
> } else if (condition instanceof SimpleCondition) {
> var simpleCondition = (SimpleCondition) condition;
> switch (simpleCondition.type()) {
> case KEY_EXISTS:
> return new
> ExistenceCondition(ExistenceCondition.Type.EXISTS, simpleCondition.key());
> case KEY_NOT_EXISTS:
> return new
> ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, simpleCondition.key());
> case TOMBSTONE:
> return new TombstoneCondition(simpleCondition.key());
> default:
> throw new IllegalArgumentException("Unexpected simple
> condition type " + simpleCondition.type());
> }
> } else if (condition instanceof CompoundCondition) {
> CompoundCondition compoundCondition = (CompoundCondition)
> condition;
> Condition leftCondition =
> toCondition(compoundCondition.leftCondition());
> Condition rightCondition =
> toCondition(compoundCondition.rightCondition());
> switch (compoundCondition.type()) {
> case AND:
> return new AndCondition(leftCondition, rightCondition);
> case OR:
> return new OrCondition(leftCondition, rightCondition);
> default:
> throw new IllegalArgumentException("Unexpected compound
> condition type " + compoundCondition.type());
> }
> } else {
> throw new IllegalArgumentException("Unknown condition " +
> condition);
> }
> }
> private static ValueCondition.Type toValueConditionType(ConditionType
> type) {
> switch (type) {
> case VAL_EQUAL:
> return ValueCondition.Type.EQUAL;
> case VAL_NOT_EQUAL:
> return ValueCondition.Type.NOT_EQUAL;
> case VAL_GREATER:
> return ValueCondition.Type.GREATER;
> case VAL_GREATER_OR_EQUAL:
> return ValueCondition.Type.GREATER_OR_EQUAL;
> case VAL_LESS:
> return ValueCondition.Type.LESS;
> case VAL_LESS_OR_EQUAL:
> return ValueCondition.Type.LESS_OR_EQUAL;
> default:
> throw new IllegalArgumentException("Unexpected value
> condition type " + type);
> }
> }
> private static RevisionCondition.Type
> toRevisionConditionType(ConditionType type) {
> switch (type) {
> case REV_EQUAL:
> return RevisionCondition.Type.EQUAL;
> case REV_NOT_EQUAL:
> return RevisionCondition.Type.NOT_EQUAL;
> case REV_GREATER:
> return RevisionCondition.Type.GREATER;
> case REV_GREATER_OR_EQUAL:
> return RevisionCondition.Type.GREATER_OR_EQUAL;
> case REV_LESS:
> return RevisionCondition.Type.LESS;
> case REV_LESS_OR_EQUAL:
> return RevisionCondition.Type.LESS_OR_EQUAL;
> default:
> throw new IllegalArgumentException("Unexpected revision
> condition type " + type);
> }
> }
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)