[
https://issues.apache.org/jira/browse/BEAM-4285?focusedWorklogId=116940&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116940
]
ASF GitHub Bot logged work on BEAM-4285:
----------------------------------------
Author: ASF GitHub Bot
Created on: 28/Jun/18 18:28
Start Date: 28/Jun/18 18:28
Worklog Time Spent: 10m
Work Description: jkff closed pull request #5814: [BEAM-4285] Extend
side input handlers to handle multiple access patterns.
URL: https://github.com/apache/beam/pull/5814
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 9916d135c8d..53fe744491b 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -84,6 +84,11 @@
public static final String SPLITTABLE_PROCESS_ELEMENTS_URN =
getUrn(SplittableParDoComponents.PROCESS_ELEMENTS);
+ public static final String ITERABLE_SIDE_INPUT =
+ getUrn(RunnerApi.StandardSideInputTypes.Enum.ITERABLE);
+ public static final String MULTIMAP_SIDE_INPUT =
+ getUrn(RunnerApi.StandardSideInputTypes.Enum.MULTIMAP);
+
private static final Map<Class<? extends PTransform>,
TransformPayloadTranslator>
KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/BatchFlinkExecutableStageContext.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/BatchFlinkExecutableStageContext.java
index 576147b5a8a..fc43d1cfe1e 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/BatchFlinkExecutableStageContext.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/BatchFlinkExecutableStageContext.java
@@ -29,7 +29,7 @@
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
-import
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.MultimapSideInputHandlerFactory;
+import
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandlerFactory;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,11 +58,11 @@ private BatchFlinkExecutableStageContext(JobBundleFactory
jobBundleFactory) {
@Override
public StateRequestHandler getStateRequestHandler(
ExecutableStage executableStage, RuntimeContext runtimeContext) {
- MultimapSideInputHandlerFactory sideInputHandlerFactory =
+ SideInputHandlerFactory sideInputHandlerFactory =
FlinkBatchSideInputHandlerFactory.forStage(executableStage,
runtimeContext);
try {
- return StateRequestHandlers.forMultimapSideInputHandlerFactory(
- ProcessBundleDescriptors.getMultimapSideInputs(executableStage),
sideInputHandlerFactory);
+ return StateRequestHandlers.forSideInputHandlerFactory(
+ ProcessBundleDescriptors.getSideInputs(executableStage),
sideInputHandlerFactory);
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactory.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactory.java
index ef46d598ce1..1bd6b9bc4b1 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactory.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactory.java
@@ -23,17 +23,22 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
import
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
import org.apache.beam.runners.core.construction.graph.SideInputReference;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
-import
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.MultimapSideInputHandler;
-import
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.MultimapSideInputHandlerFactory;
+import
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandler;
+import
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandlerFactory;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
@@ -43,7 +48,7 @@
* {@link StateRequestHandler} that uses a Flink {@link RuntimeContext} to
access Flink broadcast
* variable that represent side inputs.
*/
-class FlinkBatchSideInputHandlerFactory implements
MultimapSideInputHandlerFactory {
+class FlinkBatchSideInputHandlerFactory implements SideInputHandlerFactory {
// Map from side input id to global PCollection id.
private final Map<SideInputId, PCollectionNode> sideInputToCollection;
@@ -74,19 +79,67 @@ private FlinkBatchSideInputHandlerFactory(
}
@Override
- public <K, V, W extends BoundedWindow> MultimapSideInputHandler<K, V, W>
forSideInput(
+ public <T, V, W extends BoundedWindow> SideInputHandler<V, W> forSideInput(
String transformId,
String sideInputId,
- Coder<K> keyCoder,
- Coder<V> valueCoder,
+ RunnerApi.FunctionSpec accessPattern,
+ Coder<T> elementCoder,
Coder<W> windowCoder) {
+
PCollectionNode collectionNode =
sideInputToCollection.get(
SideInputId.newBuilder().setTransformId(transformId).setLocalName(sideInputId).build());
checkArgument(collectionNode != null, "No side input for %s/%s",
transformId, sideInputId);
- List<WindowedValue<KV<K, V>>> broadcastVariable =
- runtimeContext.getBroadcastVariable(collectionNode.getId());
+ if
(PTransformTranslation.ITERABLE_SIDE_INPUT.equals(accessPattern.getUrn())) {
+ @SuppressWarnings("unchecked") // T == V
+ Coder<V> outputCoder = (Coder<V>) elementCoder;
+ return forIterableSideInput(
+ runtimeContext.getBroadcastVariable(collectionNode.getId()),
outputCoder, windowCoder);
+ } else if
(PTransformTranslation.MULTIMAP_SIDE_INPUT.equals(accessPattern.getUrn())) {
+ @SuppressWarnings("unchecked") // T == KV<?, V>
+ KvCoder<?, V> kvCoder = (KvCoder<?, V>) elementCoder;
+ return forMultimapSideInput(
+ runtimeContext.getBroadcastVariable(collectionNode.getId()),
+ kvCoder.getKeyCoder(),
+ kvCoder.getValueCoder(),
+ windowCoder);
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Unknown side input access pattern: %s",
accessPattern));
+ }
+ }
+
+ private <T, W extends BoundedWindow> SideInputHandler<T, W>
forIterableSideInput(
+ List<WindowedValue<T>> broadcastVariable, Coder<T> elementCoder,
Coder<W> windowCoder) {
+ ImmutableMultimap.Builder<Object, T> windowToValuesBuilder =
ImmutableMultimap.builder();
+ for (WindowedValue<T> windowedValue : broadcastVariable) {
+ for (BoundedWindow boundedWindow : windowedValue.getWindows()) {
+ @SuppressWarnings("unchecked")
+ W window = (W) boundedWindow;
+ windowToValuesBuilder.put(windowCoder.structuralValue(window),
windowedValue.getValue());
+ }
+ }
+ ImmutableMultimap<Object, T> windowToValues =
windowToValuesBuilder.build();
+
+ return new SideInputHandler<T, W>() {
+ @Override
+ public Iterable<T> get(byte[] key, W window) {
+ return windowToValues.get(windowCoder.structuralValue(window));
+ }
+
+ @Override
+ public Coder<T> resultCoder() {
+ return elementCoder;
+ }
+ };
+ }
+
+ private <K, V, W extends BoundedWindow> SideInputHandler<V, W>
forMultimapSideInput(
+ List<WindowedValue<KV<K, V>>> broadcastVariable,
+ Coder<K> keyCoder,
+ Coder<V> valueCoder,
+ Coder<W> windowCoder) {
ImmutableMultimap.Builder<SideInputKey, V> multimap =
ImmutableMultimap.builder();
for (WindowedValue<KV<K, V>> windowedValue : broadcastVariable) {
K key = windowedValue.getValue().getKey();
@@ -101,28 +154,53 @@ private FlinkBatchSideInputHandlerFactory(
}
}
- return new SideInputHandler<>(multimap.build(), keyCoder, windowCoder);
+ return new MultimapSideInputHandler(multimap.build(), keyCoder,
valueCoder, windowCoder);
}
- private static class SideInputHandler<K, V, W extends BoundedWindow>
- implements MultimapSideInputHandler<K, V, W> {
+ private <T> List<WindowedValue<T>> getBroadcastVariable(String transformId,
String sideInputId) {
+ PCollectionNode collectionNode =
+ sideInputToCollection.get(
+
SideInputId.newBuilder().setTransformId(transformId).setLocalName(sideInputId).build());
+ checkArgument(collectionNode != null, "No side input for %s/%s",
transformId, sideInputId);
+ return runtimeContext.getBroadcastVariable(collectionNode.getId());
+ }
+
+ private static class MultimapSideInputHandler<K, V, W extends BoundedWindow>
+ implements SideInputHandler<V, W> {
private final Multimap<SideInputKey, V> collection;
private final Coder<K> keyCoder;
+ private final Coder<V> valueCoder;
private final Coder<W> windowCoder;
- private SideInputHandler(
- Multimap<SideInputKey, V> collection, Coder<K> keyCoder, Coder<W>
windowCoder) {
+ private MultimapSideInputHandler(
+ Multimap<SideInputKey, V> collection,
+ Coder<K> keyCoder,
+ Coder<V> valueCoder,
+ Coder<W> windowCoder) {
this.collection = collection;
this.keyCoder = keyCoder;
+ this.valueCoder = valueCoder;
this.windowCoder = windowCoder;
}
@Override
- public Iterable<V> get(K key, W window) {
+ public Iterable<V> get(byte[] keyBytes, W window) {
+ K key;
+ try {
+ // TODO: We could skip decoding and just compare encoded values for
deterministic keyCoders.
+ key = keyCoder.decode(new ByteArrayInputStream(keyBytes));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
return collection.get(
SideInputKey.of(keyCoder.structuralValue(key),
windowCoder.structuralValue(window)));
}
+
+ @Override
+ public Coder<V> resultCoder() {
+ return valueCoder;
+ }
}
@AutoValue
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactoryTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactoryTest.java
index f3afd0b5d7d..b5d616a6b30 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactoryTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactoryTest.java
@@ -24,18 +24,23 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Mockito.when;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import
org.apache.beam.runners.core.construction.graph.ImmutableExecutableStage;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
import org.apache.beam.runners.core.construction.graph.SideInputReference;
-import
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.MultimapSideInputHandler;
+import
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandler;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
@@ -65,6 +70,10 @@
private static final String TRANSFORM_ID = "transform-id";
private static final String SIDE_INPUT_NAME = "side-input";
private static final String COLLECTION_ID = "collection";
+ private static final RunnerApi.FunctionSpec MULTIMAP_ACCESS =
+
RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.MULTIMAP_SIDE_INPUT).build();
+ private static final RunnerApi.FunctionSpec ITERABLE_ACCESS =
+
RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.ITERABLE_SIDE_INPUT).build();
private static final ExecutableStage EXECUTABLE_STAGE =
createExecutableStage(
Arrays.asList(
@@ -73,6 +82,8 @@
SIDE_INPUT_NAME,
PipelineNode.pCollection(
COLLECTION_ID,
RunnerApi.PCollection.getDefaultInstance()))));
+ private static final byte[] ENCODED_NULL = encode(null, VoidCoder.of());
+ private static final byte[] ENCODED_FOO = encode("foo",
StringUtf8Coder.of());
@Rule public ExpectedException thrown = ExpectedException.none();
@@ -90,23 +101,27 @@ public void invalidSideInputThrowsException() {
FlinkBatchSideInputHandlerFactory.forStage(stage, context);
thrown.expect(instanceOf(IllegalArgumentException.class));
factory.forSideInput(
- "transform-id", "side-input", VoidCoder.of(), VoidCoder.of(),
GlobalWindow.Coder.INSTANCE);
+ "transform-id",
+ "side-input",
+ MULTIMAP_ACCESS,
+ KvCoder.of(VoidCoder.of(), VoidCoder.of()),
+ GlobalWindow.Coder.INSTANCE);
}
@Test
public void emptyResultForEmptyCollection() {
FlinkBatchSideInputHandlerFactory factory =
FlinkBatchSideInputHandlerFactory.forStage(EXECUTABLE_STAGE, context);
- MultimapSideInputHandler<Void, Integer, GlobalWindow> handler =
+ SideInputHandler<Integer, GlobalWindow> handler =
factory.forSideInput(
TRANSFORM_ID,
SIDE_INPUT_NAME,
- VoidCoder.of(),
- VarIntCoder.of(),
+ MULTIMAP_ACCESS,
+ KvCoder.of(VoidCoder.of(), VarIntCoder.of()),
GlobalWindow.Coder.INSTANCE);
// We never populated the broadcast variable for "side-input", so the mock
will return an empty
// list.
- Iterable<Integer> result = handler.get(null, GlobalWindow.INSTANCE);
+ Iterable<Integer> result = handler.get(ENCODED_NULL,
GlobalWindow.INSTANCE);
assertThat(result, emptyIterable());
}
@@ -118,14 +133,14 @@ public void singleElementForCollection() {
FlinkBatchSideInputHandlerFactory factory =
FlinkBatchSideInputHandlerFactory.forStage(EXECUTABLE_STAGE, context);
- MultimapSideInputHandler<Void, Integer, GlobalWindow> handler =
+ SideInputHandler<Integer, GlobalWindow> handler =
factory.forSideInput(
TRANSFORM_ID,
SIDE_INPUT_NAME,
- VoidCoder.of(),
- VarIntCoder.of(),
+ MULTIMAP_ACCESS,
+ KvCoder.of(VoidCoder.of(), VarIntCoder.of()),
GlobalWindow.Coder.INSTANCE);
- Iterable<Integer> result = handler.get(null, GlobalWindow.INSTANCE);
+ Iterable<Integer> result = handler.get(ENCODED_NULL,
GlobalWindow.INSTANCE);
assertThat(result, contains(3));
}
@@ -140,14 +155,14 @@ public void groupsValuesByKey() {
FlinkBatchSideInputHandlerFactory factory =
FlinkBatchSideInputHandlerFactory.forStage(EXECUTABLE_STAGE, context);
- MultimapSideInputHandler<String, Integer, GlobalWindow> handler =
+ SideInputHandler<Integer, GlobalWindow> handler =
factory.forSideInput(
TRANSFORM_ID,
SIDE_INPUT_NAME,
- StringUtf8Coder.of(),
- VarIntCoder.of(),
+ MULTIMAP_ACCESS,
+ KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
GlobalWindow.Coder.INSTANCE);
- Iterable<Integer> result = handler.get("foo", GlobalWindow.INSTANCE);
+ Iterable<Integer> result = handler.get(ENCODED_FOO, GlobalWindow.INSTANCE);
assertThat(result, containsInAnyOrder(2, 5));
}
@@ -170,19 +185,49 @@ public void groupsValuesByWindowAndKey() {
FlinkBatchSideInputHandlerFactory factory =
FlinkBatchSideInputHandlerFactory.forStage(EXECUTABLE_STAGE, context);
- MultimapSideInputHandler<String, Integer, IntervalWindow> handler =
+ SideInputHandler<Integer, IntervalWindow> handler =
factory.forSideInput(
TRANSFORM_ID,
SIDE_INPUT_NAME,
- StringUtf8Coder.of(),
- VarIntCoder.of(),
+ MULTIMAP_ACCESS,
+ KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
IntervalWindowCoder.of());
- Iterable<Integer> resultA = handler.get("foo", windowA);
- Iterable<Integer> resultB = handler.get("foo", windowB);
+ Iterable<Integer> resultA = handler.get(ENCODED_FOO, windowA);
+ Iterable<Integer> resultB = handler.get(ENCODED_FOO, windowB);
assertThat(resultA, containsInAnyOrder(1, 3));
assertThat(resultB, containsInAnyOrder(4, 6));
}
+ @Test
+ public void iterableAccessPattern() {
+ Instant instantA = new DateTime(2018, 1, 1, 1, 1,
DateTimeZone.UTC).toInstant();
+ Instant instantB = new DateTime(2018, 1, 1, 1, 2,
DateTimeZone.UTC).toInstant();
+ Instant instantC = new DateTime(2018, 1, 1, 1, 3,
DateTimeZone.UTC).toInstant();
+ IntervalWindow windowA = new IntervalWindow(instantA, instantB);
+ IntervalWindow windowB = new IntervalWindow(instantB, instantC);
+ when(context.getBroadcastVariable(COLLECTION_ID))
+ .thenReturn(
+ Arrays.asList(
+ WindowedValue.of(1, instantA, windowA, PaneInfo.NO_FIRING),
+ WindowedValue.of(2, instantA, windowA, PaneInfo.NO_FIRING),
+ WindowedValue.of(3, instantB, windowB, PaneInfo.NO_FIRING),
+ WindowedValue.of(4, instantB, windowB, PaneInfo.NO_FIRING)));
+
+ FlinkBatchSideInputHandlerFactory factory =
+ FlinkBatchSideInputHandlerFactory.forStage(EXECUTABLE_STAGE, context);
+ SideInputHandler<Integer, IntervalWindow> handler =
+ factory.forSideInput(
+ TRANSFORM_ID,
+ SIDE_INPUT_NAME,
+ ITERABLE_ACCESS,
+ VarIntCoder.of(),
+ IntervalWindowCoder.of());
+ Iterable<Integer> resultA = handler.get(null, windowA);
+ Iterable<Integer> resultB = handler.get(null, windowB);
+ assertThat(resultA, containsInAnyOrder(1, 2));
+ assertThat(resultB, containsInAnyOrder(3, 4));
+ }
+
private static ExecutableStage
createExecutableStage(Collection<SideInputReference> sideInputs) {
Components components = Components.getDefaultInstance();
Environment environment = Environment.getDefaultInstance();
@@ -196,4 +241,14 @@ private static ExecutableStage
createExecutableStage(Collection<SideInputReferen
Collections.emptyList(),
Collections.emptyList());
}
+
+ private static <T> byte[] encode(T value, Coder<T> coder) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try {
+ coder.encode(value, out);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return out.toByteArray();
+ }
}
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
index 2c049687faf..24628bdad46 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
@@ -24,6 +24,7 @@
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Iterables;
+import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
@@ -36,6 +37,7 @@
import org.apache.beam.model.fnexecution.v1.BeamFnApi.RemoteGrpcPort;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
@@ -47,7 +49,6 @@
import org.apache.beam.runners.fnexecution.wire.LengthPrefixUnknownCoders;
import org.apache.beam.runners.fnexecution.wire.WireCoders;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -115,8 +116,7 @@ private static ExecutableProcessBundleDescriptor
fromExecutableStageInternal(
Map<Target, Coder<WindowedValue<?>>> outputTargetCoders =
addStageOutputs(dataEndpoint, stage.getOutputPCollections(),
components);
- Map<String, Map<String, MultimapSideInputSpec>> multimapSideInputSpecs =
- addMultimapSideInputs(stage, components);
+ Map<String, Map<String, SideInputSpec>> sideInputSpecs =
addSideInputs(stage, components);
// Copy data from components to ProcessBundleDescriptor.
ProcessBundleDescriptor.Builder bundleDescriptorBuilder =
@@ -132,10 +132,7 @@ private static ExecutableProcessBundleDescriptor
fromExecutableStageInternal(
.putAllTransforms(components.getTransformsMap());
return ExecutableProcessBundleDescriptor.of(
- bundleDescriptorBuilder.build(),
- inputDestination,
- outputTargetCoders,
- multimapSideInputSpecs);
+ bundleDescriptorBuilder.build(), inputDestination, outputTargetCoders,
sideInputSpecs);
}
private static Map<Target, Coder<WindowedValue<?>>> addStageOutputs(
@@ -210,18 +207,17 @@ private static TargetEncoding addStageOutput(
wireCoder);
}
- public static Map<String, Map<String, MultimapSideInputSpec>>
getMultimapSideInputs(
- ExecutableStage stage) throws IOException {
- return addMultimapSideInputs(stage, stage.getComponents().toBuilder());
+ public static Map<String, Map<String, SideInputSpec>>
getSideInputs(ExecutableStage stage)
+ throws IOException {
+ return addSideInputs(stage, stage.getComponents().toBuilder());
}
- private static Map<String, Map<String, MultimapSideInputSpec>>
addMultimapSideInputs(
+ private static Map<String, Map<String, SideInputSpec>> addSideInputs(
ExecutableStage stage, Components.Builder components) throws IOException
{
- ImmutableTable.Builder<String, String, MultimapSideInputSpec> idsToSpec =
- ImmutableTable.builder();
+ ImmutableTable.Builder<String, String, SideInputSpec> idsToSpec =
ImmutableTable.builder();
for (SideInputReference sideInputReference : stage.getSideInputs()) {
// Update the coder specification for side inputs to be length prefixed
so that the
- // SDK and Runner agree on how to encode/decode the key, window, and
values for multimap
+ // SDK and Runner agree on how to encode/decode the key, window, and
values for
// side inputs.
PCollectionNode pcNode = sideInputReference.collection();
PCollection pc = pcNode.getPCollection();
@@ -236,16 +232,28 @@ private static TargetEncoding addStageOutput(
idsToSpec.put(
sideInputReference.transform().getId(),
sideInputReference.localName(),
- MultimapSideInputSpec.of(
+ SideInputSpec.of(
sideInputReference.transform().getId(),
sideInputReference.localName(),
- ((KvCoder) coder.getValueCoder()).getKeyCoder(),
- ((KvCoder) coder.getValueCoder()).getValueCoder(),
+ getAccessPattern(sideInputReference),
+ coder.getValueCoder(),
coder.getWindowCoder()));
}
return idsToSpec.build().rowMap();
}
+ private static RunnerApi.FunctionSpec getAccessPattern(SideInputReference
sideInputReference) {
+ try {
+ return RunnerApi.ParDoPayload.parseFrom(
+
sideInputReference.transform().getTransform().getSpec().getPayload())
+ .getSideInputsMap()
+ .get(sideInputReference.localName())
+ .getAccessPattern();
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@AutoValue
abstract static class TargetEncoding {
abstract BeamFnApi.Target getTarget();
@@ -255,27 +263,27 @@ private static TargetEncoding addStageOutput(
/**
* A container type storing references to the key, value, and window {@link
Coder} used when
- * handling multimap side input state requests.
+ * handling side input state requests.
*/
@AutoValue
- public abstract static class MultimapSideInputSpec<K, V, W extends
BoundedWindow> {
- static <K, V, W extends BoundedWindow> MultimapSideInputSpec<K, V, W> of(
+ public abstract static class SideInputSpec<K, T, W extends BoundedWindow> {
+ public static <T, W extends BoundedWindow> SideInputSpec of(
String transformId,
String sideInputId,
- Coder<K> keyCoder,
- Coder<V> valueCoder,
+ RunnerApi.FunctionSpec accessPattern,
+ Coder<T> elementCoder,
Coder<W> windowCoder) {
- return new AutoValue_ProcessBundleDescriptors_MultimapSideInputSpec(
- transformId, sideInputId, keyCoder, valueCoder, windowCoder);
+ return new AutoValue_ProcessBundleDescriptors_SideInputSpec(
+ transformId, sideInputId, accessPattern, elementCoder, windowCoder);
}
public abstract String transformId();
public abstract String sideInputId();
- public abstract Coder<K> keyCoder();
+ public abstract RunnerApi.FunctionSpec accessPattern();
- public abstract Coder<V> valueCoder();
+ public abstract Coder<T> elementCoder();
public abstract Coder<W> windowCoder();
}
@@ -287,19 +295,18 @@ public static ExecutableProcessBundleDescriptor of(
ProcessBundleDescriptor descriptor,
RemoteInputDestination<WindowedValue<?>> inputDestination,
Map<BeamFnApi.Target, Coder<WindowedValue<?>>> outputTargetCoders,
- Map<String, Map<String, MultimapSideInputSpec>>
multimapSideInputSpecs) {
- ImmutableTable.Builder copyOfMultimapSideInputSpecs =
ImmutableTable.builder();
- for (Map.Entry<String, Map<String, MultimapSideInputSpec>> outer :
- multimapSideInputSpecs.entrySet()) {
- for (Map.Entry<String, MultimapSideInputSpec> inner :
outer.getValue().entrySet()) {
- copyOfMultimapSideInputSpecs.put(outer.getKey(), inner.getKey(),
inner.getValue());
+ Map<String, Map<String, SideInputSpec>> sideInputSpecs) {
+ ImmutableTable.Builder copyOfSideInputSpecs = ImmutableTable.builder();
+ for (Map.Entry<String, Map<String, SideInputSpec>> outer :
sideInputSpecs.entrySet()) {
+ for (Map.Entry<String, SideInputSpec> inner :
outer.getValue().entrySet()) {
+ copyOfSideInputSpecs.put(outer.getKey(), inner.getKey(),
inner.getValue());
}
}
return new
AutoValue_ProcessBundleDescriptors_ExecutableProcessBundleDescriptor(
descriptor,
inputDestination,
Collections.unmodifiableMap(outputTargetCoders),
- copyOfMultimapSideInputSpecs.build().rowMap());
+ copyOfSideInputSpecs.build().rowMap());
}
public abstract ProcessBundleDescriptor getProcessBundleDescriptor();
@@ -317,9 +324,9 @@ public static ExecutableProcessBundleDescriptor of(
public abstract Map<BeamFnApi.Target, Coder<WindowedValue<?>>>
getOutputTargetCoders();
/**
- * Get a mapping from PTransform id to multimap side input id to {@link
MultimapSideInputSpec
- * multimap side inputs} that are used during execution.
+ * Get a mapping from PTransform id to side input id to {@link
SideInputSpec side inputs} that
+ * are used during execution.
*/
- public abstract Map<String, Map<String, MultimapSideInputSpec>>
getMultimapSideInputSpecs();
+ public abstract Map<String, Map<String, SideInputSpec>>
getSideInputSpecs();
}
}
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java
index 57dc744673c..7b571587462 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java
@@ -34,8 +34,9 @@
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey.TypeCase;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
import
org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor;
-import
org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.MultimapSideInputSpec;
+import
org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.SideInputSpec;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.stream.DataStreams;
import org.apache.beam.sdk.fn.stream.DataStreams.ElementDelimitedOutputStream;
@@ -50,56 +51,61 @@
public class StateRequestHandlers {
/**
- * A handler for multimap side inputs.
+ * A handler for side inputs.
*
* <p>Note that this handler is expected to be thread safe as it will be
invoked concurrently.
*/
@ThreadSafe
- public interface MultimapSideInputHandler<K, V, W extends BoundedWindow> {
+ public interface SideInputHandler<V, W extends BoundedWindow> {
/**
* Returns an {@link Iterable} of values representing the side input for
the given key and
* window.
*
+ * <p>The key is interpreted according to the access pattern of side input.
+ *
* <p>TODO: Add support for side input chunking and caching if a {@link
Reiterable} is returned.
*/
- Iterable<V> get(K key, W window);
+ Iterable<V> get(byte[] key, W window);
+
+ /** Returns the {@link Coder} to use for the elements of the resulting
values iterable. */
+ Coder<V> resultCoder();
}
/**
- * A factory which constructs {@link MultimapSideInputHandler}s.
+ * A factory which constructs {@link SideInputHandler}s.
*
* <p>Note that this factory should be thread safe because it will be
invoked concurrently.
*/
@ThreadSafe
- public interface MultimapSideInputHandlerFactory {
+ public interface SideInputHandlerFactory {
/**
- * Returns a {@link MultimapSideInputHandler} for the given {@code
pTransformId} and {@code
- * sideInputId}. The supplied {@code keyCoder}, {@code valueCoder}, and
{@code windowCoder}
- * should be used to encode/decode their respective values.
+ * Returns a {@link SideInputHandler} for the given {@code pTransformId},
{@code sideInputId},
+ * and {@code accessPattern}. The supplied {@code elementCoder} and {@code
windowCoder} should
+ * be used to encode/decode their respective values.
*/
- <K, V, W extends BoundedWindow> MultimapSideInputHandler<K, V, W>
forSideInput(
+ <T, V, W extends BoundedWindow> SideInputHandler<V, W> forSideInput(
String pTransformId,
String sideInputId,
- Coder<K> keyCoder,
- Coder<V> valueCoder,
+ RunnerApi.FunctionSpec accessPattern,
+ Coder<T> elementCoder,
Coder<W> windowCoder);
/** Throws a {@link UnsupportedOperationException} on the first access. */
- static MultimapSideInputHandlerFactory unsupported() {
- return new MultimapSideInputHandlerFactory() {
+ static SideInputHandlerFactory unsupported() {
+ return new SideInputHandlerFactory() {
@Override
- public <K, V, W extends BoundedWindow> MultimapSideInputHandler<K, V,
W> forSideInput(
+ public <T, V, W extends BoundedWindow> SideInputHandler<V, W>
forSideInput(
String pTransformId,
String sideInputId,
- Coder<K> keyCoder,
- Coder<V> valueCoder,
+ RunnerApi.FunctionSpec accessPattern,
+ Coder<T> elementCoder,
Coder<W> windowCoder) {
throw new UnsupportedOperationException(
String.format(
"The %s does not support handling sides inputs for
PTransform %s with side "
+ "input id %s.",
- MultimapSideInputHandler.class.getSimpleName(),
pTransformId, sideInputId));
+ SideInputHandler.class.getSimpleName(), pTransformId,
sideInputId));
}
};
}
@@ -163,40 +169,36 @@ static BagUserStateHandlerFactory unsupported() {
}
/**
- * Returns an adapter which converts a {@link
MultimapSideInputHandlerFactory} to a {@link
+ * Returns an adapter which converts a {@link SideInputHandlerFactory} to a
{@link
* StateRequestHandler}.
*
- * <p>The {@link MultimapSideInputHandlerFactory} is required to handle all
multimap side inputs
- * contained within the {@link ExecutableProcessBundleDescriptor}. See {@link
- * ExecutableProcessBundleDescriptor#getMultimapSideInputSpecs} for the set
of multimap side
- * inputs that are contained.
+ * <p>The {@link SideInputHandlerFactory} is required to handle all side
inputs contained within
+ * the {@link ExecutableProcessBundleDescriptor}. See {@link
+ * ExecutableProcessBundleDescriptor#getSideInputSpecs} for the set of side
inputs that are
+ * contained.
*
- * <p>Instances of {@link MultimapSideInputHandler}s returned by the {@link
- * MultimapSideInputHandlerFactory} are cached.
+ * <p>Instances of {@link SideInputHandler}s returned by the {@link
SideInputHandlerFactory} are
+ * cached.
*/
- public static StateRequestHandler forMultimapSideInputHandlerFactory(
- Map<String, Map<String, MultimapSideInputSpec>> sideInputSpecs,
- MultimapSideInputHandlerFactory multimapSideInputHandlerFactory) {
- return new StateRequestHandlerToMultimapSideInputHandlerFactoryAdapter(
- sideInputSpecs, multimapSideInputHandlerFactory);
+ public static StateRequestHandler forSideInputHandlerFactory(
+ Map<String, Map<String, SideInputSpec>> sideInputSpecs,
+ SideInputHandlerFactory sideInputHandlerFactory) {
+ return new StateRequestHandlerToSideInputHandlerFactoryAdapter(
+ sideInputSpecs, sideInputHandlerFactory);
}
- /**
- * An adapter which converts {@link MultimapSideInputHandlerFactory} to
{@link
- * StateRequestHandler}.
- */
- static class StateRequestHandlerToMultimapSideInputHandlerFactoryAdapter
- implements StateRequestHandler {
+ /** An adapter which converts {@link SideInputHandlerFactory} to {@link
StateRequestHandler}. */
+ static class StateRequestHandlerToSideInputHandlerFactoryAdapter implements
StateRequestHandler {
- private final Map<String, Map<String, MultimapSideInputSpec>>
sideInputSpecs;
- private final MultimapSideInputHandlerFactory
multimapSideInputHandlerFactory;
- private final ConcurrentHashMap<MultimapSideInputSpec,
MultimapSideInputHandler> cache;
+ private final Map<String, Map<String, SideInputSpec>> sideInputSpecs;
+ private final SideInputHandlerFactory sideInputHandlerFactory;
+ private final ConcurrentHashMap<SideInputSpec, SideInputHandler> cache;
- StateRequestHandlerToMultimapSideInputHandlerFactoryAdapter(
- Map<String, Map<String, MultimapSideInputSpec>> sideInputSpecs,
- MultimapSideInputHandlerFactory multimapSideInputHandlerFactory) {
+ StateRequestHandlerToSideInputHandlerFactoryAdapter(
+ Map<String, Map<String, SideInputSpec>> sideInputSpecs,
+ SideInputHandlerFactory sideInputHandlerFactory) {
this.sideInputSpecs = sideInputSpecs;
- this.multimapSideInputHandlerFactory = multimapSideInputHandlerFactory;
+ this.sideInputHandlerFactory = sideInputHandlerFactory;
this.cache = new ConcurrentHashMap<>();
}
@@ -211,9 +213,9 @@ public static StateRequestHandler
forMultimapSideInputHandlerFactory(
TypeCase.MULTIMAP_SIDE_INPUT);
StateKey.MultimapSideInput stateKey =
request.getStateKey().getMultimapSideInput();
- MultimapSideInputSpec<?, ?, ?> sideInputReferenceSpec =
+ SideInputSpec<?, ?, ?> sideInputReferenceSpec =
sideInputSpecs.get(stateKey.getPtransformId()).get(stateKey.getSideInputId());
- MultimapSideInputHandler<?, ?, ?> handler =
+ SideInputHandler<?, ?> handler =
cache.computeIfAbsent(sideInputReferenceSpec, this::createHandler);
switch (request.getRequestCase()) {
@@ -234,7 +236,7 @@ public static StateRequestHandler
forMultimapSideInputHandlerFactory(
}
private <K, V, W extends BoundedWindow>
CompletionStage<StateResponse.Builder> handleGetRequest(
- StateRequest request, MultimapSideInputHandler<K, V, W> handler)
throws Exception {
+ StateRequest request, SideInputHandler<V, W> handler) throws Exception
{
// TODO: Add support for continuation tokens when handling state if the
handler
// returned a {@link Reiterable}.
checkState(
@@ -243,17 +245,16 @@ public static StateRequestHandler
forMultimapSideInputHandlerFactory(
StateKey.MultimapSideInput stateKey =
request.getStateKey().getMultimapSideInput();
- MultimapSideInputSpec<K, V, W> sideInputReferenceSpec =
+ SideInputSpec<K, V, W> sideInputReferenceSpec =
sideInputSpecs.get(stateKey.getPtransformId()).get(stateKey.getSideInputId());
- K key =
sideInputReferenceSpec.keyCoder().decode(stateKey.getKey().newInput());
W window =
sideInputReferenceSpec.windowCoder().decode(stateKey.getWindow().newInput());
- Iterable<V> values = handler.get(key, window);
+ Iterable<V> values = handler.get(stateKey.getKey().toByteArray(),
window);
List<ByteString> encodedValues = new ArrayList<>();
ElementDelimitedOutputStream outputStream =
DataStreams.outbound(encodedValues::add);
for (V value : values) {
- sideInputReferenceSpec.valueCoder().encode(value, outputStream);
+ handler.resultCoder().encode(value, outputStream);
outputStream.delimitElement();
}
outputStream.close();
@@ -265,13 +266,13 @@ public static StateRequestHandler
forMultimapSideInputHandlerFactory(
return CompletableFuture.completedFuture(response);
}
- private <K, V, W extends BoundedWindow> MultimapSideInputHandler<K, V, W>
createHandler(
- MultimapSideInputSpec cacheKey) {
- return multimapSideInputHandlerFactory.forSideInput(
+ private <K, V, W extends BoundedWindow> SideInputHandler<V, W>
createHandler(
+ SideInputSpec cacheKey) {
+ return sideInputHandlerFactory.forSideInput(
cacheKey.transformId(),
cacheKey.sideInputId(),
- cacheKey.keyCoder(),
- cacheKey.valueCoder(),
+ cacheKey.accessPattern(),
+ cacheKey.elementCoder(),
cacheKey.windowCoder());
}
}
diff --git
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index b3c0b1649ed..7f16631f420 100644
---
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -60,8 +60,8 @@
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
-import
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.MultimapSideInputHandler;
-import
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.MultimapSideInputHandlerFactory;
+import
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandler;
+import
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandlerFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
@@ -324,21 +324,26 @@ public void processElement(ProcessContext context) {
CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "B"),
CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "C"));
StateRequestHandler stateRequestHandler =
- StateRequestHandlers.forMultimapSideInputHandlerFactory(
- descriptor.getMultimapSideInputSpecs(),
- new MultimapSideInputHandlerFactory() {
+ StateRequestHandlers.forSideInputHandlerFactory(
+ descriptor.getSideInputSpecs(),
+ new SideInputHandlerFactory() {
@Override
- public <K, V, W extends BoundedWindow>
MultimapSideInputHandler<K, V, W> forSideInput(
+ public <T, V, W extends BoundedWindow> SideInputHandler<V, W>
forSideInput(
String pTransformId,
String sideInputId,
- Coder<K> keyCoder,
- Coder<V> valueCoder,
+ RunnerApi.FunctionSpec accessPattern,
+ Coder<T> elementCoder,
Coder<W> windowCoder) {
- return new MultimapSideInputHandler<K, V, W>() {
+ return new SideInputHandler<V, W>() {
@Override
- public Iterable<V> get(K key, W window) {
+ public Iterable<V> get(byte[] key, W window) {
return (Iterable) sideInputData;
}
+
+ @Override
+ public Coder<V> resultCoder() {
+ return ((KvCoder) elementCoder).getValueCoder();
+ }
};
}
});
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 116940)
Time Spent: 2h 50m (was: 2h 40m)
> Flink batch state request handler
> ---------------------------------
>
> Key: BEAM-4285
> URL: https://issues.apache.org/jira/browse/BEAM-4285
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Ben Sidhom
> Assignee: Ben Sidhom
> Priority: Major
> Time Spent: 2h 50m
> Remaining Estimate: 0h
>
> In order to support side inputs Flink needs a state service request handler.
> As in the non-portable we can start by handling batch side inputs by Flink
> broadcast variables.
> [https://github.com/bsidhom/beam/blob/41de3bce60f1ebc9211f299612a20d8e561f9b6f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java]
> or
> [https://github.com/bsidhom/beam/blob/41de3bce60f1ebc9211f299612a20d8e561f9b6f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java]
> can be used as a starting point.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)