[
https://issues.apache.org/jira/browse/BEAM-4575?focusedWorklogId=116023&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116023
]
ASF GitHub Bot logged work on BEAM-4575:
----------------------------------------
Author: ASF GitHub Bot
Created on: 26/Jun/18 16:41
Start Date: 26/Jun/18 16:41
Worklog Time Spent: 10m
Work Description: kennknowles closed pull request #5687: [BEAM-4575][SQL]
Don't wait on Unbounded PCollections
URL: https://github.com/apache/beam/pull/5687
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
index 8e32a6aa2de..015e8711753 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
@@ -29,6 +29,7 @@
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.coders.VarIntCoder;
@@ -40,12 +41,16 @@
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.Row;
import org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
@@ -152,15 +157,6 @@ public boolean isReached() {
}
}
- private static PipelineResult run(
- PipelineOptions options, BeamRelNode node, DoFn<Row, Void> doFn) {
- Pipeline pipeline = Pipeline.create(options);
- BeamSqlRelUtils.toPCollection(pipeline, node).apply(ParDo.of(doFn));
- PipelineResult result = pipeline.run();
- result.waitUntilFinish();
- return result;
- }
-
private static PipelineResult limitRun(
PipelineOptions options,
BeamRelNode node,
@@ -209,7 +205,12 @@ private static PipelineResult limitRun(
"SELECT without INSERT is only supported in DirectRunner in SQL
Shell.");
Collector.globalValues.put(id, values);
- run(options, node, new Collector());
+
+ Pipeline pipeline = Pipeline.create(options);
+ BeamSqlRelUtils.toPCollection(pipeline, node).apply(ParDo.of(new
Collector()));
+ PipelineResult result = pipeline.run();
+ result.waitUntilFinish();
+
Collector.globalValues.remove(id);
return Linq4j.asEnumerable(values);
@@ -324,15 +325,22 @@ public void processElement(ProcessContext context) {
}
private static Enumerable<Object> count(PipelineOptions options, BeamRelNode
node) {
- PipelineResult result = run(options, node, new RowCounter());
- MetricQueryResults metrics =
- result
- .metrics()
- .queryMetrics(
- MetricsFilter.builder()
-
.addNameFilter(MetricNameFilter.named(BeamEnumerableConverter.class, "rows"))
- .build());
- long count = metrics.getCounters().iterator().next().getAttempted();
+ Pipeline pipeline = Pipeline.create(options);
+ BeamSqlRelUtils.toPCollection(pipeline, node).apply(ParDo.of(new
RowCounter()));
+ PipelineResult result = pipeline.run();
+
+ long count = 0;
+ if (!containsUnboundedPCollection(pipeline)) {
+ result.waitUntilFinish();
+ MetricQueryResults metrics =
+ result
+ .metrics()
+ .queryMetrics(
+ MetricsFilter.builder()
+
.addNameFilter(MetricNameFilter.named(BeamEnumerableConverter.class, "rows"))
+ .build());
+ count = metrics.getCounters().iterator().next().getAttempted();
+ }
return Linq4j.singletonEnumerable(count);
}
@@ -360,4 +368,21 @@ private static int getLimitCount(BeamRelNode node) {
throw new RuntimeException(
"Cannot get limit count from RelNode tree with root " +
node.getRelTypeName());
}
+
+ private static boolean containsUnboundedPCollection(Pipeline p) {
+ class BoundednessVisitor extends PipelineVisitor.Defaults {
+ IsBounded boundedness = IsBounded.BOUNDED;
+
+ @Override
+ public void visitValue(PValue value, Node producer) {
+ if (value instanceof PCollection) {
+ boundedness = boundedness.and(((PCollection) value).isBounded());
+ }
+ }
+ }
+
+ BoundednessVisitor visitor = new BoundednessVisitor();
+ p.traverseTopologically(visitor);
+ return visitor.boundedness == IsBounded.UNBOUNDED;
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 116023)
Time Spent: 6h (was: 5h 50m)
> Beam SQL should cleanly transform graph from Calcite
> ----------------------------------------------------
>
> Key: BEAM-4575
> URL: https://issues.apache.org/jira/browse/BEAM-4575
> Project: Beam
> Issue Type: New Feature
> Components: dsl-sql
> Reporter: Andrew Pilloud
> Assignee: Andrew Pilloud
> Priority: Major
> Time Spent: 6h
> Remaining Estimate: 0h
>
> It would be nice if the Beam graph matched the Calcite graph in structure
> with each node generating a PTransform that is applied onto the PCollection
> of it's parent. We should also ensure that each Calcite node only appears in
> the Beam graph one time.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)