steveniemitz commented on a change in pull request #13802:
URL: https://github.com/apache/beam/pull/13802#discussion_r563932210
##########
File path:
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
##########
@@ -1075,6 +1090,456 @@ public WorkItemCommitRequest persistDirectly(ForKey
cache) throws IOException {
}
}
+ static class WindmillSet<K> extends SimpleWindmillState implements
SetState<K> {
+ WindmillMap<K, Boolean> windmillMap;
+
+ WindmillSet(
+ StateNamespace namespace,
+ StateTag<SetState<K>> spec,
+ String stateFamily,
+ Coder<K> keyCoder,
+ boolean isNewKey) {
+ this.windmillMap =
+ new WindmillMap<>(namespace, spec, stateFamily, keyCoder,
BooleanCoder.of(), isNewKey);
+ }
+
+ @Override
+ protected WorkItemCommitRequest persistDirectly(ForKey cache) throws
IOException {
+ return windmillMap.persistDirectly(cache);
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized ReadableState<
+ @UnknownKeyFor @NonNull @Initialized Boolean>
+ contains(K k) {
+ return windmillMap.getOrDefault(k, false);
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized ReadableState<
+ @UnknownKeyFor @NonNull @Initialized Boolean>
+ addIfAbsent(K k) {
+ return new ReadableState<Boolean>() {
+ ReadableState<Boolean> putState = windmillMap.putIfAbsent(k, true);
+
+ @Override
+ public @Nullable Boolean read() {
+ Boolean result = putState.read();
+ return (result != null) ? result : false;
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized ReadableState<Boolean>
readLater() {
+ putState = putState.readLater();
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public void remove(K k) {
+ windmillMap.remove(k);
+ }
+
+ @Override
+ public void add(K value) {
+ windmillMap.put(value, true);
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized ReadableState<
+ @UnknownKeyFor @NonNull @Initialized Boolean>
+ isEmpty() {
+ return windmillMap.isEmpty();
+ }
+
+ @Override
+ public @Nullable Iterable<K> read() {
+ return windmillMap.keys().read();
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized SetState<K> readLater() {
+ windmillMap.keys().readLater();
+ return this;
+ }
+
+ @Override
+ public void clear() {
+ windmillMap.clear();
+ }
+
+ @Override
+ void initializeForWorkItem(
+ WindmillStateReader reader, Supplier<Closeable>
scopedReadStateSupplier) {
+ windmillMap.initializeForWorkItem(reader, scopedReadStateSupplier);
+ }
+
+ @Override
+ void cleanupAfterWorkItem() {
+ windmillMap.cleanupAfterWorkItem();
+ }
+ }
+
+ static class WindmillMap<K, V> extends SimpleWindmillState implements
MapState<K, V> {
+ private final ByteString stateKeyPrefix;
+ private final String stateFamily;
+ private final Coder<K> keyCoder;
+ private final Coder<V> valueCoder;
+ private boolean complete;
+
+ // TODO(reuvenlax): Should we evict items from the cache? We would have
to make sure
+ // that anything in the cache that is not committed is not evicted.
negativeCache could be
+ // evicted whenever we want.
+ private Map<K, V> cachedValues = Maps.newHashMap();
+ private Set<K> negativeCache = Sets.newHashSet();
+ private boolean cleared = false;
+
+ private Set<K> localAdditions = Sets.newHashSet();
+ private Set<K> localRemovals = Sets.newHashSet();
+
+ WindmillMap(
+ StateNamespace namespace,
+ StateTag<?> spec,
+ String stateFamily,
+ Coder<K> keyCoder,
+ Coder<V> valueCoder,
+ boolean isNewKey) {
+ this.stateKeyPrefix = encodeKey(namespace, spec);
+ this.stateFamily = stateFamily;
+ this.keyCoder = keyCoder;
+ this.valueCoder = valueCoder;
+ this.complete = isNewKey;
+ }
+
+ private K userKeyFromProtoKey(ByteString tag) throws IOException {
+ Preconditions.checkState(tag.startsWith(stateKeyPrefix));
+ ByteString keyBytes = tag.substring(stateKeyPrefix.size());
+ return keyCoder.decode(keyBytes.newInput(), Context.OUTER);
+ }
+
+ private ByteString protoKeyFromUserKey(K key) throws IOException {
+ ByteString.Output keyStream = ByteString.newOutput();
+ stateKeyPrefix.writeTo(keyStream);
+ keyCoder.encode(key, keyStream, Context.OUTER);
+ return keyStream.toByteString();
+ }
+
+ @Override
+ protected WorkItemCommitRequest persistDirectly(ForKey cache) throws
IOException {
+ if (!cleared && localAdditions.isEmpty() && localRemovals.isEmpty()) {
+ // No changes, so return directly.
+ return WorkItemCommitRequest.newBuilder().buildPartial();
+ }
+
+ WorkItemCommitRequest.Builder commitBuilder =
WorkItemCommitRequest.newBuilder();
+
+ if (cleared) {
+ commitBuilder
+ .addTagValuePrefixDeletesBuilder()
+ .setStateFamily(stateFamily)
+ .setTagPrefix(stateKeyPrefix);
+ }
+ cleared = false;
+
+ for (K key : localAdditions) {
Review comment:
can we avoid committing keys added in `localAdditions` if they already
exist in the Map (and the values are equal)? Particularly for SetState, this
could significantly reduce the commit volume.
##########
File path:
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
##########
@@ -1075,6 +1090,456 @@ public WorkItemCommitRequest persistDirectly(ForKey
cache) throws IOException {
}
}
+ static class WindmillSet<K> extends SimpleWindmillState implements
SetState<K> {
+ WindmillMap<K, Boolean> windmillMap;
+
+ WindmillSet(
+ StateNamespace namespace,
+ StateTag<SetState<K>> spec,
+ String stateFamily,
+ Coder<K> keyCoder,
+ boolean isNewKey) {
+ this.windmillMap =
+ new WindmillMap<>(namespace, spec, stateFamily, keyCoder,
BooleanCoder.of(), isNewKey);
+ }
+
+ @Override
+ protected WorkItemCommitRequest persistDirectly(ForKey cache) throws
IOException {
+ return windmillMap.persistDirectly(cache);
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized ReadableState<
+ @UnknownKeyFor @NonNull @Initialized Boolean>
+ contains(K k) {
+ return windmillMap.getOrDefault(k, false);
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized ReadableState<
+ @UnknownKeyFor @NonNull @Initialized Boolean>
+ addIfAbsent(K k) {
+ return new ReadableState<Boolean>() {
+ ReadableState<Boolean> putState = windmillMap.putIfAbsent(k, true);
+
+ @Override
+ public @Nullable Boolean read() {
+ Boolean result = putState.read();
+ return (result != null) ? result : false;
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized ReadableState<Boolean>
readLater() {
+ putState = putState.readLater();
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public void remove(K k) {
+ windmillMap.remove(k);
+ }
+
+ @Override
+ public void add(K value) {
+ windmillMap.put(value, true);
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized ReadableState<
+ @UnknownKeyFor @NonNull @Initialized Boolean>
+ isEmpty() {
+ return windmillMap.isEmpty();
+ }
+
+ @Override
+ public @Nullable Iterable<K> read() {
+ return windmillMap.keys().read();
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized SetState<K> readLater() {
+ windmillMap.keys().readLater();
+ return this;
+ }
+
+ @Override
+ public void clear() {
+ windmillMap.clear();
+ }
+
+ @Override
+ void initializeForWorkItem(
+ WindmillStateReader reader, Supplier<Closeable>
scopedReadStateSupplier) {
+ windmillMap.initializeForWorkItem(reader, scopedReadStateSupplier);
+ }
+
+ @Override
+ void cleanupAfterWorkItem() {
+ windmillMap.cleanupAfterWorkItem();
+ }
+ }
+
+ static class WindmillMap<K, V> extends SimpleWindmillState implements
MapState<K, V> {
+ private final ByteString stateKeyPrefix;
+ private final String stateFamily;
+ private final Coder<K> keyCoder;
+ private final Coder<V> valueCoder;
+ private boolean complete;
+
+ // TODO(reuvenlax): Should we evict items from the cache? We would have
to make sure
+ // that anything in the cache that is not committed is not evicted.
negativeCache could be
+ // evicted whenever we want.
+ private Map<K, V> cachedValues = Maps.newHashMap();
+ private Set<K> negativeCache = Sets.newHashSet();
+ private boolean cleared = false;
+
+ private Set<K> localAdditions = Sets.newHashSet();
+ private Set<K> localRemovals = Sets.newHashSet();
+
+ WindmillMap(
+ StateNamespace namespace,
+ StateTag<?> spec,
+ String stateFamily,
+ Coder<K> keyCoder,
+ Coder<V> valueCoder,
+ boolean isNewKey) {
+ this.stateKeyPrefix = encodeKey(namespace, spec);
+ this.stateFamily = stateFamily;
+ this.keyCoder = keyCoder;
+ this.valueCoder = valueCoder;
+ this.complete = isNewKey;
+ }
+
+ private K userKeyFromProtoKey(ByteString tag) throws IOException {
+ Preconditions.checkState(tag.startsWith(stateKeyPrefix));
+ ByteString keyBytes = tag.substring(stateKeyPrefix.size());
+ return keyCoder.decode(keyBytes.newInput(), Context.OUTER);
+ }
+
+ private ByteString protoKeyFromUserKey(K key) throws IOException {
+ ByteString.Output keyStream = ByteString.newOutput();
+ stateKeyPrefix.writeTo(keyStream);
+ keyCoder.encode(key, keyStream, Context.OUTER);
+ return keyStream.toByteString();
+ }
+
+ @Override
+ protected WorkItemCommitRequest persistDirectly(ForKey cache) throws
IOException {
+ if (!cleared && localAdditions.isEmpty() && localRemovals.isEmpty()) {
+ // No changes, so return directly.
+ return WorkItemCommitRequest.newBuilder().buildPartial();
+ }
+
+ WorkItemCommitRequest.Builder commitBuilder =
WorkItemCommitRequest.newBuilder();
+
+ if (cleared) {
+ commitBuilder
+ .addTagValuePrefixDeletesBuilder()
+ .setStateFamily(stateFamily)
+ .setTagPrefix(stateKeyPrefix);
+ }
+ cleared = false;
+
+ for (K key : localAdditions) {
Review comment:
can we avoid committing keys added in `localAdditions` if they already
exist in the cached Map (and the values are equal)? Particularly for SetState,
this could significantly reduce the commit volume.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]