[
https://issues.apache.org/jira/browse/BEAM-4285?focusedWorklogId=116937&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116937
]
ASF GitHub Bot logged work on BEAM-4285:
----------------------------------------
Author: ASF GitHub Bot
Created on: 28/Jun/18 18:25
Start Date: 28/Jun/18 18:25
Worklog Time Spent: 10m
Work Description: jkff commented on a change in pull request #5814:
[BEAM-4285] Extend side input handlers to handle multiple access patterns.
URL: https://github.com/apache/beam/pull/5814#discussion_r198940278
##########
File path:
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java
##########
@@ -50,56 +51,61 @@
public class StateRequestHandlers {
/**
- * A handler for multimap side inputs.
+ * A handler for side inputs.
*
* <p>Note that this handler is expected to be thread safe as it will be
invoked concurrently.
*/
@ThreadSafe
- public interface MultimapSideInputHandler<K, V, W extends BoundedWindow> {
+ public interface SideInputHandler<V, W extends BoundedWindow> {
/**
* Returns an {@link Iterable} of values representing the side input for
the given key and
* window.
*
+ * <p>The key is interpreted according to the access pattern of side input.
+ *
* <p>TODO: Add support for side input chunking and caching if a {@link
Reiterable} is returned.
*/
- Iterable<V> get(K key, W window);
+ Iterable<V> get(byte[] key, W window);
+
+ /** Returns the {@link Coder} to use for the elements of the resulting
values iterable. */
+ Coder<V> resultCoder();
}
/**
- * A factory which constructs {@link MultimapSideInputHandler}s.
+ * A factory which constructs {@link SideInputHandler}s.
*
* <p>Note that this factory should be thread safe because it will be
invoked concurrently.
*/
@ThreadSafe
- public interface MultimapSideInputHandlerFactory {
+ public interface SideInputHandlerFactory {
/**
- * Returns a {@link MultimapSideInputHandler} for the given {@code
pTransformId} and {@code
- * sideInputId}. The supplied {@code keyCoder}, {@code valueCoder}, and
{@code windowCoder}
- * should be used to encode/decode their respective values.
+ * Returns a {@link SideInputHandler} for the given {@code pTransformId},
{@code sideInputId},
+ * and {@code accessPattern}. The supplied {@code elementCoder} and {@code
windowCoder} should
+ * be used to encode/decode their respective values.
*/
- <K, V, W extends BoundedWindow> MultimapSideInputHandler<K, V, W>
forSideInput(
+ <T, V, W extends BoundedWindow> SideInputHandler<V, W> forSideInput(
Review comment:
I agree with this but I suggest we do it in an immediately-following-up
cleanup PR.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 116937)
Time Spent: 2h 40m (was: 2.5h)
> Flink batch state request handler
> ---------------------------------
>
> Key: BEAM-4285
> URL: https://issues.apache.org/jira/browse/BEAM-4285
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Ben Sidhom
> Assignee: Ben Sidhom
> Priority: Major
> Time Spent: 2h 40m
> Remaining Estimate: 0h
>
> In order to support side inputs Flink needs a state service request handler.
> As in the non-portable we can start by handling batch side inputs by Flink
> broadcast variables.
> [https://github.com/bsidhom/beam/blob/41de3bce60f1ebc9211f299612a20d8e561f9b6f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java]
> or
> [https://github.com/bsidhom/beam/blob/41de3bce60f1ebc9211f299612a20d8e561f9b6f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java]
> can be used as a starting point.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)