[ 
https://issues.apache.org/jira/browse/BEAM-3971?focusedWorklogId=117694&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117694
 ]

ASF GitHub Bot logged work on BEAM-3971:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Jun/18 00:20
            Start Date: 30/Jun/18 00:20
    Worklog Time Spent: 10m 
      Work Description: bsidhom commented on a change in pull request #5833: 
[BEAM-3971, BEAM-4284] Remove fromProto for Pipeline and PTransform translation.
URL: https://github.com/apache/beam/pull/5833#discussion_r199303724
 
 

 ##########
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
 ##########
 @@ -141,96 +99,28 @@ public FunctionSpec translate(
       final SdkComponents components)
       throws IOException {
 
-    return payloadForCombineLike(
-        new CombineLike() {
-          @Override
-          public SdkFunctionSpec getCombineFn() {
-            return SdkFunctionSpec.newBuilder()
-                .setEnvironmentId(
-                    
components.registerEnvironment(Environments.JAVA_SDK_HARNESS_ENVIRONMENT))
-                .setSpec(
-                    FunctionSpec.newBuilder()
-                        .setUrn(JAVA_SERIALIZED_COMBINE_FN_URN)
-                        .setPayload(
-                            ByteString.copyFrom(
-                                SerializableUtils.serializeToByteArray(
-                                    combine.getTransform().getFn())))
-                        .build())
-                .build();
-          }
-
-          @Override
-          public Coder<?> getAccumulatorCoder() {
-            GlobalCombineFn<?, ?, ?> combineFn = 
combine.getTransform().getFn();
-            try {
-              return extractAccumulatorCoder(combineFn, (AppliedPTransform) 
combine);
-            } catch (CannotProvideCoderException e) {
-              throw new IllegalStateException(e);
-            }
-          }
-        },
-        components);
-  }
-
-  private static class RawCombine<K, InputT, AccumT, OutputT>
-      extends PTransformTranslation.RawPTransform<
-          PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>
-      implements CombineLike {
-
-    private final RunnerApi.PTransform protoTransform;
-    private final transient RehydratedComponents rehydratedComponents;
-    private final FunctionSpec spec;
-    private final CombinePayload payload;
-    private final Coder<AccumT> accumulatorCoder;
-
-    private RawCombine(
-        RunnerApi.PTransform protoTransform, RehydratedComponents 
rehydratedComponents)
-        throws IOException {
-      this.protoTransform = protoTransform;
-      this.rehydratedComponents = rehydratedComponents;
-      this.spec = protoTransform.getSpec();
-      this.payload = CombinePayload.parseFrom(spec.getPayload());
-
-      // Eagerly extract the coder to throw a good exception here
-      try {
-        this.accumulatorCoder =
-            (Coder<AccumT>) 
rehydratedComponents.getCoder(payload.getAccumulatorCoderId());
-      } catch (IOException exc) {
-        throw new IllegalArgumentException(
-            String.format(
-                "Failure extracting accumulator coder with id '%s' for %s",
-                payload.getAccumulatorCoderId(), 
Combine.class.getSimpleName()),
-            exc);
-      }
-    }
-
-    @Override
-    public String getUrn() {
-      return COMBINE_TRANSFORM_URN;
-    }
-
-    @Nonnull
-    @Override
-    public FunctionSpec getSpec() {
-      return spec;
-    }
-
-    @Override
-    public RunnerApi.FunctionSpec migrate(SdkComponents sdkComponents) throws 
IOException {
-      return RunnerApi.FunctionSpec.newBuilder()
-          .setUrn(COMBINE_TRANSFORM_URN)
-          .setPayload(payloadForCombineLike(this, 
sdkComponents).toByteString())
+    GlobalCombineFn<?, ?, ?> combineFn = combine.getTransform().getFn();
+    try {
+      return RunnerApi.CombinePayload.newBuilder()
+          .setAccumulatorCoderId(
+              components.registerCoder(
+                  extractAccumulatorCoder(combineFn, (AppliedPTransform) 
combine)))
+          .setCombineFn(
+              SdkFunctionSpec.newBuilder()
+                  .setEnvironmentId(
+                      
components.registerEnvironment(Environments.JAVA_SDK_HARNESS_ENVIRONMENT))
 
 Review comment:
   I realize this was already here, but we should probably be careful about how 
we set the environment. I think it would be preferable to at least use a static 
method on `Environments` to allow us to override this, e.g., with an 
environment variable. Wasn't this added recently?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 117694)
    Time Spent: 5h  (was: 4h 50m)

> Pipeline translation utilities should not use SDK construction classes
> ----------------------------------------------------------------------
>
>                 Key: BEAM-3971
>                 URL: https://issues.apache.org/jira/browse/BEAM-3971
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Ben Sidhom
>            Assignee: Ben Sidhom
>            Priority: Major
>          Time Spent: 5h
>  Remaining Estimate: 0h
>
> In general, portable runners will require access to pipeline information not 
> available in rehydrated pipelines while constructing physical plans. 
> Translation utilities should operate directly on protos or on thin, 
> information-preserving wrappers.
> The pipeline fusion utilities already operate on protos directly and can be 
> used as an example of how this could be done.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to