ajothomas commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r787177828
##########
File path:
samza-api/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
##########
@@ -116,6 +119,60 @@ default void putAll(List<Entry<K, V>> records) {
throw new SamzaException("Not supported");
}
+ /**
+ * Asynchronously update the record with specified {@code key} and
additional arguments.
+ * This method must be thread-safe.
+ *
+ * If the update operation failed due to the an existing record missing for
the key, the implementation can return
+ * a future completed exceptionally with a {@link RecordNotFoundException}
which will
+ * allow to Put a default value if one is provided.
+ *
+ * @param key key for the table record
+ * @param update update record for the given key
+ * @return CompletableFuture for the update request
+ */
+ CompletableFuture<Void> updateAsync(K key, U update);
+
+ /**
+ * Asynchronously update the record with specified {@code key} and
additional arguments.
+ * This method must be thread-safe.
+ *
+ * If the update operation failed due to the an existing record missing for
the key, the implementation can return
+ * a future completed exceptionally with a {@link RecordNotFoundException}
which will
+ * allow to Put a default value if one is provided.
+ *
+ * @param key key for the table record
+ * @param update update record for the given key
+ * @param args additional arguments
+ * @return CompletableFuture for the update request
+ */
+ default CompletableFuture<Void> updateAsync(K key, U update, Object ...
args) {
+ throw new SamzaException("Not supported");
+ }
+
+ /**
+ * Asynchronously updates the table with {@code records} with specified
{@code keys}. This method must be thread-safe.
+ * The default implementation calls updateAsync for each entry and return a
combined future.
+ * @param records updates for the table
+ * @return CompletableFuture for the update request
+ */
+ default CompletableFuture<Void> updateAllAsync(Collection<Entry<K, U>>
records) {
+ final List<CompletableFuture<Void>> updateFutures = records.stream()
+ .map(entry -> updateAsync(entry.getKey(), entry.getValue()))
+ .collect(Collectors.toList());
+ return CompletableFuture.allOf(Iterables.toArray(updateFutures,
CompletableFuture.class));
Review comment:
Yes, bubble up the exception. quite similar to what the existing
`putAllAsync` does.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]