This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 684f813 [BEAM-7011] Update Beam SDKs to use the StandardSideInputType
enums. (#8232)
684f813 is described below
commit 684f8130284a7c7979773300d04e5473ca0ac8f3
Author: Lukasz Cwik <[email protected]>
AuthorDate: Mon Apr 8 16:37:42 2019 -0700
[BEAM-7011] Update Beam SDKs to use the StandardSideInputType enums. (#8232)
* [BEAM-7011] Update Beam to use the StandardSideInputType enums.
---
.../pkg/beam/core/runtime/coderx/coderx.shims.go | 87 +++++++++++-----------
sdks/go/pkg/beam/core/runtime/coderx/doc.go | 2 +-
sdks/go/pkg/beam/doc_test.go | 4 +-
.../pkg/beam/runners/dataflow/dataflowlib/fixup.go | 36 +--------
.../beam/sdk/transforms/Materializations.java | 59 ++++++++++++---
.../runners/dataflow/dataflow_runner.py | 7 +-
.../runners/dataflow/dataflow_runner_test.py | 4 +-
.../apache_beam/runners/worker/bundle_processor.py | 5 +-
8 files changed, 101 insertions(+), 103 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/coderx/coderx.shims.go
b/sdks/go/pkg/beam/core/runtime/coderx/coderx.shims.go
index 91cc99f..be6e362 100644
--- a/sdks/go/pkg/beam/core/runtime/coderx/coderx.shims.go
+++ b/sdks/go/pkg/beam/core/runtime/coderx/coderx.shims.go
@@ -45,25 +45,25 @@ func init() {
runtime.RegisterFunction(encVarIntZ)
runtime.RegisterFunction(encVarUintZ)
runtime.RegisterType(reflect.TypeOf((*reflect.Type)(nil)).Elem())
- reflectx.RegisterFunc(reflect.TypeOf((*func(int32)
([]byte))(nil)).Elem(), funcMakerInt32ГSliceOfByte)
- reflectx.RegisterFunc(reflect.TypeOf((*func(int64)
([]byte))(nil)).Elem(), funcMakerInt64ГSliceOfByte)
- reflectx.RegisterFunc(reflect.TypeOf((*func(reflect.Type,[]byte)
(typex.T,error))(nil)).Elem(), funcMakerReflect۰TypeSliceOfByteГTypex۰TError)
- reflectx.RegisterFunc(reflect.TypeOf((*func([]byte)
(int32))(nil)).Elem(), funcMakerSliceOfByteГInt32)
- reflectx.RegisterFunc(reflect.TypeOf((*func([]byte)
(int64))(nil)).Elem(), funcMakerSliceOfByteГInt64)
- reflectx.RegisterFunc(reflect.TypeOf((*func([]byte)
(typex.T))(nil)).Elem(), funcMakerSliceOfByteГTypex۰T)
- reflectx.RegisterFunc(reflect.TypeOf((*func([]byte)
(uint32))(nil)).Elem(), funcMakerSliceOfByteГUint32)
- reflectx.RegisterFunc(reflect.TypeOf((*func([]byte)
(uint64))(nil)).Elem(), funcMakerSliceOfByteГUint64)
- reflectx.RegisterFunc(reflect.TypeOf((*func(typex.T)
([]byte))(nil)).Elem(), funcMakerTypex۰TГSliceOfByte)
- reflectx.RegisterFunc(reflect.TypeOf((*func(uint32)
([]byte))(nil)).Elem(), funcMakerUint32ГSliceOfByte)
- reflectx.RegisterFunc(reflect.TypeOf((*func(uint64)
([]byte))(nil)).Elem(), funcMakerUint64ГSliceOfByte)
+ reflectx.RegisterFunc(reflect.TypeOf((*func(int32)
[]byte)(nil)).Elem(), funcMakerInt32ГSliceOfByte)
+ reflectx.RegisterFunc(reflect.TypeOf((*func(int64)
[]byte)(nil)).Elem(), funcMakerInt64ГSliceOfByte)
+ reflectx.RegisterFunc(reflect.TypeOf((*func(reflect.Type, []byte)
(typex.T, error))(nil)).Elem(), funcMakerReflect۰TypeSliceOfByteГTypex۰TError)
+ reflectx.RegisterFunc(reflect.TypeOf((*func([]byte)
int32)(nil)).Elem(), funcMakerSliceOfByteГInt32)
+ reflectx.RegisterFunc(reflect.TypeOf((*func([]byte)
int64)(nil)).Elem(), funcMakerSliceOfByteГInt64)
+ reflectx.RegisterFunc(reflect.TypeOf((*func([]byte)
typex.T)(nil)).Elem(), funcMakerSliceOfByteГTypex۰T)
+ reflectx.RegisterFunc(reflect.TypeOf((*func([]byte)
uint32)(nil)).Elem(), funcMakerSliceOfByteГUint32)
+ reflectx.RegisterFunc(reflect.TypeOf((*func([]byte)
uint64)(nil)).Elem(), funcMakerSliceOfByteГUint64)
+ reflectx.RegisterFunc(reflect.TypeOf((*func(typex.T)
[]byte)(nil)).Elem(), funcMakerTypex۰TГSliceOfByte)
+ reflectx.RegisterFunc(reflect.TypeOf((*func(uint32)
[]byte)(nil)).Elem(), funcMakerUint32ГSliceOfByte)
+ reflectx.RegisterFunc(reflect.TypeOf((*func(uint64)
[]byte)(nil)).Elem(), funcMakerUint64ГSliceOfByte)
}
type callerInt32ГSliceOfByte struct {
- fn func(int32) ([]byte)
+ fn func(int32) []byte
}
func funcMakerInt32ГSliceOfByte(fn interface{}) reflectx.Func {
- f := fn.(func(int32) ([]byte))
+ f := fn.(func(int32) []byte)
return &callerInt32ГSliceOfByte{fn: f}
}
@@ -80,16 +80,16 @@ func (c *callerInt32ГSliceOfByte) Call(args []interface{})
[]interface{} {
return []interface{}{out0}
}
-func (c *callerInt32ГSliceOfByte) Call1x1(arg0 interface{}) (interface{}) {
+func (c *callerInt32ГSliceOfByte) Call1x1(arg0 interface{}) interface{} {
return c.fn(arg0.(int32))
}
type callerInt64ГSliceOfByte struct {
- fn func(int64) ([]byte)
+ fn func(int64) []byte
}
func funcMakerInt64ГSliceOfByte(fn interface{}) reflectx.Func {
- f := fn.(func(int64) ([]byte))
+ f := fn.(func(int64) []byte)
return &callerInt64ГSliceOfByte{fn: f}
}
@@ -106,16 +106,16 @@ func (c *callerInt64ГSliceOfByte) Call(args
[]interface{}) []interface{} {
return []interface{}{out0}
}
-func (c *callerInt64ГSliceOfByte) Call1x1(arg0 interface{}) (interface{}) {
+func (c *callerInt64ГSliceOfByte) Call1x1(arg0 interface{}) interface{} {
return c.fn(arg0.(int64))
}
type callerReflect۰TypeSliceOfByteГTypex۰TError struct {
- fn func(reflect.Type,[]byte) (typex.T,error)
+ fn func(reflect.Type, []byte) (typex.T, error)
}
func funcMakerReflect۰TypeSliceOfByteГTypex۰TError(fn interface{})
reflectx.Func {
- f := fn.(func(reflect.Type,[]byte) (typex.T,error))
+ f := fn.(func(reflect.Type, []byte) (typex.T, error))
return &callerReflect۰TypeSliceOfByteГTypex۰TError{fn: f}
}
@@ -137,11 +137,11 @@ func (c *callerReflect۰TypeSliceOfByteГTypex۰TError)
Call2x2(arg0, arg1 inter
}
type callerSliceOfByteГInt32 struct {
- fn func([]byte) (int32)
+ fn func([]byte) int32
}
func funcMakerSliceOfByteГInt32(fn interface{}) reflectx.Func {
- f := fn.(func([]byte) (int32))
+ f := fn.(func([]byte) int32)
return &callerSliceOfByteГInt32{fn: f}
}
@@ -158,16 +158,16 @@ func (c *callerSliceOfByteГInt32) Call(args
[]interface{}) []interface{} {
return []interface{}{out0}
}
-func (c *callerSliceOfByteГInt32) Call1x1(arg0 interface{}) (interface{}) {
+func (c *callerSliceOfByteГInt32) Call1x1(arg0 interface{}) interface{} {
return c.fn(arg0.([]byte))
}
type callerSliceOfByteГInt64 struct {
- fn func([]byte) (int64)
+ fn func([]byte) int64
}
func funcMakerSliceOfByteГInt64(fn interface{}) reflectx.Func {
- f := fn.(func([]byte) (int64))
+ f := fn.(func([]byte) int64)
return &callerSliceOfByteГInt64{fn: f}
}
@@ -184,16 +184,16 @@ func (c *callerSliceOfByteГInt64) Call(args
[]interface{}) []interface{} {
return []interface{}{out0}
}
-func (c *callerSliceOfByteГInt64) Call1x1(arg0 interface{}) (interface{}) {
+func (c *callerSliceOfByteГInt64) Call1x1(arg0 interface{}) interface{} {
return c.fn(arg0.([]byte))
}
type callerSliceOfByteГTypex۰T struct {
- fn func([]byte) (typex.T)
+ fn func([]byte) typex.T
}
func funcMakerSliceOfByteГTypex۰T(fn interface{}) reflectx.Func {
- f := fn.(func([]byte) (typex.T))
+ f := fn.(func([]byte) typex.T)
return &callerSliceOfByteГTypex۰T{fn: f}
}
@@ -210,16 +210,16 @@ func (c *callerSliceOfByteГTypex۰T) Call(args
[]interface{}) []interface{} {
return []interface{}{out0}
}
-func (c *callerSliceOfByteГTypex۰T) Call1x1(arg0 interface{}) (interface{}) {
+func (c *callerSliceOfByteГTypex۰T) Call1x1(arg0 interface{}) interface{} {
return c.fn(arg0.([]byte))
}
type callerSliceOfByteГUint32 struct {
- fn func([]byte) (uint32)
+ fn func([]byte) uint32
}
func funcMakerSliceOfByteГUint32(fn interface{}) reflectx.Func {
- f := fn.(func([]byte) (uint32))
+ f := fn.(func([]byte) uint32)
return &callerSliceOfByteГUint32{fn: f}
}
@@ -236,16 +236,16 @@ func (c *callerSliceOfByteГUint32) Call(args
[]interface{}) []interface{} {
return []interface{}{out0}
}
-func (c *callerSliceOfByteГUint32) Call1x1(arg0 interface{}) (interface{}) {
+func (c *callerSliceOfByteГUint32) Call1x1(arg0 interface{}) interface{} {
return c.fn(arg0.([]byte))
}
type callerSliceOfByteГUint64 struct {
- fn func([]byte) (uint64)
+ fn func([]byte) uint64
}
func funcMakerSliceOfByteГUint64(fn interface{}) reflectx.Func {
- f := fn.(func([]byte) (uint64))
+ f := fn.(func([]byte) uint64)
return &callerSliceOfByteГUint64{fn: f}
}
@@ -262,16 +262,16 @@ func (c *callerSliceOfByteГUint64) Call(args
[]interface{}) []interface{} {
return []interface{}{out0}
}
-func (c *callerSliceOfByteГUint64) Call1x1(arg0 interface{}) (interface{}) {
+func (c *callerSliceOfByteГUint64) Call1x1(arg0 interface{}) interface{} {
return c.fn(arg0.([]byte))
}
type callerTypex۰TГSliceOfByte struct {
- fn func(typex.T) ([]byte)
+ fn func(typex.T) []byte
}
func funcMakerTypex۰TГSliceOfByte(fn interface{}) reflectx.Func {
- f := fn.(func(typex.T) ([]byte))
+ f := fn.(func(typex.T) []byte)
return &callerTypex۰TГSliceOfByte{fn: f}
}
@@ -288,16 +288,16 @@ func (c *callerTypex۰TГSliceOfByte) Call(args
[]interface{}) []interface{} {
return []interface{}{out0}
}
-func (c *callerTypex۰TГSliceOfByte) Call1x1(arg0 interface{}) (interface{}) {
+func (c *callerTypex۰TГSliceOfByte) Call1x1(arg0 interface{}) interface{} {
return c.fn(arg0.(typex.T))
}
type callerUint32ГSliceOfByte struct {
- fn func(uint32) ([]byte)
+ fn func(uint32) []byte
}
func funcMakerUint32ГSliceOfByte(fn interface{}) reflectx.Func {
- f := fn.(func(uint32) ([]byte))
+ f := fn.(func(uint32) []byte)
return &callerUint32ГSliceOfByte{fn: f}
}
@@ -314,16 +314,16 @@ func (c *callerUint32ГSliceOfByte) Call(args
[]interface{}) []interface{} {
return []interface{}{out0}
}
-func (c *callerUint32ГSliceOfByte) Call1x1(arg0 interface{}) (interface{}) {
+func (c *callerUint32ГSliceOfByte) Call1x1(arg0 interface{}) interface{} {
return c.fn(arg0.(uint32))
}
type callerUint64ГSliceOfByte struct {
- fn func(uint64) ([]byte)
+ fn func(uint64) []byte
}
func funcMakerUint64ГSliceOfByte(fn interface{}) reflectx.Func {
- f := fn.(func(uint64) ([]byte))
+ f := fn.(func(uint64) []byte)
return &callerUint64ГSliceOfByte{fn: f}
}
@@ -340,9 +340,8 @@ func (c *callerUint64ГSliceOfByte) Call(args []interface{})
[]interface{} {
return []interface{}{out0}
}
-func (c *callerUint64ГSliceOfByte) Call1x1(arg0 interface{}) (interface{}) {
+func (c *callerUint64ГSliceOfByte) Call1x1(arg0 interface{}) interface{} {
return c.fn(arg0.(uint64))
}
-
// DO NOT MODIFY: GENERATED CODE
diff --git a/sdks/go/pkg/beam/core/runtime/coderx/doc.go
b/sdks/go/pkg/beam/core/runtime/coderx/doc.go
index 9a8ef29..7cb2bf2 100644
--- a/sdks/go/pkg/beam/core/runtime/coderx/doc.go
+++ b/sdks/go/pkg/beam/core/runtime/coderx/doc.go
@@ -19,4 +19,4 @@ package coderx
//go:generate go install github.com/apache/beam/sdks/go/cmd/starcgen
//go:generate starcgen --package=coderx
--identifiers=encString,decString,encUint32,decUint32,encInt32,decInt32,encUint64,decUint64,encInt64,decInt64,encVarIntZ,decVarIntZ,encVarUintZ,decVarUintZ,encFloat,decFloat
-//go:generate go fmt
\ No newline at end of file
+//go:generate go fmt
diff --git a/sdks/go/pkg/beam/doc_test.go b/sdks/go/pkg/beam/doc_test.go
index 92a2b03..645926f 100644
--- a/sdks/go/pkg/beam/doc_test.go
+++ b/sdks/go/pkg/beam/doc_test.go
@@ -128,9 +128,9 @@ func ExampleSeq() {
a := textio.Read(s, "...some file path...") // PCollection<string>
beam.Seq(s, a,
- strconv.Atoi, // string to int
+ strconv.Atoi, // string to int
func(i int) float64 { return float64(i) }, // int to float64
- math.Signbit, // float64 to bool
+ math.Signbit, // float64 to bool
) // PCollection<bool>
}
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/fixup.go
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/fixup.go
index 4c2a101..671ceda 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/fixup.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/fixup.go
@@ -16,44 +16,10 @@
package dataflowlib
import (
- "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
- "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex"
- "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
- "github.com/golang/protobuf/proto"
)
// Fixup proto pipeline with Dataflow quirks.
func Fixup(p *pb.Pipeline) (*pb.Pipeline, error) {
- upd := make(map[string]*pb.PTransform)
-
- for id, t := range p.GetComponents().GetTransforms() {
- if t.GetSpec().GetUrn() != graphx.URNParDo {
- continue
- }
- var payload pb.ParDoPayload
- if err := proto.Unmarshal(t.GetSpec().GetPayload(), &payload);
err != nil {
- continue // ignore: unexpected payload
- }
- if len(payload.SideInputs) == 0 {
- continue
- }
-
- // ParDo w/ side input. Fixup URN.
-
- fixedPayload := pipelinex.ShallowCloneParDoPayload(&payload)
- for k, v := range payload.SideInputs {
- fixedV := pipelinex.ShallowCloneSideInput(v)
- fixedV.AccessPattern =
pipelinex.ShallowCloneFunctionSpec(v.AccessPattern)
- fixedV.AccessPattern.Urn =
"urn:beam:sideinput:materialization:multimap:0.1"
-
- fixedPayload.SideInputs[k] = fixedV
- }
- fixed := pipelinex.ShallowClonePTransform(t)
- fixed.Spec = pipelinex.ShallowCloneFunctionSpec(t.Spec)
- fixed.Spec.Payload = protox.MustEncode(fixedPayload)
-
- upd[id] = fixed
- }
- return pipelinex.Update(p, &pb.Components{Transforms: upd})
+ return p, nil
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
index c9bce5c..9334ddd 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
@@ -18,6 +18,8 @@
package org.apache.beam.sdk.transforms;
import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StandardSideInputTypes;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.annotations.Internal;
@@ -31,12 +33,36 @@ import org.apache.beam.sdk.annotations.Internal;
@Internal
public class Materializations {
/**
- * The URN for a {@link Materialization} where the primitive view type is an
multimap of fully
+ * The URN for a {@link Materialization} where the primitive view type is an
iterable of fully
+ * specified windowed values.
+ */
+ @Experimental(Kind.CORE_RUNNERS_ONLY)
+ public static final String ITERABLE_MATERIALIZATION_URN =
+ StandardSideInputTypes.Enum.ITERABLE
+ .getValueDescriptor()
+ .getOptions()
+ .getExtension(RunnerApi.beamUrn);
+
+ /**
+ * The URN for a {@link Materialization} where the primitive view type is a
multimap of fully
* specified windowed values.
*/
@Experimental(Kind.CORE_RUNNERS_ONLY)
public static final String MULTIMAP_MATERIALIZATION_URN =
- "urn:beam:sideinput:materialization:multimap:0.1";
+ StandardSideInputTypes.Enum.MULTIMAP
+ .getValueDescriptor()
+ .getOptions()
+ .getExtension(RunnerApi.beamUrn);
+
+ /**
+ * Represents the {@code PrimitiveViewT} supplied to the {@link ViewFn} when
it declares to use
+ * the {@link Materializations#ITERABLE_MATERIALIZATION_URN iterable
materialization}.
+ *
+ * @param <V>
+ */
+ public interface IterableView<V> {
+ Iterable<V> get();
+ }
/**
* Represents the {@code PrimitiveViewT} supplied to the {@link ViewFn} when
it declares to use
@@ -50,18 +76,31 @@ public class Materializations {
* <b><i>For internal use only; no backwards-compatibility
guarantees.</i></b>
*
* <p>A {@link Materialization} where the primitive view type is a multimap
with fully specified
- * windowed keys.
+ * windowed values.
*/
@Internal
public static <K, V> Materialization<MultimapView<K, V>> multimap() {
- return new MultimapMaterialization<>();
+ return new Materialization<MultimapView<K, V>>() {
+ @Override
+ public String getUrn() {
+ return MULTIMAP_MATERIALIZATION_URN;
+ }
+ };
}
- private static class MultimapMaterialization<K, V>
- implements Materialization<MultimapView<K, V>> {
- @Override
- public String getUrn() {
- return MULTIMAP_MATERIALIZATION_URN;
- }
+ /**
+ * <b><i>For internal use only; no backwards-compatibility
guarantees.</i></b>
+ *
+ * <p>A {@link Materialization} where the primitive view type is an iterable
with fully specifed
+ * windowed values.
+ */
+ @Internal
+ public static <V> Materialization<IterableView<V>> iterable() {
+ return new Materialization<IterableView<V>>() {
+ @Override
+ public String getUrn() {
+ return ITERABLE_MATERIALIZATION_URN;
+ }
+ };
}
}
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index e140db5..e3e7bd6 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -1112,9 +1112,6 @@ class DataflowRunner(PipelineRunner):
class _DataflowSideInput(beam.pvalue.AsSideInput):
"""Wraps a side input as a dataflow-compatible side input."""
- # Dataflow does not yet accept the shared urn definition for access.
- DATAFLOW_MULTIMAP_URN = 'urn:beam:sideinput:materialization:multimap:0.1'
-
def _view_options(self):
return {
'data': self._data,
@@ -1134,7 +1131,7 @@ class _DataflowIterableSideInput(_DataflowSideInput):
side_input_data.access_pattern == common_urns.side_inputs.ITERABLE.urn)
iterable_view_fn = side_input_data.view_fn
self._data = beam.pvalue.SideInputData(
- self.DATAFLOW_MULTIMAP_URN,
+ common_urns.side_inputs.MULTIMAP.urn,
side_input_data.window_mapping_fn,
lambda multimap: iterable_view_fn(multimap['']))
@@ -1149,7 +1146,7 @@ class _DataflowMultimapSideInput(_DataflowSideInput):
assert (
side_input_data.access_pattern == common_urns.side_inputs.MULTIMAP.urn)
self._data = beam.pvalue.SideInputData(
- self.DATAFLOW_MULTIMAP_URN,
+ common_urns.side_inputs.MULTIMAP.urn,
side_input_data.window_mapping_fn,
side_input_data.view_fn)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index ed5c74a..6c19c79 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -32,11 +32,11 @@ import apache_beam.transforms as ptransform
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.pipeline import AppliedPTransform
from apache_beam.pipeline import Pipeline
+from apache_beam.portability import common_urns
from apache_beam.pvalue import PCollection
from apache_beam.runners import DataflowRunner
from apache_beam.runners import TestDataflowRunner
from apache_beam.runners import create_runner
-from apache_beam.runners.dataflow import dataflow_runner
from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult
from apache_beam.runners.dataflow.dataflow_runner import
DataflowRuntimeException
from apache_beam.runners.dataflow.internal.clients import dataflow as
dataflow_api
@@ -385,7 +385,7 @@ class DataflowRunnerTest(unittest.TestCase):
self.assertEqual(2, len(applied_transform.side_inputs))
for side_input in applied_transform.side_inputs:
self.assertEqual(
- dataflow_runner._DataflowSideInput.DATAFLOW_MULTIMAP_URN,
+ common_urns.side_inputs.MULTIMAP.urn,
side_input._side_input_data().access_pattern)
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 583f1bb..feb8eae 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -47,7 +47,6 @@ from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners import common
from apache_beam.runners import pipeline_context
-from apache_beam.runners.dataflow import dataflow_runner
from apache_beam.runners.worker import operation_specs
from apache_beam.runners.worker import operations
from apache_beam.runners.worker import statesampler
@@ -236,9 +235,7 @@ class StateBackedSideInputMap(object):
raw_view = _StateBackedIterable(
state_handler, state_key, self._element_coder)
- elif (access_pattern == common_urns.side_inputs.MULTIMAP.urn or
- access_pattern ==
- dataflow_runner._DataflowSideInput.DATAFLOW_MULTIMAP_URN):
+ elif access_pattern == common_urns.side_inputs.MULTIMAP.urn:
cache = {}
key_coder_impl = self._element_coder.key_coder().get_impl()
value_coder = self._element_coder.value_coder()