scwhittle commented on code in PR #23492:
URL: https://github.com/apache/beam/pull/23492#discussion_r1100066124
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -2005,79 +2025,64 @@ public Iterable<Entry<K, V>> read() {
Future<Iterable<Entry<ByteString, Iterable<V>>>> persistedData =
getFuture(false);
try (Closeable scope = scopedReadState()) {
Iterable<Entry<ByteString, Iterable<V>>> entries =
persistedData.get();
- // If a key returned by windmill is known to be no longer exist or
is completely cached
- // locally, we can safely ignore the content of this key in
windmill. Thus, we filter
- // entries to filteredEntries which contains only keys that are
known to exist and not
- // fully cached.
- Iterable<Entry<Object, Iterable<V>>> filteredEntries =
- Iterables.filter(
- Iterables.transform(
- entries,
- entry -> {
- try {
- final K key =
keyCoder.decode(entry.getKey().newInput());
- final Object structuralKey =
keyCoder.structuralValue(key);
- KeyState keyState =
- keyStateMap.computeIfAbsent(structuralKey, k
-> new KeyState(key));
- if (keyState.existence ==
KeyExistence.UNKNOWN_EXISTENCE) {
- keyState.existence = KeyExistence.KNOWN_EXIST;
- }
- return new
AbstractMap.SimpleEntry<>(structuralKey, entry.getValue());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }),
- entry -> {
- KeyState keyState = keyStateMap.get(entry.getKey());
- return keyState.existence !=
KeyExistence.KNOWN_NONEXISTENT
- && !(keyState.existence == KeyExistence.KNOWN_EXIST
- && keyState.valuesCached);
- });
if (entries instanceof Weighted) {
// This is a known amount of data, cache them all.
- filteredEntries.forEach(
+ entries.forEach(
entry -> {
- final Object structuralKey = entry.getKey();
- KeyState keyState = keyStateMap.get(structuralKey);
- keyState.existence = KeyExistence.KNOWN_EXIST;
- keyState.values.extendWith(entry.getValue());
- // We can't set keyState.valuesCached to true here,
because there may be more
- // paginated values that should not be filtered out in
filteredEntries.
+ try {
+ final K key = keyCoder.decode(entry.getKey().newInput());
+ final Object structuralKey =
keyCoder.structuralValue(key);
+ KeyState keyState =
+ keyStateMap.computeIfAbsent(structuralKey, k -> new
KeyState(key));
+ // Ignore any key from windmill that has been marked
pending deletion, and any
+ // key that is fully cached.
+ if (keyState.existence == KeyExistence.KNOWN_NONEXISTENT
+ || (keyState.existence == KeyExistence.KNOWN_EXIST
+ && keyState.valuesCached)) return;
+ // Or else cache contents from windmill.
+ keyState.existence = KeyExistence.KNOWN_EXIST;
+ keyState.values.extendWith(entry.getValue());
+ // We can't set keyState.valuesCached to true here,
because there may be more
+ // paginated values that should not be filtered out in
above if statement.
Review Comment:
this isn't a paginated read, can we set valuesCached?
Or shoudl comment be updated that there may be additional entries for the
key.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -2005,79 +2025,64 @@ public Iterable<Entry<K, V>> read() {
Future<Iterable<Entry<ByteString, Iterable<V>>>> persistedData =
getFuture(false);
try (Closeable scope = scopedReadState()) {
Iterable<Entry<ByteString, Iterable<V>>> entries =
persistedData.get();
- // If a key returned by windmill is known to be no longer exist or
is completely cached
- // locally, we can safely ignore the content of this key in
windmill. Thus, we filter
- // entries to filteredEntries which contains only keys that are
known to exist and not
- // fully cached.
- Iterable<Entry<Object, Iterable<V>>> filteredEntries =
- Iterables.filter(
- Iterables.transform(
- entries,
- entry -> {
- try {
- final K key =
keyCoder.decode(entry.getKey().newInput());
- final Object structuralKey =
keyCoder.structuralValue(key);
- KeyState keyState =
- keyStateMap.computeIfAbsent(structuralKey, k
-> new KeyState(key));
- if (keyState.existence ==
KeyExistence.UNKNOWN_EXISTENCE) {
- keyState.existence = KeyExistence.KNOWN_EXIST;
- }
- return new
AbstractMap.SimpleEntry<>(structuralKey, entry.getValue());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }),
- entry -> {
- KeyState keyState = keyStateMap.get(entry.getKey());
- return keyState.existence !=
KeyExistence.KNOWN_NONEXISTENT
- && !(keyState.existence == KeyExistence.KNOWN_EXIST
- && keyState.valuesCached);
- });
if (entries instanceof Weighted) {
// This is a known amount of data, cache them all.
- filteredEntries.forEach(
+ entries.forEach(
entry -> {
- final Object structuralKey = entry.getKey();
- KeyState keyState = keyStateMap.get(structuralKey);
- keyState.existence = KeyExistence.KNOWN_EXIST;
- keyState.values.extendWith(entry.getValue());
- // We can't set keyState.valuesCached to true here,
because there may be more
- // paginated values that should not be filtered out in
filteredEntries.
+ try {
+ final K key = keyCoder.decode(entry.getKey().newInput());
+ final Object structuralKey =
keyCoder.structuralValue(key);
+ KeyState keyState =
+ keyStateMap.computeIfAbsent(structuralKey, k -> new
KeyState(key));
+ // Ignore any key from windmill that has been marked
pending deletion, and any
+ // key that is fully cached.
+ if (keyState.existence == KeyExistence.KNOWN_NONEXISTENT
+ || (keyState.existence == KeyExistence.KNOWN_EXIST
+ && keyState.valuesCached)) return;
+ // Or else cache contents from windmill.
+ keyState.existence = KeyExistence.KNOWN_EXIST;
+ keyState.values.extendWith(entry.getValue());
+ // We can't set keyState.valuesCached to true here,
because there may be more
+ // paginated values that should not be filtered out in
above if statement.
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
});
allKeysKnown = true;
complete = true;
- // Unload keys that are not known exist from cache, set
valuesCached of all cached
- // entries to true.
- keyStateMap
- .entrySet()
- .removeIf(
- entry -> {
- KeyState keyState = entry.getValue();
- boolean shouldRemove =
- (keyState.existence ==
KeyExistence.KNOWN_NONEXISTENT
- && !keyState.removedLocally)
- || keyState.existence ==
KeyExistence.UNKNOWN_EXISTENCE;
- keyState.valuesCached = !shouldRemove;
- return shouldRemove;
- });
return Iterables.unmodifiableIterable(mergedCachedEntries());
} else {
- Iterable<Entry<K, V>> fromWindmill =
- Iterables.concat(
- Iterables.transform(
- Iterables.transform(
- filteredEntries,
- entry ->
- new AbstractMap.SimpleEntry<>(
-
keyStateMap.get(entry.getKey()).originalKey,
- entry.getValue())),
- entry ->
- Iterables.transform(
- entry.getValue(),
- v -> new
AbstractMap.SimpleEntry<>(entry.getKey(), v))));
- return Iterables.unmodifiableIterable(
- Iterables.concat(mergedCachedEntries(), fromWindmill));
+ MultimapIterables<K, V> cachedEntries = mergedCachedEntries();
+ entries.forEach(
Review Comment:
I think this is back to loading all the pages into the memory before
iterating
Could you instead make it lazy by returning windmill non-cached data first
(filtering cached or added to cached) then return the rest?
Iterables.concat(Iterables.transform( logic in entries.forEach),
some iterable that lazily calls mergedCacheEntries and iterates);
Maybe you can make some test for this by having the test have some iterable
that would return GBs of data total which should be streamed through and not
cause test to OOM?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]