m-trieu commented on code in PR #28755:
URL: https://github.com/apache/beam/pull/28755#discussion_r1362745211
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -292,41 +213,54 @@ protected SideInputReader getSideInputReaderForViews(
* Fetches the requested sideInput, and maintains a view of the cache that
doesn't remove items
* until the active work item is finished.
*
- * <p>If the side input was not ready, throws {@code IllegalStateException}
if the state is
- * {@literal CACHED_IN_WORKITEM} or returns null otherwise.
- *
- * <p>If the side input was ready and null, returns {@literal
Optional.absent()}. If the side
- * input was ready and non-null returns {@literal Optional.present(...)}.
+ * <p>If the side input cached, throws {@code IllegalStateException} if the
state is {@literal
+ * CACHED_IN_WORK_ITEM} or returns {@link SideInput<T>} which contains
{@link Optional<T>}.
*/
- private @Nullable <T> Optional<T> fetchSideInput(
+ @SuppressWarnings({"deprecation", "unchecked"})
+ private <T> SideInput<T> fetchSideInput(
+ PCollectionView<T> view,
+ BoundedWindow sideInputWindow,
+ @Nullable String stateFamily,
+ SideInputState state,
+ @Nullable Supplier<Closeable> scopedReadStateSupplier) {
+ Map<BoundedWindow, SideInput<?>> tagCache =
+ sideInputCache.computeIfAbsent(view.getTagInternal(), k -> new
HashMap<>());
+
+ Optional<SideInput<T>> seenSideInput =
+ Optional.ofNullable((SideInput<T>) tagCache.get(sideInputWindow));
+
+ if (state == SideInputState.CACHED_IN_WORK_ITEM &&
!seenSideInput.isPresent()) {
+ throw new IllegalStateException(
+ "Expected side input to be cached. Tag: " +
view.getTagInternal().getId());
+ }
+
+ return seenSideInput.orElseGet(
Review Comment:
done.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -292,41 +213,54 @@ protected SideInputReader getSideInputReaderForViews(
* Fetches the requested sideInput, and maintains a view of the cache that
doesn't remove items
* until the active work item is finished.
*
- * <p>If the side input was not ready, throws {@code IllegalStateException}
if the state is
- * {@literal CACHED_IN_WORKITEM} or returns null otherwise.
- *
- * <p>If the side input was ready and null, returns {@literal
Optional.absent()}. If the side
- * input was ready and non-null returns {@literal Optional.present(...)}.
+ * <p>If the side input cached, throws {@code IllegalStateException} if the
state is {@literal
+ * CACHED_IN_WORK_ITEM} or returns {@link SideInput<T>} which contains
{@link Optional<T>}.
*/
- private @Nullable <T> Optional<T> fetchSideInput(
+ @SuppressWarnings({"deprecation", "unchecked"})
+ private <T> SideInput<T> fetchSideInput(
+ PCollectionView<T> view,
+ BoundedWindow sideInputWindow,
+ @Nullable String stateFamily,
+ SideInputState state,
+ @Nullable Supplier<Closeable> scopedReadStateSupplier) {
+ Map<BoundedWindow, SideInput<?>> tagCache =
+ sideInputCache.computeIfAbsent(view.getTagInternal(), k -> new
HashMap<>());
+
+ Optional<SideInput<T>> seenSideInput =
Review Comment:
done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]