scwhittle commented on code in PR #23492:
URL: https://github.com/apache/beam/pull/23492#discussion_r1071903078
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -1721,20 +1729,20 @@ public Iterable<V> read() {
return Collections.emptyList();
}
- if (localRemovals.contains(structuralKey)) {
+ if (keyState.removedLocally) {
// this key has been removed locally but the removal hasn't been
sent to windmill,
// thus values in windmill(if any) are obsolete, and we only care
about local values.
- return
Iterables.unmodifiableIterable(localAdditions.get(structuralKey));
+ return Iterables.unmodifiableIterable(keyState.localAdditions);
}
if (keyState.valuesCached || complete) {
return Iterables.unmodifiableIterable(
- Iterables.concat(keyState.values,
localAdditions.get(structuralKey)));
+ Iterables.concat(keyState.values, keyState.localAdditions));
}
Future<Iterable<V>> persistedData = getFutureForKey(key);
try (Closeable scope = scopedReadState()) {
final Iterable<V> persistedValues = persistedData.get();
if (Iterables.isEmpty(persistedValues)) {
- Collection<V> local = localAdditions.get(structuralKey);
+ Collection<V> local = keyState.localAdditions;
if (local.isEmpty()) {
// empty in both cache and windmill, remove key from cache.
keyStateMap.remove(structuralKey);
Review Comment:
Instead of removing, it seems we could remember that it is empty to avoid
future reads.
Could the decision of removing it be deferred to when we move it to the
cross-bundle cache? We could reduce memory usage for empty keys at that point.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -1771,52 +1779,54 @@ public ReadableState<Iterable<V>> readLater() {
@Override
protected WorkItemCommitRequest
persistDirectly(WindmillStateCache.ForKeyAndFamily cache)
throws IOException {
- if (!cleared && localAdditions.isEmpty() && localRemovals.isEmpty()) {
+ if (!cleared && !hasLocalAdditions && !hasLocalRemovals) {
+ cache.put(namespace, address, this, 1);
return WorkItemCommitRequest.newBuilder().buildPartial();
}
WorkItemCommitRequest.Builder commitBuilder =
WorkItemCommitRequest.newBuilder();
- Windmill.TagMultimapUpdateRequest.Builder builder = null;
+ Windmill.TagMultimapUpdateRequest.Builder builder =
commitBuilder.addMultimapUpdatesBuilder();
+ builder.setTag(stateKey).setStateFamily(stateFamily);
+
if (cleared) {
- builder = commitBuilder.addMultimapUpdatesBuilder();
builder.setDeleteAll(true);
- cleared = false;
- }
- Set<Object> keysWithUpdates = Sets.newHashSet();
- keysWithUpdates.addAll(localRemovals);
- keysWithUpdates.addAll(localAdditions.keySet());
- if (!keysWithUpdates.isEmpty() && builder == null) {
- builder = commitBuilder.addMultimapUpdatesBuilder();
- builder.setTag(stateKey).setStateFamily(stateFamily);
}
- for (Object structuralKey : keysWithUpdates) {
- KeyState keyState = keyStateMap.get(structuralKey);
+ if (hasLocalRemovals || hasLocalAdditions) {
ByteStringOutputStream keyStream = new ByteStringOutputStream();
- keyCoder.encode(keyState.originalKey, keyStream);
- ByteString encodedKey = keyStream.toByteString();
- Windmill.TagMultimapEntry.Builder entryBuilder =
builder.addUpdatesBuilder();
- entryBuilder.setEntryName(encodedKey);
- entryBuilder.setDeleteAll(localRemovals.contains(structuralKey));
- for (V value : localAdditions.get(structuralKey)) {
- ByteStringOutputStream valueStream = new ByteStringOutputStream();
- valueCoder.encode(value, valueStream);
- ByteString encodedValue = valueStream.toByteString();
- entryBuilder.addValues(encodedValue);
- }
- // Move newly added values from localAdditions to cachedEntries as
those new values now are
- // also persisted in Windmill. If a key now has no more values and is
not KNOWN_EXIST,
- // remove it from cache.
- if (keyState.valuesCached) {
- keyState.values.extendWith(localAdditions.get(structuralKey));
- } else {
- if (keyState.existence != KeyExistence.KNOWN_EXIST)
keyStateMap.remove(structuralKey);
+ ByteStringOutputStream valueStream = new ByteStringOutputStream();
+ List<Object> keyStateToBeRemoved = Lists.newArrayList();
+ for (Entry<Object, KeyState> entry : keyStateMap.entrySet()) {
+ KeyState keyState = entry.getValue();
+ if (!keyState.removedLocally && keyState.localAdditions.isEmpty())
continue;
+ keyCoder.encode(keyState.originalKey, keyStream);
+ ByteString encodedKey = keyStream.toByteStringAndReset();
+ Windmill.TagMultimapEntry.Builder entryBuilder =
builder.addUpdatesBuilder();
+ entryBuilder.setEntryName(encodedKey);
+ entryBuilder.setDeleteAll(keyState.removedLocally);
+ keyState.removedLocally = false;
+ for (V value : keyState.localAdditions) {
+ valueCoder.encode(value, valueStream);
+ ByteString encodedValue = valueStream.toByteStringAndReset();
+ entryBuilder.addValues(encodedValue);
+ }
+ // Move newly added values from localAdditions to keyState.values as
those new values now
+ // are also persisted in Windmill. If a key now has no more values
and is not KNOWN_EXIST,
+ // remove it from cache.
+ if (keyState.valuesCached) {
+ keyState.values.extendWith(keyState.localAdditions);
+ } else {
+ if (keyState.existence != KeyExistence.KNOWN_EXIST)
+ keyStateToBeRemoved.add(entry.getKey());
+ }
+ keyState.localAdditions = Lists.newArrayList();
Review Comment:
// Create a new localAdditions so that the cached values are unaffected.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -1830,22 +1840,21 @@ public void remove(K key) {
}
if (keyState.valuesCached || !complete) {
// there may be data in windmill that need to be removed.
- localRemovals.add(structuralKey);
+ hasLocalRemovals = true;
+ keyState.removedLocally = true;
keyState.values = new ConcatIterables<>();
keyState.valuesCached = false;
keyState.existence = KeyExistence.KNOWN_NONEXISTENT;
} else {
// no data in windmill, deleting from local cache is sufficient.
keyStateMap.remove(structuralKey);
}
- localAdditions.removeAll(structuralKey);
+ keyState.localAdditions = Lists.newArrayList();
Review Comment:
could do this only if non-empty
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -1771,52 +1779,54 @@ public ReadableState<Iterable<V>> readLater() {
@Override
protected WorkItemCommitRequest
persistDirectly(WindmillStateCache.ForKeyAndFamily cache)
throws IOException {
- if (!cleared && localAdditions.isEmpty() && localRemovals.isEmpty()) {
+ if (!cleared && !hasLocalAdditions && !hasLocalRemovals) {
+ cache.put(namespace, address, this, 1);
return WorkItemCommitRequest.newBuilder().buildPartial();
}
WorkItemCommitRequest.Builder commitBuilder =
WorkItemCommitRequest.newBuilder();
- Windmill.TagMultimapUpdateRequest.Builder builder = null;
+ Windmill.TagMultimapUpdateRequest.Builder builder =
commitBuilder.addMultimapUpdatesBuilder();
+ builder.setTag(stateKey).setStateFamily(stateFamily);
+
if (cleared) {
- builder = commitBuilder.addMultimapUpdatesBuilder();
builder.setDeleteAll(true);
- cleared = false;
- }
- Set<Object> keysWithUpdates = Sets.newHashSet();
- keysWithUpdates.addAll(localRemovals);
- keysWithUpdates.addAll(localAdditions.keySet());
- if (!keysWithUpdates.isEmpty() && builder == null) {
- builder = commitBuilder.addMultimapUpdatesBuilder();
- builder.setTag(stateKey).setStateFamily(stateFamily);
}
- for (Object structuralKey : keysWithUpdates) {
- KeyState keyState = keyStateMap.get(structuralKey);
+ if (hasLocalRemovals || hasLocalAdditions) {
ByteStringOutputStream keyStream = new ByteStringOutputStream();
- keyCoder.encode(keyState.originalKey, keyStream);
- ByteString encodedKey = keyStream.toByteString();
- Windmill.TagMultimapEntry.Builder entryBuilder =
builder.addUpdatesBuilder();
- entryBuilder.setEntryName(encodedKey);
- entryBuilder.setDeleteAll(localRemovals.contains(structuralKey));
- for (V value : localAdditions.get(structuralKey)) {
- ByteStringOutputStream valueStream = new ByteStringOutputStream();
- valueCoder.encode(value, valueStream);
- ByteString encodedValue = valueStream.toByteString();
- entryBuilder.addValues(encodedValue);
- }
- // Move newly added values from localAdditions to cachedEntries as
those new values now are
- // also persisted in Windmill. If a key now has no more values and is
not KNOWN_EXIST,
- // remove it from cache.
- if (keyState.valuesCached) {
- keyState.values.extendWith(localAdditions.get(structuralKey));
- } else {
- if (keyState.existence != KeyExistence.KNOWN_EXIST)
keyStateMap.remove(structuralKey);
+ ByteStringOutputStream valueStream = new ByteStringOutputStream();
+ List<Object> keyStateToBeRemoved = Lists.newArrayList();
+ for (Entry<Object, KeyState> entry : keyStateMap.entrySet()) {
+ KeyState keyState = entry.getValue();
+ if (!keyState.removedLocally && keyState.localAdditions.isEmpty())
continue;
+ keyCoder.encode(keyState.originalKey, keyStream);
+ ByteString encodedKey = keyStream.toByteStringAndReset();
+ Windmill.TagMultimapEntry.Builder entryBuilder =
builder.addUpdatesBuilder();
+ entryBuilder.setEntryName(encodedKey);
+ entryBuilder.setDeleteAll(keyState.removedLocally);
+ keyState.removedLocally = false;
+ for (V value : keyState.localAdditions) {
+ valueCoder.encode(value, valueStream);
+ ByteString encodedValue = valueStream.toByteStringAndReset();
+ entryBuilder.addValues(encodedValue);
+ }
+ // Move newly added values from localAdditions to keyState.values as
those new values now
+ // are also persisted in Windmill. If a key now has no more values
and is not KNOWN_EXIST,
+ // remove it from cache.
+ if (keyState.valuesCached) {
+ keyState.values.extendWith(keyState.localAdditions);
+ } else {
+ if (keyState.existence != KeyExistence.KNOWN_EXIST)
Review Comment:
nit: combine into else if
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -1771,52 +1779,54 @@ public ReadableState<Iterable<V>> readLater() {
@Override
protected WorkItemCommitRequest
persistDirectly(WindmillStateCache.ForKeyAndFamily cache)
throws IOException {
- if (!cleared && localAdditions.isEmpty() && localRemovals.isEmpty()) {
+ if (!cleared && !hasLocalAdditions && !hasLocalRemovals) {
+ cache.put(namespace, address, this, 1);
return WorkItemCommitRequest.newBuilder().buildPartial();
}
WorkItemCommitRequest.Builder commitBuilder =
WorkItemCommitRequest.newBuilder();
- Windmill.TagMultimapUpdateRequest.Builder builder = null;
+ Windmill.TagMultimapUpdateRequest.Builder builder =
commitBuilder.addMultimapUpdatesBuilder();
+ builder.setTag(stateKey).setStateFamily(stateFamily);
+
if (cleared) {
- builder = commitBuilder.addMultimapUpdatesBuilder();
builder.setDeleteAll(true);
- cleared = false;
- }
- Set<Object> keysWithUpdates = Sets.newHashSet();
- keysWithUpdates.addAll(localRemovals);
- keysWithUpdates.addAll(localAdditions.keySet());
- if (!keysWithUpdates.isEmpty() && builder == null) {
- builder = commitBuilder.addMultimapUpdatesBuilder();
- builder.setTag(stateKey).setStateFamily(stateFamily);
}
- for (Object structuralKey : keysWithUpdates) {
- KeyState keyState = keyStateMap.get(structuralKey);
+ if (hasLocalRemovals || hasLocalAdditions) {
ByteStringOutputStream keyStream = new ByteStringOutputStream();
- keyCoder.encode(keyState.originalKey, keyStream);
- ByteString encodedKey = keyStream.toByteString();
- Windmill.TagMultimapEntry.Builder entryBuilder =
builder.addUpdatesBuilder();
- entryBuilder.setEntryName(encodedKey);
- entryBuilder.setDeleteAll(localRemovals.contains(structuralKey));
- for (V value : localAdditions.get(structuralKey)) {
- ByteStringOutputStream valueStream = new ByteStringOutputStream();
- valueCoder.encode(value, valueStream);
- ByteString encodedValue = valueStream.toByteString();
- entryBuilder.addValues(encodedValue);
- }
- // Move newly added values from localAdditions to cachedEntries as
those new values now are
- // also persisted in Windmill. If a key now has no more values and is
not KNOWN_EXIST,
- // remove it from cache.
- if (keyState.valuesCached) {
- keyState.values.extendWith(localAdditions.get(structuralKey));
- } else {
- if (keyState.existence != KeyExistence.KNOWN_EXIST)
keyStateMap.remove(structuralKey);
+ ByteStringOutputStream valueStream = new ByteStringOutputStream();
+ List<Object> keyStateToBeRemoved = Lists.newArrayList();
+ for (Entry<Object, KeyState> entry : keyStateMap.entrySet()) {
+ KeyState keyState = entry.getValue();
+ if (!keyState.removedLocally && keyState.localAdditions.isEmpty())
continue;
+ keyCoder.encode(keyState.originalKey, keyStream);
+ ByteString encodedKey = keyStream.toByteStringAndReset();
+ Windmill.TagMultimapEntry.Builder entryBuilder =
builder.addUpdatesBuilder();
+ entryBuilder.setEntryName(encodedKey);
+ entryBuilder.setDeleteAll(keyState.removedLocally);
+ keyState.removedLocally = false;
+ for (V value : keyState.localAdditions) {
+ valueCoder.encode(value, valueStream);
+ ByteString encodedValue = valueStream.toByteStringAndReset();
+ entryBuilder.addValues(encodedValue);
+ }
+ // Move newly added values from localAdditions to keyState.values as
those new values now
+ // are also persisted in Windmill. If a key now has no more values
and is not KNOWN_EXIST,
+ // remove it from cache.
+ if (keyState.valuesCached) {
+ keyState.values.extendWith(keyState.localAdditions);
+ } else {
+ if (keyState.existence != KeyExistence.KNOWN_EXIST)
+ keyStateToBeRemoved.add(entry.getKey());
+ }
+ keyState.localAdditions = Lists.newArrayList();
}
+ for (Object key : keyStateToBeRemoved) keyStateMap.remove(key);
Review Comment:
by using a raw iterator to the entrySet and you can do iterator.remove()
instead of having a separate structure and loop to remove.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -1937,11 +1946,10 @@ public ReadableState<Iterable<K>> readLater() {
private MultimapIterables<K, V> mergedCachedEntries() {
MultimapIterables<K, V> result = new MultimapIterables<>();
- for (Entry<Object, Collection<V>> entry :
localAdditions.asMap().entrySet()) {
- K key = keyStateMap.get(entry.getKey()).originalKey;
- result.extendWith(key, entry.getValue());
- }
for (Entry<Object, KeyState> entry : keyStateMap.entrySet()) {
+ if (!entry.getValue().localAdditions.isEmpty()) {
Review Comment:
nit: have a
KeyState keyState = entry.getValue();
to help readability
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java:
##########
@@ -2029,7 +2037,7 @@ public Iterable<Entry<K, V>> read() {
.removeIf(
entry ->
entry.getValue().existence ==
KeyExistence.KNOWN_NONEXISTENT
- && !localRemovals.contains(entry.getKey()));
+ && !entry.getValue().removedLocally);
return Iterables.unmodifiableIterable(mergedCachedEntries());
} else {
MultimapIterables<K, V> local = mergedCachedEntries();
Review Comment:
I'm worried about memory usage in this case. If entries is not weighted it
means it is a paginating iterable to windmill IIUC. But the for-each above on
2003 (in new version) is going to read the full iterable and add all the keys
to the keyStateMap. It seems like a new iterator should be returned that wraps
the existing iterable so that it can lazily do that as iteration occurs
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]