zhengbuqian commented on code in PR #23492:
URL: https://github.com/apache/beam/pull/23492#discussion_r1090262269
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -1721,20 +1729,20 @@ public Iterable<V> read() {
return Collections.emptyList();
}
- if (localRemovals.contains(structuralKey)) {
+ if (keyState.removedLocally) {
// this key has been removed locally but the removal hasn't been
sent to windmill,
// thus values in windmill(if any) are obsolete, and we only care
about local values.
- return
Iterables.unmodifiableIterable(localAdditions.get(structuralKey));
+ return Iterables.unmodifiableIterable(keyState.localAdditions);
}
if (keyState.valuesCached || complete) {
return Iterables.unmodifiableIterable(
- Iterables.concat(keyState.values,
localAdditions.get(structuralKey)));
+ Iterables.concat(keyState.values, keyState.localAdditions));
}
Future<Iterable<V>> persistedData = getFutureForKey(key);
try (Closeable scope = scopedReadState()) {
final Iterable<V> persistedValues = persistedData.get();
if (Iterables.isEmpty(persistedValues)) {
- Collection<V> local = localAdditions.get(structuralKey);
+ Collection<V> local = keyState.localAdditions;
if (local.isEmpty()) {
// empty in both cache and windmill, remove key from cache.
keyStateMap.remove(structuralKey);
Review Comment:
Done.
In the current impl, get on non exist keys leaves tombstones in keyStateMap,
and those will be cleared in `persistDirectly`. But note that if the user never
add/remove from the multimap, `persistDirectly` will not try to clear those
tombstones, because we iterate through the keys only if `(hasLocalRemovals ||
hasLocalAdditions)`, so if the user never add/remove and keep `get()` on
different nonexistent keys, the memory could grow unbound. I think this is a
rare edge case that we can live with, wdyt?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -1771,52 +1779,54 @@ public ReadableState<Iterable<V>> readLater() {
@Override
protected WorkItemCommitRequest
persistDirectly(WindmillStateCache.ForKeyAndFamily cache)
throws IOException {
- if (!cleared && localAdditions.isEmpty() && localRemovals.isEmpty()) {
+ if (!cleared && !hasLocalAdditions && !hasLocalRemovals) {
+ cache.put(namespace, address, this, 1);
return WorkItemCommitRequest.newBuilder().buildPartial();
}
WorkItemCommitRequest.Builder commitBuilder =
WorkItemCommitRequest.newBuilder();
- Windmill.TagMultimapUpdateRequest.Builder builder = null;
+ Windmill.TagMultimapUpdateRequest.Builder builder =
commitBuilder.addMultimapUpdatesBuilder();
+ builder.setTag(stateKey).setStateFamily(stateFamily);
+
if (cleared) {
- builder = commitBuilder.addMultimapUpdatesBuilder();
builder.setDeleteAll(true);
- cleared = false;
- }
- Set<Object> keysWithUpdates = Sets.newHashSet();
- keysWithUpdates.addAll(localRemovals);
- keysWithUpdates.addAll(localAdditions.keySet());
- if (!keysWithUpdates.isEmpty() && builder == null) {
- builder = commitBuilder.addMultimapUpdatesBuilder();
- builder.setTag(stateKey).setStateFamily(stateFamily);
}
- for (Object structuralKey : keysWithUpdates) {
- KeyState keyState = keyStateMap.get(structuralKey);
+ if (hasLocalRemovals || hasLocalAdditions) {
ByteStringOutputStream keyStream = new ByteStringOutputStream();
- keyCoder.encode(keyState.originalKey, keyStream);
- ByteString encodedKey = keyStream.toByteString();
- Windmill.TagMultimapEntry.Builder entryBuilder =
builder.addUpdatesBuilder();
- entryBuilder.setEntryName(encodedKey);
- entryBuilder.setDeleteAll(localRemovals.contains(structuralKey));
- for (V value : localAdditions.get(structuralKey)) {
- ByteStringOutputStream valueStream = new ByteStringOutputStream();
- valueCoder.encode(value, valueStream);
- ByteString encodedValue = valueStream.toByteString();
- entryBuilder.addValues(encodedValue);
- }
- // Move newly added values from localAdditions to cachedEntries as
those new values now are
- // also persisted in Windmill. If a key now has no more values and is
not KNOWN_EXIST,
- // remove it from cache.
- if (keyState.valuesCached) {
- keyState.values.extendWith(localAdditions.get(structuralKey));
- } else {
- if (keyState.existence != KeyExistence.KNOWN_EXIST)
keyStateMap.remove(structuralKey);
+ ByteStringOutputStream valueStream = new ByteStringOutputStream();
+ List<Object> keyStateToBeRemoved = Lists.newArrayList();
+ for (Entry<Object, KeyState> entry : keyStateMap.entrySet()) {
+ KeyState keyState = entry.getValue();
+ if (!keyState.removedLocally && keyState.localAdditions.isEmpty())
continue;
+ keyCoder.encode(keyState.originalKey, keyStream);
+ ByteString encodedKey = keyStream.toByteStringAndReset();
+ Windmill.TagMultimapEntry.Builder entryBuilder =
builder.addUpdatesBuilder();
+ entryBuilder.setEntryName(encodedKey);
+ entryBuilder.setDeleteAll(keyState.removedLocally);
+ keyState.removedLocally = false;
+ for (V value : keyState.localAdditions) {
+ valueCoder.encode(value, valueStream);
+ ByteString encodedValue = valueStream.toByteStringAndReset();
+ entryBuilder.addValues(encodedValue);
+ }
+ // Move newly added values from localAdditions to keyState.values as
those new values now
+ // are also persisted in Windmill. If a key now has no more values
and is not KNOWN_EXIST,
+ // remove it from cache.
+ if (keyState.valuesCached) {
+ keyState.values.extendWith(keyState.localAdditions);
+ } else {
+ if (keyState.existence != KeyExistence.KNOWN_EXIST)
+ keyStateToBeRemoved.add(entry.getKey());
+ }
+ keyState.localAdditions = Lists.newArrayList();
Review Comment:
Done
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -1937,11 +1946,10 @@ public ReadableState<Iterable<K>> readLater() {
private MultimapIterables<K, V> mergedCachedEntries() {
MultimapIterables<K, V> result = new MultimapIterables<>();
- for (Entry<Object, Collection<V>> entry :
localAdditions.asMap().entrySet()) {
- K key = keyStateMap.get(entry.getKey()).originalKey;
- result.extendWith(key, entry.getValue());
- }
for (Entry<Object, KeyState> entry : keyStateMap.entrySet()) {
+ if (!entry.getValue().localAdditions.isEmpty()) {
Review Comment:
Done
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -2029,7 +2037,7 @@ public Iterable<Entry<K, V>> read() {
.removeIf(
entry ->
entry.getValue().existence ==
KeyExistence.KNOWN_NONEXISTENT
- && !localRemovals.contains(entry.getKey()));
+ && !entry.getValue().removedLocally);
return Iterables.unmodifiableIterable(mergedCachedEntries());
} else {
MultimapIterables<K, V> local = mergedCachedEntries();
Review Comment:
Ah that makes a lot of sense. Updated the code so that if entries is not
weighted, it will not be evaluated immediately.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -1771,52 +1779,54 @@ public ReadableState<Iterable<V>> readLater() {
@Override
protected WorkItemCommitRequest
persistDirectly(WindmillStateCache.ForKeyAndFamily cache)
throws IOException {
- if (!cleared && localAdditions.isEmpty() && localRemovals.isEmpty()) {
+ if (!cleared && !hasLocalAdditions && !hasLocalRemovals) {
+ cache.put(namespace, address, this, 1);
return WorkItemCommitRequest.newBuilder().buildPartial();
}
WorkItemCommitRequest.Builder commitBuilder =
WorkItemCommitRequest.newBuilder();
- Windmill.TagMultimapUpdateRequest.Builder builder = null;
+ Windmill.TagMultimapUpdateRequest.Builder builder =
commitBuilder.addMultimapUpdatesBuilder();
+ builder.setTag(stateKey).setStateFamily(stateFamily);
+
if (cleared) {
- builder = commitBuilder.addMultimapUpdatesBuilder();
builder.setDeleteAll(true);
- cleared = false;
- }
- Set<Object> keysWithUpdates = Sets.newHashSet();
- keysWithUpdates.addAll(localRemovals);
- keysWithUpdates.addAll(localAdditions.keySet());
- if (!keysWithUpdates.isEmpty() && builder == null) {
- builder = commitBuilder.addMultimapUpdatesBuilder();
- builder.setTag(stateKey).setStateFamily(stateFamily);
}
- for (Object structuralKey : keysWithUpdates) {
- KeyState keyState = keyStateMap.get(structuralKey);
+ if (hasLocalRemovals || hasLocalAdditions) {
ByteStringOutputStream keyStream = new ByteStringOutputStream();
- keyCoder.encode(keyState.originalKey, keyStream);
- ByteString encodedKey = keyStream.toByteString();
- Windmill.TagMultimapEntry.Builder entryBuilder =
builder.addUpdatesBuilder();
- entryBuilder.setEntryName(encodedKey);
- entryBuilder.setDeleteAll(localRemovals.contains(structuralKey));
- for (V value : localAdditions.get(structuralKey)) {
- ByteStringOutputStream valueStream = new ByteStringOutputStream();
- valueCoder.encode(value, valueStream);
- ByteString encodedValue = valueStream.toByteString();
- entryBuilder.addValues(encodedValue);
- }
- // Move newly added values from localAdditions to cachedEntries as
those new values now are
- // also persisted in Windmill. If a key now has no more values and is
not KNOWN_EXIST,
- // remove it from cache.
- if (keyState.valuesCached) {
- keyState.values.extendWith(localAdditions.get(structuralKey));
- } else {
- if (keyState.existence != KeyExistence.KNOWN_EXIST)
keyStateMap.remove(structuralKey);
+ ByteStringOutputStream valueStream = new ByteStringOutputStream();
+ List<Object> keyStateToBeRemoved = Lists.newArrayList();
+ for (Entry<Object, KeyState> entry : keyStateMap.entrySet()) {
+ KeyState keyState = entry.getValue();
+ if (!keyState.removedLocally && keyState.localAdditions.isEmpty())
continue;
+ keyCoder.encode(keyState.originalKey, keyStream);
+ ByteString encodedKey = keyStream.toByteStringAndReset();
+ Windmill.TagMultimapEntry.Builder entryBuilder =
builder.addUpdatesBuilder();
+ entryBuilder.setEntryName(encodedKey);
+ entryBuilder.setDeleteAll(keyState.removedLocally);
+ keyState.removedLocally = false;
+ for (V value : keyState.localAdditions) {
+ valueCoder.encode(value, valueStream);
+ ByteString encodedValue = valueStream.toByteStringAndReset();
+ entryBuilder.addValues(encodedValue);
+ }
+ // Move newly added values from localAdditions to keyState.values as
those new values now
+ // are also persisted in Windmill. If a key now has no more values
and is not KNOWN_EXIST,
+ // remove it from cache.
+ if (keyState.valuesCached) {
+ keyState.values.extendWith(keyState.localAdditions);
+ } else {
+ if (keyState.existence != KeyExistence.KNOWN_EXIST)
+ keyStateToBeRemoved.add(entry.getKey());
+ }
+ keyState.localAdditions = Lists.newArrayList();
}
+ for (Object key : keyStateToBeRemoved) keyStateMap.remove(key);
Review Comment:
Done
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -1830,22 +1840,21 @@ public void remove(K key) {
}
if (keyState.valuesCached || !complete) {
// there may be data in windmill that need to be removed.
- localRemovals.add(structuralKey);
+ hasLocalRemovals = true;
+ keyState.removedLocally = true;
keyState.values = new ConcatIterables<>();
keyState.valuesCached = false;
keyState.existence = KeyExistence.KNOWN_NONEXISTENT;
} else {
// no data in windmill, deleting from local cache is sufficient.
keyStateMap.remove(structuralKey);
}
- localAdditions.removeAll(structuralKey);
+ keyState.localAdditions = Lists.newArrayList();
Review Comment:
Done
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -1771,52 +1779,54 @@ public ReadableState<Iterable<V>> readLater() {
@Override
protected WorkItemCommitRequest
persistDirectly(WindmillStateCache.ForKeyAndFamily cache)
throws IOException {
- if (!cleared && localAdditions.isEmpty() && localRemovals.isEmpty()) {
+ if (!cleared && !hasLocalAdditions && !hasLocalRemovals) {
+ cache.put(namespace, address, this, 1);
return WorkItemCommitRequest.newBuilder().buildPartial();
}
WorkItemCommitRequest.Builder commitBuilder =
WorkItemCommitRequest.newBuilder();
- Windmill.TagMultimapUpdateRequest.Builder builder = null;
+ Windmill.TagMultimapUpdateRequest.Builder builder =
commitBuilder.addMultimapUpdatesBuilder();
+ builder.setTag(stateKey).setStateFamily(stateFamily);
+
if (cleared) {
- builder = commitBuilder.addMultimapUpdatesBuilder();
builder.setDeleteAll(true);
- cleared = false;
- }
- Set<Object> keysWithUpdates = Sets.newHashSet();
- keysWithUpdates.addAll(localRemovals);
- keysWithUpdates.addAll(localAdditions.keySet());
- if (!keysWithUpdates.isEmpty() && builder == null) {
- builder = commitBuilder.addMultimapUpdatesBuilder();
- builder.setTag(stateKey).setStateFamily(stateFamily);
}
- for (Object structuralKey : keysWithUpdates) {
- KeyState keyState = keyStateMap.get(structuralKey);
+ if (hasLocalRemovals || hasLocalAdditions) {
ByteStringOutputStream keyStream = new ByteStringOutputStream();
- keyCoder.encode(keyState.originalKey, keyStream);
- ByteString encodedKey = keyStream.toByteString();
- Windmill.TagMultimapEntry.Builder entryBuilder =
builder.addUpdatesBuilder();
- entryBuilder.setEntryName(encodedKey);
- entryBuilder.setDeleteAll(localRemovals.contains(structuralKey));
- for (V value : localAdditions.get(structuralKey)) {
- ByteStringOutputStream valueStream = new ByteStringOutputStream();
- valueCoder.encode(value, valueStream);
- ByteString encodedValue = valueStream.toByteString();
- entryBuilder.addValues(encodedValue);
- }
- // Move newly added values from localAdditions to cachedEntries as
those new values now are
- // also persisted in Windmill. If a key now has no more values and is
not KNOWN_EXIST,
- // remove it from cache.
- if (keyState.valuesCached) {
- keyState.values.extendWith(localAdditions.get(structuralKey));
- } else {
- if (keyState.existence != KeyExistence.KNOWN_EXIST)
keyStateMap.remove(structuralKey);
+ ByteStringOutputStream valueStream = new ByteStringOutputStream();
+ List<Object> keyStateToBeRemoved = Lists.newArrayList();
+ for (Entry<Object, KeyState> entry : keyStateMap.entrySet()) {
+ KeyState keyState = entry.getValue();
+ if (!keyState.removedLocally && keyState.localAdditions.isEmpty())
continue;
+ keyCoder.encode(keyState.originalKey, keyStream);
+ ByteString encodedKey = keyStream.toByteStringAndReset();
+ Windmill.TagMultimapEntry.Builder entryBuilder =
builder.addUpdatesBuilder();
+ entryBuilder.setEntryName(encodedKey);
+ entryBuilder.setDeleteAll(keyState.removedLocally);
+ keyState.removedLocally = false;
+ for (V value : keyState.localAdditions) {
+ valueCoder.encode(value, valueStream);
+ ByteString encodedValue = valueStream.toByteStringAndReset();
+ entryBuilder.addValues(encodedValue);
+ }
+ // Move newly added values from localAdditions to keyState.values as
those new values now
+ // are also persisted in Windmill. If a key now has no more values
and is not KNOWN_EXIST,
+ // remove it from cache.
+ if (keyState.valuesCached) {
+ keyState.values.extendWith(keyState.localAdditions);
+ } else {
+ if (keyState.existence != KeyExistence.KNOWN_EXIST)
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]