[
https://issues.apache.org/jira/browse/BEAM-4076?focusedWorklogId=127917&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127917
]
ASF GitHub Bot logged work on BEAM-4076:
----------------------------------------
Author: ASF GitHub Bot
Created on: 26/Jul/18 21:00
Start Date: 26/Jul/18 21:00
Worklog Time Spent: 10m
Work Description: aaltay closed pull request #6072: [BEAM-4076] Fix
schemas on Dataflow and FnApi
URL: https://github.com/apache/beam/pull/6072
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
index 717585c4d7a..aeda31232a2 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
@@ -97,7 +97,11 @@ private ParDoSingle(
@Override
public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(),
input.isBounded(), outputCoder);
+ input.getPipeline(),
+ input.getWindowingStrategy(),
+ input.isBounded(),
+ outputCoder,
+ onlyOutputTag);
}
public DoFn<InputT, OutputT> getFn() {
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
index 8457e0b56df..20adbd567c7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
@@ -361,12 +361,21 @@ public IsBounded isBounded() {
private IsBounded isBounded;
/** A local {@link TupleTag} used in the expansion of this {@link
PValueBase}. */
- private final TupleTag<?> tag = new TupleTag<>();
+ private final TupleTag<?> tag;
private PCollection(Pipeline p, WindowingStrategy<?, ?> windowingStrategy,
IsBounded isBounded) {
super(p);
this.windowingStrategy = windowingStrategy;
this.isBounded = isBounded;
+ this.tag = new TupleTag<>();
+ }
+
+ private PCollection(
+ Pipeline p, WindowingStrategy<?, ?> windowingStrategy, IsBounded
isBounded, TupleTag<?> tag) {
+ super(p);
+ this.windowingStrategy = windowingStrategy;
+ this.isBounded = isBounded;
+ this.tag = tag;
}
/**
@@ -408,6 +417,21 @@ private PCollection(Pipeline p, WindowingStrategy<?, ?>
windowingStrategy, IsBou
return res;
}
+ /** <b><i>For internal use only; no backwards-compatibility
guarantees.</i></b> */
+ @Internal
+ public static <T> PCollection<T> createPrimitiveOutputInternal(
+ Pipeline pipeline,
+ WindowingStrategy<?, ?> windowingStrategy,
+ IsBounded isBounded,
+ @Nullable Coder<T> coder,
+ TupleTag<?> tag) {
+ PCollection<T> res = new PCollection<>(pipeline, windowingStrategy,
isBounded, tag);
+ if (coder != null) {
+ res.setCoder(coder);
+ }
+ return res;
+ }
+
private static class CoderOrFailure<T> {
@Nullable private final Coder<T> coder;
@Nullable private final String failure;
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/DoFnPTransformRunnerFactory.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/DoFnPTransformRunnerFactory.java
index 61a341eaed2..92560232284 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/DoFnPTransformRunnerFactory.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/DoFnPTransformRunnerFactory.java
@@ -47,6 +47,7 @@
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.function.ThrowingRunnable;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Materializations;
@@ -150,7 +151,9 @@ public final RunnerT createRunnerForPTransform(
final DoFnSignature doFnSignature;
final TupleTag<OutputT> mainOutputTag;
final Coder<?> inputCoder;
+ final SchemaCoder<InputT> schemaCoder;
final Coder<?> keyCoder;
+ final SchemaCoder<OutputT> mainOutputSchemaCoder;
final Coder<? extends BoundedWindow> windowCoder;
final WindowingStrategy<InputT, ?> windowingStrategy;
final Map<TupleTag<?>, SideInputSpec> tagToSideInputSpecMap;
@@ -210,6 +213,17 @@ public final RunnerT createRunnerForPTransform(
} else {
this.keyCoder = null;
}
+ if (inputCoder instanceof SchemaCoder
+ // TODO: Stop passing windowed value coders within PCollections.
+ || (inputCoder instanceof WindowedValue.WindowedValueCoder
+ && (((WindowedValueCoder) inputCoder).getValueCoder()
instanceof SchemaCoder))) {
+ this.schemaCoder =
+ inputCoder instanceof WindowedValueCoder
+ ? (SchemaCoder<InputT>) ((WindowedValueCoder)
inputCoder).getValueCoder()
+ : ((SchemaCoder<InputT>) inputCoder);
+ } else {
+ this.schemaCoder = null;
+ }
windowingStrategy =
(WindowingStrategy)
@@ -221,8 +235,14 @@ public final RunnerT createRunnerForPTransform(
TupleTag<?> outputTag = new TupleTag<>(entry.getKey());
RunnerApi.PCollection outputPCollection =
pCollections.get(entry.getValue());
Coder<?> outputCoder =
rehydratedComponents.getCoder(outputPCollection.getCoderId());
+ if (outputCoder instanceof WindowedValueCoder) {
+ outputCoder = ((WindowedValueCoder) outputCoder).getValueCoder();
+ }
outputCoders.put(outputTag, outputCoder);
}
+ Coder<OutputT> outputCoder = (Coder<OutputT>)
outputCoders.get(mainOutputTag);
+ mainOutputSchemaCoder =
+ (outputCoder instanceof SchemaCoder) ? (SchemaCoder<OutputT>)
outputCoder : null;
// Build the map from tag id to side input specification
for (Map.Entry<String, RunnerApi.SideInput> entry :
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index b8a02f673a6..30129aa258d 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -38,7 +38,6 @@
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
-import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.TimeDomain;
@@ -109,10 +108,6 @@
/** Only valid during {@link #processElement} and {@link #processTimer},
null otherwise. */
private BoundedWindow currentWindow;
- /** Following fields are only valid if a Schema is set, null otherwise. */
- @Nullable private final SchemaCoder<InputT> schemaCoder;
-
- @Nullable private final SchemaCoder<OutputT> mainOutputSchemaCoder;
@Nullable private final FieldAccessDescriptor fieldAccessDescriptor;
/** Only valid during {@link #processTimer}, null otherwise. */
@@ -165,17 +160,6 @@ public void output(OutputT output, Instant timestamp,
BoundedWindow window) {
}
};
- this.schemaCoder =
- (context.inputCoder instanceof SchemaCoder)
- ? (SchemaCoder<InputT>) context.inputCoder
- : null;
- if (context.outputCoders != null) {
- Coder<OutputT> outputCoder = (Coder<OutputT>)
context.outputCoders.get(context.mainOutputTag);
- mainOutputSchemaCoder =
- (outputCoder instanceof SchemaCoder) ? (SchemaCoder<OutputT>)
outputCoder : null;
- } else {
- mainOutputSchemaCoder = null;
- }
DoFnSignature doFnSignature =
DoFnSignatures.getSignature(context.doFn.getClass());
DoFnSignature.ProcessElementMethod processElementMethod =
DoFnSignatures.getSignature(context.doFn.getClass()).processElement();
@@ -183,7 +167,7 @@ public void output(OutputT output, Instant timestamp,
BoundedWindow window) {
FieldAccessDescriptor fieldAccessDescriptor = null;
if (rowParameter != null) {
checkArgument(
- schemaCoder != null,
+ context.schemaCoder != null,
"Cannot access object as a row if the input PCollection does not
have a schema ."
+ "DoFn "
+ context.doFn.getClass()
@@ -209,7 +193,7 @@ public void output(OutputT output, Instant timestamp,
BoundedWindow window) {
}
}
// Resolve the FieldAccessDescriptor. This converts all field names into
field ids.
- fieldAccessDescriptor =
fieldAccessDescriptor.resolve(schemaCoder.getSchema());
+ fieldAccessDescriptor =
fieldAccessDescriptor.resolve(context.schemaCoder.getSchema());
}
this.fieldAccessDescriptor = fieldAccessDescriptor;
}
@@ -452,7 +436,7 @@ public InputT element(DoFn<InputT, OutputT> doFn) {
@Override
public Row asRow(@Nullable String id) {
checkState(fieldAccessDescriptor.allFields());
- return schemaCoder.getToRowFunction().apply(element());
+ return context.schemaCoder.getToRowFunction().apply(element());
}
@Override
@@ -473,7 +457,7 @@ public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
@Override
public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
- return DoFnOutputReceivers.rowReceiver(this, null,
mainOutputSchemaCoder);
+ return DoFnOutputReceivers.rowReceiver(this, null,
context.mainOutputSchemaCoder);
}
@Override
@@ -656,7 +640,7 @@ public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
@Override
public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
- return DoFnOutputReceivers.rowReceiver(this, null,
mainOutputSchemaCoder);
+ return DoFnOutputReceivers.rowReceiver(this, null,
context.mainOutputSchemaCoder);
}
@Override
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 127917)
Time Spent: 16h 10m (was: 16h)
> Schema followups
> ----------------
>
> Key: BEAM-4076
> URL: https://issues.apache.org/jira/browse/BEAM-4076
> Project: Beam
> Issue Type: Improvement
> Components: beam-model, dsl-sql, sdk-java-core
> Reporter: Kenneth Knowles
> Priority: Major
> Time Spent: 16h 10m
> Remaining Estimate: 0h
>
> This umbrella bug contains subtasks with followups for Beam schemas, which
> were moved from SQL to the core Java SDK and made to be type-name-based
> rather than coder based.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)