This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 684f813  [BEAM-7011] Update Beam SDKs to use the StandardSideInputType 
enums. (#8232)
684f813 is described below

commit 684f8130284a7c7979773300d04e5473ca0ac8f3
Author: Lukasz Cwik <[email protected]>
AuthorDate: Mon Apr 8 16:37:42 2019 -0700

    [BEAM-7011] Update Beam SDKs to use the StandardSideInputType enums. (#8232)
    
    * [BEAM-7011] Update Beam to use the StandardSideInputType enums.
---
 .../pkg/beam/core/runtime/coderx/coderx.shims.go   | 87 +++++++++++-----------
 sdks/go/pkg/beam/core/runtime/coderx/doc.go        |  2 +-
 sdks/go/pkg/beam/doc_test.go                       |  4 +-
 .../pkg/beam/runners/dataflow/dataflowlib/fixup.go | 36 +--------
 .../beam/sdk/transforms/Materializations.java      | 59 ++++++++++++---
 .../runners/dataflow/dataflow_runner.py            |  7 +-
 .../runners/dataflow/dataflow_runner_test.py       |  4 +-
 .../apache_beam/runners/worker/bundle_processor.py |  5 +-
 8 files changed, 101 insertions(+), 103 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/coderx/coderx.shims.go 
b/sdks/go/pkg/beam/core/runtime/coderx/coderx.shims.go
index 91cc99f..be6e362 100644
--- a/sdks/go/pkg/beam/core/runtime/coderx/coderx.shims.go
+++ b/sdks/go/pkg/beam/core/runtime/coderx/coderx.shims.go
@@ -45,25 +45,25 @@ func init() {
        runtime.RegisterFunction(encVarIntZ)
        runtime.RegisterFunction(encVarUintZ)
        runtime.RegisterType(reflect.TypeOf((*reflect.Type)(nil)).Elem())
-       reflectx.RegisterFunc(reflect.TypeOf((*func(int32) 
([]byte))(nil)).Elem(), funcMakerInt32ГSliceOfByte)
-       reflectx.RegisterFunc(reflect.TypeOf((*func(int64) 
([]byte))(nil)).Elem(), funcMakerInt64ГSliceOfByte)
-       reflectx.RegisterFunc(reflect.TypeOf((*func(reflect.Type,[]byte) 
(typex.T,error))(nil)).Elem(), funcMakerReflect۰TypeSliceOfByteГTypex۰TError)
-       reflectx.RegisterFunc(reflect.TypeOf((*func([]byte) 
(int32))(nil)).Elem(), funcMakerSliceOfByteГInt32)
-       reflectx.RegisterFunc(reflect.TypeOf((*func([]byte) 
(int64))(nil)).Elem(), funcMakerSliceOfByteГInt64)
-       reflectx.RegisterFunc(reflect.TypeOf((*func([]byte) 
(typex.T))(nil)).Elem(), funcMakerSliceOfByteГTypex۰T)
-       reflectx.RegisterFunc(reflect.TypeOf((*func([]byte) 
(uint32))(nil)).Elem(), funcMakerSliceOfByteГUint32)
-       reflectx.RegisterFunc(reflect.TypeOf((*func([]byte) 
(uint64))(nil)).Elem(), funcMakerSliceOfByteГUint64)
-       reflectx.RegisterFunc(reflect.TypeOf((*func(typex.T) 
([]byte))(nil)).Elem(), funcMakerTypex۰TГSliceOfByte)
-       reflectx.RegisterFunc(reflect.TypeOf((*func(uint32) 
([]byte))(nil)).Elem(), funcMakerUint32ГSliceOfByte)
-       reflectx.RegisterFunc(reflect.TypeOf((*func(uint64) 
([]byte))(nil)).Elem(), funcMakerUint64ГSliceOfByte)
+       reflectx.RegisterFunc(reflect.TypeOf((*func(int32) 
[]byte)(nil)).Elem(), funcMakerInt32ГSliceOfByte)
+       reflectx.RegisterFunc(reflect.TypeOf((*func(int64) 
[]byte)(nil)).Elem(), funcMakerInt64ГSliceOfByte)
+       reflectx.RegisterFunc(reflect.TypeOf((*func(reflect.Type, []byte) 
(typex.T, error))(nil)).Elem(), funcMakerReflect۰TypeSliceOfByteГTypex۰TError)
+       reflectx.RegisterFunc(reflect.TypeOf((*func([]byte) 
int32)(nil)).Elem(), funcMakerSliceOfByteГInt32)
+       reflectx.RegisterFunc(reflect.TypeOf((*func([]byte) 
int64)(nil)).Elem(), funcMakerSliceOfByteГInt64)
+       reflectx.RegisterFunc(reflect.TypeOf((*func([]byte) 
typex.T)(nil)).Elem(), funcMakerSliceOfByteГTypex۰T)
+       reflectx.RegisterFunc(reflect.TypeOf((*func([]byte) 
uint32)(nil)).Elem(), funcMakerSliceOfByteГUint32)
+       reflectx.RegisterFunc(reflect.TypeOf((*func([]byte) 
uint64)(nil)).Elem(), funcMakerSliceOfByteГUint64)
+       reflectx.RegisterFunc(reflect.TypeOf((*func(typex.T) 
[]byte)(nil)).Elem(), funcMakerTypex۰TГSliceOfByte)
+       reflectx.RegisterFunc(reflect.TypeOf((*func(uint32) 
[]byte)(nil)).Elem(), funcMakerUint32ГSliceOfByte)
+       reflectx.RegisterFunc(reflect.TypeOf((*func(uint64) 
[]byte)(nil)).Elem(), funcMakerUint64ГSliceOfByte)
 }
 
 type callerInt32ГSliceOfByte struct {
-       fn func(int32) ([]byte)
+       fn func(int32) []byte
 }
 
 func funcMakerInt32ГSliceOfByte(fn interface{}) reflectx.Func {
-       f := fn.(func(int32) ([]byte))
+       f := fn.(func(int32) []byte)
        return &callerInt32ГSliceOfByte{fn: f}
 }
 
@@ -80,16 +80,16 @@ func (c *callerInt32ГSliceOfByte) Call(args []interface{}) 
[]interface{} {
        return []interface{}{out0}
 }
 
-func (c *callerInt32ГSliceOfByte) Call1x1(arg0 interface{}) (interface{}) {
+func (c *callerInt32ГSliceOfByte) Call1x1(arg0 interface{}) interface{} {
        return c.fn(arg0.(int32))
 }
 
 type callerInt64ГSliceOfByte struct {
-       fn func(int64) ([]byte)
+       fn func(int64) []byte
 }
 
 func funcMakerInt64ГSliceOfByte(fn interface{}) reflectx.Func {
-       f := fn.(func(int64) ([]byte))
+       f := fn.(func(int64) []byte)
        return &callerInt64ГSliceOfByte{fn: f}
 }
 
@@ -106,16 +106,16 @@ func (c *callerInt64ГSliceOfByte) Call(args 
[]interface{}) []interface{} {
        return []interface{}{out0}
 }
 
-func (c *callerInt64ГSliceOfByte) Call1x1(arg0 interface{}) (interface{}) {
+func (c *callerInt64ГSliceOfByte) Call1x1(arg0 interface{}) interface{} {
        return c.fn(arg0.(int64))
 }
 
 type callerReflect۰TypeSliceOfByteГTypex۰TError struct {
-       fn func(reflect.Type,[]byte) (typex.T,error)
+       fn func(reflect.Type, []byte) (typex.T, error)
 }
 
 func funcMakerReflect۰TypeSliceOfByteГTypex۰TError(fn interface{}) 
reflectx.Func {
-       f := fn.(func(reflect.Type,[]byte) (typex.T,error))
+       f := fn.(func(reflect.Type, []byte) (typex.T, error))
        return &callerReflect۰TypeSliceOfByteГTypex۰TError{fn: f}
 }
 
@@ -137,11 +137,11 @@ func (c *callerReflect۰TypeSliceOfByteГTypex۰TError) 
Call2x2(arg0, arg1 inter
 }
 
 type callerSliceOfByteГInt32 struct {
-       fn func([]byte) (int32)
+       fn func([]byte) int32
 }
 
 func funcMakerSliceOfByteГInt32(fn interface{}) reflectx.Func {
-       f := fn.(func([]byte) (int32))
+       f := fn.(func([]byte) int32)
        return &callerSliceOfByteГInt32{fn: f}
 }
 
@@ -158,16 +158,16 @@ func (c *callerSliceOfByteГInt32) Call(args 
[]interface{}) []interface{} {
        return []interface{}{out0}
 }
 
-func (c *callerSliceOfByteГInt32) Call1x1(arg0 interface{}) (interface{}) {
+func (c *callerSliceOfByteГInt32) Call1x1(arg0 interface{}) interface{} {
        return c.fn(arg0.([]byte))
 }
 
 type callerSliceOfByteГInt64 struct {
-       fn func([]byte) (int64)
+       fn func([]byte) int64
 }
 
 func funcMakerSliceOfByteГInt64(fn interface{}) reflectx.Func {
-       f := fn.(func([]byte) (int64))
+       f := fn.(func([]byte) int64)
        return &callerSliceOfByteГInt64{fn: f}
 }
 
@@ -184,16 +184,16 @@ func (c *callerSliceOfByteГInt64) Call(args 
[]interface{}) []interface{} {
        return []interface{}{out0}
 }
 
-func (c *callerSliceOfByteГInt64) Call1x1(arg0 interface{}) (interface{}) {
+func (c *callerSliceOfByteГInt64) Call1x1(arg0 interface{}) interface{} {
        return c.fn(arg0.([]byte))
 }
 
 type callerSliceOfByteГTypex۰T struct {
-       fn func([]byte) (typex.T)
+       fn func([]byte) typex.T
 }
 
 func funcMakerSliceOfByteГTypex۰T(fn interface{}) reflectx.Func {
-       f := fn.(func([]byte) (typex.T))
+       f := fn.(func([]byte) typex.T)
        return &callerSliceOfByteГTypex۰T{fn: f}
 }
 
@@ -210,16 +210,16 @@ func (c *callerSliceOfByteГTypex۰T) Call(args 
[]interface{}) []interface{} {
        return []interface{}{out0}
 }
 
-func (c *callerSliceOfByteГTypex۰T) Call1x1(arg0 interface{}) (interface{}) {
+func (c *callerSliceOfByteГTypex۰T) Call1x1(arg0 interface{}) interface{} {
        return c.fn(arg0.([]byte))
 }
 
 type callerSliceOfByteГUint32 struct {
-       fn func([]byte) (uint32)
+       fn func([]byte) uint32
 }
 
 func funcMakerSliceOfByteГUint32(fn interface{}) reflectx.Func {
-       f := fn.(func([]byte) (uint32))
+       f := fn.(func([]byte) uint32)
        return &callerSliceOfByteГUint32{fn: f}
 }
 
@@ -236,16 +236,16 @@ func (c *callerSliceOfByteГUint32) Call(args 
[]interface{}) []interface{} {
        return []interface{}{out0}
 }
 
-func (c *callerSliceOfByteГUint32) Call1x1(arg0 interface{}) (interface{}) {
+func (c *callerSliceOfByteГUint32) Call1x1(arg0 interface{}) interface{} {
        return c.fn(arg0.([]byte))
 }
 
 type callerSliceOfByteГUint64 struct {
-       fn func([]byte) (uint64)
+       fn func([]byte) uint64
 }
 
 func funcMakerSliceOfByteГUint64(fn interface{}) reflectx.Func {
-       f := fn.(func([]byte) (uint64))
+       f := fn.(func([]byte) uint64)
        return &callerSliceOfByteГUint64{fn: f}
 }
 
@@ -262,16 +262,16 @@ func (c *callerSliceOfByteГUint64) Call(args 
[]interface{}) []interface{} {
        return []interface{}{out0}
 }
 
-func (c *callerSliceOfByteГUint64) Call1x1(arg0 interface{}) (interface{}) {
+func (c *callerSliceOfByteГUint64) Call1x1(arg0 interface{}) interface{} {
        return c.fn(arg0.([]byte))
 }
 
 type callerTypex۰TГSliceOfByte struct {
-       fn func(typex.T) ([]byte)
+       fn func(typex.T) []byte
 }
 
 func funcMakerTypex۰TГSliceOfByte(fn interface{}) reflectx.Func {
-       f := fn.(func(typex.T) ([]byte))
+       f := fn.(func(typex.T) []byte)
        return &callerTypex۰TГSliceOfByte{fn: f}
 }
 
@@ -288,16 +288,16 @@ func (c *callerTypex۰TГSliceOfByte) Call(args 
[]interface{}) []interface{} {
        return []interface{}{out0}
 }
 
-func (c *callerTypex۰TГSliceOfByte) Call1x1(arg0 interface{}) (interface{}) {
+func (c *callerTypex۰TГSliceOfByte) Call1x1(arg0 interface{}) interface{} {
        return c.fn(arg0.(typex.T))
 }
 
 type callerUint32ГSliceOfByte struct {
-       fn func(uint32) ([]byte)
+       fn func(uint32) []byte
 }
 
 func funcMakerUint32ГSliceOfByte(fn interface{}) reflectx.Func {
-       f := fn.(func(uint32) ([]byte))
+       f := fn.(func(uint32) []byte)
        return &callerUint32ГSliceOfByte{fn: f}
 }
 
@@ -314,16 +314,16 @@ func (c *callerUint32ГSliceOfByte) Call(args 
[]interface{}) []interface{} {
        return []interface{}{out0}
 }
 
-func (c *callerUint32ГSliceOfByte) Call1x1(arg0 interface{}) (interface{}) {
+func (c *callerUint32ГSliceOfByte) Call1x1(arg0 interface{}) interface{} {
        return c.fn(arg0.(uint32))
 }
 
 type callerUint64ГSliceOfByte struct {
-       fn func(uint64) ([]byte)
+       fn func(uint64) []byte
 }
 
 func funcMakerUint64ГSliceOfByte(fn interface{}) reflectx.Func {
-       f := fn.(func(uint64) ([]byte))
+       f := fn.(func(uint64) []byte)
        return &callerUint64ГSliceOfByte{fn: f}
 }
 
@@ -340,9 +340,8 @@ func (c *callerUint64ГSliceOfByte) Call(args []interface{}) 
[]interface{} {
        return []interface{}{out0}
 }
 
-func (c *callerUint64ГSliceOfByte) Call1x1(arg0 interface{}) (interface{}) {
+func (c *callerUint64ГSliceOfByte) Call1x1(arg0 interface{}) interface{} {
        return c.fn(arg0.(uint64))
 }
 
-
 // DO NOT MODIFY: GENERATED CODE
diff --git a/sdks/go/pkg/beam/core/runtime/coderx/doc.go 
b/sdks/go/pkg/beam/core/runtime/coderx/doc.go
index 9a8ef29..7cb2bf2 100644
--- a/sdks/go/pkg/beam/core/runtime/coderx/doc.go
+++ b/sdks/go/pkg/beam/core/runtime/coderx/doc.go
@@ -19,4 +19,4 @@ package coderx
 
 //go:generate go install github.com/apache/beam/sdks/go/cmd/starcgen
 //go:generate starcgen --package=coderx 
--identifiers=encString,decString,encUint32,decUint32,encInt32,decInt32,encUint64,decUint64,encInt64,decInt64,encVarIntZ,decVarIntZ,encVarUintZ,decVarUintZ,encFloat,decFloat
-//go:generate go fmt
\ No newline at end of file
+//go:generate go fmt
diff --git a/sdks/go/pkg/beam/doc_test.go b/sdks/go/pkg/beam/doc_test.go
index 92a2b03..645926f 100644
--- a/sdks/go/pkg/beam/doc_test.go
+++ b/sdks/go/pkg/beam/doc_test.go
@@ -128,9 +128,9 @@ func ExampleSeq() {
        a := textio.Read(s, "...some file path...") // PCollection<string>
 
        beam.Seq(s, a,
-               strconv.Atoi, // string to int
+               strconv.Atoi,                              // string to int
                func(i int) float64 { return float64(i) }, // int to float64
-               math.Signbit, // float64 to bool
+               math.Signbit,                              // float64 to bool
        ) // PCollection<bool>
 }
 
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/fixup.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/fixup.go
index 4c2a101..671ceda 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/fixup.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/fixup.go
@@ -16,44 +16,10 @@
 package dataflowlib
 
 import (
-       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
        pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
-       "github.com/golang/protobuf/proto"
 )
 
 // Fixup proto pipeline with Dataflow quirks.
 func Fixup(p *pb.Pipeline) (*pb.Pipeline, error) {
-       upd := make(map[string]*pb.PTransform)
-
-       for id, t := range p.GetComponents().GetTransforms() {
-               if t.GetSpec().GetUrn() != graphx.URNParDo {
-                       continue
-               }
-               var payload pb.ParDoPayload
-               if err := proto.Unmarshal(t.GetSpec().GetPayload(), &payload); 
err != nil {
-                       continue // ignore: unexpected payload
-               }
-               if len(payload.SideInputs) == 0 {
-                       continue
-               }
-
-               // ParDo w/ side input. Fixup URN.
-
-               fixedPayload := pipelinex.ShallowCloneParDoPayload(&payload)
-               for k, v := range payload.SideInputs {
-                       fixedV := pipelinex.ShallowCloneSideInput(v)
-                       fixedV.AccessPattern = 
pipelinex.ShallowCloneFunctionSpec(v.AccessPattern)
-                       fixedV.AccessPattern.Urn = 
"urn:beam:sideinput:materialization:multimap:0.1"
-
-                       fixedPayload.SideInputs[k] = fixedV
-               }
-               fixed := pipelinex.ShallowClonePTransform(t)
-               fixed.Spec = pipelinex.ShallowCloneFunctionSpec(t.Spec)
-               fixed.Spec.Payload = protox.MustEncode(fixedPayload)
-
-               upd[id] = fixed
-       }
-       return pipelinex.Update(p, &pb.Components{Transforms: upd})
+       return p, nil
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
index c9bce5c..9334ddd 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
@@ -18,6 +18,8 @@
 package org.apache.beam.sdk.transforms;
 
 import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StandardSideInputTypes;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.annotations.Internal;
@@ -31,12 +33,36 @@ import org.apache.beam.sdk.annotations.Internal;
 @Internal
 public class Materializations {
   /**
-   * The URN for a {@link Materialization} where the primitive view type is an 
multimap of fully
+   * The URN for a {@link Materialization} where the primitive view type is an 
iterable of fully
+   * specified windowed values.
+   */
+  @Experimental(Kind.CORE_RUNNERS_ONLY)
+  public static final String ITERABLE_MATERIALIZATION_URN =
+      StandardSideInputTypes.Enum.ITERABLE
+          .getValueDescriptor()
+          .getOptions()
+          .getExtension(RunnerApi.beamUrn);
+
+  /**
+   * The URN for a {@link Materialization} where the primitive view type is a 
multimap of fully
    * specified windowed values.
    */
   @Experimental(Kind.CORE_RUNNERS_ONLY)
   public static final String MULTIMAP_MATERIALIZATION_URN =
-      "urn:beam:sideinput:materialization:multimap:0.1";
+      StandardSideInputTypes.Enum.MULTIMAP
+          .getValueDescriptor()
+          .getOptions()
+          .getExtension(RunnerApi.beamUrn);
+
+  /**
+   * Represents the {@code PrimitiveViewT} supplied to the {@link ViewFn} when 
it declares to use
+   * the {@link Materializations#ITERABLE_MATERIALIZATION_URN iterable 
materialization}.
+   *
+   * @param <V>
+   */
+  public interface IterableView<V> {
+    Iterable<V> get();
+  }
 
   /**
    * Represents the {@code PrimitiveViewT} supplied to the {@link ViewFn} when 
it declares to use
@@ -50,18 +76,31 @@ public class Materializations {
    * <b><i>For internal use only; no backwards-compatibility 
guarantees.</i></b>
    *
    * <p>A {@link Materialization} where the primitive view type is a multimap 
with fully specified
-   * windowed keys.
+   * windowed values.
    */
   @Internal
   public static <K, V> Materialization<MultimapView<K, V>> multimap() {
-    return new MultimapMaterialization<>();
+    return new Materialization<MultimapView<K, V>>() {
+      @Override
+      public String getUrn() {
+        return MULTIMAP_MATERIALIZATION_URN;
+      }
+    };
   }
 
-  private static class MultimapMaterialization<K, V>
-      implements Materialization<MultimapView<K, V>> {
-    @Override
-    public String getUrn() {
-      return MULTIMAP_MATERIALIZATION_URN;
-    }
+  /**
+   * <b><i>For internal use only; no backwards-compatibility 
guarantees.</i></b>
+   *
+   * <p>A {@link Materialization} where the primitive view type is an iterable 
with fully specifed
+   * windowed values.
+   */
+  @Internal
+  public static <V> Materialization<IterableView<V>> iterable() {
+    return new Materialization<IterableView<V>>() {
+      @Override
+      public String getUrn() {
+        return ITERABLE_MATERIALIZATION_URN;
+      }
+    };
   }
 }
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index e140db5..e3e7bd6 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -1112,9 +1112,6 @@ class DataflowRunner(PipelineRunner):
 class _DataflowSideInput(beam.pvalue.AsSideInput):
   """Wraps a side input as a dataflow-compatible side input."""
 
-  # Dataflow does not yet accept the shared urn definition for access.
-  DATAFLOW_MULTIMAP_URN = 'urn:beam:sideinput:materialization:multimap:0.1'
-
   def _view_options(self):
     return {
         'data': self._data,
@@ -1134,7 +1131,7 @@ class _DataflowIterableSideInput(_DataflowSideInput):
         side_input_data.access_pattern == common_urns.side_inputs.ITERABLE.urn)
     iterable_view_fn = side_input_data.view_fn
     self._data = beam.pvalue.SideInputData(
-        self.DATAFLOW_MULTIMAP_URN,
+        common_urns.side_inputs.MULTIMAP.urn,
         side_input_data.window_mapping_fn,
         lambda multimap: iterable_view_fn(multimap['']))
 
@@ -1149,7 +1146,7 @@ class _DataflowMultimapSideInput(_DataflowSideInput):
     assert (
         side_input_data.access_pattern == common_urns.side_inputs.MULTIMAP.urn)
     self._data = beam.pvalue.SideInputData(
-        self.DATAFLOW_MULTIMAP_URN,
+        common_urns.side_inputs.MULTIMAP.urn,
         side_input_data.window_mapping_fn,
         side_input_data.view_fn)
 
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index ed5c74a..6c19c79 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -32,11 +32,11 @@ import apache_beam.transforms as ptransform
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.pipeline import AppliedPTransform
 from apache_beam.pipeline import Pipeline
+from apache_beam.portability import common_urns
 from apache_beam.pvalue import PCollection
 from apache_beam.runners import DataflowRunner
 from apache_beam.runners import TestDataflowRunner
 from apache_beam.runners import create_runner
-from apache_beam.runners.dataflow import dataflow_runner
 from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult
 from apache_beam.runners.dataflow.dataflow_runner import 
DataflowRuntimeException
 from apache_beam.runners.dataflow.internal.clients import dataflow as 
dataflow_api
@@ -385,7 +385,7 @@ class DataflowRunnerTest(unittest.TestCase):
     self.assertEqual(2, len(applied_transform.side_inputs))
     for side_input in applied_transform.side_inputs:
       self.assertEqual(
-          dataflow_runner._DataflowSideInput.DATAFLOW_MULTIMAP_URN,
+          common_urns.side_inputs.MULTIMAP.urn,
           side_input._side_input_data().access_pattern)
 
 
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 583f1bb..feb8eae 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -47,7 +47,6 @@ from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners import common
 from apache_beam.runners import pipeline_context
-from apache_beam.runners.dataflow import dataflow_runner
 from apache_beam.runners.worker import operation_specs
 from apache_beam.runners.worker import operations
 from apache_beam.runners.worker import statesampler
@@ -236,9 +235,7 @@ class StateBackedSideInputMap(object):
         raw_view = _StateBackedIterable(
             state_handler, state_key, self._element_coder)
 
-      elif (access_pattern == common_urns.side_inputs.MULTIMAP.urn or
-            access_pattern ==
-            dataflow_runner._DataflowSideInput.DATAFLOW_MULTIMAP_URN):
+      elif access_pattern == common_urns.side_inputs.MULTIMAP.urn:
         cache = {}
         key_coder_impl = self._element_coder.key_coder().get_impl()
         value_coder = self._element_coder.value_coder()

Reply via email to