TheNeuralBit commented on a change in pull request #16513: URL: https://github.com/apache/beam/pull/16513#discussion_r792070969
########## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ProjectionPushdownOptimizer.java ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction.graph; + +import java.util.AbstractMap.SimpleEntry; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.runners.PTransformMatcher; +import org.apache.beam.sdk.runners.PTransformOverride; +import org.apache.beam.sdk.runners.PTransformOverrideFactory; +import org.apache.beam.sdk.schemas.FieldAccessDescriptor; +import org.apache.beam.sdk.schemas.ProjectionProducer; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; + +/** See {@link #optimize(Pipeline)}. */ +public class ProjectionPushdownOptimizer { + + /** + * Performs all known projection pushdown optimizations in-place on a Pipeline. + * + * <p>A pushdown optimization will be made wherever there is a {@link ProjectionProducer} that Review comment: Could be helpful to link actuateProjectionPushdown here ```suggestion * <p>A pushdown optimization (i.e. {@link ProjectionProducer#actuateProjectionPushdown}) will be made wherever there is a {@link ProjectionProducer} that ``` (or something) ########## File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ProjectionPushdownOptimizerTest.java ########## @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction.graph; + +import java.util.Map; +import java.util.Objects; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; +import org.apache.beam.sdk.schemas.FieldAccessDescriptor; +import org.apache.beam.sdk.schemas.FieldAccessDescriptor.FieldDescriptor; +import org.apache.beam.sdk.schemas.ProjectionProducer; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link ProjectionPushdownOptimizer}. */ +@RunWith(JUnit4.class) +public class ProjectionPushdownOptimizerTest { + + @Test + public void testSourceDoesNotImplementPushdownProjector() { + Pipeline p = Pipeline.create(); + SimpleSource source = new SimpleSource(FieldAccessDescriptor.withAllFields()); + p.apply(source) + .apply(new FieldAccessTransform(FieldAccessDescriptor.withFieldNames("foo", "bar"))); + + ProjectionPushdownOptimizer.optimize(p); + Assert.assertTrue(pipelineHasTransform(p, source)); + } + + @Test + public void testSimpleProjectionPushdown() { + Pipeline p = Pipeline.create(); + SimpleSourceWithPushdown originalSource = + new SimpleSourceWithPushdown(FieldAccessDescriptor.withAllFields()); + FieldAccessDescriptor downstreamFieldAccess = + FieldAccessDescriptor.withFieldNames("foo", "bar"); + p.apply(originalSource).apply(new FieldAccessTransform(downstreamFieldAccess)); + + SimpleSourceWithPushdown expectedSource = new SimpleSourceWithPushdown(downstreamFieldAccess); + + ProjectionPushdownOptimizer.optimize(p); + Assert.assertTrue(pipelineHasTransform(p, expectedSource)); + Assert.assertFalse(pipelineHasTransform(p, originalSource)); + } + + @Test + public void testBranchedProjectionPushdown() { + Pipeline p = Pipeline.create(); + SimpleSourceWithPushdown originalSource = + new SimpleSourceWithPushdown(FieldAccessDescriptor.withAllFields()); + PCollection<Row> input = p.apply(originalSource); + input.apply(new FieldAccessTransform(FieldAccessDescriptor.withFieldNames("foo"))); + input.apply(new FieldAccessTransform(FieldAccessDescriptor.withFieldNames("bar"))); + + SimpleSourceWithPushdown expectedSource = + new SimpleSourceWithPushdown(FieldAccessDescriptor.withFieldNames("foo", "bar")); + + ProjectionPushdownOptimizer.optimize(p); + Assert.assertTrue(pipelineHasTransform(p, expectedSource)); + Assert.assertFalse(pipelineHasTransform(p, originalSource)); + } + + @Test + public void testIntermediateProducer() { + Pipeline p = Pipeline.create(); + SimpleSource source = new SimpleSource(FieldAccessDescriptor.withAllFields()); + IntermediateTransformWithPushdown originalT = + new IntermediateTransformWithPushdown(FieldAccessDescriptor.withAllFields()); + FieldAccessDescriptor downstreamFieldAccess = + FieldAccessDescriptor.withFieldNames("foo", "bar"); + p.apply(source).apply(originalT).apply(new FieldAccessTransform(downstreamFieldAccess)); + + // TODO(BEAM-13658) Support pushdown on intermediate transforms. + // For now, test that the pushdown optimizer ignores immediate transforms. + ProjectionPushdownOptimizer.optimize(p); + Assert.assertTrue(pipelineHasTransform(p, originalT)); + } + + @Test + public void testMultipleOutputs() { + Pipeline p = Pipeline.create(); + MultipleOutputSourceWithPushdown originalSource = + new MultipleOutputSourceWithPushdown( + FieldAccessDescriptor.withAllFields(), FieldAccessDescriptor.withAllFields()); Review comment: You might add another case where these schemas are different, just to be safe. ########## File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ProjectionPushdownOptimizerTest.java ########## @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction.graph; + +import java.util.Map; +import java.util.Objects; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; +import org.apache.beam.sdk.schemas.FieldAccessDescriptor; +import org.apache.beam.sdk.schemas.FieldAccessDescriptor.FieldDescriptor; +import org.apache.beam.sdk.schemas.ProjectionProducer; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link ProjectionPushdownOptimizer}. */ +@RunWith(JUnit4.class) +public class ProjectionPushdownOptimizerTest { + + @Test + public void testSourceDoesNotImplementPushdownProjector() { + Pipeline p = Pipeline.create(); + SimpleSource source = new SimpleSource(FieldAccessDescriptor.withAllFields()); + p.apply(source) + .apply(new FieldAccessTransform(FieldAccessDescriptor.withFieldNames("foo", "bar"))); + + ProjectionPushdownOptimizer.optimize(p); + Assert.assertTrue(pipelineHasTransform(p, source)); + } + + @Test + public void testSimpleProjectionPushdown() { + Pipeline p = Pipeline.create(); + SimpleSourceWithPushdown originalSource = + new SimpleSourceWithPushdown(FieldAccessDescriptor.withAllFields()); + FieldAccessDescriptor downstreamFieldAccess = + FieldAccessDescriptor.withFieldNames("foo", "bar"); + p.apply(originalSource).apply(new FieldAccessTransform(downstreamFieldAccess)); + + SimpleSourceWithPushdown expectedSource = new SimpleSourceWithPushdown(downstreamFieldAccess); + + ProjectionPushdownOptimizer.optimize(p); + Assert.assertTrue(pipelineHasTransform(p, expectedSource)); + Assert.assertFalse(pipelineHasTransform(p, originalSource)); + } + + @Test + public void testBranchedProjectionPushdown() { + Pipeline p = Pipeline.create(); + SimpleSourceWithPushdown originalSource = + new SimpleSourceWithPushdown(FieldAccessDescriptor.withAllFields()); + PCollection<Row> input = p.apply(originalSource); + input.apply(new FieldAccessTransform(FieldAccessDescriptor.withFieldNames("foo"))); + input.apply(new FieldAccessTransform(FieldAccessDescriptor.withFieldNames("bar"))); + + SimpleSourceWithPushdown expectedSource = + new SimpleSourceWithPushdown(FieldAccessDescriptor.withFieldNames("foo", "bar")); + + ProjectionPushdownOptimizer.optimize(p); + Assert.assertTrue(pipelineHasTransform(p, expectedSource)); + Assert.assertFalse(pipelineHasTransform(p, originalSource)); + } + + @Test + public void testIntermediateProducer() { + Pipeline p = Pipeline.create(); + SimpleSource source = new SimpleSource(FieldAccessDescriptor.withAllFields()); + IntermediateTransformWithPushdown originalT = + new IntermediateTransformWithPushdown(FieldAccessDescriptor.withAllFields()); + FieldAccessDescriptor downstreamFieldAccess = + FieldAccessDescriptor.withFieldNames("foo", "bar"); + p.apply(source).apply(originalT).apply(new FieldAccessTransform(downstreamFieldAccess)); + + // TODO(BEAM-13658) Support pushdown on intermediate transforms. + // For now, test that the pushdown optimizer ignores immediate transforms. + ProjectionPushdownOptimizer.optimize(p); + Assert.assertTrue(pipelineHasTransform(p, originalT)); + } + + @Test + public void testMultipleOutputs() { + Pipeline p = Pipeline.create(); + MultipleOutputSourceWithPushdown originalSource = + new MultipleOutputSourceWithPushdown( + FieldAccessDescriptor.withAllFields(), FieldAccessDescriptor.withAllFields()); + PCollectionTuple inputs = p.apply(originalSource); + inputs + .get(originalSource.tag1) + .apply(new FieldAccessTransform(FieldAccessDescriptor.withFieldNames("foo"))); + inputs + .get(originalSource.tag1) + .apply(new FieldAccessTransform(FieldAccessDescriptor.withFieldNames("bar"))); + inputs + .get(originalSource.tag2) + .apply(new FieldAccessTransform(FieldAccessDescriptor.withFieldNames("baz"))); + inputs + .get(originalSource.tag2) + .apply(new FieldAccessTransform(FieldAccessDescriptor.withFieldNames("qux"))); + + MultipleOutputSourceWithPushdown expectedSource = + new MultipleOutputSourceWithPushdown( + FieldAccessDescriptor.withFieldNames("foo", "bar"), + FieldAccessDescriptor.withFieldNames("baz", "qux")); + + ProjectionPushdownOptimizer.optimize(p); + Assert.assertTrue(pipelineHasTransform(p, expectedSource)); + Assert.assertFalse(pipelineHasTransform(p, originalSource)); + } + + private static boolean pipelineHasTransform(Pipeline p, PTransform<?, ?> t) { + HasTransformVisitor hasTransformVisitor = new HasTransformVisitor(t); + p.traverseTopologically(hasTransformVisitor); + return hasTransformVisitor.found; + } + + private static class HasTransformVisitor extends Defaults { + private final PTransform<?, ?> t; + boolean found = false; + + HasTransformVisitor(PTransform<?, ?> t) { + this.t = t; + } + + @Override + public CompositeBehavior enterCompositeTransform(Node node) { + if (t.equals(node.getTransform())) { + found = true; + return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; + } + return CompositeBehavior.ENTER_TRANSFORM; + } + } + + private static class FieldAccessTransform extends PTransform<PCollection<Row>, PCollection<Row>> { + private final FieldAccessDescriptor fieldAccessDescriptor; + + FieldAccessTransform(FieldAccessDescriptor fieldAccessDescriptor) { + this.fieldAccessDescriptor = fieldAccessDescriptor; + } + + @Override + public PCollection<Row> expand(PCollection<Row> input) { + return input + .apply( + ParDo.of( + new DoFn<Row, Row>() { + @SuppressWarnings("unused") // used by reflection + @FieldAccess("row") + private final FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessTransform.this.fieldAccessDescriptor; + + @ProcessElement + public void processElement( + @FieldAccess("row") Row row, OutputReceiver<Row> outputReceiver) + throws Exception { + // Do nothing; we don't need to execute this DoFn. + } + })) + .setRowSchema(createStringSchema(fieldAccessDescriptor)); + } + } + + private static Schema createStringSchema(FieldAccessDescriptor fieldAccessDescriptor) { + if (fieldAccessDescriptor.getAllFields()) { + return createStringSchema(FieldAccessDescriptor.withFieldNames("foo", "bar", "baz", "qux")); + } + Schema.Builder schemaBuilder = Schema.builder(); + for (FieldDescriptor field : fieldAccessDescriptor.getFieldsAccessed()) { + schemaBuilder.addStringField(field.getFieldName()); + } + return schemaBuilder.build(); + } + + private abstract static class SchemaTransform<InputT extends PInput> Review comment: nit: maybe clarify this is for transforms that produce a schema'd PCollection ```suggestion private abstract static class SchemaSourceTransform<InputT extends PInput> ``` ########## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PCollectionOutputTagVisitor.java ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction.graph; + +import java.util.AbstractMap.SimpleEntry; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; +import org.apache.beam.sdk.Pipeline.PipelineVisitor; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; +import org.apache.beam.sdk.schemas.FieldAccessDescriptor; +import org.apache.beam.sdk.schemas.ProjectionProducer; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableBiMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; + +/** + * {@link PipelineVisitor} to convert projection pushdown targets from PCollections to TupleTags. + * + * <p>For example, if we can do pushdown on PTransform T's output PCollection P by rewriting T, we + * need to get T's output tag for P. This is necessary because PCollection objects are not + * instantiated until pipeline construction, but output tags are constants that are known before + * pipeline construction, so transform authors can identify them in {@link + * ProjectionProducer#actuateProjectionPushdown(Map)}. Review comment: nit: you might add some `@link` and `@code` tags here ########## File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ProjectionPushdownOptimizerTest.java ########## @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction.graph; + +import java.util.Map; +import java.util.Objects; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; +import org.apache.beam.sdk.schemas.FieldAccessDescriptor; +import org.apache.beam.sdk.schemas.FieldAccessDescriptor.FieldDescriptor; +import org.apache.beam.sdk.schemas.ProjectionProducer; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link ProjectionPushdownOptimizer}. */ +@RunWith(JUnit4.class) +public class ProjectionPushdownOptimizerTest { + + @Test + public void testSourceDoesNotImplementPushdownProjector() { + Pipeline p = Pipeline.create(); + SimpleSource source = new SimpleSource(FieldAccessDescriptor.withAllFields()); Review comment: It could improve readability to make these sources explicitly specify the schema fields they produce, instead of having the special case for `withAllFields` in `createStringSchema`. ########## File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ProjectionPushdownOptimizerTest.java ########## @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction.graph; + +import java.util.Map; +import java.util.Objects; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; +import org.apache.beam.sdk.schemas.FieldAccessDescriptor; +import org.apache.beam.sdk.schemas.FieldAccessDescriptor.FieldDescriptor; +import org.apache.beam.sdk.schemas.ProjectionProducer; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link ProjectionPushdownOptimizer}. */ +@RunWith(JUnit4.class) +public class ProjectionPushdownOptimizerTest { + + @Test + public void testSourceDoesNotImplementPushdownProjector() { + Pipeline p = Pipeline.create(); + SimpleSource source = new SimpleSource(FieldAccessDescriptor.withAllFields()); + p.apply(source) + .apply(new FieldAccessTransform(FieldAccessDescriptor.withFieldNames("foo", "bar"))); + + ProjectionPushdownOptimizer.optimize(p); + Assert.assertTrue(pipelineHasTransform(p, source)); + } + + @Test + public void testSimpleProjectionPushdown() { + Pipeline p = Pipeline.create(); + SimpleSourceWithPushdown originalSource = + new SimpleSourceWithPushdown(FieldAccessDescriptor.withAllFields()); + FieldAccessDescriptor downstreamFieldAccess = + FieldAccessDescriptor.withFieldNames("foo", "bar"); + p.apply(originalSource).apply(new FieldAccessTransform(downstreamFieldAccess)); + + SimpleSourceWithPushdown expectedSource = new SimpleSourceWithPushdown(downstreamFieldAccess); + + ProjectionPushdownOptimizer.optimize(p); + Assert.assertTrue(pipelineHasTransform(p, expectedSource)); + Assert.assertFalse(pipelineHasTransform(p, originalSource)); + } + + @Test + public void testBranchedProjectionPushdown() { + Pipeline p = Pipeline.create(); + SimpleSourceWithPushdown originalSource = + new SimpleSourceWithPushdown(FieldAccessDescriptor.withAllFields()); + PCollection<Row> input = p.apply(originalSource); + input.apply(new FieldAccessTransform(FieldAccessDescriptor.withFieldNames("foo"))); + input.apply(new FieldAccessTransform(FieldAccessDescriptor.withFieldNames("bar"))); + + SimpleSourceWithPushdown expectedSource = + new SimpleSourceWithPushdown(FieldAccessDescriptor.withFieldNames("foo", "bar")); + + ProjectionPushdownOptimizer.optimize(p); + Assert.assertTrue(pipelineHasTransform(p, expectedSource)); + Assert.assertFalse(pipelineHasTransform(p, originalSource)); + } + + @Test + public void testIntermediateProducer() { + Pipeline p = Pipeline.create(); + SimpleSource source = new SimpleSource(FieldAccessDescriptor.withAllFields()); + IntermediateTransformWithPushdown originalT = + new IntermediateTransformWithPushdown(FieldAccessDescriptor.withAllFields()); + FieldAccessDescriptor downstreamFieldAccess = + FieldAccessDescriptor.withFieldNames("foo", "bar"); + p.apply(source).apply(originalT).apply(new FieldAccessTransform(downstreamFieldAccess)); + + // TODO(BEAM-13658) Support pushdown on intermediate transforms. + // For now, test that the pushdown optimizer ignores immediate transforms. Review comment: Is it worth verifying this behavior? Maybe it would be better to draft the test for BEAM-13658 and skip it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
