[ 
https://issues.apache.org/jira/browse/BEAM-4076?focusedWorklogId=127917&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127917
 ]

ASF GitHub Bot logged work on BEAM-4076:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Jul/18 21:00
            Start Date: 26/Jul/18 21:00
    Worklog Time Spent: 10m 
      Work Description: aaltay closed pull request #6072: [BEAM-4076] Fix 
schemas on Dataflow and FnApi
URL: https://github.com/apache/beam/pull/6072
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
index 717585c4d7a..aeda31232a2 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
@@ -97,7 +97,11 @@ private ParDoSingle(
     @Override
     public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
       return PCollection.createPrimitiveOutputInternal(
-          input.getPipeline(), input.getWindowingStrategy(), 
input.isBounded(), outputCoder);
+          input.getPipeline(),
+          input.getWindowingStrategy(),
+          input.isBounded(),
+          outputCoder,
+          onlyOutputTag);
     }
 
     public DoFn<InputT, OutputT> getFn() {
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
index 8457e0b56df..20adbd567c7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
@@ -361,12 +361,21 @@ public IsBounded isBounded() {
   private IsBounded isBounded;
 
   /** A local {@link TupleTag} used in the expansion of this {@link 
PValueBase}. */
-  private final TupleTag<?> tag = new TupleTag<>();
+  private final TupleTag<?> tag;
 
   private PCollection(Pipeline p, WindowingStrategy<?, ?> windowingStrategy, 
IsBounded isBounded) {
     super(p);
     this.windowingStrategy = windowingStrategy;
     this.isBounded = isBounded;
+    this.tag = new TupleTag<>();
+  }
+
+  private PCollection(
+      Pipeline p, WindowingStrategy<?, ?> windowingStrategy, IsBounded 
isBounded, TupleTag<?> tag) {
+    super(p);
+    this.windowingStrategy = windowingStrategy;
+    this.isBounded = isBounded;
+    this.tag = tag;
   }
 
   /**
@@ -408,6 +417,21 @@ private PCollection(Pipeline p, WindowingStrategy<?, ?> 
windowingStrategy, IsBou
     return res;
   }
 
+  /** <b><i>For internal use only; no backwards-compatibility 
guarantees.</i></b> */
+  @Internal
+  public static <T> PCollection<T> createPrimitiveOutputInternal(
+      Pipeline pipeline,
+      WindowingStrategy<?, ?> windowingStrategy,
+      IsBounded isBounded,
+      @Nullable Coder<T> coder,
+      TupleTag<?> tag) {
+    PCollection<T> res = new PCollection<>(pipeline, windowingStrategy, 
isBounded, tag);
+    if (coder != null) {
+      res.setCoder(coder);
+    }
+    return res;
+  }
+
   private static class CoderOrFailure<T> {
     @Nullable private final Coder<T> coder;
     @Nullable private final String failure;
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/DoFnPTransformRunnerFactory.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/DoFnPTransformRunnerFactory.java
index 61a341eaed2..92560232284 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/DoFnPTransformRunnerFactory.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/DoFnPTransformRunnerFactory.java
@@ -47,6 +47,7 @@
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.fn.function.ThrowingRunnable;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.SchemaCoder;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Materializations;
@@ -150,7 +151,9 @@ public final RunnerT createRunnerForPTransform(
     final DoFnSignature doFnSignature;
     final TupleTag<OutputT> mainOutputTag;
     final Coder<?> inputCoder;
+    final SchemaCoder<InputT> schemaCoder;
     final Coder<?> keyCoder;
+    final SchemaCoder<OutputT> mainOutputSchemaCoder;
     final Coder<? extends BoundedWindow> windowCoder;
     final WindowingStrategy<InputT, ?> windowingStrategy;
     final Map<TupleTag<?>, SideInputSpec> tagToSideInputSpecMap;
@@ -210,6 +213,17 @@ public final RunnerT createRunnerForPTransform(
         } else {
           this.keyCoder = null;
         }
+        if (inputCoder instanceof SchemaCoder
+            // TODO: Stop passing windowed value coders within PCollections.
+            || (inputCoder instanceof WindowedValue.WindowedValueCoder
+                && (((WindowedValueCoder) inputCoder).getValueCoder() 
instanceof SchemaCoder))) {
+          this.schemaCoder =
+              inputCoder instanceof WindowedValueCoder
+                  ? (SchemaCoder<InputT>) ((WindowedValueCoder) 
inputCoder).getValueCoder()
+                  : ((SchemaCoder<InputT>) inputCoder);
+        } else {
+          this.schemaCoder = null;
+        }
 
         windowingStrategy =
             (WindowingStrategy)
@@ -221,8 +235,14 @@ public final RunnerT createRunnerForPTransform(
           TupleTag<?> outputTag = new TupleTag<>(entry.getKey());
           RunnerApi.PCollection outputPCollection = 
pCollections.get(entry.getValue());
           Coder<?> outputCoder = 
rehydratedComponents.getCoder(outputPCollection.getCoderId());
+          if (outputCoder instanceof WindowedValueCoder) {
+            outputCoder = ((WindowedValueCoder) outputCoder).getValueCoder();
+          }
           outputCoders.put(outputTag, outputCoder);
         }
+        Coder<OutputT> outputCoder = (Coder<OutputT>) 
outputCoders.get(mainOutputTag);
+        mainOutputSchemaCoder =
+            (outputCoder instanceof SchemaCoder) ? (SchemaCoder<OutputT>) 
outputCoder : null;
 
         // Build the map from tag id to side input specification
         for (Map.Entry<String, RunnerApi.SideInput> entry :
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index b8a02f673a6..30129aa258d 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -38,7 +38,6 @@
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
-import org.apache.beam.sdk.schemas.SchemaCoder;
 import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.TimeDomain;
@@ -109,10 +108,6 @@
   /** Only valid during {@link #processElement} and {@link #processTimer}, 
null otherwise. */
   private BoundedWindow currentWindow;
 
-  /** Following fields are only valid if a Schema is set, null otherwise. */
-  @Nullable private final SchemaCoder<InputT> schemaCoder;
-
-  @Nullable private final SchemaCoder<OutputT> mainOutputSchemaCoder;
   @Nullable private final FieldAccessDescriptor fieldAccessDescriptor;
 
   /** Only valid during {@link #processTimer}, null otherwise. */
@@ -165,17 +160,6 @@ public void output(OutputT output, Instant timestamp, 
BoundedWindow window) {
           }
         };
 
-    this.schemaCoder =
-        (context.inputCoder instanceof SchemaCoder)
-            ? (SchemaCoder<InputT>) context.inputCoder
-            : null;
-    if (context.outputCoders != null) {
-      Coder<OutputT> outputCoder = (Coder<OutputT>) 
context.outputCoders.get(context.mainOutputTag);
-      mainOutputSchemaCoder =
-          (outputCoder instanceof SchemaCoder) ? (SchemaCoder<OutputT>) 
outputCoder : null;
-    } else {
-      mainOutputSchemaCoder = null;
-    }
     DoFnSignature doFnSignature = 
DoFnSignatures.getSignature(context.doFn.getClass());
     DoFnSignature.ProcessElementMethod processElementMethod =
         DoFnSignatures.getSignature(context.doFn.getClass()).processElement();
@@ -183,7 +167,7 @@ public void output(OutputT output, Instant timestamp, 
BoundedWindow window) {
     FieldAccessDescriptor fieldAccessDescriptor = null;
     if (rowParameter != null) {
       checkArgument(
-          schemaCoder != null,
+          context.schemaCoder != null,
           "Cannot access object as a row if the input PCollection does not 
have a schema ."
               + "DoFn "
               + context.doFn.getClass()
@@ -209,7 +193,7 @@ public void output(OutputT output, Instant timestamp, 
BoundedWindow window) {
         }
       }
       // Resolve the FieldAccessDescriptor. This converts all field names into 
field ids.
-      fieldAccessDescriptor = 
fieldAccessDescriptor.resolve(schemaCoder.getSchema());
+      fieldAccessDescriptor = 
fieldAccessDescriptor.resolve(context.schemaCoder.getSchema());
     }
     this.fieldAccessDescriptor = fieldAccessDescriptor;
   }
@@ -452,7 +436,7 @@ public InputT element(DoFn<InputT, OutputT> doFn) {
     @Override
     public Row asRow(@Nullable String id) {
       checkState(fieldAccessDescriptor.allFields());
-      return schemaCoder.getToRowFunction().apply(element());
+      return context.schemaCoder.getToRowFunction().apply(element());
     }
 
     @Override
@@ -473,7 +457,7 @@ public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
 
     @Override
     public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
-      return DoFnOutputReceivers.rowReceiver(this, null, 
mainOutputSchemaCoder);
+      return DoFnOutputReceivers.rowReceiver(this, null, 
context.mainOutputSchemaCoder);
     }
 
     @Override
@@ -656,7 +640,7 @@ public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
 
     @Override
     public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
-      return DoFnOutputReceivers.rowReceiver(this, null, 
mainOutputSchemaCoder);
+      return DoFnOutputReceivers.rowReceiver(this, null, 
context.mainOutputSchemaCoder);
     }
 
     @Override


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 127917)
    Time Spent: 16h 10m  (was: 16h)

> Schema followups
> ----------------
>
>                 Key: BEAM-4076
>                 URL: https://issues.apache.org/jira/browse/BEAM-4076
>             Project: Beam
>          Issue Type: Improvement
>          Components: beam-model, dsl-sql, sdk-java-core
>            Reporter: Kenneth Knowles
>            Priority: Major
>          Time Spent: 16h 10m
>  Remaining Estimate: 0h
>
> This umbrella bug contains subtasks with followups for Beam schemas, which 
> were moved from SQL to the core Java SDK and made to be type-name-based 
> rather than coder based.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to