scwhittle commented on code in PR #23492:
URL: https://github.com/apache/beam/pull/23492#discussion_r1100066124


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -2005,79 +2025,64 @@ public Iterable<Entry<K, V>> read() {
           Future<Iterable<Entry<ByteString, Iterable<V>>>> persistedData = 
getFuture(false);
           try (Closeable scope = scopedReadState()) {
             Iterable<Entry<ByteString, Iterable<V>>> entries = 
persistedData.get();
-            // If a key returned by windmill is known to be no longer exist or 
is completely cached
-            // locally, we can safely ignore the content of this key in 
windmill. Thus, we filter
-            // entries to filteredEntries which contains only keys that are 
known to exist and not
-            // fully cached.
-            Iterable<Entry<Object, Iterable<V>>> filteredEntries =
-                Iterables.filter(
-                    Iterables.transform(
-                        entries,
-                        entry -> {
-                          try {
-                            final K key = 
keyCoder.decode(entry.getKey().newInput());
-                            final Object structuralKey = 
keyCoder.structuralValue(key);
-                            KeyState keyState =
-                                keyStateMap.computeIfAbsent(structuralKey, k 
-> new KeyState(key));
-                            if (keyState.existence == 
KeyExistence.UNKNOWN_EXISTENCE) {
-                              keyState.existence = KeyExistence.KNOWN_EXIST;
-                            }
-                            return new 
AbstractMap.SimpleEntry<>(structuralKey, entry.getValue());
-                          } catch (IOException e) {
-                            throw new RuntimeException(e);
-                          }
-                        }),
-                    entry -> {
-                      KeyState keyState = keyStateMap.get(entry.getKey());
-                      return keyState.existence != 
KeyExistence.KNOWN_NONEXISTENT
-                          && !(keyState.existence == KeyExistence.KNOWN_EXIST
-                              && keyState.valuesCached);
-                    });
 
             if (entries instanceof Weighted) {
               // This is a known amount of data, cache them all.
-              filteredEntries.forEach(
+              entries.forEach(
                   entry -> {
-                    final Object structuralKey = entry.getKey();
-                    KeyState keyState = keyStateMap.get(structuralKey);
-                    keyState.existence = KeyExistence.KNOWN_EXIST;
-                    keyState.values.extendWith(entry.getValue());
-                    // We can't set keyState.valuesCached to true here, 
because there may be more
-                    // paginated values that should not be filtered out in 
filteredEntries.
+                    try {
+                      final K key = keyCoder.decode(entry.getKey().newInput());
+                      final Object structuralKey = 
keyCoder.structuralValue(key);
+                      KeyState keyState =
+                          keyStateMap.computeIfAbsent(structuralKey, k -> new 
KeyState(key));
+                      // Ignore any key from windmill that has been marked 
pending deletion, and any
+                      // key that is fully cached.
+                      if (keyState.existence == KeyExistence.KNOWN_NONEXISTENT
+                          || (keyState.existence == KeyExistence.KNOWN_EXIST
+                              && keyState.valuesCached)) return;
+                      // Or else cache contents from windmill.
+                      keyState.existence = KeyExistence.KNOWN_EXIST;
+                      keyState.values.extendWith(entry.getValue());
+                      // We can't set keyState.valuesCached to true here, 
because there may be more
+                      // paginated values that should not be filtered out in 
above if statement.

Review Comment:
   this isn't a paginated read, can we set valuesCached?
   Or shoudl comment be updated that there may be additional entries for the 
key.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -2005,79 +2025,64 @@ public Iterable<Entry<K, V>> read() {
           Future<Iterable<Entry<ByteString, Iterable<V>>>> persistedData = 
getFuture(false);
           try (Closeable scope = scopedReadState()) {
             Iterable<Entry<ByteString, Iterable<V>>> entries = 
persistedData.get();
-            // If a key returned by windmill is known to be no longer exist or 
is completely cached
-            // locally, we can safely ignore the content of this key in 
windmill. Thus, we filter
-            // entries to filteredEntries which contains only keys that are 
known to exist and not
-            // fully cached.
-            Iterable<Entry<Object, Iterable<V>>> filteredEntries =
-                Iterables.filter(
-                    Iterables.transform(
-                        entries,
-                        entry -> {
-                          try {
-                            final K key = 
keyCoder.decode(entry.getKey().newInput());
-                            final Object structuralKey = 
keyCoder.structuralValue(key);
-                            KeyState keyState =
-                                keyStateMap.computeIfAbsent(structuralKey, k 
-> new KeyState(key));
-                            if (keyState.existence == 
KeyExistence.UNKNOWN_EXISTENCE) {
-                              keyState.existence = KeyExistence.KNOWN_EXIST;
-                            }
-                            return new 
AbstractMap.SimpleEntry<>(structuralKey, entry.getValue());
-                          } catch (IOException e) {
-                            throw new RuntimeException(e);
-                          }
-                        }),
-                    entry -> {
-                      KeyState keyState = keyStateMap.get(entry.getKey());
-                      return keyState.existence != 
KeyExistence.KNOWN_NONEXISTENT
-                          && !(keyState.existence == KeyExistence.KNOWN_EXIST
-                              && keyState.valuesCached);
-                    });
 
             if (entries instanceof Weighted) {
               // This is a known amount of data, cache them all.
-              filteredEntries.forEach(
+              entries.forEach(
                   entry -> {
-                    final Object structuralKey = entry.getKey();
-                    KeyState keyState = keyStateMap.get(structuralKey);
-                    keyState.existence = KeyExistence.KNOWN_EXIST;
-                    keyState.values.extendWith(entry.getValue());
-                    // We can't set keyState.valuesCached to true here, 
because there may be more
-                    // paginated values that should not be filtered out in 
filteredEntries.
+                    try {
+                      final K key = keyCoder.decode(entry.getKey().newInput());
+                      final Object structuralKey = 
keyCoder.structuralValue(key);
+                      KeyState keyState =
+                          keyStateMap.computeIfAbsent(structuralKey, k -> new 
KeyState(key));
+                      // Ignore any key from windmill that has been marked 
pending deletion, and any
+                      // key that is fully cached.
+                      if (keyState.existence == KeyExistence.KNOWN_NONEXISTENT
+                          || (keyState.existence == KeyExistence.KNOWN_EXIST
+                              && keyState.valuesCached)) return;
+                      // Or else cache contents from windmill.
+                      keyState.existence = KeyExistence.KNOWN_EXIST;
+                      keyState.values.extendWith(entry.getValue());
+                      // We can't set keyState.valuesCached to true here, 
because there may be more
+                      // paginated values that should not be filtered out in 
above if statement.
+                    } catch (IOException e) {
+                      throw new RuntimeException(e);
+                    }
                   });
               allKeysKnown = true;
               complete = true;
-              // Unload keys that are not known exist from cache, set 
valuesCached of all cached
-              // entries to true.
-              keyStateMap
-                  .entrySet()
-                  .removeIf(
-                      entry -> {
-                        KeyState keyState = entry.getValue();
-                        boolean shouldRemove =
-                            (keyState.existence == 
KeyExistence.KNOWN_NONEXISTENT
-                                    && !keyState.removedLocally)
-                                || keyState.existence == 
KeyExistence.UNKNOWN_EXISTENCE;
-                        keyState.valuesCached = !shouldRemove;
-                        return shouldRemove;
-                      });
               return Iterables.unmodifiableIterable(mergedCachedEntries());
             } else {
-              Iterable<Entry<K, V>> fromWindmill =
-                  Iterables.concat(
-                      Iterables.transform(
-                          Iterables.transform(
-                              filteredEntries,
-                              entry ->
-                                  new AbstractMap.SimpleEntry<>(
-                                      
keyStateMap.get(entry.getKey()).originalKey,
-                                      entry.getValue())),
-                          entry ->
-                              Iterables.transform(
-                                  entry.getValue(),
-                                  v -> new 
AbstractMap.SimpleEntry<>(entry.getKey(), v))));
-              return Iterables.unmodifiableIterable(
-                  Iterables.concat(mergedCachedEntries(), fromWindmill));
+              MultimapIterables<K, V> cachedEntries = mergedCachedEntries();
+              entries.forEach(

Review Comment:
   I think this is back to loading all the pages into the memory before 
iterating
   
   Could you instead make it lazy by returning windmill non-cached data first 
(filtering cached or added to cached) then return the rest?
   
   Iterables.concat(Iterables.transform( logic in entries.forEach), 
      some iterable that lazily calls mergedCacheEntries and iterates);
     
   Maybe you can make some test for this by having the test have some iterable 
that would return GBs of data total which should be streamed through and not 
cause test to OOM?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to