You will want to add your own CoderTranslatorRegistrar for the additional URNs that you want to support to the runner and make sure the SDK that your using submits those coders using those URNs and payloads. If your using Java, you would want to make sure that the same CoderTranslatorRegistrar is on the classpath and discoverable via a ServiceLoader. If your using Python/Go SDKs for creating pipelines, you'll need to use their coder registration/translation mechanism (I'm not sure if they support dynamic registration like the Java SDK does).
On Wed, Oct 3, 2018 at 1:53 PM Shen Li <[email protected]> wrote: > Hi Lukasz, > > Is there a way to get the SDK coders (LengthPrefixCoder<VoidCoder>, > LengthPrefixCoder<VarIntCoder> etc.) instead of a > LengthPrefixCoder<ByteArrayCoder> on the runner side from > RunnerApi.Pipeline? Our runner needs to serialize the key and use its hash > value to keep some per-key states. Now I am getting the ClassCastException > as the key seen by the runner (an Integer) is not a Byte array. > > Thanks, > Shen > > On Fri, Sep 28, 2018 at 2:20 PM Shen Li <[email protected]> wrote: > >> Thank you, Lukasz! >> >> Best, >> Shen >> >> On Fri, Sep 28, 2018 at 2:11 PM Lukasz Cwik <[email protected]> wrote: >> >>> Runners can never know about every coder that a user may want to write >>> which is why we need to have a mechanism for Runners to be able to convert >>> any unknown coder to one it can handle. This is done via >>> WireCoders.instantiateRunnerWireCoder but this modifies the original coder >>> which is why WireCoders.addSdkWireCoder creates the proto definition that >>> the SDK should be told to use. In your case, your correct in that KV<Void, >>> T> becomes KVCoder<LengthPrefixCoder<ByteArrayCoder>, >>> LengthPrefixCoder<ByteArrayCoder>> on the runner side and on the SDK side >>> it should be KVCoder<LengthPrefixCoder<VoidCoder>, >>> LengthPrefixCoder<CoderForT>>. More details in [1]. >>> >>> 1: >>> http://doc/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA#heading=h.sh4d5klmtfis >>> >>> >>> >>> On Fri, Sep 28, 2018 at 11:02 AM Shen Li <[email protected]> wrote: >>> >>>> Hi, >>>> >>>> I noticed that ModelCoderRegistrar only includes 9 out of ~40 coders. >>>> May I know the rationale behind this decision? >>>> >>>> >>>> https://github.com/apache/beam/blob/release-2.7.0/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java >>>> >>>> I think one consequence of the above configuration is >>>> that WireCoders.instantiateRunnerWireCoder cannot instantiate KV coders >>>> correctly, where VoidCoder (key coder) becomes >>>> LengthPrefixCoder(ByteArrayCoder). What is the appropriate way to get >>>> KvCoder<Void, T> from RunnerApi.Pipeline? >>>> >>>> Thanks, >>>> Shen >>>> >>>
