zhengbuqian commented on code in PR #23492:
URL: https://github.com/apache/beam/pull/23492#discussion_r1090262269


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -1721,20 +1729,20 @@ public Iterable<V> read() {
             return Collections.emptyList();
           }
 
-          if (localRemovals.contains(structuralKey)) {
+          if (keyState.removedLocally) {
             // this key has been removed locally but the removal hasn't been 
sent to windmill,
             // thus values in windmill(if any) are obsolete, and we only care 
about local values.
-            return 
Iterables.unmodifiableIterable(localAdditions.get(structuralKey));
+            return Iterables.unmodifiableIterable(keyState.localAdditions);
           }
           if (keyState.valuesCached || complete) {
             return Iterables.unmodifiableIterable(
-                Iterables.concat(keyState.values, 
localAdditions.get(structuralKey)));
+                Iterables.concat(keyState.values, keyState.localAdditions));
           }
           Future<Iterable<V>> persistedData = getFutureForKey(key);
           try (Closeable scope = scopedReadState()) {
             final Iterable<V> persistedValues = persistedData.get();
             if (Iterables.isEmpty(persistedValues)) {
-              Collection<V> local = localAdditions.get(structuralKey);
+              Collection<V> local = keyState.localAdditions;
               if (local.isEmpty()) {
                 // empty in both cache and windmill, remove key from cache.
                 keyStateMap.remove(structuralKey);

Review Comment:
   Done. 
   
   In the current impl, get on non exist keys leaves tombstones in keyStateMap, 
and those will be cleared in `persistDirectly`. But note that if the user never 
add/remove from the multimap, `persistDirectly` will not try to clear those 
tombstones, because we iterate through the keys only if `(hasLocalRemovals || 
hasLocalAdditions)`, so if the user never add/remove and keep `get()` on 
different nonexistent keys, the memory could grow unbound. I think this is a 
rare edge case that we can live with, wdyt?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -1771,52 +1779,54 @@ public ReadableState<Iterable<V>> readLater() {
     @Override
     protected WorkItemCommitRequest 
persistDirectly(WindmillStateCache.ForKeyAndFamily cache)
         throws IOException {
-      if (!cleared && localAdditions.isEmpty() && localRemovals.isEmpty()) {
+      if (!cleared && !hasLocalAdditions && !hasLocalRemovals) {
+        cache.put(namespace, address, this, 1);
         return WorkItemCommitRequest.newBuilder().buildPartial();
       }
       WorkItemCommitRequest.Builder commitBuilder = 
WorkItemCommitRequest.newBuilder();
-      Windmill.TagMultimapUpdateRequest.Builder builder = null;
+      Windmill.TagMultimapUpdateRequest.Builder builder = 
commitBuilder.addMultimapUpdatesBuilder();
+      builder.setTag(stateKey).setStateFamily(stateFamily);
+
       if (cleared) {
-        builder = commitBuilder.addMultimapUpdatesBuilder();
         builder.setDeleteAll(true);
-        cleared = false;
-      }
-      Set<Object> keysWithUpdates = Sets.newHashSet();
-      keysWithUpdates.addAll(localRemovals);
-      keysWithUpdates.addAll(localAdditions.keySet());
-      if (!keysWithUpdates.isEmpty() && builder == null) {
-        builder = commitBuilder.addMultimapUpdatesBuilder();
-        builder.setTag(stateKey).setStateFamily(stateFamily);
       }
-      for (Object structuralKey : keysWithUpdates) {
-        KeyState keyState = keyStateMap.get(structuralKey);
+      if (hasLocalRemovals || hasLocalAdditions) {
         ByteStringOutputStream keyStream = new ByteStringOutputStream();
-        keyCoder.encode(keyState.originalKey, keyStream);
-        ByteString encodedKey = keyStream.toByteString();
-        Windmill.TagMultimapEntry.Builder entryBuilder = 
builder.addUpdatesBuilder();
-        entryBuilder.setEntryName(encodedKey);
-        entryBuilder.setDeleteAll(localRemovals.contains(structuralKey));
-        for (V value : localAdditions.get(structuralKey)) {
-          ByteStringOutputStream valueStream = new ByteStringOutputStream();
-          valueCoder.encode(value, valueStream);
-          ByteString encodedValue = valueStream.toByteString();
-          entryBuilder.addValues(encodedValue);
-        }
-        // Move newly added values from localAdditions to cachedEntries as 
those new values now are
-        // also persisted in Windmill. If a key now has no more values and is 
not KNOWN_EXIST,
-        // remove it from cache.
-        if (keyState.valuesCached) {
-          keyState.values.extendWith(localAdditions.get(structuralKey));
-        } else {
-          if (keyState.existence != KeyExistence.KNOWN_EXIST) 
keyStateMap.remove(structuralKey);
+        ByteStringOutputStream valueStream = new ByteStringOutputStream();
+        List<Object> keyStateToBeRemoved = Lists.newArrayList();
+        for (Entry<Object, KeyState> entry : keyStateMap.entrySet()) {
+          KeyState keyState = entry.getValue();
+          if (!keyState.removedLocally && keyState.localAdditions.isEmpty()) 
continue;
+          keyCoder.encode(keyState.originalKey, keyStream);
+          ByteString encodedKey = keyStream.toByteStringAndReset();
+          Windmill.TagMultimapEntry.Builder entryBuilder = 
builder.addUpdatesBuilder();
+          entryBuilder.setEntryName(encodedKey);
+          entryBuilder.setDeleteAll(keyState.removedLocally);
+          keyState.removedLocally = false;
+          for (V value : keyState.localAdditions) {
+            valueCoder.encode(value, valueStream);
+            ByteString encodedValue = valueStream.toByteStringAndReset();
+            entryBuilder.addValues(encodedValue);
+          }
+          // Move newly added values from localAdditions to keyState.values as 
those new values now
+          // are also persisted in Windmill. If a key now has no more values 
and is not KNOWN_EXIST,
+          // remove it from cache.
+          if (keyState.valuesCached) {
+            keyState.values.extendWith(keyState.localAdditions);
+          } else {
+            if (keyState.existence != KeyExistence.KNOWN_EXIST)
+              keyStateToBeRemoved.add(entry.getKey());
+          }
+          keyState.localAdditions = Lists.newArrayList();

Review Comment:
   Done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -1937,11 +1946,10 @@ public ReadableState<Iterable<K>> readLater() {
 
     private MultimapIterables<K, V> mergedCachedEntries() {
       MultimapIterables<K, V> result = new MultimapIterables<>();
-      for (Entry<Object, Collection<V>> entry : 
localAdditions.asMap().entrySet()) {
-        K key = keyStateMap.get(entry.getKey()).originalKey;
-        result.extendWith(key, entry.getValue());
-      }
       for (Entry<Object, KeyState> entry : keyStateMap.entrySet()) {
+        if (!entry.getValue().localAdditions.isEmpty()) {

Review Comment:
   Done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -2029,7 +2037,7 @@ public Iterable<Entry<K, V>> read() {
                   .removeIf(
                       entry ->
                           entry.getValue().existence == 
KeyExistence.KNOWN_NONEXISTENT
-                              && !localRemovals.contains(entry.getKey()));
+                              && !entry.getValue().removedLocally);
               return Iterables.unmodifiableIterable(mergedCachedEntries());
             } else {
               MultimapIterables<K, V> local = mergedCachedEntries();

Review Comment:
   Ah that makes a lot of sense. Updated the code so that if entries is not 
weighted, it will not be evaluated immediately. 



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -1771,52 +1779,54 @@ public ReadableState<Iterable<V>> readLater() {
     @Override
     protected WorkItemCommitRequest 
persistDirectly(WindmillStateCache.ForKeyAndFamily cache)
         throws IOException {
-      if (!cleared && localAdditions.isEmpty() && localRemovals.isEmpty()) {
+      if (!cleared && !hasLocalAdditions && !hasLocalRemovals) {
+        cache.put(namespace, address, this, 1);
         return WorkItemCommitRequest.newBuilder().buildPartial();
       }
       WorkItemCommitRequest.Builder commitBuilder = 
WorkItemCommitRequest.newBuilder();
-      Windmill.TagMultimapUpdateRequest.Builder builder = null;
+      Windmill.TagMultimapUpdateRequest.Builder builder = 
commitBuilder.addMultimapUpdatesBuilder();
+      builder.setTag(stateKey).setStateFamily(stateFamily);
+
       if (cleared) {
-        builder = commitBuilder.addMultimapUpdatesBuilder();
         builder.setDeleteAll(true);
-        cleared = false;
-      }
-      Set<Object> keysWithUpdates = Sets.newHashSet();
-      keysWithUpdates.addAll(localRemovals);
-      keysWithUpdates.addAll(localAdditions.keySet());
-      if (!keysWithUpdates.isEmpty() && builder == null) {
-        builder = commitBuilder.addMultimapUpdatesBuilder();
-        builder.setTag(stateKey).setStateFamily(stateFamily);
       }
-      for (Object structuralKey : keysWithUpdates) {
-        KeyState keyState = keyStateMap.get(structuralKey);
+      if (hasLocalRemovals || hasLocalAdditions) {
         ByteStringOutputStream keyStream = new ByteStringOutputStream();
-        keyCoder.encode(keyState.originalKey, keyStream);
-        ByteString encodedKey = keyStream.toByteString();
-        Windmill.TagMultimapEntry.Builder entryBuilder = 
builder.addUpdatesBuilder();
-        entryBuilder.setEntryName(encodedKey);
-        entryBuilder.setDeleteAll(localRemovals.contains(structuralKey));
-        for (V value : localAdditions.get(structuralKey)) {
-          ByteStringOutputStream valueStream = new ByteStringOutputStream();
-          valueCoder.encode(value, valueStream);
-          ByteString encodedValue = valueStream.toByteString();
-          entryBuilder.addValues(encodedValue);
-        }
-        // Move newly added values from localAdditions to cachedEntries as 
those new values now are
-        // also persisted in Windmill. If a key now has no more values and is 
not KNOWN_EXIST,
-        // remove it from cache.
-        if (keyState.valuesCached) {
-          keyState.values.extendWith(localAdditions.get(structuralKey));
-        } else {
-          if (keyState.existence != KeyExistence.KNOWN_EXIST) 
keyStateMap.remove(structuralKey);
+        ByteStringOutputStream valueStream = new ByteStringOutputStream();
+        List<Object> keyStateToBeRemoved = Lists.newArrayList();
+        for (Entry<Object, KeyState> entry : keyStateMap.entrySet()) {
+          KeyState keyState = entry.getValue();
+          if (!keyState.removedLocally && keyState.localAdditions.isEmpty()) 
continue;
+          keyCoder.encode(keyState.originalKey, keyStream);
+          ByteString encodedKey = keyStream.toByteStringAndReset();
+          Windmill.TagMultimapEntry.Builder entryBuilder = 
builder.addUpdatesBuilder();
+          entryBuilder.setEntryName(encodedKey);
+          entryBuilder.setDeleteAll(keyState.removedLocally);
+          keyState.removedLocally = false;
+          for (V value : keyState.localAdditions) {
+            valueCoder.encode(value, valueStream);
+            ByteString encodedValue = valueStream.toByteStringAndReset();
+            entryBuilder.addValues(encodedValue);
+          }
+          // Move newly added values from localAdditions to keyState.values as 
those new values now
+          // are also persisted in Windmill. If a key now has no more values 
and is not KNOWN_EXIST,
+          // remove it from cache.
+          if (keyState.valuesCached) {
+            keyState.values.extendWith(keyState.localAdditions);
+          } else {
+            if (keyState.existence != KeyExistence.KNOWN_EXIST)
+              keyStateToBeRemoved.add(entry.getKey());
+          }
+          keyState.localAdditions = Lists.newArrayList();
         }
+        for (Object key : keyStateToBeRemoved) keyStateMap.remove(key);

Review Comment:
   Done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -1830,22 +1840,21 @@ public void remove(K key) {
       }
       if (keyState.valuesCached || !complete) {
         // there may be data in windmill that need to be removed.
-        localRemovals.add(structuralKey);
+        hasLocalRemovals = true;
+        keyState.removedLocally = true;
         keyState.values = new ConcatIterables<>();
         keyState.valuesCached = false;
         keyState.existence = KeyExistence.KNOWN_NONEXISTENT;
       } else {
         // no data in windmill, deleting from local cache is sufficient.
         keyStateMap.remove(structuralKey);
       }
-      localAdditions.removeAll(structuralKey);
+      keyState.localAdditions = Lists.newArrayList();

Review Comment:
   Done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -1771,52 +1779,54 @@ public ReadableState<Iterable<V>> readLater() {
     @Override
     protected WorkItemCommitRequest 
persistDirectly(WindmillStateCache.ForKeyAndFamily cache)
         throws IOException {
-      if (!cleared && localAdditions.isEmpty() && localRemovals.isEmpty()) {
+      if (!cleared && !hasLocalAdditions && !hasLocalRemovals) {
+        cache.put(namespace, address, this, 1);
         return WorkItemCommitRequest.newBuilder().buildPartial();
       }
       WorkItemCommitRequest.Builder commitBuilder = 
WorkItemCommitRequest.newBuilder();
-      Windmill.TagMultimapUpdateRequest.Builder builder = null;
+      Windmill.TagMultimapUpdateRequest.Builder builder = 
commitBuilder.addMultimapUpdatesBuilder();
+      builder.setTag(stateKey).setStateFamily(stateFamily);
+
       if (cleared) {
-        builder = commitBuilder.addMultimapUpdatesBuilder();
         builder.setDeleteAll(true);
-        cleared = false;
-      }
-      Set<Object> keysWithUpdates = Sets.newHashSet();
-      keysWithUpdates.addAll(localRemovals);
-      keysWithUpdates.addAll(localAdditions.keySet());
-      if (!keysWithUpdates.isEmpty() && builder == null) {
-        builder = commitBuilder.addMultimapUpdatesBuilder();
-        builder.setTag(stateKey).setStateFamily(stateFamily);
       }
-      for (Object structuralKey : keysWithUpdates) {
-        KeyState keyState = keyStateMap.get(structuralKey);
+      if (hasLocalRemovals || hasLocalAdditions) {
         ByteStringOutputStream keyStream = new ByteStringOutputStream();
-        keyCoder.encode(keyState.originalKey, keyStream);
-        ByteString encodedKey = keyStream.toByteString();
-        Windmill.TagMultimapEntry.Builder entryBuilder = 
builder.addUpdatesBuilder();
-        entryBuilder.setEntryName(encodedKey);
-        entryBuilder.setDeleteAll(localRemovals.contains(structuralKey));
-        for (V value : localAdditions.get(structuralKey)) {
-          ByteStringOutputStream valueStream = new ByteStringOutputStream();
-          valueCoder.encode(value, valueStream);
-          ByteString encodedValue = valueStream.toByteString();
-          entryBuilder.addValues(encodedValue);
-        }
-        // Move newly added values from localAdditions to cachedEntries as 
those new values now are
-        // also persisted in Windmill. If a key now has no more values and is 
not KNOWN_EXIST,
-        // remove it from cache.
-        if (keyState.valuesCached) {
-          keyState.values.extendWith(localAdditions.get(structuralKey));
-        } else {
-          if (keyState.existence != KeyExistence.KNOWN_EXIST) 
keyStateMap.remove(structuralKey);
+        ByteStringOutputStream valueStream = new ByteStringOutputStream();
+        List<Object> keyStateToBeRemoved = Lists.newArrayList();
+        for (Entry<Object, KeyState> entry : keyStateMap.entrySet()) {
+          KeyState keyState = entry.getValue();
+          if (!keyState.removedLocally && keyState.localAdditions.isEmpty()) 
continue;
+          keyCoder.encode(keyState.originalKey, keyStream);
+          ByteString encodedKey = keyStream.toByteStringAndReset();
+          Windmill.TagMultimapEntry.Builder entryBuilder = 
builder.addUpdatesBuilder();
+          entryBuilder.setEntryName(encodedKey);
+          entryBuilder.setDeleteAll(keyState.removedLocally);
+          keyState.removedLocally = false;
+          for (V value : keyState.localAdditions) {
+            valueCoder.encode(value, valueStream);
+            ByteString encodedValue = valueStream.toByteStringAndReset();
+            entryBuilder.addValues(encodedValue);
+          }
+          // Move newly added values from localAdditions to keyState.values as 
those new values now
+          // are also persisted in Windmill. If a key now has no more values 
and is not KNOWN_EXIST,
+          // remove it from cache.
+          if (keyState.valuesCached) {
+            keyState.values.extendWith(keyState.localAdditions);
+          } else {
+            if (keyState.existence != KeyExistence.KNOWN_EXIST)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to