robertwb commented on a change in pull request #15983:
URL: https://github.com/apache/beam/pull/15983#discussion_r751528307
##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -1151,10 +1151,21 @@ def extend(
# When a corrupt value made it into the cache, we have to fail.
raise Exception("Unexpected cached value: %s" % cached_value)
# Write to state handler
+ futures = []
out = coder_impl.create_OutputStream()
for element in elements:
coder.encode_to_stream(element, out, True)
- return self._underlying.append_raw(state_key, out.get())
+ if out.size() > data_plane._DEFAULT_SIZE_FLUSH_THRESHOLD:
+ futures.append(self._underlying.append_raw(state_key, out.get()))
+ out = coder_impl.create_OutputStream()
+ if out.size():
+ futures.append(self._underlying.append_raw(state_key, out.get()))
+ return _DeferredCall(
Review comment:
I thought about that, but I think the savings is negligible and this
means that the base case exercises the same code path as the exceptional one.
##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -1151,10 +1151,21 @@ def extend(
# When a corrupt value made it into the cache, we have to fail.
raise Exception("Unexpected cached value: %s" % cached_value)
# Write to state handler
+ futures = []
out = coder_impl.create_OutputStream()
for element in elements:
coder.encode_to_stream(element, out, True)
- return self._underlying.append_raw(state_key, out.get())
+ if out.size() > data_plane._DEFAULT_SIZE_FLUSH_THRESHOLD:
Review comment:
There is not (in general, depends on the coder) a cheap way to get the
size of the encoded element before actually encoding it. Given that the flush
limit << the strict limit, I don't think this delta is worth optimizing for.
One is already in risky territory getting close to the strict limit anyway.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]