This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 3e26876 [SPARK-30964][CORE][WEBUI] Accelerate InMemoryStore with a
new index
3e26876 is described below
commit 3e268763c345076801f1ff6f75bab67ecab87e8f
Author: Gengliang Wang <[email protected]>
AuthorDate: Mon Mar 2 15:48:48 2020 +0800
[SPARK-30964][CORE][WEBUI] Accelerate InMemoryStore with a new index
### What changes were proposed in this pull request?
Spark uses the class `InMemoryStore` as the KV storage for live UI and
history server(by default if no LevelDB file path is provided).
In `InMemoryStore`, all the task data in one application is stored in a
hashmap, which key is the task ID and the value is the task data. This fine for
getting or deleting with a provided task ID.
However, Spark stage UI always shows all the task data in one stage and the
current implementation is to look up all the values in the hashmap. The time
complexity is O(numOfTasks).
Also, when there are too many stages (>spark.ui.retainedStages), Spark will
linearly try to look up all the task data of the stages to be deleted as well.
This can be very bad for a large application with many stages and tasks. We
can improve it by allowing the natural key of an entity to have a real parent
index. So that on each lookup with parent node provided, Spark can look up all
the natural keys(in our case, the task IDs) first, and then find the data with
the natural keys in the hashmap.
### Why are the changes needed?
The in-memory KV store becomes really slow for large applications. We can
improve it with a new index. The performance can be 10 times, 100 times, even
1000 times faster.
This is also possible to make the Spark driver more stable for large
applications.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Existing unit tests.
Also, I run a benchmark with the following code
```
val store = new InMemoryStore()
val numberOfTasksPerStage = 10000
(0 until 1000).map { sId =>
(0 until numberOfTasksPerStage).map { taskId =>
val task = newTaskData(sId * numberOfTasksPerStage + taskId,
"SUCCESS", sId)
store.write(task)
}
}
val appStatusStore = new AppStatusStore(store)
var start = System.nanoTime()
appStatusStore.taskSummary(2, attemptId, Array(0, 0.25, 0.5, 0.75, 1))
println("task summary run time: " + ((System.nanoTime() - start) /
1000000))
val stageIds = Seq(1, 11, 66, 88)
val stageKeys = stageIds.map(Array(_, attemptId))
start = System.nanoTime()
store.removeAllByIndexValues(classOf[TaskDataWrapper],
TaskIndexNames.STAGE,
stageKeys.asJavaCollection)
println("clean up tasks run time: " + ((System.nanoTime() - start) /
1000000))
```
Task summary before the changes: 98642ms
Task summary after the changes: 120ms
Task clean up before the changes: 4900ms
Task clean up before the changes: 4ms
It's 800x faster after the changes in the micro-benchmark.
Closes #27716 from gengliangwang/liveUIStore.
Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 6b641430c37e0115bee781fed7360187b988313d)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/util/kvstore/InMemoryStore.java | 116 +++++++++++++++++----
.../org/apache/spark/util/kvstore/KVTypeInfo.java | 7 +-
.../apache/spark/util/kvstore/LevelDBTypeInfo.java | 5 +-
.../scala/org/apache/spark/status/storeTypes.scala | 2 +-
4 files changed, 105 insertions(+), 25 deletions(-)
diff --git
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
index b33c538..db08740 100644
---
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
+++
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
@@ -163,6 +163,12 @@ public class InMemoryStore implements KVStore {
}
}
+ /**
+ * An alias class for the type "ConcurrentHashMap<Comparable<Object>,
Boolean>", which is used
+ * as a concurrent hashset for storing natural keys and the boolean value
doesn't matter.
+ */
+ private static class NaturalKeys extends
ConcurrentHashMap<Comparable<Object>, Boolean> {}
+
private static class InstanceList<T> {
/**
@@ -205,11 +211,19 @@ public class InMemoryStore implements KVStore {
private final KVTypeInfo ti;
private final KVTypeInfo.Accessor naturalKey;
private final ConcurrentMap<Comparable<Object>, T> data;
+ private final String naturalParentIndexName;
+ private final Boolean hasNaturalParentIndex;
+ // A mapping from parent to the natural keys of its children.
+ // For example, a mapping from a stage ID to all the task IDs in the stage.
+ private final ConcurrentMap<Comparable<Object>, NaturalKeys>
parentToChildrenMap;
private InstanceList(Class<?> klass) {
this.ti = new KVTypeInfo(klass);
this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
this.data = new ConcurrentHashMap<>();
+ this.naturalParentIndexName =
ti.getParentIndexName(KVIndex.NATURAL_INDEX_NAME);
+ this.parentToChildrenMap = new ConcurrentHashMap<>();
+ this.hasNaturalParentIndex = !naturalParentIndexName.isEmpty();
}
KVTypeInfo.Accessor getIndexAccessor(String indexName) {
@@ -217,11 +231,30 @@ public class InMemoryStore implements KVStore {
}
int countingRemoveAllByIndexValues(String index, Collection<?>
indexValues) {
- Predicate<? super T> filter = getPredicate(ti.getAccessor(index),
indexValues);
- CountingRemoveIfForEach<T> callback = new
CountingRemoveIfForEach<>(data, filter);
+ if (hasNaturalParentIndex && naturalParentIndexName.equals(index)) {
+ // If there is a parent index for the natural index and `index`
happens to be it,
+ // Spark can use the `parentToChildrenMap` to get the related natural
keys, and then
+ // delete them from `data`.
+ int count = 0;
+ for (Object indexValue : indexValues) {
+ Comparable<Object> parentKey = asKey(indexValue);
+ NaturalKeys children = parentToChildrenMap.getOrDefault(parentKey,
new NaturalKeys());
+ for (Comparable<Object> naturalKey : children.keySet()) {
+ data.remove(naturalKey);
+ count ++;
+ }
+ parentToChildrenMap.remove(parentKey);
+ }
+ return count;
+ } else {
+ Predicate<? super T> filter = getPredicate(ti.getAccessor(index),
indexValues);
+ CountingRemoveIfForEach<T> callback = new
CountingRemoveIfForEach<>(data, filter);
- data.forEach(callback);
- return callback.count();
+ // Go through all the values in `data` and delete objects that meets
the predicate `filter`.
+ // This can be slow when there is a large number of entries in `data`.
+ data.forEach(callback);
+ return callback.count();
+ }
}
public T get(Object key) {
@@ -230,10 +263,27 @@ public class InMemoryStore implements KVStore {
public void put(T value) throws Exception {
data.put(asKey(naturalKey.get(value)), value);
+ if (hasNaturalParentIndex) {
+ Comparable<Object> parentKey =
asKey(getIndexAccessor(naturalParentIndexName).get(value));
+ NaturalKeys children =
+ parentToChildrenMap.computeIfAbsent(parentKey, k -> new
NaturalKeys());
+ children.put(asKey(naturalKey.get(value)), true);
+ }
}
public void delete(Object key) {
data.remove(asKey(key));
+ if (hasNaturalParentIndex) {
+ for (NaturalKeys v : parentToChildrenMap.values()) {
+ if (v.remove(asKey(key))) {
+ // `v` can be empty after removing the natural key and we can
remove it from
+ // `parentToChildrenMap`. However, `parentToChildrenMap` is a
ConcurrentMap and such
+ // checking and deleting can be slow.
+ // This method is to delete one object with certain key, let's
make it simple here.
+ break;
+ }
+ }
+ }
}
public int size() {
@@ -241,7 +291,7 @@ public class InMemoryStore implements KVStore {
}
public InMemoryView<T> view() {
- return new InMemoryView<>(data.values(), ti);
+ return new InMemoryView<>(data, ti, naturalParentIndexName,
parentToChildrenMap);
}
private static <T> Predicate<? super T> getPredicate(
@@ -271,22 +321,32 @@ public class InMemoryStore implements KVStore {
private static class InMemoryView<T> extends KVStoreView<T> {
private static final InMemoryView<?> EMPTY_VIEW =
- new InMemoryView<>(Collections.emptyList(), null);
+ new InMemoryView<>(new ConcurrentHashMap<>(), null, "", new
ConcurrentHashMap<>());
- private final Collection<T> elements;
+ private final ConcurrentMap<Comparable<Object>, T> data;
private final KVTypeInfo ti;
private final KVTypeInfo.Accessor natural;
-
- InMemoryView(Collection<T> elements, KVTypeInfo ti) {
- this.elements = elements;
+ private final ConcurrentMap<Comparable<Object>, NaturalKeys>
parentToChildrenMap;
+ private final String naturalParentIndexName;
+ private final Boolean hasNaturalParentIndex;
+
+ InMemoryView(
+ ConcurrentMap<Comparable<Object>, T> data,
+ KVTypeInfo ti,
+ String naturalParentIndexName,
+ ConcurrentMap<Comparable<Object>, NaturalKeys> parentToChildrenMap) {
+ this.data = data;
this.ti = ti;
this.natural = ti != null ? ti.getAccessor(KVIndex.NATURAL_INDEX_NAME) :
null;
+ this.naturalParentIndexName = naturalParentIndexName;
+ this.parentToChildrenMap = parentToChildrenMap;
+ this.hasNaturalParentIndex = !naturalParentIndexName.isEmpty();
}
@Override
public Iterator<T> iterator() {
- if (elements.isEmpty()) {
- return new InMemoryIterator<>(elements.iterator());
+ if (data.isEmpty()) {
+ return new InMemoryIterator<>(Collections.emptyIterator());
}
KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) :
null;
@@ -322,15 +382,31 @@ public class InMemoryStore implements KVStore {
*/
private List<T> copyElements() {
if (parent != null) {
- KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index);
- Preconditions.checkArgument(parentGetter != null, "Parent filter for
non-child index.");
- Comparable<?> parentKey = asKey(parent);
-
- return elements.stream()
- .filter(e -> compare(e, parentGetter, parentKey) == 0)
- .collect(Collectors.toList());
+ Comparable<Object> parentKey = asKey(parent);
+ if (hasNaturalParentIndex &&
naturalParentIndexName.equals(ti.getParentIndexName(index))) {
+ // If there is a parent index for the natural index and the parent
of `index` happens to
+ // be it, Spark can use the `parentToChildrenMap` to get the related
natural keys, and
+ // then copy them from `data`.
+ NaturalKeys children = parentToChildrenMap.getOrDefault(parentKey,
new NaturalKeys());
+ ArrayList<T> elements = new ArrayList<>();
+ for (Comparable<Object> naturalKey : children.keySet()) {
+ data.computeIfPresent(naturalKey, (k, v) -> {
+ elements.add(v);
+ return v;
+ });
+ }
+ return elements;
+ } else {
+ // Go through all the values in `data` and collect all the objects
has certain parent
+ // value. This can be slow when there is a large number of entries
in `data`.
+ KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index);
+ Preconditions.checkArgument(parentGetter != null, "Parent filter for
non-child index.");
+ return data.values().stream()
+ .filter(e -> compare(e, parentGetter, parentKey) == 0)
+ .collect(Collectors.toList());
+ }
} else {
- return new ArrayList<>(elements);
+ return new ArrayList<>(data.values());
}
}
diff --git
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java
index d2a2698..5404d33 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java
@@ -68,8 +68,6 @@ public class KVTypeInfo {
Preconditions.checkArgument(indices.containsKey(KVIndex.NATURAL_INDEX_NAME),
"No natural index defined for type %s.", type.getName());
-
Preconditions.checkArgument(indices.get(KVIndex.NATURAL_INDEX_NAME).parent().isEmpty(),
- "Natural index of %s cannot have a parent.", type.getName());
for (KVIndex idx : indices.values()) {
if (!idx.parent().isEmpty()) {
@@ -117,6 +115,11 @@ public class KVTypeInfo {
return index.parent().isEmpty() ? null : getAccessor(index.parent());
}
+ String getParentIndexName(String indexName) {
+ KVIndex index = indices.get(indexName);
+ return index.parent();
+ }
+
/**
* Abstracts the difference between invoking a Field and a Method.
*/
diff --git
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java
index f4d3592..d742353 100644
---
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java
+++
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java
@@ -133,12 +133,13 @@ class LevelDBTypeInfo {
// First create the parent indices, then the child indices.
ti.indices().forEach(idx -> {
- if (idx.parent().isEmpty()) {
+ // In LevelDB, there is no parent index for the NUTURAL INDEX.
+ if (idx.parent().isEmpty() ||
idx.value().equals(KVIndex.NATURAL_INDEX_NAME)) {
indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()),
null));
}
});
ti.indices().forEach(idx -> {
- if (!idx.parent().isEmpty()) {
+ if (!idx.parent().isEmpty() &&
!idx.value().equals(KVIndex.NATURAL_INDEX_NAME)) {
indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()),
indices.get(idx.parent())));
}
diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala
b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
index f0a94d8..c957ff7 100644
--- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala
+++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
@@ -154,7 +154,7 @@ private[spark] object TaskIndexNames {
private[spark] class TaskDataWrapper(
// Storing this as an object actually saves memory; it's also used as the
key in the in-memory
// store, so in that case you'd save the extra copy of the value here.
- @KVIndexParam
+ @KVIndexParam(parent = TaskIndexNames.STAGE)
val taskId: JLong,
@KVIndexParam(value = TaskIndexNames.TASK_INDEX, parent =
TaskIndexNames.STAGE)
val index: Int,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]