[
https://issues.apache.org/jira/browse/BEAM-4285?focusedWorklogId=116962&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116962
]
ASF GitHub Bot logged work on BEAM-4285:
----------------------------------------
Author: ASF GitHub Bot
Created on: 28/Jun/18 18:51
Start Date: 28/Jun/18 18:51
Worklog Time Spent: 10m
Work Description: robertwb commented on a change in pull request #5814:
[BEAM-4285] Extend side input handlers to handle multiple access patterns.
URL: https://github.com/apache/beam/pull/5814#discussion_r198948558
##########
File path:
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java
##########
@@ -50,56 +51,61 @@
public class StateRequestHandlers {
/**
- * A handler for multimap side inputs.
+ * A handler for side inputs.
*
* <p>Note that this handler is expected to be thread safe as it will be
invoked concurrently.
*/
@ThreadSafe
- public interface MultimapSideInputHandler<K, V, W extends BoundedWindow> {
+ public interface SideInputHandler<V, W extends BoundedWindow> {
/**
* Returns an {@link Iterable} of values representing the side input for
the given key and
* window.
*
+ * <p>The key is interpreted according to the access pattern of side input.
+ *
* <p>TODO: Add support for side input chunking and caching if a {@link
Reiterable} is returned.
*/
- Iterable<V> get(K key, W window);
+ Iterable<V> get(byte[] key, W window);
+
+ /** Returns the {@link Coder} to use for the elements of the resulting
values iterable. */
+ Coder<V> resultCoder();
}
/**
- * A factory which constructs {@link MultimapSideInputHandler}s.
+ * A factory which constructs {@link SideInputHandler}s.
*
* <p>Note that this factory should be thread safe because it will be
invoked concurrently.
*/
@ThreadSafe
- public interface MultimapSideInputHandlerFactory {
+ public interface SideInputHandlerFactory {
/**
- * Returns a {@link MultimapSideInputHandler} for the given {@code
pTransformId} and {@code
- * sideInputId}. The supplied {@code keyCoder}, {@code valueCoder}, and
{@code windowCoder}
- * should be used to encode/decode their respective values.
+ * Returns a {@link SideInputHandler} for the given {@code pTransformId},
{@code sideInputId},
+ * and {@code accessPattern}. The supplied {@code elementCoder} and {@code
windowCoder} should
+ * be used to encode/decode their respective values.
*/
- <K, V, W extends BoundedWindow> MultimapSideInputHandler<K, V, W>
forSideInput(
+ <T, V, W extends BoundedWindow> SideInputHandler<V, W> forSideInput(
Review comment:
I actually started down this road, but found myself duplicating a huge
amount of code due to the lack of a common interface (short of passing the full
StateRequest which seemed to break the abstraction). That's not to say it
wouldn't be worth exploring more, but it seems every access pattern
(geo-spacial, range-based, ...) falls naturally into the pattern of having a
key that is interpreted/ignored according to the access pattern's semantics.
The other alternative is to let the StateHandlerFactory interface have a
handle-returning-method per-known access pattern rather than pass this down to
the runners themselves, which also felt odd.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 116962)
Time Spent: 3h (was: 2h 50m)
> Flink batch state request handler
> ---------------------------------
>
> Key: BEAM-4285
> URL: https://issues.apache.org/jira/browse/BEAM-4285
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Ben Sidhom
> Assignee: Ben Sidhom
> Priority: Major
> Time Spent: 3h
> Remaining Estimate: 0h
>
> In order to support side inputs Flink needs a state service request handler.
> As in the non-portable we can start by handling batch side inputs by Flink
> broadcast variables.
> [https://github.com/bsidhom/beam/blob/41de3bce60f1ebc9211f299612a20d8e561f9b6f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java]
> or
> [https://github.com/bsidhom/beam/blob/41de3bce60f1ebc9211f299612a20d8e561f9b6f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java]
> can be used as a starting point.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)